Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
filter some ops out, check for queue overflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn Feldman committed Oct 5, 2015
1 parent 5f20ece commit f35d01c
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 14 deletions.
Expand Up @@ -239,7 +239,7 @@ private Observable<IndexEventResult> handleMessages( final List<QueueMessage> me


if (event == null) { if (event == null) {
logger.error("AsyncEvent type or event is null!"); logger.error("AsyncEvent type or event is null!");
return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false)); return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true));
} }
try { try {
//merge each operation to a master observable; //merge each operation to a master observable;
Expand All @@ -254,10 +254,10 @@ private Observable<IndexEventResult> handleMessages( final List<QueueMessage> me
} else if (event instanceof InitializeApplicationIndexEvent) { } else if (event instanceof InitializeApplicationIndexEvent) {
//does not return observable //does not return observable
handleInitializeApplicationIndex(message); handleInitializeApplicationIndex(message);
return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true)); return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
} else { } else {
logger.error("Unknown EventType: {}", event); logger.error("Unknown EventType: {}", event);
return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false)); return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true));
} }
}catch (Exception e){ }catch (Exception e){
logger.error("Failed to index entity", e,message); logger.error("Failed to index entity", e,message);
Expand All @@ -270,20 +270,25 @@ private Observable<IndexEventResult> handleMessages( final List<QueueMessage> me


return masterObservable return masterObservable
//remove unsuccessful //remove unsuccessful
.filter( indexEventResult -> indexEventResult.success() && indexEventResult.getIndexOperationMessage() .filter( indexEventResult -> indexEventResult.shouldProcess() )
.isPresent() )
//take the max //take the max
.buffer( MAX_TAKE ) .buffer( MAX_TAKE )
//map them to index results and return them //map them to index results and return them
.flatMap( indexEventResults -> { .flatMap( indexEventResults -> {
IndexOperationMessage combined = new IndexOperationMessage(); IndexOperationMessage combined = new IndexOperationMessage();
indexEventResults.stream().forEach( indexEventResults.stream().forEach(
indexEventResult -> combined.ingest( indexEventResult.getIndexOperationMessage().get() ) ); indexEventResult ->{
if(indexEventResult.getIndexOperationMessage().isPresent()) {
combined.ingest(indexEventResult.getIndexOperationMessage().get());
}
} );



//ack after successful completion of the operation. //ack after successful completion of the operation.
return indexProducer.put( combined ).flatMap( operationResult -> Observable.from( indexEventResults ) ) return indexProducer.put( combined )
.flatMap( operationResult -> Observable.from( indexEventResults ) )
//ack each message, but only if we didn't error. If we did, we'll want to log it and //ack each message, but only if we didn't error. If we did, we'll want to log it and
.map( indexEventResult -> { .map( indexEventResult -> {
ack( indexEventResult.queueMessage ); ack( indexEventResult.queueMessage );
return indexEventResult; return indexEventResult;
} ); } );
Expand Down Expand Up @@ -562,21 +567,21 @@ public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
public class IndexEventResult{ public class IndexEventResult{
private final QueueMessage queueMessage; private final QueueMessage queueMessage;
private final Optional<IndexOperationMessage> indexOperationMessage; private final Optional<IndexOperationMessage> indexOperationMessage;
private final boolean success; private final boolean shouldProcess;


public IndexEventResult(QueueMessage queueMessage, Optional<IndexOperationMessage> indexOperationMessage ,boolean success){ public IndexEventResult(QueueMessage queueMessage, Optional<IndexOperationMessage> indexOperationMessage ,boolean shouldProcess){


this.queueMessage = queueMessage; this.queueMessage = queueMessage;
this.indexOperationMessage = indexOperationMessage; this.indexOperationMessage = indexOperationMessage;
this.success = success; this.shouldProcess = shouldProcess;
} }


public QueueMessage getQueueMessage() { public QueueMessage getQueueMessage() {
return queueMessage; return queueMessage;
} }


public boolean success() { public boolean shouldProcess() {
return success; return shouldProcess;
} }


public Optional<IndexOperationMessage> getIndexOperationMessage() { public Optional<IndexOperationMessage> getIndexOperationMessage() {
Expand Down
Expand Up @@ -196,4 +196,7 @@ public interface IndexFig extends GuicyFig {
long getWriteTimeout(); long getWriteTimeout();




@Default("1000")
@Key( "elasticsearch_queue_error_sleep_ms" )
long getSleepTimeForQueueError();
} }
Expand Up @@ -205,6 +205,13 @@ private void sendRequest( BulkRequestBuilder bulkRequest ) {
} }


if ( error ) { if ( error ) {
if(errorString.lastIndexOf("rejected execution (queue capacity")>=0){
try{
Thread.sleep(indexFig.getSleepTimeForQueueError());
}catch (InterruptedException ie){
//move on
}
}
throw new RuntimeException( throw new RuntimeException(
"Error during processing of bulk index operations one of the responses failed. \n" + errorString); "Error during processing of bulk index operations one of the responses failed. \n" + errorString);
} }
Expand Down
Expand Up @@ -47,7 +47,7 @@ public QueueManagerFactoryImpl(final QueueFig queueFig, final QueueManagerIntern
} }
@Override @Override
public QueueManager getQueueManager(QueueScope scope) { public QueueManager getQueueManager(QueueScope scope) {
if(queueFig.overrideQueueForDefault()){ if(true==false){
QueueManager manager = defaultManager.get(scope.getName()); QueueManager manager = defaultManager.get(scope.getName());
if(manager==null){ if(manager==null){
manager = new DefaultQueueManager(); manager = new DefaultQueueManager();
Expand Down

0 comments on commit f35d01c

Please sign in to comment.