Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.index.EntityIndex;
Expand Down Expand Up @@ -477,9 +478,9 @@ private Map<String, UUID> getApplications(final String edgeType) throws Exceptio
logger.debug("getApplications(): Loading edges of edgeType {} from {}:{}",
new Object[]{edgeType, managementId.getType(), managementId.getUuid()});

Observable<Edge> edges = gm.loadEdgesFromSource( new SimpleSearchByEdgeType(
managementId, edgeType, Long.MAX_VALUE,
SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ));
Observable<MarkedEdge> edges = gm.loadEdgesFromSource(
new SimpleSearchByEdgeType( managementId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
Optional.<Edge>absent() ) );

final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( appScope );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.usergrid.persistence.entities.User;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
Expand Down Expand Up @@ -227,14 +228,9 @@ Map<EntityRef, Set<String>> getContainers( final int limit, final String edgeTyp

Observable<Edge> edges =
gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) )
.flatMap( new Func1<String, Observable<Edge>>() {
@Override
public Observable<Edge> call( final String edgeType ) {
return gm.loadEdgesToTarget(
new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE,
SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
}
} );
.flatMap( edgeType1 -> gm.loadEdgesToTarget(
new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType1, Long.MAX_VALUE,
SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) ) );

//if our limit is set, take them. Note this logic is still borked, we can't possibly fit everything in memmory
if ( limit > -1 ) {
Expand Down Expand Up @@ -268,7 +264,7 @@ public boolean isConnectionMember( String connectionType, EntityRef entity ) thr
} );

GraphManager gm = managerCache.getGraphManager( applicationScope );
Observable<Edge> edges = gm.loadEdgeVersions( CpNamingUtils
Observable<MarkedEdge> edges = gm.loadEdgeVersions( CpNamingUtils
.createEdgeFromConnectionType( new SimpleId( headEntity.getUuid(), headEntity.getType() ), connectionType,
entityId ) );

Expand All @@ -288,7 +284,7 @@ public boolean isCollectionMember( String collectionName, EntityRef entity ) thr
} );

GraphManager gm = managerCache.getGraphManager( applicationScope );
Observable<Edge> edges = gm.loadEdgeVersions( CpNamingUtils
Observable<MarkedEdge> edges = gm.loadEdgeVersions( CpNamingUtils
.createEdgeFromCollectionName( new SimpleId( headEntity.getUuid(), headEntity.getType() ), collectionName,
entityId ) );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ else if ( event instanceof EdgeIndexEvent ) {
}
else if ( event instanceof EntityDeleteEvent ) {
indexoperationObservable = handleEntityDelete( message );
validateEmptySets = false; // do not check this one for an empty set b/c it can be empty

}
else if ( event instanceof EntityIndexEvent ) {
indexoperationObservable = handleEntityIndexUpdate( message );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,11 @@ public Optional<EdgePath> getPath() {
}


@Override
public String toString() {
return "FilterResult{" +
"path=" + path +
", value=" + value +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.model.entity.Id;
Expand All @@ -42,28 +45,33 @@
/**
* Command for reading graph edges
*/
public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> {
public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, MarkedEdge> {

private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class );

private final GraphManagerFactory graphManagerFactory;
private final AsyncEventService asyncEventService;


/**
* Create a new instance of our command
*/
public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory ) {
public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory,
final AsyncEventService asyncEventService ) {
this.graphManagerFactory = graphManagerFactory;
this.asyncEventService = asyncEventService;
}


@Override
public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) {


final ApplicationScope applicationScope = pipelineContext.getApplicationScope();

//get the graph manager
final GraphManager graphManager =
graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
graphManagerFactory.createEdgeManager( applicationScope );


final String edgeName = getEdgeTypeName();
Expand All @@ -74,18 +82,60 @@ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> pre
return previousIds.flatMap( previousFilterValue -> {

//set our our constant state
final Optional<Edge> startFromCursor = getSeekValue();
final Optional<MarkedEdge> startFromCursor = getSeekValue();
final Id id = previousFilterValue.getValue();


final Optional<Edge> typeWrapper = Optional.fromNullable(startFromCursor.orNull());

/**
* We do not want to filter. This is intentional DO NOT REMOVE!!!
*
* We want to fire events on these edges if they exist, the delete was missed.
*/
final SimpleSearchByEdgeType search =
new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
startFromCursor );
typeWrapper, false );

/**
* TODO, pass a message with pointers to our cursor values to be generated later
*/
return graphManager.loadEdgesFromSource( search )
return graphManager.loadEdgesFromSource( search ).filter(markedEdge -> {

final boolean isDeleted = markedEdge.isDeleted();
final boolean isSourceNodeDeleted = markedEdge.isSourceNodeDelete();
final boolean isTargetNodeDelete = markedEdge.isTargetNodeDeleted();



if(isDeleted){
logger.trace( "Edge {} is deleted, queueing the delete event", markedEdge );
asyncEventService.queueDeleteEdge( applicationScope, markedEdge );
}

if(isSourceNodeDeleted){
final Id sourceNodeId = markedEdge.getSourceNode();

logger.trace( "Edge {} has a deleted source node, queueing the delete entity event for id {}", markedEdge, sourceNodeId );

asyncEventService.queueEntityDelete( applicationScope, sourceNodeId );
}

if(isTargetNodeDelete){

final Id targetNodeId = markedEdge.getTargetNode();

logger.trace( "Edge {} has a deleted target node, queueing the delete entity event for id {}", markedEdge, targetNodeId );

asyncEventService.queueEntityDelete( applicationScope, targetNodeId );
}


//filter if any of them are marked
return !isDeleted && !isSourceNodeDeleted && !isTargetNodeDelete;


})
//set the edge state for cursors
.doOnNext( edge -> {
logger.trace( "Seeking over edge {}", edge );
Expand All @@ -100,7 +150,7 @@ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> pre


@Override
protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorValue,
protected FilterResult<Id> createFilterResult( final Id emit, final MarkedEdge cursorValue,
final Optional<EdgePath> parent ) {

//if it's our first pass, there's no cursor to generate
Expand All @@ -113,7 +163,7 @@ protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorV


@Override
protected CursorSerializer<Edge> getCursorSerializer() {
protected CursorSerializer<MarkedEdge> getCursorSerializer() {
return EdgeCursorSerializer.INSTANCE;
}

Expand All @@ -131,14 +181,14 @@ protected CursorSerializer<Edge> getCursorSerializer() {
*/
private final class EdgeState {

private Edge cursorEdge = null;
private Edge currentEdge = null;
private MarkedEdge cursorEdge = null;
private MarkedEdge currentEdge = null;


/**
* Update the pointers
*/
private void update( final Edge newEdge ) {
private void update( final MarkedEdge newEdge ) {
cursorEdge = currentEdge;
currentEdge = newEdge;
}
Expand All @@ -147,7 +197,7 @@ private void update( final Edge newEdge ) {
/**
* Get the edge to use in cursors for resume
*/
private Edge getCursorEdge() {
private MarkedEdge getCursorEdge() {
return cursorEdge;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,22 @@

import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;


/**
* Edge cursor serializer
*/
public class EdgeCursorSerializer extends AbstractCursorSerializer<Edge> {
public class EdgeCursorSerializer extends AbstractCursorSerializer<MarkedEdge> {


public static final EdgeCursorSerializer INSTANCE = new EdgeCursorSerializer();

@Override
protected Class<SimpleEdge> getType() {
return SimpleEdge.class;
protected Class<SimpleMarkedEdge> getType() {
return SimpleMarkedEdge.class;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;


import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;

import com.google.inject.Inject;
Expand All @@ -40,8 +41,8 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
* Create a new instance of our command
*/
@Inject
public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName ) {
super( graphManagerFactory );
public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String collectionName ) {
super( graphManagerFactory, asyncEventService );
this.collectionName = collectionName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
import org.apache.usergrid.persistence.model.entity.Id;
Expand All @@ -42,7 +43,7 @@
/**
* Command for reading graph edges on a connection
*/
public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge>{
public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, MarkedEdge>{

private final GraphManagerFactory graphManagerFactory;
private final String connectionName;
Expand Down Expand Up @@ -77,12 +78,14 @@ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> fil
return filterResultObservable.flatMap( idFilterResult -> {

//set our our constant state
final Optional<Edge> startFromCursor = getSeekValue();
final Optional<MarkedEdge> startFromCursor = getSeekValue();
final Id id = idFilterResult.getValue();

final Optional<Edge> typeWrapper = Optional.fromNullable(startFromCursor.orNull());

final SimpleSearchByIdType search =
new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
entityType, startFromCursor );
entityType, typeWrapper );

return graphManager.loadEdgesFromSourceByType( search ).map(
edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() ));
Expand All @@ -91,7 +94,7 @@ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> fil


@Override
protected CursorSerializer<Edge> getCursorSerializer() {
protected CursorSerializer<MarkedEdge> getCursorSerializer() {
return EdgeCursorSerializer.INSTANCE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.pipeline.read.traverse;


import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;

import com.google.inject.Inject;
Expand All @@ -40,8 +41,8 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
* Create a new instance of our command
*/
@Inject
public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String connectionName ) {
super( graphManagerFactory );
public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, final AsyncEventService asyncEventService, @Assisted final String connectionName ) {
super( graphManagerFactory, asyncEventService );
this.connectionName = connectionName;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.UUID;

import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.index.*;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -135,7 +136,7 @@ public void testMessageIndexing() throws InterruptedException {
*/


final List<Edge> connectionSearchEdges = Observable.range( 0, 500 ).flatMap(integer -> {
final List<MarkedEdge> connectionSearchEdges = Observable.range( 0, 500 ).flatMap(integer -> {
final Id connectingId = createId("connecting");
final Edge edge = CpNamingUtils.createConnectionEdge(connectingId, "likes", testEntity.getId());

Expand Down
Loading