Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
index will merge all batches
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn Feldman committed Sep 28, 2015
1 parent 4c263b8 commit 3ed0848
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 49 deletions.
Expand Up @@ -25,6 +25,8 @@
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;


import com.google.common.base.Optional;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -69,6 +71,7 @@
import rx.Observable; import rx.Observable;
import rx.Subscriber; import rx.Subscriber;
import rx.Subscription; import rx.Subscription;
import rx.functions.Func1;
import rx.schedulers.Schedulers; import rx.schedulers.Schedulers;




Expand All @@ -85,6 +88,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
private final QueueManager queue; private final QueueManager queue;
private final QueueScope queueScope; private final QueueScope queueScope;
private final IndexProcessorFig indexProcessorFig; private final IndexProcessorFig indexProcessorFig;
private final IndexProducer indexProducer;
private final EntityCollectionManagerFactory entityCollectionManagerFactory; private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final IndexLocationStrategyFactory indexLocationStrategyFactory; private final IndexLocationStrategyFactory indexLocationStrategyFactory;
private final EntityIndexFactory entityIndexFactory; private final EntityIndexFactory entityIndexFactory;
Expand All @@ -110,11 +114,16 @@ public class AmazonAsyncEventService implements AsyncEventService {




@Inject @Inject
public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory, final IndexProcessorFig indexProcessorFig, public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory,
final MetricsFactory metricsFactory, final EntityCollectionManagerFactory entityCollectionManagerFactory, final IndexProcessorFig indexProcessorFig,
final IndexLocationStrategyFactory indexLocationStrategyFactory, final EntityIndexFactory entityIndexFactory, final IndexProducer indexProducer,
final MetricsFactory metricsFactory,
final EntityCollectionManagerFactory entityCollectionManagerFactory,
final IndexLocationStrategyFactory indexLocationStrategyFactory,
final EntityIndexFactory entityIndexFactory,
final EventBuilder eventBuilder, final EventBuilder eventBuilder,
final RxTaskScheduler rxTaskScheduler ) { final RxTaskScheduler rxTaskScheduler ) {
this.indexProducer = indexProducer;


this.entityCollectionManagerFactory = entityCollectionManagerFactory; this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.indexLocationStrategyFactory = indexLocationStrategyFactory; this.indexLocationStrategyFactory = indexLocationStrategyFactory;
Expand Down Expand Up @@ -219,43 +228,60 @@ public void ack(final QueueMessage message) {




private void handleMessages( final List<QueueMessage> messages ) { private void handleMessages( final List<QueueMessage> messages ) {
if ( logger.isDebugEnabled() ) { if (logger.isDebugEnabled()) {
logger.debug( "handleMessages with {} message", messages.size() ); logger.debug("handleMessages with {} message", messages.size());
} }


for ( QueueMessage message : messages ) { Observable<IndexEventResult> merged = Observable.empty();
final AsyncEvent event = ( AsyncEvent ) message.getBody(); for (QueueMessage message : messages) {
final AsyncEvent event = (AsyncEvent) message.getBody();


logger.debug( "Processing {} event", event ); logger.debug("Processing {} event", event);


if ( event == null ) { if (event == null) {
logger.error( "AsyncEvent type or event is null!" ); logger.error("AsyncEvent type or event is null!");
continue; continue;
} }




if ( event instanceof EdgeDeleteEvent ) { if (event instanceof EdgeDeleteEvent) {
handleEdgeDelete( message ); merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEdgeDelete(queueMessage), message));
} } else if (event instanceof EdgeIndexEvent) {
else if ( event instanceof EdgeIndexEvent ) { merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEdgeIndex(queueMessage),message));
handleEdgeIndex( message ); } else if (event instanceof EntityDeleteEvent) {
merged = merged.mergeWith( callHandleIndex(queueMessage -> handleEntityDelete(queueMessage),message));
} else if (event instanceof EntityIndexEvent) {
merged = merged.mergeWith(callHandleIndex(queueMessage -> handleEntityIndexUpdate(queueMessage),message));
} else if (event instanceof InitializeApplicationIndexEvent) {
//does not return observable
handleInitializeApplicationIndex(message);
} else {
logger.error("Unknown EventType: {}", event);
} }


else if ( event instanceof EntityDeleteEvent ) { messageCycle.update(System.currentTimeMillis() - event.getCreationTime());
handleEntityDelete( message ); }
}
else if ( event instanceof EntityIndexEvent ) {
handleEntityIndexUpdate( message );
}


else if ( event instanceof InitializeApplicationIndexEvent ) { merged
handleInitializeApplicationIndex( message ); .filter(indexEventResult -> indexEventResult.success() && indexEventResult.getIndexOperationMessage().isPresent())
} .buffer(MAX_TAKE)
else { .flatMap(indexEventResults -> {
logger.error( "Unknown EventType: {}", event ); IndexOperationMessage combined = new IndexOperationMessage();
} Observable.from(indexEventResults)
.doOnNext(indexEventResult -> combined.injest(indexEventResult.getIndexOperationMessage().get())).subscribe();
indexProducer.put(combined).subscribe();
return Observable.from(indexEventResults);
})
.doOnNext(indexEventResult ->ack(indexEventResult.queueMessage));
}


messageCycle.update( System.currentTimeMillis() - event.getCreationTime() ); private Observable<IndexEventResult> callHandleIndex(Func1<QueueMessage,Observable<IndexOperationMessage>> toCall, QueueMessage message){
try{
IndexOperationMessage indexOperationMessage = toCall.call(message).toBlocking().lastOrDefault(null);
return Observable.just(new IndexEventResult(message,Optional.fromNullable(indexOperationMessage),true));
}catch (Exception e){
logger.error("failed to run index",e);
return Observable.just( new IndexEventResult(message, Optional.<IndexOperationMessage>absent(),false));
} }
} }


Expand All @@ -276,7 +302,7 @@ public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
} }




public void handleEntityIndexUpdate(final QueueMessage message) { public Observable<IndexOperationMessage> handleEntityIndexUpdate(final QueueMessage message) {


Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" ); Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );


Expand All @@ -298,8 +324,7 @@ public void handleEntityIndexUpdate(final QueueMessage message) {
final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter); final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);


final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation ); final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation );

return observable;
subscribeAndAck( observable, message );
} }




Expand All @@ -313,7 +338,7 @@ public void queueNewEdge(final ApplicationScope applicationScope,
offer( operation ); offer( operation );
} }


public void handleEdgeIndex(final QueueMessage message) { public Observable<IndexOperationMessage> handleEdgeIndex(final QueueMessage message) {


Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEdgeIndex"); Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEdgeIndex");


Expand All @@ -333,8 +358,7 @@ public void handleEdgeIndex(final QueueMessage message) {


final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge( final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
applicationScope, entity, edge ) ); applicationScope, entity, edge ) );

return edgeIndexObservable;
subscribeAndAck( edgeIndexObservable, message );
} }


@Override @Override
Expand All @@ -344,7 +368,7 @@ public void queueDeleteEdge(final ApplicationScope applicationScope,
offer( new EdgeDeleteEvent( applicationScope, edge ) ); offer( new EdgeDeleteEvent( applicationScope, edge ) );
} }


public void handleEdgeDelete(final QueueMessage message) { public Observable<IndexOperationMessage> handleEdgeDelete(final QueueMessage message) {


Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEdgeDelete"); Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEdgeDelete");


Expand All @@ -362,8 +386,7 @@ public void handleEdgeDelete(final QueueMessage message) {
if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge); if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);


final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge( applicationScope, edge ); final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge( applicationScope, edge );

return observable;
subscribeAndAck( observable, message );
} }




Expand All @@ -378,7 +401,7 @@ public long getQueueDepth() {
return queue.getQueueDepth(); return queue.getQueueDepth();
} }


public void handleEntityDelete(final QueueMessage message) { public Observable<IndexOperationMessage> handleEntityDelete(final QueueMessage message) {


Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete"); Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");


Expand All @@ -401,8 +424,7 @@ public void handleEntityDelete(final QueueMessage message) {


final Observable merged = Observable.merge( entityDeleteResults.getEntitiesCompacted(), final Observable merged = Observable.merge( entityDeleteResults.getEntitiesCompacted(),
entityDeleteResults.getIndexObservable() ); entityDeleteResults.getIndexObservable() );

return merged;
subscribeAndAck( merged, message );
} }




Expand Down Expand Up @@ -526,4 +548,28 @@ public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
private void subscribeAndAck( final Observable<?> observable, final QueueMessage message ){ private void subscribeAndAck( final Observable<?> observable, final QueueMessage message ){
observable.doOnCompleted( ()-> ack(message) ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); observable.doOnCompleted( ()-> ack(message) ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
} }
public static class IndexEventResult{
private final QueueMessage queueMessage;
private final Optional<IndexOperationMessage> indexOperationMessage;
private final boolean success;

public IndexEventResult(QueueMessage queueMessage, Optional<IndexOperationMessage> indexOperationMessage ,boolean success){

this.queueMessage = queueMessage;
this.indexOperationMessage = indexOperationMessage;
this.success = success;
}

public QueueMessage getQueueMessage() {
return queueMessage;
}

public boolean success() {
return success;
}

public Optional<IndexOperationMessage> getIndexOperationMessage() {
return indexOperationMessage;
}
}
} }
Expand Up @@ -27,6 +27,7 @@
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.apache.usergrid.persistence.queue.QueueManagerFactory; import org.apache.usergrid.persistence.queue.QueueManagerFactory;


import com.google.inject.Inject; import com.google.inject.Inject;
Expand All @@ -49,6 +50,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
private final EventBuilder eventBuilder; private final EventBuilder eventBuilder;
private final IndexLocationStrategyFactory indexLocationStrategyFactory; private final IndexLocationStrategyFactory indexLocationStrategyFactory;
private final EntityIndexFactory entityIndexFactory; private final EntityIndexFactory entityIndexFactory;
private final IndexProducer indexProducer;


private AsyncEventService asyncEventService; private AsyncEventService asyncEventService;


Expand All @@ -61,7 +63,8 @@ public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
final EntityCollectionManagerFactory entityCollectionManagerFactory, final EntityCollectionManagerFactory entityCollectionManagerFactory,
final EventBuilder eventBuilder, final EventBuilder eventBuilder,
final IndexLocationStrategyFactory indexLocationStrategyFactory, final IndexLocationStrategyFactory indexLocationStrategyFactory,
final EntityIndexFactory entityIndexFactory) { final EntityIndexFactory entityIndexFactory,
final IndexProducer indexProducer) {


this.indexProcessorFig = indexProcessorFig; this.indexProcessorFig = indexProcessorFig;
this.queueManagerFactory = queueManagerFactory; this.queueManagerFactory = queueManagerFactory;
Expand All @@ -71,6 +74,7 @@ public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
this.eventBuilder = eventBuilder; this.eventBuilder = eventBuilder;
this.indexLocationStrategyFactory = indexLocationStrategyFactory; this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.entityIndexFactory = entityIndexFactory; this.entityIndexFactory = entityIndexFactory;
this.indexProducer = indexProducer;
} }




Expand All @@ -92,12 +96,12 @@ private AsyncEventService getIndexService() {


switch (impl) { switch (impl) {
case LOCAL: case LOCAL:
return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProcessorFig.resolveSynchronously()); return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
case SQS: case SQS:
return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler ); entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
case SNS: case SNS:
return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler ); entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
default: default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed"); throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
Expand Down
Expand Up @@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.asyncevents; package org.apache.usergrid.corepersistence.asyncevents;




import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down Expand Up @@ -48,14 +50,19 @@ public class InMemoryAsyncEventService implements AsyncEventService {


private final EventBuilder eventBuilder; private final EventBuilder eventBuilder;
private final RxTaskScheduler rxTaskScheduler; private final RxTaskScheduler rxTaskScheduler;
private final IndexProducer indexProducer;
private final boolean resolveSynchronously; private final boolean resolveSynchronously;




@Inject @Inject
public InMemoryAsyncEventService( final EventBuilder eventBuilder, final RxTaskScheduler rxTaskScheduler, boolean public InMemoryAsyncEventService( final EventBuilder eventBuilder,
resolveSynchronously ) { final RxTaskScheduler rxTaskScheduler,
final IndexProducer indexProducer,
boolean resolveSynchronously
) {
this.eventBuilder = eventBuilder; this.eventBuilder = eventBuilder;
this.rxTaskScheduler = rxTaskScheduler; this.rxTaskScheduler = rxTaskScheduler;
this.indexProducer = indexProducer;
this.resolveSynchronously = resolveSynchronously; this.resolveSynchronously = resolveSynchronously;
} }


Expand Down Expand Up @@ -117,12 +124,13 @@ public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
} }


public void run( Observable<?> observable ) { public void run( Observable<?> observable ) {
Observable mapped = observable.map(message -> message instanceof IndexOperationMessage ? indexProducer.put((IndexOperationMessage)message) : Observable.just(message));
//start it in the background on an i/o thread //start it in the background on an i/o thread
if ( !resolveSynchronously ) { if ( !resolveSynchronously ) {
observable.subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); mapped.subscribeOn(rxTaskScheduler.getAsyncIOScheduler()).subscribe();
} }
else { else {
observable.toBlocking().lastOrDefault(null); mapped.subscribe();
} }
} }


Expand Down
Expand Up @@ -22,6 +22,7 @@


import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.junit.Rule; import org.junit.Rule;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;


Expand Down Expand Up @@ -72,6 +73,8 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
@Inject @Inject
public EventBuilder eventBuilder; public EventBuilder eventBuilder;


@Inject
public IndexProducer indexProducer;


@Inject @Inject
public IndexLocationStrategyFactory indexLocationStrategyFactory; public IndexLocationStrategyFactory indexLocationStrategyFactory;
Expand All @@ -82,7 +85,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {


@Override @Override
protected AsyncEventService getAsyncEventService() { protected AsyncEventService getAsyncEventService() {
return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler ); return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
} }




Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.Set; import java.util.Set;


import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.base.Optional;




/** /**
Expand Down Expand Up @@ -106,4 +107,8 @@ public int hashCode() {
public long getCreationTime() { public long getCreationTime() {
return creationTime; return creationTime;
} }

public void injest(IndexOperationMessage singleMessage) {
si
}
} }

0 comments on commit 3ed0848

Please sign in to comment.