Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
Reduce SQS hop for entity write/update indexing events.
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelarusso committed Feb 19, 2016
1 parent d4c7a3c commit b4634dc
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 198 deletions.
Expand Up @@ -590,7 +590,7 @@ public void update( Entity entity ) throws Exception {


// update in all containing collections and connection indexes // update in all containing collections and connection indexes


indexService.queueEntityIndexUpdate( applicationScope, cpEntity ); indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0);
} }




Expand Down Expand Up @@ -1107,7 +1107,7 @@ public void deleteProperty( EntityRef entityRef, String propertyName ) throws Ex


//Adding graphite metrics //Adding graphite metrics


indexService.queueEntityIndexUpdate(applicationScope, cpEntity); indexService.queueEntityIndexUpdate(applicationScope, cpEntity, 0);
} }




Expand Down
Expand Up @@ -22,9 +22,7 @@


import org.apache.usergrid.corepersistence.index.ReIndexAction; import org.apache.usergrid.corepersistence.index.ReIndexAction;
import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.graph.Edge; import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.Id;
Expand All @@ -48,8 +46,9 @@ public interface AsyncEventService extends ReIndexAction {
* After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly. * After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly.
* @param applicationScope * @param applicationScope
* @param entity The entity to index. Should be fired when an entity is updated * @param entity The entity to index. Should be fired when an entity is updated
* @param updatedAfter
*/ */
void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity); void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter);




/** /**
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -104,14 +104,14 @@ private AsyncEventService getIndexService() {


switch (impl) { switch (impl) {
case LOCAL: case LOCAL:
AmazonAsyncEventService eventService = new AmazonAsyncEventService(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory, AsyncEventServiceImpl eventService = new AsyncEventServiceImpl(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory,
entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder,mapManagerFactory, queueFig,rxTaskScheduler); entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder,mapManagerFactory, queueFig,rxTaskScheduler);
eventService.MAX_TAKE = 1000; eventService.MAX_TAKE = 1000;
return eventService; return eventService;
case SQS: case SQS:
throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region"); throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region");
case SNS: case SNS:
return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, return new AsyncEventServiceImpl(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler ); entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler );
default: default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed"); throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
Expand Down
Expand Up @@ -38,14 +38,6 @@
*/ */
public interface EventBuilder { public interface EventBuilder {


/**
* Return the cold observable of entity index update operations
* @param applicationScope
* @param entity
* @return
*/
Observable<IndexOperationMessage> buildEntityIndexUpdate( ApplicationScope applicationScope, Entity entity );

/** /**
* Return the cold observable of the new edge operation * Return the cold observable of the new edge operation
* @param applicationScope * @param applicationScope
Expand All @@ -69,7 +61,9 @@ public interface EventBuilder {
* @param entityId * @param entityId
* @return * @return
*/ */
EventBuilderImpl.EntityDeleteResults buildEntityDelete( ApplicationScope applicationScope, Id entityId ); EntityDeleteResults buildEntityDelete(ApplicationScope applicationScope, Id entityId );




/** /**
* Re-index an entity in the scope provided * Re-index an entity in the scope provided
Expand Down
Expand Up @@ -73,19 +73,6 @@ public EventBuilderImpl( final IndexService indexService,
} }




@Override
public Observable<IndexOperationMessage> buildEntityIndexUpdate( final ApplicationScope applicationScope,
final Entity entity ) {
//process the entity immediately
//only process the same version, otherwise ignore

if (logger.isDebugEnabled()) {
logger.debug("Indexing in app scope {} entity {}", entity, applicationScope);
}

return indexService.indexEntity( applicationScope, entity );
}



@Override @Override
public Observable<IndexOperationMessage> buildNewEdge( final ApplicationScope applicationScope, final Entity entity, public Observable<IndexOperationMessage> buildNewEdge( final ApplicationScope applicationScope, final Entity entity,
Expand Down Expand Up @@ -118,7 +105,7 @@ public Observable<IndexOperationMessage> buildDeleteEdge( final ApplicationScope
//it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter? //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?


@Override @Override
public EntityDeleteResults buildEntityDelete( final ApplicationScope applicationScope, final Id entityId ) { public EntityDeleteResults buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId); logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
} }
Expand Down
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.usergrid.corepersistence.asyncevents;


import java.util.UUID;

public class IndexDocNotFoundException extends RuntimeException {

final UUID batchId;

public IndexDocNotFoundException(final UUID batchId){

super("Index batch ID "+batchId.toString()+" not found in map persistence");
this.batchId = batchId;


}

}
Expand Up @@ -22,6 +22,7 @@


import java.util.Iterator; import java.util.Iterator;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit;


import org.apache.usergrid.persistence.index.*; import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.utils.UUIDUtils; import org.apache.usergrid.utils.UUIDUtils;
Expand Down Expand Up @@ -104,7 +105,8 @@ public Observable<IndexOperationMessage> indexEntity( final ApplicationScope app


//do our observable for batching //do our observable for batching
//try to send a whole batch if we can //try to send a whole batch if we can
final Observable<IndexOperationMessage> batches = sourceEdgesToIndex.buffer( indexFig.getIndexBatchSize() ) final Observable<IndexOperationMessage> batches = sourceEdgesToIndex
.buffer( indexFig.getIndexBatchSize(), 1000, TimeUnit.MILLISECONDS )


//map into batches based on our buffer size //map into batches based on our buffer size
.flatMap( buffer -> Observable.from( buffer ) .flatMap( buffer -> Observable.from( buffer )
Expand Down
Expand Up @@ -230,10 +230,16 @@ private void validate( final FilterResult<Candidate> filterResult ) {




//entity is newer than ES version, could be an update or the entity is marked as deleted //entity is newer than ES version, could be an update or the entity is marked as deleted
if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) { if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ||
!entity.getEntity().isPresent() ||
entity.getStatus() == MvccEntity.Status.DELETED ) {


logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}", // when updating entities, we don't delete previous versions from ES so this action is expected
if(logger.isDebugEnabled()){
logger.debug( "Deindexing stale entity on edge {} for entityId {} and version {}",
searchEdge, entityId, entityVersion); searchEdge, entityId, entityVersion);
}

batch.deindex( searchEdge, entityId, candidateVersion ); batch.deindex( searchEdge, entityId, candidateVersion );
return; return;
} }
Expand Down
Expand Up @@ -29,7 +29,7 @@


import org.apache.usergrid.corepersistence.TestIndexModule; import org.apache.usergrid.corepersistence.TestIndexModule;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.asyncevents.AmazonAsyncEventService; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventServiceImpl;
import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
Expand All @@ -50,7 +50,7 @@
@RunWith( EsRunner.class ) @RunWith( EsRunner.class )
@UseModules( { TestIndexModule.class } ) @UseModules( { TestIndexModule.class } )
@NotThreadSafe @NotThreadSafe
public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest { public class AsyncEventServiceImplTest extends AsyncIndexServiceTest {






Expand Down Expand Up @@ -93,7 +93,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {


@Override @Override
protected AsyncEventService getAsyncEventService() { protected AsyncEventService getAsyncEventService() {
return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler ); return new AsyncEventServiceImpl( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler );
} }




Expand Down
Expand Up @@ -55,7 +55,6 @@
import net.jcip.annotations.NotThreadSafe; import net.jcip.annotations.NotThreadSafe;


import rx.Observable; import rx.Observable;
import rx.schedulers.Schedulers;


import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -145,7 +144,7 @@ public void testMessageIndexing() throws InterruptedException {




//queue up processing //queue up processing
asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity ); asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity, 0);




final EntityIndex EntityIndex = final EntityIndex EntityIndex =
Expand Down
Expand Up @@ -19,6 +19,7 @@




import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;


import com.codahale.metrics.Histogram; import com.codahale.metrics.Histogram;
Expand Down Expand Up @@ -130,7 +131,9 @@ private Observable<IndexOperationMessage> processBatch( final IndexOperationMess
final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex); final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);


//buffer into the max size we can send ES and fire them all off until we're completed //buffer into the max size we can send ES and fire them all off until we're completed
final Observable<BulkRequestBuilder> requests = batchOps.buffer(indexFig.getIndexBatchSize()) final Observable<BulkRequestBuilder> requests = batchOps
.buffer(indexFig.getIndexBatchSize(), 1000, TimeUnit.MILLISECONDS)

//flatten the buffer into a single batch execution //flatten the buffer into a single batch execution
.flatMap(individualOps -> Observable.from(individualOps) .flatMap(individualOps -> Observable.from(individualOps)
//collect them //collect them
Expand Down
Expand Up @@ -82,4 +82,8 @@ public interface QueueFig extends GuicyFig {
@Key( "usergrid.queue.visibilityTimeout" ) @Key( "usergrid.queue.visibilityTimeout" )
@Default("5000") // 5 seconds @Default("5000") // 5 seconds
int getVisibilityTimeout(); int getVisibilityTimeout();

@Key( "usergrid.queue.localquorum.timeout")
@Default("3000") // 3 seconds
int getLocalQuorumTimeout();
} }
Expand Up @@ -292,7 +292,10 @@ private static Class<Service> findClass( String classname ) {
} }
} }
catch ( ClassNotFoundException e1 ) { catch ( ClassNotFoundException e1 ) {
logger.error("Could not load class", e1); if(logger.isTraceEnabled()){
logger.trace("Could not find class", e1);
}

} }
return null; return null;
} }
Expand Down

0 comments on commit b4634dc

Please sign in to comment.