diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 6c2ef0bdca..b677f7904c 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -590,7 +590,7 @@ public void update( Entity entity ) throws Exception { // update in all containing collections and connection indexes - indexService.queueEntityIndexUpdate( applicationScope, cpEntity ); + indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0); } @@ -1107,7 +1107,7 @@ public void deleteProperty( EntityRef entityRef, String propertyName ) throws Ex //Adding graphite metrics - indexService.queueEntityIndexUpdate(applicationScope, cpEntity); + indexService.queueEntityIndexUpdate(applicationScope, cpEntity, 0); } diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index dbf8996758..288fb12d1d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -22,9 +22,7 @@ import org.apache.usergrid.corepersistence.index.ReIndexAction; import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.entities.Application; import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.index.IndexLocationStrategy; import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; @@ -48,8 +46,9 @@ public interface AsyncEventService extends ReIndexAction { * After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly. * @param applicationScope * @param entity The entity to index. Should be fired when an entity is updated + * @param updatedAfter */ - void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity); + void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter); /** diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java similarity index 73% rename from stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java rename to stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 00dc69a62f..e101761c09 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -29,13 +29,11 @@ import java.util.stream.Stream; import org.apache.usergrid.persistence.index.impl.*; -import org.elasticsearch.action.index.IndexRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent; import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent; -import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent; import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent; import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent; import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent; @@ -102,10 +100,10 @@ * */ @Singleton -public class AmazonAsyncEventService implements AsyncEventService { +public class AsyncEventServiceImpl implements AsyncEventService { - private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class); + private static final Logger logger = LoggerFactory.getLogger(AsyncEventServiceImpl.class); // SQS maximum receive messages is 10 public int MAX_TAKE = 10; @@ -141,17 +139,17 @@ public class AmazonAsyncEventService implements AsyncEventService { @Inject - public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory, - final IndexProcessorFig indexProcessorFig, - final IndexProducer indexProducer, - final MetricsFactory metricsFactory, - final EntityCollectionManagerFactory entityCollectionManagerFactory, - final IndexLocationStrategyFactory indexLocationStrategyFactory, - final EntityIndexFactory entityIndexFactory, - final EventBuilder eventBuilder, - final MapManagerFactory mapManagerFactory, - final QueueFig queueFig, - @EventExecutionScheduler + public AsyncEventServiceImpl(final QueueManagerFactory queueManagerFactory, + final IndexProcessorFig indexProcessorFig, + final IndexProducer indexProducer, + final MetricsFactory metricsFactory, + final EntityCollectionManagerFactory entityCollectionManagerFactory, + final IndexLocationStrategyFactory indexLocationStrategyFactory, + final EntityIndexFactory entityIndexFactory, + final EventBuilder eventBuilder, + final MapManagerFactory mapManagerFactory, + final QueueFig queueFig, + @EventExecutionScheduler final RxTaskScheduler rxTaskScheduler ) { this.indexProducer = indexProducer; @@ -172,15 +170,15 @@ public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory, this.indexProcessorFig = indexProcessorFig; this.queueFig = queueFig; - this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write"); - this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.read"); - this.ackTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.ack"); - this.indexErrorCounter = metricsFactory.getCounter(AmazonAsyncEventService.class, "async_event.error"); - this.messageCycle = metricsFactory.getHistogram(AmazonAsyncEventService.class, "async_event.message_cycle"); + this.writeTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.write"); + this.readTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.read"); + this.ackTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.ack"); + this.indexErrorCounter = metricsFactory.getCounter(AsyncEventServiceImpl.class, "async_event.error"); + this.messageCycle = metricsFactory.getHistogram(AsyncEventServiceImpl.class, "async_event.message_cycle"); //wire up the gauge of inflight message - metricsFactory.addGauge(AmazonAsyncEventService.class, "async-event.inflight", new Gauge() { + metricsFactory.addGauge(AsyncEventServiceImpl.class, "async-event.inflight", new Gauge() { @Override public Long getValue() { return inFlight.longValue(); @@ -291,18 +289,21 @@ private List callEventHandlers(final List messag logger.debug("callEventHandlers with {} message", messages.size()); } - Stream indexEventResults = messages.stream().map(message -> { + Stream indexEventResults = messages.parallelStream().map(message -> + + { AsyncEvent event = null; try { event = (AsyncEvent) message.getBody(); + } catch (ClassCastException cce) { logger.error("Failed to deserialize message body", cce); + return new IndexEventResult(Optional.absent(), System.currentTimeMillis()); } if (event == null) { logger.error("AsyncEvent type or event is null!"); - return new IndexEventResult(Optional.fromNullable(message), Optional.absent(), - System.currentTimeMillis()); + return new IndexEventResult(Optional.absent(), System.currentTimeMillis()); } final AsyncEvent thisEvent = event; @@ -312,55 +313,48 @@ private List callEventHandlers(final List messag } try { - //check for empty sets if this is true - boolean validateEmptySets = true; - Observable indexoperationObservable; - //merge each operation to a master observable; + + // deletes are 2-part, actual IO to delete data, then queue up a de-index if ( event instanceof EdgeDeleteEvent ) { - indexoperationObservable = handleEdgeDelete( message ); - } - else if ( event instanceof EdgeIndexEvent ) { - indexoperationObservable = handleEdgeIndex( message ); + + handleEdgeDelete( message ); } + // deletes are 2-part, actual IO to delete data, then queue up a de-index else if ( event instanceof EntityDeleteEvent ) { - indexoperationObservable = handleEntityDelete( message ); - validateEmptySets = false; // do not check this one for an empty set b/c it can be empty + handleEntityDelete( message ); } - else if ( event instanceof EntityIndexEvent ) { - indexoperationObservable = handleEntityIndexUpdate( message ); - } + // application initialization has special logic, therefore a special event type else if ( event instanceof InitializeApplicationIndexEvent ) { - //does not return observable + handleInitializeApplicationIndex(event, message); - indexoperationObservable = Observable.just(new IndexOperationMessage()); - validateEmptySets = false; //do not check this one for an empty set b/c it will be empty. - } else if (event instanceof ElasticsearchIndexEvent) { - handleIndexOperation((ElasticsearchIndexEvent) event); - indexoperationObservable = Observable.just(new IndexOperationMessage()); - validateEmptySets = false; //do not check this one for an empty set b/c it will be empty. - } else { - throw new Exception("Unknown EventType");//TODO: print json instead } + // this is the main event that pulls the index doc from map persistence and hands to the index producer + else if (event instanceof ElasticsearchIndexEvent) { - //collect all of the - IndexOperationMessage indexOperationMessage = indexoperationObservable - .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single)) - .toBlocking().lastOrDefault(null); + handleIndexOperation((ElasticsearchIndexEvent) event); + + } else { - if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) { - logger.error("Received empty index sequence message:({}), body:({}) ", message.getMessageId(), - message.getStringBody()); - throw new Exception("Received empty index sequence."); + throw new Exception("Unknown EventType for message: "+ message.getStringBody()); } + //return type that can be indexed and ack'd later - return new IndexEventResult(Optional.fromNullable(message), - Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime()); + return new IndexEventResult(Optional.of(message), thisEvent.getCreationTime()); + + } catch (IndexDocNotFoundException e){ + + // this exception is throw when we wait before trying quorum read on map persistence. + // return empty event result so the event's message doesn't get ack'd + logger.info(e.getMessage()); + return new IndexEventResult(Optional.absent(), event.getCreationTime()); + } catch (Exception e) { - logger.error("Failed to index message: {} {}", message.getMessageId(), message.getStringBody(), e); - return new IndexEventResult(Optional.absent(), Optional.absent(), - event.getCreationTime()); + + // if the event fails to process, log the message and return empty event result so it doesn't get ack'd + logger.error("Failed to process message: {} {}", message.getMessageId(), message.getStringBody(), e); + return new IndexEventResult(Optional.absent(), event.getCreationTime()); } }); @@ -379,35 +373,17 @@ public void queueInitializeApplicationIndex( final ApplicationScope applicationS @Override public void queueEntityIndexUpdate(final ApplicationScope applicationScope, - final Entity entity) { + final Entity entity, long updatedAfter) { - offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),new EntityIdScope(applicationScope, entity.getId()), 0)); - } + final EntityIndexOperation entityIndexOperation = + new EntityIndexOperation( applicationScope, entity.getId(), updatedAfter); - public Observable handleEntityIndexUpdate(final QueueMessage message) { + final IndexOperationMessage indexMessage = + eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null); - Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" ); + queueIndexOperationMessage( indexMessage ); - final AsyncEvent event = ( AsyncEvent ) message.getBody(); - - Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate"); - Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass())); - - final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event; - - - //process the entity immediately - //only process the same version, otherwise ignore - final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope(); - final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); - final Id entityId = entityIdScope.getId(); - final long updatedAfter = entityIndexEvent.getUpdatedAfter(); - - final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter); - - final Observable observable = eventBuilder.buildEntityIndex( entityIndexOperation ); - return observable; } @@ -416,34 +392,17 @@ public void queueNewEdge(final ApplicationScope applicationScope, final Entity entity, final Edge newEdge) { - EdgeIndexEvent operation = new EdgeIndexEvent(queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge); - - offer( operation ); - } - - public Observable handleEdgeIndex(final QueueMessage message) { - - Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeIndex" ); - - final AsyncEvent event = (AsyncEvent) message.getBody(); - - Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" ); - Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass())); - - final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event; - - final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope(); - final Edge edge = edgeIndexEvent.getEdge(); - + final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); + final IndexOperationMessage indexMessage = ecm.load( entity.getId() ) + .flatMap( loadedEntity -> eventBuilder.buildNewEdge(applicationScope, entity, newEdge) ) + .toBlocking().lastOrDefault(null); - final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); + queueIndexOperationMessage( indexMessage ); - final Observable edgeIndexObservable = ecm.load( edgeIndexEvent.getEntityId() ).flatMap( - entity -> eventBuilder.buildNewEdge(applicationScope, entity, edge)); - return edgeIndexObservable; } + @Override public void queueDeleteEdge(final ApplicationScope applicationScope, final Edge edge) { @@ -451,7 +410,7 @@ public void queueDeleteEdge(final ApplicationScope applicationScope, offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) ); } - public Observable handleEdgeDelete(final QueueMessage message) { + public void handleEdgeDelete(final QueueMessage message) { Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" ); @@ -470,17 +429,15 @@ public Observable handleEdgeDelete(final QueueMessage mes logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge); } - return eventBuilder.buildDeleteEdge(applicationScope, edge); - } + IndexOperationMessage indexMessage = + eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(null); + queueIndexOperationMessage(indexMessage); - @Override - public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) { - - offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) ); } + /** * Queue up an indexOperationMessage for multi region execution * @param indexOperationMessage @@ -488,7 +445,7 @@ public void queueEntityDelete(final ApplicationScope applicationScope, final Id public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) { // don't try to produce something with nothing - if(indexOperationMessage.isEmpty()){ + if(indexOperationMessage == null || indexOperationMessage.isEmpty()){ return; } @@ -528,20 +485,29 @@ public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchInde final IndexOperationMessage indexOperationMessage; - if(message == null){ - logger.warn( "Received message with id {} to process, unable to find it, reading with higher consistency level", - messageId); + if(message == null) { - final String highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() ); + if ( System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) { - if(highConsistency == null){ - logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level", - messageId); + logger.warn("Received message with id {} to process, unable to find it, reading with higher consistency level", + messageId); - throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId ); - } + final String highConsistency = esMapPersistence.getStringHighConsistency(messageId.toString()); - indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class ); + if (highConsistency == null) { + logger.error("Unable to find the ES batch with id {} to process at a higher consistency level", + messageId); + + throw new RuntimeException("Unable to find the ES batch to process with message id " + messageId); + } + + indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString(highConsistency, IndexOperationMessage.class); + + } else{ + + throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId()); + + } } else{ indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class ); @@ -599,7 +565,13 @@ public long getQueueDepth() { return queue.getQueueDepth(); } - public Observable handleEntityDelete(final QueueMessage message) { + @Override + public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) { + + offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) ); + } + + public void handleEntityDelete(final QueueMessage message) { Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete"); @@ -625,7 +597,10 @@ public Observable handleEntityDelete(final QueueMessage m entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null); - return entityDeleteResults.getIndexObservable(); + IndexOperationMessage indexMessage = entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(null); + + queueIndexOperationMessage(indexMessage); + } @@ -723,16 +698,16 @@ public void call( final Subscriber> subscriber ) { } try { - List indexEventResults = - callEventHandlers( messages ); - List messagesToAck = - submitToIndex( indexEventResults ); + List indexEventResults = callEventHandlers( messages ); + List messagesToAck = ackMessages( indexEventResults ); + if ( messagesToAck == null || messagesToAck.size() == 0 ) { logger.error( "No messages came back from the queue operation, should have seen {} messages", messages.size() ); return messagesToAck; } + if ( messagesToAck.size() < messages.size() ) { logger.error( "Missing messages from queue post operation", messages, messagesToAck ); @@ -763,37 +738,24 @@ public void call( final Subscriber> subscriber ) { * @param indexEventResults * @return */ - private List submitToIndex( List indexEventResults) { + private List ackMessages(List indexEventResults) { //if nothing came back then return null if(indexEventResults==null){ return null; } - final IndexOperationMessage combined = new IndexOperationMessage(); - - //stream and filer the messages - List messagesToAck = indexEventResults.stream() - .map(indexEventResult -> { - //collect into the index submission - if (indexEventResult.getIndexOperationMessage().isPresent()) { - combined.ingest(indexEventResult.getIndexOperationMessage().get()); - } - return indexEventResult; - }) - //filter out the ones that need to be ack'd - .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent()) + // stream the messages to record the cycle time + return indexEventResults.stream() .map(indexEventResult -> { //record the cycle time messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime()); return indexEventResult; }) - //ack after successful completion of the operation. + // filter out messages that are not present, they were not processed and put into the results + .filter( result -> result.getQueueMessage().isPresent() ) .map(result -> result.getQueueMessage().get()) + // collect .collect(Collectors.toList()); - - queueIndexOperationMessage( combined ); - - return messagesToAck; } public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) { @@ -814,15 +776,11 @@ public void indexBatch(final List edges, final long updatedSince) { public class IndexEventResult{ private final Optional queueMessage; - private final Optional indexOperationMessage; private final long creationTime; - - public IndexEventResult(Optional queueMessage, Optional indexOperationMessage, long creationTime){ + public IndexEventResult(Optional queueMessage, long creationTime){ this.queueMessage = queueMessage; - this.indexOperationMessage = indexOperationMessage; - this.creationTime = creationTime; } @@ -831,10 +789,6 @@ public Optional getQueueMessage() { return queueMessage; } - public Optional getIndexOperationMessage() { - return indexOperationMessage; - } - public long getCreationTime() { return creationTime; } diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index 96da2df307..abd4ce1342 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@ -104,14 +104,14 @@ private AsyncEventService getIndexService() { switch (impl) { case LOCAL: - AmazonAsyncEventService eventService = new AmazonAsyncEventService(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory, + AsyncEventServiceImpl eventService = new AsyncEventServiceImpl(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder,mapManagerFactory, queueFig,rxTaskScheduler); eventService.MAX_TAKE = 1000; return eventService; case SQS: throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region"); case SNS: - return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, + return new AsyncEventServiceImpl(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler ); default: throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed"); diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java index 480756f53b..a47ec77430 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java @@ -38,14 +38,6 @@ */ public interface EventBuilder { - /** - * Return the cold observable of entity index update operations - * @param applicationScope - * @param entity - * @return - */ - Observable buildEntityIndexUpdate( ApplicationScope applicationScope, Entity entity ); - /** * Return the cold observable of the new edge operation * @param applicationScope @@ -69,7 +61,9 @@ public interface EventBuilder { * @param entityId * @return */ - EventBuilderImpl.EntityDeleteResults buildEntityDelete( ApplicationScope applicationScope, Id entityId ); + EntityDeleteResults buildEntityDelete(ApplicationScope applicationScope, Id entityId ); + + /** * Re-index an entity in the scope provided diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index 4e476dbc8e..2edc668395 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -73,19 +73,6 @@ public EventBuilderImpl( final IndexService indexService, } - @Override - public Observable buildEntityIndexUpdate( final ApplicationScope applicationScope, - final Entity entity ) { - //process the entity immediately - //only process the same version, otherwise ignore - - if (logger.isDebugEnabled()) { - logger.debug("Indexing in app scope {} entity {}", entity, applicationScope); - } - - return indexService.indexEntity( applicationScope, entity ); - } - @Override public Observable buildNewEdge( final ApplicationScope applicationScope, final Entity entity, @@ -118,7 +105,7 @@ public Observable buildDeleteEdge( final ApplicationScope //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter? @Override - public EntityDeleteResults buildEntityDelete( final ApplicationScope applicationScope, final Id entityId ) { + public EntityDeleteResults buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) { if (logger.isDebugEnabled()) { logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId); } diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java new file mode 100644 index 0000000000..c0e022f5c2 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.usergrid.corepersistence.asyncevents; + + +import java.util.UUID; + +public class IndexDocNotFoundException extends RuntimeException { + + final UUID batchId; + + public IndexDocNotFoundException(final UUID batchId){ + + super("Index batch ID "+batchId.toString()+" not found in map persistence"); + this.batchId = batchId; + + + } + +} diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java index 68c398f9f7..7512c90c05 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.usergrid.persistence.index.*; import org.apache.usergrid.utils.UUIDUtils; @@ -104,7 +105,8 @@ public Observable indexEntity( final ApplicationScope app //do our observable for batching //try to send a whole batch if we can - final Observable batches = sourceEdgesToIndex.buffer( indexFig.getIndexBatchSize() ) + final Observable batches = sourceEdgesToIndex + .buffer( indexFig.getIndexBatchSize(), 1000, TimeUnit.MILLISECONDS ) //map into batches based on our buffer size .flatMap( buffer -> Observable.from( buffer ) diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java index bf444b5dd1..d47e96c2de 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java @@ -230,10 +230,16 @@ private void validate( final FilterResult filterResult ) { //entity is newer than ES version, could be an update or the entity is marked as deleted - if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) { + if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || + !entity.getEntity().isPresent() || + entity.getStatus() == MvccEntity.Status.DELETED ) { - logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}", + // when updating entities, we don't delete previous versions from ES so this action is expected + if(logger.isDebugEnabled()){ + logger.debug( "Deindexing stale entity on edge {} for entityId {} and version {}", searchEdge, entityId, entityVersion); + } + batch.deindex( searchEdge, entityId, candidateVersion ); return; } diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java similarity index 87% rename from stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java rename to stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java index 625a8fdcd3..c91546486b 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java @@ -29,7 +29,7 @@ import org.apache.usergrid.corepersistence.TestIndexModule; import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; -import org.apache.usergrid.corepersistence.asyncevents.AmazonAsyncEventService; +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventServiceImpl; import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; @@ -50,7 +50,7 @@ @RunWith( EsRunner.class ) @UseModules( { TestIndexModule.class } ) @NotThreadSafe -public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest { +public class AsyncEventServiceImplTest extends AsyncIndexServiceTest { @@ -93,7 +93,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest { @Override protected AsyncEventService getAsyncEventService() { - return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler ); + return new AsyncEventServiceImpl( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler ); } diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java index 74f9ce0ab3..12a33cf92d 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java @@ -55,7 +55,6 @@ import net.jcip.annotations.NotThreadSafe; import rx.Observable; -import rx.schedulers.Schedulers; import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; import static org.junit.Assert.assertEquals; @@ -145,7 +144,7 @@ public void testMessageIndexing() throws InterruptedException { //queue up processing - asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity ); + asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity, 0); final EntityIndex EntityIndex = diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java index 4782beabc3..62102b45b3 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java @@ -19,6 +19,7 @@ import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.codahale.metrics.Histogram; @@ -130,7 +131,9 @@ private Observable processBatch( final IndexOperationMess final Observable batchOps = Observable.merge(index, deIndex); //buffer into the max size we can send ES and fire them all off until we're completed - final Observable requests = batchOps.buffer(indexFig.getIndexBatchSize()) + final Observable requests = batchOps + .buffer(indexFig.getIndexBatchSize(), 1000, TimeUnit.MILLISECONDS) + //flatten the buffer into a single batch execution .flatMap(individualOps -> Observable.from(individualOps) //collect them diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java index cdab3e0d24..88ad3ff148 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java @@ -82,4 +82,8 @@ public interface QueueFig extends GuicyFig { @Key( "usergrid.queue.visibilityTimeout" ) @Default("5000") // 5 seconds int getVisibilityTimeout(); + + @Key( "usergrid.queue.localquorum.timeout") + @Default("3000") // 3 seconds + int getLocalQuorumTimeout(); } diff --git a/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java b/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java index c439b4998d..0507818b7f 100644 --- a/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java +++ b/stack/services/src/main/java/org/apache/usergrid/services/ServiceManager.java @@ -292,7 +292,10 @@ private static Class findClass( String classname ) { } } catch ( ClassNotFoundException e1 ) { - logger.error("Could not load class", e1); + if(logger.isTraceEnabled()){ + logger.trace("Could not find class", e1); + } + } return null; }