Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Ensure indexBatch works with the new model for indexing.
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelarusso committed Feb 20, 2016
1 parent b4634dc commit 3c399e7
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
Expand Down Expand Up @@ -298,12 +297,12 @@ private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messag

} catch (ClassCastException cce) {
logger.error("Failed to deserialize message body", cce);
return new IndexEventResult(Optional.absent(), System.currentTimeMillis());
return new IndexEventResult(Optional.absent(), Optional.absent(), System.currentTimeMillis());
}

if (event == null) {
logger.error("AsyncEvent type or event is null!");
return new IndexEventResult(Optional.absent(), System.currentTimeMillis());
return new IndexEventResult(Optional.absent(), Optional.absent(), System.currentTimeMillis());
}

final AsyncEvent thisEvent = event;
Expand All @@ -312,6 +311,7 @@ private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messag
logger.debug("Processing {} event", event);
}

IndexOperationMessage indexOperationMessage = null;
try {

// deletes are 2-part, actual IO to delete data, then queue up a de-index
Expand All @@ -332,7 +332,7 @@ else if ( event instanceof InitializeApplicationIndexEvent ) {
// this is the main event that pulls the index doc from map persistence and hands to the index producer
else if (event instanceof ElasticsearchIndexEvent) {

handleIndexOperation((ElasticsearchIndexEvent) event);
indexOperationMessage = handleIndexOperation((ElasticsearchIndexEvent) event);

} else {

Expand All @@ -341,20 +341,20 @@ else if (event instanceof ElasticsearchIndexEvent) {


//return type that can be indexed and ack'd later
return new IndexEventResult(Optional.of(message), thisEvent.getCreationTime());
return new IndexEventResult(Optional.fromNullable(indexOperationMessage), Optional.of(message), thisEvent.getCreationTime());

} catch (IndexDocNotFoundException e){

// this exception is throw when we wait before trying quorum read on map persistence.
// return empty event result so the event's message doesn't get ack'd
logger.info(e.getMessage());
return new IndexEventResult(Optional.absent(), event.getCreationTime());
return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime());

} catch (Exception e) {

// if the event fails to process, log the message and return empty event result so it doesn't get ack'd
logger.error("Failed to process message: {} {}", message.getMessageId(), message.getStringBody(), e);
return new IndexEventResult(Optional.absent(), event.getCreationTime());
return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime());
}
});

Expand Down Expand Up @@ -407,6 +407,7 @@ public void queueNewEdge(final ApplicationScope applicationScope,
public void queueDeleteEdge(final ApplicationScope applicationScope,
final Edge edge) {

// sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
}

Expand Down Expand Up @@ -471,7 +472,7 @@ public void queueIndexOperationMessage( final IndexOperationMessage indexOperati
offerTopic( elasticsearchIndexEvent );
}

public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
public IndexOperationMessage handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );

final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
Expand Down Expand Up @@ -525,7 +526,7 @@ public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchInde


//now execute it
indexProducer.put(indexOperationMessage).toBlocking().last();
return indexOperationMessage;

}

Expand Down Expand Up @@ -568,6 +569,7 @@ public long getQueueDepth() {
@Override
public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {

// sent in region (not offerTopic) as the delete IO happens in-region, then queues a multi-region de-index op
offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
}

Expand Down Expand Up @@ -699,7 +701,7 @@ public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {

try {
List<IndexEventResult> indexEventResults = callEventHandlers( messages );
List<QueueMessage> messagesToAck = ackMessages( indexEventResults );
List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );

if ( messagesToAck == null || messagesToAck.size() == 0 ) {
logger.error(
Expand Down Expand Up @@ -738,52 +740,80 @@ public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
* @param indexEventResults
* @return
*/
private List<QueueMessage> ackMessages(List<IndexEventResult> indexEventResults) {
private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) {
//if nothing came back then return null
if(indexEventResults==null){
return null;
}
IndexOperationMessage combined = new IndexOperationMessage();

// stream the messages to record the cycle time
return indexEventResults.stream()
List<QueueMessage> queueMessages = indexEventResults.stream()
.map(indexEventResult -> {
//record the cycle time
messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
if(indexEventResult.getIndexOperationMessage().isPresent()){
combined.ingest(indexEventResult.getIndexOperationMessage().get());
}
return indexEventResult;
})
// filter out messages that are not present, they were not processed and put into the results
.filter( result -> result.getQueueMessage().isPresent() )
.map(result -> result.getQueueMessage().get())
// collect
.collect(Collectors.toList());

// sumbit the requests to Elasticsearch
indexProducer.put(combined).toBlocking().last();

return queueMessages;
}

public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
//change to id scope to avoid serialization issues
offer( new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, id ), updatedSince ) );

EntityIndexOperation entityIndexOperation =
new EntityIndexOperation( applicationScope, id, updatedSince);

queueIndexOperationMessage(eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null));
}

public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {

List batch = new ArrayList<EdgeScope>();
IndexOperationMessage batch = new IndexOperationMessage();

for ( EdgeScope e : edges){
//change to id scope to avoid serialization issues
batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));

EntityIndexOperation entityIndexOperation =
new EntityIndexOperation( e.getApplicationScope(), e.getEdge().getTargetNode(), updatedSince);

IndexOperationMessage indexOperationMessage =
eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);

if (indexOperationMessage != null){
batch.ingest(indexOperationMessage);
}

}
offerBatch( batch );

queueIndexOperationMessage(batch);
}


public class IndexEventResult{
private final Optional<IndexOperationMessage> indexOperationMessage;
private final Optional<QueueMessage> queueMessage;
private final long creationTime;

public IndexEventResult(Optional<QueueMessage> queueMessage, long creationTime){
public IndexEventResult(Optional<IndexOperationMessage> indexOperationMessage, Optional<QueueMessage> queueMessage, long creationTime){

this.queueMessage = queueMessage;
this.creationTime = creationTime;
this.indexOperationMessage = indexOperationMessage;
}

public Optional<IndexOperationMessage> getIndexOperationMessage() {
return indexOperationMessage;
}

public Optional<QueueMessage> getQueueMessage() {
return queueMessage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@
@JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" )
@JsonSubTypes( {
@JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ),
@JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ),
@JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
@JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ),
@JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ),
@JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" )
} )
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 3c399e7

Please sign in to comment.