Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
Adds strong consistency read to maps. Persists ES batches into Cassan…
Browse files Browse the repository at this point in the history
…dra for multi region execution.

A bug in wiring JSON to SQS still exists, it's incorrectly escaping some message subtypes.
  • Loading branch information
Todd Nine committed Oct 17, 2015
1 parent 2b22c61 commit 94a9078
Show file tree
Hide file tree
Showing 21 changed files with 666 additions and 250 deletions.
Expand Up @@ -21,13 +21,20 @@




import java.io.IOException; import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;


import com.google.common.base.Optional; import com.google.common.base.Optional;

import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
import org.apache.usergrid.exception.NotImplementedException;
import org.apache.usergrid.persistence.index.impl.IndexProducer; import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -54,8 +61,13 @@
import org.apache.usergrid.persistence.index.EntityIndexFactory; import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexLocationStrategy; import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.map.MapScope;
import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.persistence.queue.QueueManager; import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory; import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import org.apache.usergrid.persistence.queue.QueueMessage; import org.apache.usergrid.persistence.queue.QueueMessage;
Expand All @@ -82,12 +94,13 @@ public class AmazonAsyncEventService implements AsyncEventService {


private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class); private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class);


private static final ObjectJsonSerializer OBJECT_JSON_SERIALIZER = new ObjectJsonSerializer( );

// SQS maximum receive messages is 10 // SQS maximum receive messages is 10
private static final int MAX_TAKE = 10; private static final int MAX_TAKE = 10;
public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars


private final QueueManager queue; private final QueueManager queue;
private final QueueScope queueScope;
private final IndexProcessorFig indexProcessorFig; private final IndexProcessorFig indexProcessorFig;
private final IndexProducer indexProducer; private final IndexProducer indexProducer;
private final EntityCollectionManagerFactory entityCollectionManagerFactory; private final EntityCollectionManagerFactory entityCollectionManagerFactory;
Expand All @@ -109,6 +122,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
private final AtomicLong counter = new AtomicLong(); private final AtomicLong counter = new AtomicLong();
private final AtomicLong inFlight = new AtomicLong(); private final AtomicLong inFlight = new AtomicLong();
private final Histogram messageCycle; private final Histogram messageCycle;
private final MapManager esMapPersistence;


//the actively running subscription //the actively running subscription
private List<Subscription> subscriptions = new ArrayList<>(); private List<Subscription> subscriptions = new ArrayList<>();
Expand All @@ -123,17 +137,24 @@ public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory,
final IndexLocationStrategyFactory indexLocationStrategyFactory, final IndexLocationStrategyFactory indexLocationStrategyFactory,
final EntityIndexFactory entityIndexFactory, final EntityIndexFactory entityIndexFactory,
final EventBuilder eventBuilder, final EventBuilder eventBuilder,
final MapManagerFactory mapManagerFactory,
final RxTaskScheduler rxTaskScheduler ) { final RxTaskScheduler rxTaskScheduler ) {
this.indexProducer = indexProducer; this.indexProducer = indexProducer;


this.entityCollectionManagerFactory = entityCollectionManagerFactory; this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.indexLocationStrategyFactory = indexLocationStrategyFactory; this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.entityIndexFactory = entityIndexFactory; this.entityIndexFactory = entityIndexFactory;
this.eventBuilder = eventBuilder; this.eventBuilder = eventBuilder;

final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "indexEvents");

this.esMapPersistence = mapManagerFactory.createMapManager( mapScope );

this.rxTaskScheduler = rxTaskScheduler; this.rxTaskScheduler = rxTaskScheduler;


this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL); QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
this.queue = queueManagerFactory.getQueueManager(queueScope); this.queue = queueManagerFactory.getQueueManager(queueScope);

this.indexProcessorFig = indexProcessorFig; this.indexProcessorFig = indexProcessorFig;


this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write"); this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write");
Expand All @@ -158,7 +179,7 @@ public Long getValue() {
/** /**
* Offer the EntityIdScope to SQS * Offer the EntityIdScope to SQS
*/ */
private void offer(final Object operation) { private void offer(final Serializable operation) {
final Timer.Context timer = this.writeTimer.time(); final Timer.Context timer = this.writeTimer.time();


try { try {
Expand Down Expand Up @@ -213,7 +234,7 @@ public void ack(final QueueMessage message) {
final Timer.Context timer = this.ackTimer.time(); final Timer.Context timer = this.ackTimer.time();


try{ try{
queue.commitMessage(message); queue.commitMessage( message );


//decrement our in-flight counter //decrement our in-flight counter
inFlight.decrementAndGet(); inFlight.decrementAndGet();
Expand All @@ -235,7 +256,7 @@ public void ack(final List<QueueMessage> messages) {
final Timer.Context timer = this.ackTimer.time(); final Timer.Context timer = this.ackTimer.time();


try{ try{
queue.commitMessages(messages); queue.commitMessages( messages );


//decrement our in-flight counter //decrement our in-flight counter
inFlight.decrementAndGet(); inFlight.decrementAndGet();
Expand Down Expand Up @@ -296,7 +317,13 @@ private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messag
handleInitializeApplicationIndex(event, message); handleInitializeApplicationIndex(event, message);
indexoperationObservable = Observable.just(new IndexOperationMessage()); indexoperationObservable = Observable.just(new IndexOperationMessage());
validateEmptySets = false; //do not check this one for an empty set b/c it will be empty. validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
} else { } else if (event instanceof ElasticsearchIndexEvent){
handleIndexOperation( (ElasticsearchIndexEvent)event );
indexoperationObservable = Observable.just( new IndexOperationMessage() );
validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
}

else {
throw new Exception("Unknown EventType");//TODO: print json instead throw new Exception("Unknown EventType");//TODO: print json instead
} }


Expand Down Expand Up @@ -434,6 +461,85 @@ public void queueEntityDelete(final ApplicationScope applicationScope, final Id
offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) ); offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) );
} }



/**
* Queue up an indexOperationMessage for multi region execution
* @param indexOperationMessage
*/
public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {

final String jsonValue = OBJECT_JSON_SERIALIZER.toByteBuffer( indexOperationMessage );

final UUID newMessageId = UUIDGenerator.newTimeUUID();

//write to the map in ES
esMapPersistence.putString( newMessageId.toString(), jsonValue, indexProcessorFig.getIndexMessageTtl() );



//now queue up the index message

final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId );

//send to the topic so all regions index the batch
try {
queue.sendMessageToTopic( elasticsearchIndexEvent );
}
catch ( IOException e ) {
throw new RuntimeException( "Unable to pulish to topic", e );
}
}

public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );

final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();

Preconditions.checkNotNull( messageId, "messageId must not be null" );


//load the entity

final String message = esMapPersistence.getString( messageId.toString() );

String highConsistency = null;

if(message == null){
logger.error( "Receive message with id {} to process, unable to find it, reading with higher consistency level" );

highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() );

}

//read the value from the string

final IndexOperationMessage indexOperationMessage;

//our original local read has it, parse it.
if(message != null){
indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( message, IndexOperationMessage.class );
}
//we tried to read it at a higher consistency level and it works
else if (highConsistency != null){
indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( highConsistency, IndexOperationMessage.class );
}

//we couldn't find it, bail
else{
logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );

throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
}



//now execute it
indexProducer.put(indexOperationMessage).toBlocking().last();

}



@Override @Override
public long getQueueDepth() { public long getQueueDepth() {
return queue.getQueueDepth(); return queue.getQueueDepth();
Expand Down Expand Up @@ -510,71 +616,75 @@ private void startWorker() {
synchronized (mutex) { synchronized (mutex) {


Observable<List<QueueMessage>> consumer = Observable<List<QueueMessage>> consumer =
Observable.create(new Observable.OnSubscribe<List<QueueMessage>>() { Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
@Override @Override
public void call(final Subscriber<? super List<QueueMessage>> subscriber) { public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {


//name our thread so it's easy to see //name our thread so it's easy to see
Thread.currentThread().setName("QueueConsumer_" + counter.incrementAndGet()); Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );


List<QueueMessage> drainList = null; List<QueueMessage> drainList = null;


do { do {
try { try {
drainList = take().toList().toBlocking().lastOrDefault(null); drainList = take().toList().toBlocking().lastOrDefault( null );
//emit our list in it's entity to hand off to a worker pool //emit our list in it's entity to hand off to a worker pool
subscriber.onNext(drainList); subscriber.onNext( drainList );


//take since we're in flight //take since we're in flight
inFlight.addAndGet(drainList.size()); inFlight.addAndGet( drainList.size() );
} catch (Throwable t) { }
catch ( Throwable t ) {
final long sleepTime = indexProcessorFig.getFailureRetryTime(); final long sleepTime = indexProcessorFig.getFailureRetryTime();


logger.error("Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t); logger.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t );


if (drainList != null) { if ( drainList != null ) {
inFlight.addAndGet(-1 * drainList.size()); inFlight.addAndGet( -1 * drainList.size() );
} }




try { try {
Thread.sleep(sleepTime); Thread.sleep( sleepTime );
} catch (InterruptedException ie) { }
catch ( InterruptedException ie ) {
//swallow //swallow
} }


indexErrorCounter.inc(); indexErrorCounter.inc();
} }
} }
while (true); while ( true );
} }
}) } )
//this won't block our read loop, just reads and proceeds //this won't block our read loop, just reads and proceeds
.map(messages -> .map( messages -> {
{ if ( messages == null || messages.size() == 0 ) {
if (messages == null || messages.size() == 0) {
return null; return null;
} }


try { try {
List<IndexEventResult> indexEventResults = callEventHandlers(messages); List<IndexEventResult> indexEventResults = callEventHandlers( messages );
List<QueueMessage> messagesToAck = submitToIndex(indexEventResults); List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );
if (messagesToAck == null || messagesToAck.size() == 0) { if ( messagesToAck == null || messagesToAck.size() == 0 ) {
logger.error("No messages came back from the queue operation should have seen "+messages.size(),messages); logger.error( "No messages came back from the queue operation should have seen "
+ messages.size(), messages );
return messagesToAck; return messagesToAck;
} }
if(messagesToAck.size()<messages.size()){ if ( messagesToAck.size() < messages.size() ) {
logger.error("Missing messages from queue post operation",messages,messagesToAck); logger.error( "Missing messages from queue post operation", messages,
messagesToAck );
} }
//ack each message, but only if we didn't error. //ack each message, but only if we didn't error.
ack(messagesToAck); ack( messagesToAck );
return messagesToAck; return messagesToAck;
} catch (Exception e) { }
logger.error("failed to ack messages to sqs", e); catch ( Exception e ) {
logger.error( "failed to ack messages to sqs", e );
return null; return null;
//do not rethrow so we can process all of them //do not rethrow so we can process all of them
} }
}); } );


//start in the background //start in the background


Expand Down Expand Up @@ -619,12 +729,8 @@ private List<QueueMessage> submitToIndex( List<IndexEventResult> indexEventResul


//send the batch //send the batch
//TODO: should retry? //TODO: should retry?
try { queueIndexOperationMessage( combined );
indexProducer.put(combined).toBlocking().lastOrDefault(null);
}catch (Exception e){
logger.error("Failed to submit to index producer",e);
throw e;
}
return messagesToAck; return messagesToAck;
} }


Expand Down Expand Up @@ -671,4 +777,6 @@ public long getCreationTime() {
return creationTime; return creationTime;
} }
} }


} }
Expand Up @@ -25,6 +25,7 @@
import org.apache.usergrid.persistence.entities.Application; import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.index.IndexLocationStrategy; import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.Id;


Expand Down

0 comments on commit 94a9078

Please sign in to comment.