Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
Make the intial write path of indexing require less I/O.
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Russo committed Apr 13, 2016
1 parent a462244 commit 10ff27c
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 28 deletions.
Expand Up @@ -310,42 +310,54 @@ private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messag
logger.debug("Processing event with type {}", event.getClass().getSimpleName()); logger.debug("Processing event with type {}", event.getClass().getSimpleName());
} }


IndexOperationMessage indexOperationMessage = null;
try { try {


IndexOperationMessage single = new IndexOperationMessage();

// normal indexing event for an entity
if ( event instanceof EntityIndexEvent ){

single = handleEntityIndexUpdate( message );

}
// normal indexing event for an edge
else if ( event instanceof EdgeIndexEvent ){

single = handleEdgeIndex( message );

}
// deletes are 2-part, actual IO to delete data, then queue up a de-index // deletes are 2-part, actual IO to delete data, then queue up a de-index
if ( event instanceof EdgeDeleteEvent ) { else if ( event instanceof EdgeDeleteEvent ) {


handleEdgeDelete( message ); single = handleEdgeDelete( message );
} }
// deletes are 2-part, actual IO to delete data, then queue up a de-index // deletes are 2-part, actual IO to delete data, then queue up a de-index
else if ( event instanceof EntityDeleteEvent ) { else if ( event instanceof EntityDeleteEvent ) {


handleEntityDelete( message ); single = handleEntityDelete( message );
} }
// application initialization has special logic, therefore a special event type // initialization has special logic, therefore a special event type and no index operation message
else if ( event instanceof InitializeApplicationIndexEvent ) { else if ( event instanceof InitializeApplicationIndexEvent ) {


handleInitializeApplicationIndex(event, message); handleInitializeApplicationIndex(event, message);
} }
// this is the main event that pulls the index doc from map persistence and hands to the index producer // this is the main event that pulls the index doc from map persistence and hands to the index producer
else if (event instanceof ElasticsearchIndexEvent) { else if (event instanceof ElasticsearchIndexEvent) {


indexOperationMessage = handleIndexOperation((ElasticsearchIndexEvent) event); handleIndexOperation((ElasticsearchIndexEvent) event);


} else if (event instanceof DeIndexOldVersionsEvent) { } else if (event instanceof DeIndexOldVersionsEvent) {


indexOperationMessage = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event); single = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event);


} else { } else {


throw new Exception("Unknown EventType for message: "+ message.getStringBody().trim()); throw new Exception("Unknown EventType for message: "+ message.getStringBody().trim());
} }




// returning indexOperationMessage will send that indexOperationMessage to the index producer
// if no exception happens and the QueueMessage is returned in these results, it will get ack'd // if no exception happens and the QueueMessage is returned in these results, it will get ack'd
return new IndexEventResult(Optional.fromNullable(indexOperationMessage), Optional.of(message), thisEvent.getCreationTime()); return new IndexEventResult(Optional.of(single), Optional.of(message), thisEvent.getCreationTime());


} catch (IndexDocNotFoundException e){ } catch (IndexDocNotFoundException e){


Expand Down Expand Up @@ -382,6 +394,8 @@ public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
final Entity entity, long updatedAfter) { final Entity entity, long updatedAfter) {




offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),new EntityIdScope(applicationScope, entity.getId()), 0));

final EntityIndexOperation entityIndexOperation = final EntityIndexOperation entityIndexOperation =
new EntityIndexOperation( applicationScope, entity.getId(), updatedAfter); new EntityIndexOperation( applicationScope, entity.getId(), updatedAfter);


Expand All @@ -392,19 +406,56 @@ public void queueEntityIndexUpdate(final ApplicationScope applicationScope,


} }


private IndexOperationMessage handleEntityIndexUpdate(final QueueMessage message) {

Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );

final AsyncEvent event = ( AsyncEvent ) message.getBody();

Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass()));

final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event;


//process the entity immediately
//only process the same version, otherwise ignore
final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope();
final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
final Id entityId = entityIdScope.getId();
final long updatedAfter = entityIndexEvent.getUpdatedAfter();

final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);

return eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
}



@Override @Override
public void queueNewEdge(final ApplicationScope applicationScope, public void queueNewEdge(final ApplicationScope applicationScope,
final Entity entity, final Entity entity,
final Edge newEdge) { final Edge newEdge) {


final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge ));


final IndexOperationMessage indexMessage = ecm.load( entity.getId() ) }
.flatMap( loadedEntity -> eventBuilder.buildNewEdge(applicationScope, entity, newEdge) )
.toBlocking().lastOrDefault(null);


queueIndexOperationMessage( indexMessage ); private IndexOperationMessage handleEdgeIndex(final QueueMessage message) {

Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeIndex" );

final AsyncEvent event = (AsyncEvent) message.getBody();

Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" );
Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass()));

final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;

final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( edgeIndexEvent.getApplicationScope() );

return ecm.load( edgeIndexEvent.getEntityId() )
.flatMap( loadedEntity -> eventBuilder.buildNewEdge(edgeIndexEvent.getApplicationScope(), loadedEntity, edgeIndexEvent.getEdge()) )
.toBlocking().lastOrDefault(null);


} }


Expand All @@ -417,7 +468,7 @@ public void queueDeleteEdge(final ApplicationScope applicationScope,
offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) ); offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
} }


public void handleEdgeDelete(final QueueMessage message) { private IndexOperationMessage handleEdgeDelete(final QueueMessage message) {


Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" ); Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" );


Expand All @@ -436,10 +487,7 @@ public void handleEdgeDelete(final QueueMessage message) {
logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge); logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
} }


IndexOperationMessage indexMessage = return eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(null);
eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(null);

queueIndexOperationMessage(indexMessage);


} }


Expand Down Expand Up @@ -478,7 +526,7 @@ public void queueIndexOperationMessage( final IndexOperationMessage indexOperati
offerTopic( elasticsearchIndexEvent ); offerTopic( elasticsearchIndexEvent );
} }


public IndexOperationMessage handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){ private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) throws IndexDocNotFoundException {


Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" ); Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );


Expand Down Expand Up @@ -529,7 +577,8 @@ public IndexOperationMessage handleIndexOperation(final ElasticsearchIndexEvent
// always do a check to ensure the indexes are initialized for the index requests // always do a check to ensure the indexes are initialized for the index requests
initializeEntityIndexes(indexOperationMessage); initializeEntityIndexes(indexOperationMessage);


return indexOperationMessage; // send it to to be indexed
indexProducer.put(indexOperationMessage).toBlocking().last();


} }


Expand Down Expand Up @@ -599,7 +648,7 @@ public void queueEntityDelete(final ApplicationScope applicationScope, final Id
offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) ); offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
} }


public void handleEntityDelete(final QueueMessage message) { private IndexOperationMessage handleEntityDelete(final QueueMessage message) {


Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete"); Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");


Expand All @@ -625,14 +674,12 @@ public void handleEntityDelete(final QueueMessage message) {


entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null); entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null);


IndexOperationMessage indexMessage = entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(null); return entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(null);

queueIndexOperationMessage(indexMessage);


} }




public void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) { private void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex"); Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass())); Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));


Expand Down Expand Up @@ -793,8 +840,7 @@ private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResult
// collect into a list of QueueMessages that can be ack'd later // collect into a list of QueueMessages that can be ack'd later
.collect(Collectors.toList()); .collect(Collectors.toList());


// sumbit the requests to Elasticsearch queueIndexOperationMessage(combined);
indexProducer.put(combined).toBlocking().last();


return queueMessages; return queueMessages;
} }
Expand Down
Expand Up @@ -38,6 +38,8 @@
@JsonIgnoreProperties( ignoreUnknown = true ) @JsonIgnoreProperties( ignoreUnknown = true )
@JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" ) @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" )
@JsonSubTypes( { @JsonSubTypes( {
@JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ),
@JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ),
@JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ), @JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ),
@JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ), @JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
@JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ), @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ),
Expand Down
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.usergrid.corepersistence.asyncevents.model;


import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.model.entity.Id;


public final class EdgeIndexEvent
extends AsyncEvent {


@JsonProperty
protected ApplicationScope applicationScope;

@JsonProperty
protected Id entityId;

@JsonProperty
protected Edge edge;

/**
* Needed by jackson
*/
public EdgeIndexEvent() {
super();
}

public EdgeIndexEvent(String sourceRegion, ApplicationScope applicationScope, Id entityId, Edge edge) {
super(sourceRegion);
this.applicationScope = applicationScope;
this.entityId = entityId;
this.edge = edge;
}


public ApplicationScope getApplicationScope() {
return applicationScope;
}


public Edge getEdge() {
return edge;
}


public Id getEntityId() {
return entityId;
}
}
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.usergrid.corepersistence.asyncevents.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;


public final class EntityIndexEvent extends AsyncEvent {


@JsonProperty
protected EntityIdScope entityIdScope;

@JsonProperty
private long updatedAfter;

public EntityIndexEvent() {
super();
}

public EntityIndexEvent(String sourceRegion, EntityIdScope entityIdScope, final long updatedAfter ) {
super(sourceRegion);
this.entityIdScope = entityIdScope;
this.updatedAfter = updatedAfter;
}


public long getUpdatedAfter() {
return updatedAfter;
}


public EntityIdScope getEntityIdScope() {
return entityIdScope;
}
}

0 comments on commit 10ff27c

Please sign in to comment.