diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java index 4bdade5da7..c75a025cba 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java @@ -63,6 +63,7 @@ import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.index.EntityIndex; @@ -477,9 +478,9 @@ private Map getApplications(final String edgeType) throws Exceptio logger.debug("getApplications(): Loading edges of edgeType {} from {}:{}", new Object[]{edgeType, managementId.getType(), managementId.getUuid()}); - Observable edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType( - managementId, edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, Optional.absent() )); + Observable edges = gm.loadEdgesFromSource( + new SimpleSearchByEdgeType( managementId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, + Optional.absent() ) ); final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( appScope ); diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index aad76104fd..b4cabc4f2b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -59,6 +59,7 @@ import org.apache.usergrid.persistence.entities.User; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; @@ -227,14 +228,9 @@ Map> getContainers( final int limit, final String edgeTyp Observable edges = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) ) - .flatMap( new Func1>() { - @Override - public Observable call( final String edgeType ) { - return gm.loadEdgesToTarget( - new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE, - SearchByEdgeType.Order.DESCENDING, Optional.absent() ) ); - } - } ); + .flatMap( edgeType1 -> gm.loadEdgesToTarget( + new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType1, Long.MAX_VALUE, + SearchByEdgeType.Order.DESCENDING, Optional.absent() ) ) ); //if our limit is set, take them. Note this logic is still borked, we can't possibly fit everything in memmory if ( limit > -1 ) { @@ -268,7 +264,7 @@ public boolean isConnectionMember( String connectionType, EntityRef entity ) thr } ); GraphManager gm = managerCache.getGraphManager( applicationScope ); - Observable edges = gm.loadEdgeVersions( CpNamingUtils + Observable edges = gm.loadEdgeVersions( CpNamingUtils .createEdgeFromConnectionType( new SimpleId( headEntity.getUuid(), headEntity.getType() ), connectionType, entityId ) ); @@ -288,7 +284,7 @@ public boolean isCollectionMember( String collectionName, EntityRef entity ) thr } ); GraphManager gm = managerCache.getGraphManager( applicationScope ); - Observable edges = gm.loadEdgeVersions( CpNamingUtils + Observable edges = gm.loadEdgeVersions( CpNamingUtils .createEdgeFromCollectionName( new SimpleId( headEntity.getUuid(), headEntity.getType() ), collectionName, entityId ) ); diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index fe53776f82..d93e304813 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -324,6 +324,8 @@ else if ( event instanceof EdgeIndexEvent ) { } else if ( event instanceof EntityDeleteEvent ) { indexoperationObservable = handleEntityDelete( message ); + validateEmptySets = false; // do not check this one for an empty set b/c it can be empty + } else if ( event instanceof EntityIndexEvent ) { indexoperationObservable = handleEntityIndexUpdate( message ); diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java index 3c41a2bd60..915af032a3 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java @@ -53,4 +53,11 @@ public Optional getPath() { } + @Override + public String toString() { + return "FilterResult{" + + "path=" + path + + ", value=" + value + + '}'; + } } diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java index d3e034539d..621edd22de 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java @@ -23,13 +23,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer; import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter; import org.apache.usergrid.corepersistence.pipeline.read.EdgePath; import org.apache.usergrid.corepersistence.pipeline.read.FilterResult; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; import org.apache.usergrid.persistence.model.entity.Id; @@ -42,18 +45,21 @@ /** * Command for reading graph edges */ -public abstract class AbstractReadGraphFilter extends AbstractPathFilter { +public abstract class AbstractReadGraphFilter extends AbstractPathFilter { private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class ); private final GraphManagerFactory graphManagerFactory; + private final AsyncEventService asyncEventService; /** * Create a new instance of our command */ - public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory ) { + public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory, + final AsyncEventService asyncEventService ) { this.graphManagerFactory = graphManagerFactory; + this.asyncEventService = asyncEventService; } @@ -61,9 +67,11 @@ public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory ) public Observable> call( final Observable> previousIds ) { + final ApplicationScope applicationScope = pipelineContext.getApplicationScope(); + //get the graph manager final GraphManager graphManager = - graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() ); + graphManagerFactory.createEdgeManager( applicationScope ); final String edgeName = getEdgeTypeName(); @@ -74,18 +82,60 @@ public Observable> call( final Observable> pre return previousIds.flatMap( previousFilterValue -> { //set our our constant state - final Optional startFromCursor = getSeekValue(); + final Optional startFromCursor = getSeekValue(); final Id id = previousFilterValue.getValue(); + final Optional typeWrapper = Optional.fromNullable(startFromCursor.orNull()); + + /** + * We do not want to filter. This is intentional DO NOT REMOVE!!! + * + * We want to fire events on these edges if they exist, the delete was missed. + */ final SimpleSearchByEdgeType search = new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, - startFromCursor ); + typeWrapper, false ); /** * TODO, pass a message with pointers to our cursor values to be generated later */ - return graphManager.loadEdgesFromSource( search ) + return graphManager.loadEdgesFromSource( search ).filter(markedEdge -> { + + final boolean isDeleted = markedEdge.isDeleted(); + final boolean isSourceNodeDeleted = markedEdge.isSourceNodeDelete(); + final boolean isTargetNodeDelete = markedEdge.isTargetNodeDeleted(); + + + + if(isDeleted){ + logger.trace( "Edge {} is deleted, queueing the delete event", markedEdge ); + asyncEventService.queueDeleteEdge( applicationScope, markedEdge ); + } + + if(isSourceNodeDeleted){ + final Id sourceNodeId = markedEdge.getSourceNode(); + + logger.trace( "Edge {} has a deleted source node, queueing the delete entity event for id {}", markedEdge, sourceNodeId ); + + asyncEventService.queueEntityDelete( applicationScope, sourceNodeId ); + } + + if(isTargetNodeDelete){ + + final Id targetNodeId = markedEdge.getTargetNode(); + + logger.trace( "Edge {} has a deleted target node, queueing the delete entity event for id {}", markedEdge, targetNodeId ); + + asyncEventService.queueEntityDelete( applicationScope, targetNodeId ); + } + + + //filter if any of them are marked + return !isDeleted && !isSourceNodeDeleted && !isTargetNodeDelete; + + + }) //set the edge state for cursors .doOnNext( edge -> { logger.trace( "Seeking over edge {}", edge ); @@ -100,7 +150,7 @@ public Observable> call( final Observable> pre @Override - protected FilterResult createFilterResult( final Id emit, final Edge cursorValue, + protected FilterResult createFilterResult( final Id emit, final MarkedEdge cursorValue, final Optional parent ) { //if it's our first pass, there's no cursor to generate @@ -113,7 +163,7 @@ protected FilterResult createFilterResult( final Id emit, final Edge cursorV @Override - protected CursorSerializer getCursorSerializer() { + protected CursorSerializer getCursorSerializer() { return EdgeCursorSerializer.INSTANCE; } @@ -131,14 +181,14 @@ protected CursorSerializer getCursorSerializer() { */ private final class EdgeState { - private Edge cursorEdge = null; - private Edge currentEdge = null; + private MarkedEdge cursorEdge = null; + private MarkedEdge currentEdge = null; /** * Update the pointers */ - private void update( final Edge newEdge ) { + private void update( final MarkedEdge newEdge ) { cursorEdge = currentEdge; currentEdge = newEdge; } @@ -147,7 +197,7 @@ private void update( final Edge newEdge ) { /** * Get the edge to use in cursors for resume */ - private Edge getCursorEdge() { + private MarkedEdge getCursorEdge() { return cursorEdge; } } diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java index 8d9bf6fde0..d54e54742f 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EdgeCursorSerializer.java @@ -22,20 +22,22 @@ import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer; import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.impl.SimpleEdge; +import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge; /** * Edge cursor serializer */ -public class EdgeCursorSerializer extends AbstractCursorSerializer { +public class EdgeCursorSerializer extends AbstractCursorSerializer { public static final EdgeCursorSerializer INSTANCE = new EdgeCursorSerializer(); @Override - protected Class getType() { - return SimpleEdge.class; + protected Class getType() { + return SimpleMarkedEdge.class; } diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java index dc39f5cb46..db5a0a857f 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java @@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import com.google.inject.Inject; @@ -40,8 +41,8 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter { * Create a new instance of our command */ @Inject - public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName ) { - super( graphManagerFactory ); + public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String collectionName ) { + super( graphManagerFactory, asyncEventService ); this.collectionName = collectionName; } diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java index 61ba4adc08..054a52bac6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionByTypeFilter.java @@ -26,6 +26,7 @@ import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType; import org.apache.usergrid.persistence.model.entity.Id; @@ -42,7 +43,7 @@ /** * Command for reading graph edges on a connection */ -public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter{ +public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter{ private final GraphManagerFactory graphManagerFactory; private final String connectionName; @@ -77,12 +78,14 @@ public Observable> call( final Observable> fil return filterResultObservable.flatMap( idFilterResult -> { //set our our constant state - final Optional startFromCursor = getSeekValue(); + final Optional startFromCursor = getSeekValue(); final Id id = idFilterResult.getValue(); + final Optional typeWrapper = Optional.fromNullable(startFromCursor.orNull()); + final SimpleSearchByIdType search = new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, - entityType, startFromCursor ); + entityType, typeWrapper ); return graphManager.loadEdgesFromSourceByType( search ).map( edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() )); @@ -91,7 +94,7 @@ public Observable> call( final Observable> fil @Override - protected CursorSerializer getCursorSerializer() { + protected CursorSerializer getCursorSerializer() { return EdgeCursorSerializer.INSTANCE; } diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java index 11ec5f8090..93e8fd466e 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java @@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.persistence.graph.GraphManagerFactory; import com.google.inject.Inject; @@ -40,8 +41,8 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter { * Create a new instance of our command */ @Inject - public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String connectionName ) { - super( graphManagerFactory ); + public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String connectionName ) { + super( graphManagerFactory, asyncEventService ); this.connectionName = connectionName; } diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java index 2863cbf326..74f9ce0ab3 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java @@ -24,6 +24,7 @@ import java.util.UUID; import org.apache.usergrid.persistence.EntityManagerFactory; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.index.*; import org.junit.Before; import org.junit.Rule; @@ -135,7 +136,7 @@ public void testMessageIndexing() throws InterruptedException { */ - final List connectionSearchEdges = Observable.range( 0, 500 ).flatMap(integer -> { + final List connectionSearchEdges = Observable.range( 0, 500 ).flatMap(integer -> { final Id connectingId = createId("connecting"); final Edge edge = CpNamingUtils.createConnectionEdge(connectingId, "likes", testEntity.getId()); diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java index 6001dd4e60..90d6c5a052 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.UUID; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.index.impl.IndexProducer; import org.junit.Before; import org.junit.Test; @@ -244,7 +245,7 @@ public void testConnectingIndexingBatches() throws InterruptedException { // final int edgeCount = indexFig.getIndexBatchSize()*2; final int edgeCount = 100; - final List connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> { + final List connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> { final Id connectingId = createId( "connecting" ); final Edge edge = CpNamingUtils.createConnectionEdge( connectingId, "likes", testEntity.getId() ); @@ -377,7 +378,8 @@ public void testDeleteMultipleConnectingEdges() throws InterruptedException { //Write multiple connection edges final int edgeCount = 5; - final List connectionSearchEdges = createConnectionSearchEdges( testEntity, graphManager, edgeCount ); + final List + connectionSearchEdges = createConnectionSearchEdges( testEntity, graphManager, edgeCount ); indexService.indexEntity( applicationScope, testEntity ).flatMap(mesage -> indexProducer.put(mesage)).toBlocking().getIterator(); @@ -485,10 +487,10 @@ private Edge createEntityandCollectionEdge( final ApplicationScope applicationSc } - private List createConnectionSearchEdges( - final Entity testEntity, final GraphManager graphManager, final int edgeCount ) { + private List createConnectionSearchEdges( final Entity testEntity, final GraphManager graphManager, + final int edgeCount ) { - final List connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> { + final List connectionSearchEdges = Observable.range( 0, edgeCount ).flatMap( integer -> { //create our connection edge. final Id connectingId = createId( "connecting" ); diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java index 7128dcf07e..c9dcbf1662 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorTest.java @@ -28,7 +28,9 @@ import org.apache.usergrid.corepersistence.pipeline.read.search.ElasticsearchCursorSerializer; import org.apache.usergrid.corepersistence.pipeline.read.traverse.EdgeCursorSerializer; import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.impl.SimpleEdge; +import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge; import com.google.common.base.Optional; @@ -40,19 +42,12 @@ public class CursorTest { @Test public void testCursors(){ + //test encoding edge + final MarkedEdge edge1 = new SimpleMarkedEdge( createId("source1"), "edgeType1", createId("target1"), 100, false, false, false ); - - - - - //test encoding edge - - final Edge edge1 = new SimpleEdge( createId("source1"), "edgeType1", createId("target1"), 100 ); - - - final Edge edge2 = new SimpleEdge( createId("source2"), "edgeType2", createId("target2"), 110 ); + final MarkedEdge edge2 = new SimpleMarkedEdge( createId("source2"), "edgeType2", createId("target2"), 110, false, false, false ); @@ -64,11 +59,12 @@ public void testCursors(){ final EdgePath filter3Path = new EdgePath<>( 3, query2, ElasticsearchCursorSerializer.INSTANCE, Optional.absent() ); - final EdgePath filter2Path = new EdgePath(2, edge2, EdgeCursorSerializer.INSTANCE, Optional.of( filter3Path )); + final EdgePath filter2Path = + new EdgePath<>( 2, edge2, EdgeCursorSerializer.INSTANCE, Optional.of( filter3Path ) ); final EdgePath filter1Path = new EdgePath<>( 1, query1, ElasticsearchCursorSerializer.INSTANCE, Optional.of(filter2Path) ); - final EdgePath filter0Path = new EdgePath<>( 0, edge1, EdgeCursorSerializer.INSTANCE, Optional.of( filter1Path ) ); + final EdgePath filter0Path = new EdgePath<>( 0, edge1, EdgeCursorSerializer.INSTANCE, Optional.of( filter1Path ) ); @@ -91,7 +87,7 @@ public void testCursors(){ assertEquals(query2, parsedQuery2); - final Edge parsedEdge2 = requestCursor.getCursor( 2, EdgeCursorSerializer.INSTANCE ); + final MarkedEdge parsedEdge2 = requestCursor.getCursor( 2, EdgeCursorSerializer.INSTANCE ); assertEquals( edge2, parsedEdge2 ); @@ -100,7 +96,7 @@ public void testCursors(){ assertEquals( query1, parsedQuery1 ); - final Edge parsedEdge1 = requestCursor.getCursor( 0, EdgeCursorSerializer.INSTANCE ); + final MarkedEdge parsedEdge1 = requestCursor.getCursor( 0, EdgeCursorSerializer.INSTANCE ); assertEquals(edge1, parsedEdge1); diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java index 326e128b02..6929d878ea 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/service/ConnectionServiceImplTest.java @@ -36,6 +36,7 @@ import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; @@ -118,7 +119,7 @@ public void testSingleConnection() { new SimpleSearchByEdge( source, connectionEdge.getType(), target, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ); - final List edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last(); + final List edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last(); assertEquals( 1, edges.size() ); @@ -209,7 +210,7 @@ public void testDuplicateConnections() { SearchByEdgeType.Order.DESCENDING, Optional.absent() ); //check only 1 exists - final List edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last(); + final List edges = gm.loadEdgeVersions( simpleSearchByEdge ).toList().toBlocking().last(); assertEquals( 1, edges.size() ); diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java index d870114fc0..658d3ebad5 100644 --- a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java +++ b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java @@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; @@ -86,7 +87,8 @@ public void testDeletes() throws Exception{ , Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ); - Iterator results = graphManager.loadEdgesFromSource(simpleSearchByEdgeType).toBlocking().getIterator(); + Iterator + results = graphManager.loadEdgesFromSource(simpleSearchByEdgeType).toBlocking().getIterator(); if(results.hasNext()){ Assert.fail("should be empty"); diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java index 6100725828..000c633374 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java @@ -59,7 +59,7 @@ public interface GraphManager extends CPManager { * Create or update an edge. Note that the implementation should also create incoming (reversed) edges for this * edge. */ - Observable writeEdge( Edge edge ); + Observable writeEdge( Edge edge ); /** @@ -68,7 +68,7 @@ public interface GraphManager extends CPManager { * * Implementation should also mark the incoming (reversed) edge. Only marks the specific version */ - Observable markEdge( Edge edge ); + Observable markEdge( Edge edge ); /** * @param edge Remove the edge in the graph @@ -98,7 +98,7 @@ public interface GraphManager extends CPManager { /** * Get all versions of this edge where versions <= max version */ - Observable loadEdgeVersions( SearchByEdge edge ); + Observable loadEdgeVersions( SearchByEdge edge ); /** * Returns an observable that emits all edges where the specified node is the source node. The edges will match the @@ -108,7 +108,7 @@ public interface GraphManager extends CPManager { * * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption. */ - Observable loadEdgesFromSource( SearchByEdgeType search ); + Observable loadEdgesFromSource( SearchByEdgeType search ); /** * Returns an observable that emits all edges where the specified node is the target node. The edges will match the @@ -118,7 +118,7 @@ public interface GraphManager extends CPManager { * * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption. */ - Observable loadEdgesToTarget( SearchByEdgeType search ); + Observable loadEdgesToTarget( SearchByEdgeType search ); /** @@ -129,7 +129,7 @@ public interface GraphManager extends CPManager { * * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption. */ - Observable loadEdgesFromSourceByType( SearchByIdType search ); + Observable loadEdgesFromSourceByType( SearchByIdType search ); /** @@ -140,7 +140,7 @@ public interface GraphManager extends CPManager { * * @return An observable that emits Edges. The observer will need to unsubscribe when it has completed consumption. */ - Observable loadEdgesToTargetByType( SearchByIdType search ); + Observable loadEdgesToTargetByType( SearchByIdType search ); /** * Get all edge types to this node. The node provided by search is the target node. diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java index 4b5eeaa001..54d1596dd4 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java @@ -20,7 +20,6 @@ package org.apache.usergrid.persistence.graph; -import org.apache.usergrid.persistence.graph.impl.SimpleEdge; import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; @@ -30,6 +29,7 @@ * An edge. With the additional info of if it is marked for deletion * */ +@JsonDeserialize(as = SimpleMarkedEdge.class) public interface MarkedEdge extends Edge{ /** @@ -38,4 +38,17 @@ public interface MarkedEdge extends Edge{ */ boolean isDeleted(); + /** + * Return true if the source node is deleted + * @return + */ + boolean isSourceNodeDelete(); + + /** + * Return true if the target node is deleted + * @return + */ + boolean isTargetNodeDeleted(); + + } diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java index e119c59e72..93ae7535b2 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java @@ -144,12 +144,12 @@ public GraphManagerImpl( final EdgeMetadataSerialization edgeMetadataSerializati @Override - public Observable writeEdge( final Edge edge ) { + public Observable writeEdge( final Edge edge ) { GraphValidation.validateEdge( edge ); final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, false ); - final Observable observable = Observable.just( markedEdge ).map( edge1 -> { + final Observable observable = Observable.just( markedEdge ).map( edge1 -> { final UUID timestamp = UUIDGenerator.newTimeUUID(); @@ -175,12 +175,12 @@ public Observable writeEdge( final Edge edge ) { @Override - public Observable markEdge( final Edge edge ) { + public Observable markEdge( final Edge edge ) { GraphValidation.validateEdge( edge ); final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true ); - final Observable observable = Observable.just( markedEdge ).map( edge1 -> { + final Observable observable = Observable.just( markedEdge ).map( edge1 -> { final UUID timestamp = UUIDGenerator.newTimeUUID(); @@ -282,46 +282,46 @@ public Observable compactNode( final Id inputNode ) { @Override - public Observable loadEdgeVersions( final SearchByEdge searchByEdge ) { + public Observable loadEdgeVersions( final SearchByEdge searchByEdge ) { - final Observable edges = + final Observable edges = Observable.create( new ObservableIterator( "getEdgeTypesFromSource" ) { @Override protected Iterator getIterator() { return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge ); } } ).buffer( graphFig.getScanPageSize() ) - .compose( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) ); + .compose( new EdgeBufferFilter( searchByEdge.filterMarked() ) ); return ObservableTimer.time( edges, loadEdgesVersionsTimer ); } @Override - public Observable loadEdgesFromSource( final SearchByEdgeType search ) { - final Observable edges = - Observable.create( new ObservableIterator( "getEdgeTypesFromSource" ) { + public Observable loadEdgesFromSource( final SearchByEdgeType search ) { + final Observable edges = + Observable.create( new ObservableIterator( "loadEdgesFromSource" ) { @Override protected Iterator getIterator() { return storageEdgeSerialization.getEdgesFromSource( scope, search ); } } ).buffer( graphFig.getScanPageSize() ) - .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); + .compose( new EdgeBufferFilter( search.filterMarked() ) ); return ObservableTimer.time( edges, loadEdgesFromSourceTimer ); } @Override - public Observable loadEdgesToTarget( final SearchByEdgeType search ) { - final Observable edges = - Observable.create( new ObservableIterator( "getEdgeTypesFromSource" ) { + public Observable loadEdgesToTarget( final SearchByEdgeType search ) { + final Observable edges = + Observable.create( new ObservableIterator( "loadEdgesToTarget" ) { @Override protected Iterator getIterator() { return storageEdgeSerialization.getEdgesToTarget( scope, search ); } } ).buffer( graphFig.getScanPageSize() ) - .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); + .compose( new EdgeBufferFilter( search.filterMarked() ) ); return ObservableTimer.time( edges, loadEdgesToTargetTimer ); @@ -329,30 +329,30 @@ protected Iterator getIterator() { @Override - public Observable loadEdgesFromSourceByType( final SearchByIdType search ) { - final Observable edges = - Observable.create( new ObservableIterator( "getEdgeTypesFromSource" ) { + public Observable loadEdgesFromSourceByType( final SearchByIdType search ) { + final Observable edges = + Observable.create( new ObservableIterator( "loadEdgesFromSourceByType" ) { @Override protected Iterator getIterator() { return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search ); } } ).buffer( graphFig.getScanPageSize() ) - .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); + .compose( new EdgeBufferFilter( search.filterMarked() ) ); return ObservableTimer.time( edges, loadEdgesFromSourceByTypeTimer ); } @Override - public Observable loadEdgesToTargetByType( final SearchByIdType search ) { - final Observable edges = - Observable.create( new ObservableIterator( "getEdgeTypesFromSource" ) { + public Observable loadEdgesToTargetByType( final SearchByIdType search ) { + final Observable edges = + Observable.create( new ObservableIterator( "loadEdgesToTargetByType" ) { @Override protected Iterator getIterator() { return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search ); } } ).buffer( graphFig.getScanPageSize() ) - .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ); + .compose( new EdgeBufferFilter( search.filterMarked() ) ); return ObservableTimer.time( edges, loadEdgesToTargetByTypeTimer ); } @@ -420,12 +420,10 @@ private class EdgeBufferFilter implements Observable.Transformer, MarkedEdge> {//implements Func1, // Observable> { - private final long maxVersion; private final boolean filterMarked; - private EdgeBufferFilter( final long maxVersion, final boolean filterMarked ) { - this.maxVersion = maxVersion; + private EdgeBufferFilter( final boolean filterMarked ) { this.filterMarked = filterMarked; } @@ -444,23 +442,16 @@ private EdgeBufferFilter( final long maxVersion, final boolean filterMarked ) { final Observable markedEdgeObservable = Observable.from( markedEdges ); - /** - * We aren't going to filter anything, return exactly what we're passed - */ - if(!filterMarked){ - return markedEdgeObservable; - } - //We need to filter, perform that filter final Map markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges ); - return markedEdgeObservable.filter( edge -> { - final long edgeTimestamp = edge.getTimestamp(); + return markedEdgeObservable.map( edge -> { - //our edge needs to not be deleted and have a version that's > max Version - if ( edge.isDeleted() ) { - return false; - } + /** + * Make sure we mark source and target deleted nodes as such + */ + + final long edgeTimestamp = edge.getTimestamp(); final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() ); @@ -468,22 +459,29 @@ private EdgeBufferFilter( final long maxVersion, final boolean filterMarked ) { //the source Id has been marked for deletion. It's version is <= to the marked version for // deletion, // so we need to discard it - if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) { - return false; - } + final boolean isSourceDeleted = ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ); final Long targetTimestamp = markedVersions.get( edge.getTargetNode() ); //the target Id has been marked for deletion. It's version is <= to the marked version for // deletion, // so we need to discard it - if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) { - return false; + final boolean isTargetDeleted = ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ); + + //one has been marked for deletion, return it + if(isSourceDeleted || isTargetDeleted){ + return new SimpleMarkedEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), edge.isDeleted(), isSourceDeleted, isTargetDeleted ); } + return edge; + } ).filter( simpleMarkedEdge -> { + if(!filterMarked){ + return true; + } - return true; - } ); + //if any one of these is true, we filter it + return !simpleMarkedEdge.isDeleted() && !simpleMarkedEdge.isSourceNodeDelete() && !simpleMarkedEdge.isTargetNodeDeleted(); + }); } ); } } diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java index 29d90eb1f8..9c35e2e173 100644 --- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java +++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java @@ -29,29 +29,63 @@ /** * Simple bean to represent our edge + * * @author tnine */ -public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge { +public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge { + + private boolean isDeleted; + private boolean isSourceNodeDeleted; + private boolean isTargetNodeDeleted; - private final boolean deleted; + /** + * Unused but required for Jackson + */ + public SimpleMarkedEdge() { + } - public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp, final boolean deleted) { - super(sourceNode, type, targetNode, timestamp); - this.deleted = deleted; + public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp, + final boolean isDeleted ) { + + this( sourceNode, type, targetNode, timestamp, isDeleted, false, false ); } - public SimpleMarkedEdge(final Edge edge, final boolean deleted){ - this(edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), deleted); + public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp, + final boolean isDeleted, final boolean isSourceNodeDeleted, + final boolean isTargetNodeDeleted ) { + super( sourceNode, type, targetNode, timestamp ); + this.isDeleted = isDeleted; + this.isSourceNodeDeleted = isSourceNodeDeleted; + this.isTargetNodeDeleted = isTargetNodeDeleted; + } + + + public SimpleMarkedEdge( final Edge edge, final boolean isDeleted ) { + this( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), isDeleted ); } @Override @JsonIgnore public boolean isDeleted() { - return deleted; + return isDeleted; + } + + + @Override + @JsonIgnore + public boolean isSourceNodeDelete() { + return isSourceNodeDeleted; + } + + + @Override + @JsonIgnore + public boolean isTargetNodeDeleted() { + return isTargetNodeDeleted; } @@ -69,7 +103,7 @@ public boolean equals( final Object o ) { final SimpleMarkedEdge that = ( SimpleMarkedEdge ) o; - if ( deleted != that.deleted ) { + if ( isDeleted != that.isDeleted ) { return false; } @@ -80,7 +114,9 @@ public boolean equals( final Object o ) { @Override public int hashCode() { int result = super.hashCode(); - result = 31 * result + ( deleted ? 1 : 0 ); + result = 31 * result + ( isDeleted ? 1 : 0 ); + result = 31 * result + ( isSourceNodeDeleted ? 1 : 0 ); + result = 31 * result + ( isTargetNodeDeleted ? 1 : 0 ); return result; } @@ -88,8 +124,10 @@ public int hashCode() { @Override public String toString() { return "SimpleMarkedEdge{" + - "deleted=" + deleted + - "} " + super.toString(); + "deleted=" + isDeleted + + ", isSourceNodeDeleted=" + isSourceNodeDeleted + + ", isTargetNodeDeleted=" + isTargetNodeDeleted + + "} " + super.toString(); } } diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java index eda3a02015..9a95c2689c 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java @@ -95,7 +95,7 @@ public void testWriteReadEdgeTypeSource() throws TimeoutException, InterruptedEx SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), null ); - Observable edges = gm.loadEdgesFromSource( search ); + Observable edges = gm.loadEdgesFromSource( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlocking().last(); @@ -127,7 +127,7 @@ public void testWriteReadEdgeTypeTarget() throws TimeoutException, InterruptedEx SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), null ); - Observable edges = gm.loadEdgesToTarget( search ); + Observable edges = gm.loadEdgesToTarget( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlocking().single(); @@ -161,7 +161,7 @@ public void testWriteReadEdgeTypeVersionSource() throws TimeoutException, Interr SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), null ); - Observable edges = gm.loadEdgesFromSource( search ); + Observable edges = gm.loadEdgesFromSource( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlocking().single(); @@ -196,7 +196,7 @@ public void testWriteReadEdgeTypeVersionTarget() throws TimeoutException, Interr SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), null ); - Observable edges = gm.loadEdgesToTarget( search ); + Observable edges = gm.loadEdgesToTarget( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlocking().single(); @@ -248,10 +248,10 @@ public void testWriteReadEdgeTypeVersionSourceDistinct() throws TimeoutException SearchByEdgeType search = createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null ); - Observable edges = gm.loadEdgesFromSource( search ); + Observable edges = gm.loadEdgesFromSource( search ); //implicitly blows up if more than 1 is returned from "single" - Iterator returned = edges.toBlocking().getIterator(); + Iterator returned = edges.toBlocking().getIterator(); assertEquals( "Correct edge returned", edge3, returned.next() ); assertEquals( "Correct edge returned", edge2, returned.next() ); @@ -321,10 +321,10 @@ public void testWriteReadEdgeTypeVersionTargetDistinct() throws TimeoutException SearchByEdgeType search = createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null ); - Observable edges = gm.loadEdgesToTarget( search ); + Observable edges = gm.loadEdgesToTarget( search ); //implicitly blows up if more than 1 is returned from "single" - Iterator returned = edges.toBlocking().getIterator(); + Iterator returned = edges.toBlocking().getIterator(); assertEquals( "Correct edge returned", edge3, returned.next() ); assertEquals( "Correct edge returned", edge2, returned.next() ); @@ -387,10 +387,10 @@ public void testWriteReadEdgeTypePagingSource() throws TimeoutException, Interru SearchByEdgeType search = createSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge3.getTimestamp(), null ); - Observable edges = gm.loadEdgesFromSource( search ); + Observable edges = gm.loadEdgesFromSource( search ); //implicitly blows up if more than 1 is returned from "single" - Iterator returned = edges.toBlocking().getIterator(); + Iterator returned = edges.toBlocking().getIterator(); //we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first @@ -443,10 +443,10 @@ public void testWriteReadEdgeTypePagingTarget() { SearchByEdgeType search = createSearchByEdge( edge1.getTargetNode(), edge1.getType(), edge3.getTimestamp(), null ); - Observable edges = gm.loadEdgesToTarget( search ); + Observable edges = gm.loadEdgesToTarget( search ); //implicitly blows up if more than 1 is returned from "single" - Iterator returned = edges.toBlocking().getIterator(); + Iterator returned = edges.toBlocking().getIterator(); //we have 3 edges, but we specified our first edge as the max, we shouldn't get any more results than the first @@ -487,7 +487,7 @@ public void testWriteReadEdgeTypeTargetTypeSource() { SearchByIdType search = createSearchByEdgeAndId( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), edge.getTargetNode().getType(), null ); - Observable edges = gm.loadEdgesFromSourceByType( search ); + Observable edges = gm.loadEdgesFromSourceByType( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlocking().single(); @@ -524,7 +524,7 @@ public void testWriteReadEdgeTypeTargetTypeTarget() { SearchByIdType search = createSearchByEdgeAndId( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), edge.getSourceNode().getType(), null ); - Observable edges = gm.loadEdgesToTargetByType( search ); + Observable edges = gm.loadEdgesToTargetByType( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlocking().single(); @@ -560,7 +560,7 @@ public void testWriteReadEdgeDeleteSource() { SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTimestamp(), null ); - Observable edges = gm.loadEdgesFromSource( search ); + Observable edges = gm.loadEdgesFromSource( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlocking().single(); @@ -629,7 +629,7 @@ public void testWriteReadEdgeDeleteTarget() { SearchByEdgeType search = createSearchByEdge( edge.getTargetNode(), edge.getType(), edge.getTimestamp(), null ); - Observable edges = gm.loadEdgesToTarget( search ); + Observable edges = gm.loadEdgesToTarget( search ); //implicitly blows up if more than 1 is returned from "single" Edge returned = edges.toBlocking().single(); @@ -988,11 +988,11 @@ public void testMarkSourceEdges() throws InterruptedException { //get our 2 edges - Observable edges = + Observable edges = gm.loadEdgesFromSource( createSearchByEdge( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) ); - Iterator results = edges.toBlocking().getIterator(); + Iterator results = edges.toBlocking().getIterator(); assertEquals( "Edges correct", edge2, results.next() ); @@ -1060,11 +1060,11 @@ public void testMarkTargetEdges() { //get our 2 edges - Observable edges = + Observable edges = gm.loadEdgesToTarget( createSearchByEdge( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) ); - Iterator results = edges.toBlocking().getIterator(); + Iterator results = edges.toBlocking().getIterator(); assertEquals( "Edges correct", edge2, results.next() ); @@ -1140,11 +1140,11 @@ public void testMarkCompactSourceEdges() throws InterruptedException { //get our 2 edges - Observable edges = gm.loadEdgesFromSource( + Observable edges = gm.loadEdgesFromSource( createSearchByEdgeUnfiltered( edge1.getSourceNode(), edge1.getType(), maxVersion, null ) ); - Iterator results = edges.toBlocking().getIterator(); + Iterator results = edges.toBlocking().getIterator(); assertEquals( "Edges correct", edge2.getTargetNode(), results.next().getTargetNode() ); @@ -1211,11 +1211,11 @@ public void testMarkCompactTargetEdges() { gm.markEdge( edge1 ).toBlocking().last(); //get our 2 edges - Observable edges = gm.loadEdgesToTarget( + Observable edges = gm.loadEdgesToTarget( createSearchByEdgeUnfiltered( edge1.getTargetNode(), edge1.getType(), maxVersion, null ) ); - Iterator results = edges.toBlocking().getIterator(); + Iterator results = edges.toBlocking().getIterator(); assertEquals( "Edges correct", edge2.getSourceNode(), results.next().getSourceNode() ); @@ -1282,11 +1282,11 @@ public void testMarkSourceEdgesType() { //get our 2 edges - Observable edges = gm.loadEdgesFromSourceByType( + Observable edges = gm.loadEdgesFromSourceByType( createSearchByEdgeAndId( sourceId, edge1.getType(), maxVersion, targetId1.getType(), null ) ); - Iterator results = edges.toBlocking().getIterator(); + Iterator results = edges.toBlocking().getIterator(); assertEquals( "Edges correct", edge1, results.next() ); @@ -1359,11 +1359,11 @@ public void testMarkTargetEdgesType() { final long maxVersion = System.currentTimeMillis(); //get our 2 edges - Observable edges = gm.loadEdgesToTargetByType( + Observable edges = gm.loadEdgesToTargetByType( createSearchByEdgeAndId( targetId, edge1.getType(), maxVersion, sourceId1.getType(), null ) ); - Iterator results = edges.toBlocking().getIterator(); + Iterator results = edges.toBlocking().getIterator(); assertEquals( "Edges correct", edge1, results.next() ); @@ -1435,7 +1435,7 @@ public void markSourceNode() { final long maxVersion = System.currentTimeMillis(); - Iterator results = + Iterator results = gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking() .getIterator(); @@ -1547,7 +1547,7 @@ public void markCreateSourceNode() { final long maxVersion = System.currentTimeMillis(); - Iterator results = + Iterator results = gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking() .getIterator(); @@ -1627,7 +1627,7 @@ public void markTargetNode() { final long maxVersion = System.currentTimeMillis(); - Iterator results = + Iterator results = gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking() .getIterator(); @@ -1764,7 +1764,7 @@ public void markCreateTargetNode() { final long maxVersion = System.currentTimeMillis(); - Iterator results = + Iterator results = gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking() .getIterator(); @@ -1842,7 +1842,7 @@ public void markDeleteSourceNode() { final long maxVersion = System.currentTimeMillis(); - Iterator results = + Iterator results = gm.loadEdgesFromSource( createSearchByEdge( sourceId, edge1.getType(), maxVersion, null ) ).toBlocking() .getIterator(); @@ -1983,7 +1983,7 @@ public void markDeleteTargetNode() { final long maxVersion = System.currentTimeMillis(); - Iterator results = + Iterator results = gm.loadEdgesToTarget( createSearchByEdge( targetId, edge1.getType(), maxVersion, null ) ).toBlocking() .getIterator(); @@ -2356,10 +2356,10 @@ public void testReadMultipleVersionOrder() { new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ); - final Observable edgesDescending = gm.loadEdgeVersions( searchDescending ); + final Observable edgesDescending = gm.loadEdgeVersions( searchDescending ); //search descending - final List descending = edgesDescending.toList().toBlocking().single(); + final List descending = edgesDescending.toList().toBlocking().single(); assertEquals( "Correct size returned", 3, descending.size() ); @@ -2376,9 +2376,9 @@ public void testReadMultipleVersionOrder() { new SimpleSearchByEdge( edge1.getSourceNode(), edge1.getType(), edge1.getTargetNode(), 0, SearchByEdgeType.Order.ASCENDING, Optional.absent() ); - Observable edgesAscending = gm.loadEdgeVersions( searchAscending ); + Observable edgesAscending = gm.loadEdgeVersions( searchAscending ); - List ascending = edgesAscending.toList().toBlocking().single(); + List ascending = edgesAscending.toList().toBlocking().single(); assertEquals( "Correct size returned", 3, ascending.size() ); diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java index b922e7cb86..22683f658b 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java @@ -113,7 +113,7 @@ public Edge newEdge() { @Override - public Observable doSearch( final GraphManager manager ) { + public Observable doSearch( final GraphManager manager ) { return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, Optional .absent()) ); } @@ -141,7 +141,7 @@ public Edge newEdge() { @Override - public Observable doSearch( final GraphManager manager ) { + public Observable doSearch( final GraphManager manager ) { return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, Optional.absent() ) ); } }; @@ -220,7 +220,7 @@ public Boolean call() throws Exception { final CountDownLatch latch = new CountDownLatch( 1 ); - generator.doSearch( manager ).take( readCount ).buffer( 1000 ).subscribe( new Subscriber>() { + generator.doSearch( manager ).take( readCount ).buffer( 1000 ).subscribe( new Subscriber>() { @Override public void onCompleted() { timer.stop(); @@ -235,7 +235,7 @@ public void onError( final Throwable throwable ) { @Override - public void onNext( final List edges ) { + public void onNext( final List edges ) { log.info("Read {} edges", edges.size()); } } ); @@ -263,6 +263,6 @@ private interface EdgeGenerator { * @param manager * @return */ - public Observable doSearch( final GraphManager manager ); + public Observable doSearch( final GraphManager manager ); } } diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java index c1917bb9a3..6aad2898bd 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java @@ -176,7 +176,7 @@ public Edge newEdge() { @Override - public Observable doSearch( final GraphManager manager ) { + public Observable doSearch( final GraphManager manager ) { return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent() ) ); @@ -409,7 +409,7 @@ public Edge newEdge() { @Override - public Observable doSearch( final GraphManager manager ) { + public Observable doSearch( final GraphManager manager ) { return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent(), false ) ); @@ -729,7 +729,7 @@ private interface EdgeGenerator { /** * Perform the search returning an observable edge */ - public Observable doSearch( final GraphManager manager ); + public Observable doSearch( final GraphManager manager ); } } diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java index 28896847a2..6a2efc9557 100644 --- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java +++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java @@ -107,25 +107,25 @@ public Edge newEdge() { @Override - public Observable doSearch( final GraphManager manager ) { + public Observable doSearch( final GraphManager manager ) { final long timestamp = System.currentTimeMillis(); - return Observable.create( new Observable.OnSubscribe() { + return Observable.create( new Observable.OnSubscribe() { @Override - public void call( final Subscriber subscriber ) { + public void call( final Subscriber subscriber ) { try { for ( Id sourceId : sourceIds ) { - final Iterable edges = manager.loadEdgesFromSource( + final Iterable edges = manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", timestamp, SearchByEdgeType.Order.DESCENDING, Optional .absent() ) ) .toBlocking().toIterable(); - for ( Edge edge : edges ) { + for ( MarkedEdge edge : edges ) { log.debug( "Firing on next for edge {}", edge ); subscriber.onNext( edge ); @@ -195,7 +195,7 @@ public Edge newEdge() { @Override - public Observable doSearch( final GraphManager manager ) { + public Observable doSearch( final GraphManager manager ) { return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, Optional.absent() ) ); } }; @@ -222,7 +222,7 @@ public Edge newEdge() { @Override - public Observable doSearch( final GraphManager manager ) { + public Observable doSearch( final GraphManager manager ) { return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, Optional.absent() ) ); } @@ -307,6 +307,6 @@ private interface EdgeGenerator { */ public Edge newEdge(); - public Observable doSearch( final GraphManager manager ); + public Observable doSearch( final GraphManager manager ); } } diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml index 4b47bc0e51..4e4648e3fd 100644 --- a/stack/corepersistence/pom.xml +++ b/stack/corepersistence/pom.xml @@ -70,7 +70,7 @@ limitations under the License. 4.11 0.26 1.2.17 - 1.0.12 + 1.0.14 1.7.2 2.16 1.10.6 diff --git a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java index d9d3d0d8e1..6e846013fc 100644 --- a/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java +++ b/stack/services/src/main/java/org/apache/usergrid/management/AppInfoMigrationPlugin.java @@ -47,6 +47,7 @@ import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.GraphManager; import org.apache.usergrid.persistence.graph.GraphManagerFactory; +import org.apache.usergrid.persistence.graph.MarkedEdge; import org.apache.usergrid.persistence.graph.SearchByEdgeType; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge; import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType; @@ -239,7 +240,7 @@ private org.apache.usergrid.persistence.Entity getApplicationInfo( final UUID ap final EntityCollectionManager managementCollectionManager = entityCollectionManagerFactory.createCollectionManager( managementAppScope ); - Observable edgesObservable = getApplicationInfoEdges( appId ); + Observable edgesObservable = getApplicationInfoEdges( appId ); //get the graph for all app infos Observable entityObs = edgesObservable.flatMap( edge -> { final Id appInfoId = edge.getTargetNode(); @@ -299,7 +300,7 @@ public Observable getOldApp } - public Observable getApplicationInfoEdges( final UUID applicationId ) { + public Observable getApplicationInfoEdges( final UUID applicationId ) { final ApplicationScope managementAppScope = getApplicationScope( CpNamingUtils.MANAGEMENT_APPLICATION_ID ); final GraphManager gm = graphManagerFactory.createEdgeManager( managementAppScope );