Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
Now supports elasticsearch.queue_impl=MULTIREGION setting instead of …
Browse files Browse the repository at this point in the history
…SNS, also more/better DEBUG logging
  • Loading branch information
snoopdave committed Sep 23, 2016
1 parent 00eb139 commit 2cd8ecb
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 42 deletions.
Expand Up @@ -103,16 +103,44 @@ private AsyncEventService getIndexService() {
final Implementations impl = Implementations.valueOf(value);

switch (impl) {

case LOCAL:
AsyncEventServiceImpl eventService = new AsyncEventServiceImpl(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory,
entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder,mapManagerFactory, queueFig,rxTaskScheduler);
AsyncEventServiceImpl eventService =
new AsyncEventServiceImpl(scope -> new LocalQueueManager(),
indexProcessorFig,
indexProducer,
metricsFactory,
entityCollectionManagerFactory,
indexLocationStrategyFactory,
entityIndexFactory,
eventBuilder,
mapManagerFactory,
queueFig,rxTaskScheduler);
eventService.MAX_TAKE = 1000;
return eventService;

case SQS:
throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region");
throw new IllegalArgumentException(
"Configuration value of SQS is no longer allowed. Use SNS instead with only a single region.");

case SNS:
return new AsyncEventServiceImpl(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler );
throw new IllegalArgumentException(
"Configuration value of SNS is no longer allowed. Use MULTIREGION instead. ");

case MULTIREGION:
return new AsyncEventServiceImpl(
queueManagerFactory,
indexProcessorFig,
indexProducer,
metricsFactory,
entityCollectionManagerFactory,
indexLocationStrategyFactory,
entityIndexFactory,
eventBuilder,
mapManagerFactory,
queueFig,
rxTaskScheduler );

default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
}
Expand All @@ -135,12 +163,12 @@ private String getErrorValues() {
/**
* Different implementations
*/
public static enum Implementations { //TODO see about removing SNS and SQS and use AMZN? - michaelarusso
public static enum Implementations {
TEST,
LOCAL,
SQS,
SNS;

SQS, // deprecated
SNS, // deprecated
MULTIREGION; // built-in Akka-based queue

public String asString() {
return toString();
Expand Down
Expand Up @@ -37,11 +37,10 @@
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.text.DecimalFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;


public class QueueActor extends UntypedActor {
Expand All @@ -62,6 +61,10 @@ public class QueueActor extends UntypedActor {
private final Map<String, ActorRef> queueTimeoutersByQueueName = new HashMap<>();
private final Map<String, ActorRef> shardAllocatorsByQueueName = new HashMap<>();

private final AtomicLong runCount = new AtomicLong(0);
private final AtomicLong messageCount = new AtomicLong(0);
private final Set<String> queuesSeen = new HashSet<>();


public QueueActor() {

Expand All @@ -81,6 +84,8 @@ public void onReceive(Object message) {
if ( message instanceof QueueInitRequest) {
QueueInitRequest request = (QueueInitRequest)message;

queuesSeen.add( request.getQueueName() );

if ( refreshSchedulersByQueueName.get( request.getQueueName() ) == null ) {
Cancellable scheduler = getContext().system().scheduler().schedule(
Duration.create( 0, TimeUnit.MILLISECONDS),
Expand Down Expand Up @@ -120,6 +125,8 @@ public void onReceive(Object message) {
} else if ( message instanceof QueueRefreshRequest ) {
QueueRefreshRequest request = (QueueRefreshRequest)message;

queuesSeen.add( request.getQueueName() );

if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {

if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
Expand All @@ -135,6 +142,8 @@ public void onReceive(Object message) {
} else if ( message instanceof QueueTimeoutRequest ) {
QueueTimeoutRequest request = (QueueTimeoutRequest)message;

queuesSeen.add( request.getQueueName() );

if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == null ) {
ActorRef readerRef = getContext().actorOf( Props.create(
QueueTimeouter.class, request.getQueueName()), request.getQueueName() + "_timeouter");
Expand All @@ -148,6 +157,8 @@ public void onReceive(Object message) {
} else if ( message instanceof ShardCheckRequest ) {
ShardCheckRequest request = (ShardCheckRequest)message;

queuesSeen.add( request.getQueueName() );

if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == null ) {
ActorRef readerRef = getContext().actorOf( Props.create(
ShardAllocator.class, request.getQueueName()), request.getQueueName() + "_shard_allocator");
Expand All @@ -164,6 +175,8 @@ public void onReceive(Object message) {
try {
QueueGetRequest queueGetRequest = (QueueGetRequest) message;

queuesSeen.add( queueGetRequest.getQueueName() );

Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>();

while (queueMessages.size() < queueGetRequest.getNumRequested()) {
Expand All @@ -189,6 +202,36 @@ public void onReceive(Object message) {
getSender().tell( new QueueGetResponse(
DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() );

long runs = runCount.incrementAndGet();
long messagesReturned = messageCount.addAndGet( queueMessages.size() );

if ( logger.isDebugEnabled() && runs % 100 == 0 ) {

final DecimalFormat format = new DecimalFormat("##.###");
final long nano = 1000000000;
Timer t = metricsService.getMetricRegistry().timer(MetricsService.GET_TIME_GET );

logger.debug("QueueActor get stats (queues {}):\n" +
" Num runs={}\n" +
" Messages={}\n" +
" Mean={}\n" +
" One min rate={}\n" +
" Five min rate={}\n" +
" Snapshot mean={}\n" +
" Snapshot min={}\n" +
" Snapshot max={}",
queuesSeen.toArray(),
t.getCount(),
messagesReturned,
format.format( t.getMeanRate() ),
format.format( t.getOneMinuteRate() ),
format.format( t.getFiveMinuteRate() ),
format.format( t.getSnapshot().getMean() / nano ),
format.format( (double) t.getSnapshot().getMin() / nano ),
format.format( (double) t.getSnapshot().getMax() / nano ) );
}


} finally {
timer.close();
}
Expand All @@ -201,6 +244,8 @@ public void onReceive(Object message) {

QueueAckRequest queueAckRequest = (QueueAckRequest) message;

queuesSeen.add( queueAckRequest.getQueueName() );

DistributedQueueService.Status status = queueActorHelper.ackQueueMessage(
queueAckRequest.getQueueName(),
queueAckRequest.getQueueMessageId() );
Expand Down
Expand Up @@ -39,28 +39,31 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.DecimalFormat;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;


public class QueueRefresher extends UntypedActor {
private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class );

private final String queueName;

private final QueueMessageSerialization serialization;
private final InMemoryQueue inMemoryQueue;
private final QakkaFig qakkaFig;
private final ActorSystemFig actorSystemFig;
private final MetricsService metricsService;
private final String queueName;
private final InMemoryQueue inMemoryQueue;
private final QakkaFig qakkaFig;
private final ActorSystemFig actorSystemFig;
private final MetricsService metricsService;
private final CassandraClient cassandraClient;

private final AtomicLong runCount = new AtomicLong(0);
private final AtomicLong totalRead = new AtomicLong(0);


public QueueRefresher(String queueName ) {
this.queueName = queueName;

Injector injector = App.INJECTOR;

serialization = injector.getInstance( QueueMessageSerialization.class );
inMemoryQueue = injector.getInstance( InMemoryQueue.class );
qakkaFig = injector.getInstance( QakkaFig.class );
actorSystemFig = injector.getInstance( ActorSystemFig.class );
Expand All @@ -76,7 +79,7 @@ public void onReceive(Object message) {

QueueRefreshRequest request = (QueueRefreshRequest) message;

logger.debug( "running for queue {}", queueName );
//logger.debug( "running for queue {}", queueName );

if (!request.getQueueName().equals( queueName )) {
throw new QakkaRuntimeException(
Expand Down Expand Up @@ -109,10 +112,39 @@ public void onReceive(Object message) {
count++;
}

if ( count > 0 ) {
logger.debug( "Added {} in-memory for queue {}, new size = {}",
count, queueName, inMemoryQueue.size( queueName ) );
long runs = runCount.incrementAndGet();
long readCount = totalRead.addAndGet( count );

if ( logger.isDebugEnabled() && runs % 100 == 0 ) {

final DecimalFormat format = new DecimalFormat("##.###");
final long nano = 1000000000;
Timer t = metricsService.getMetricRegistry().timer(MetricsService.REFRESH_TIME );

logger.debug("QueueRefresher for queue '{}' stats:\n" +
" Num runs={}\n" +
" Read count={}\n" +
" Mean={}\n" +
" One min rate={}\n" +
" Five min rate={}\n" +
" Snapshot mean={}\n" +
" Snapshot min={}\n" +
" Snapshot max={}",
queueName,
t.getCount(),
readCount,
format.format( t.getMeanRate() ),
format.format( t.getOneMinuteRate() ),
format.format( t.getFiveMinuteRate() ),
format.format( t.getSnapshot().getMean() / nano ),
format.format( (double) t.getSnapshot().getMin() / nano ),
format.format( (double) t.getSnapshot().getMax() / nano ) );
}

// if ( count > 0 ) {
// logger.debug( "Added {} in-memory for queue {}, new size = {}",
// count, queueName, inMemoryQueue.size( queueName ) );
// }
}

} finally {
Expand Down
Expand Up @@ -40,8 +40,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.DecimalFormat;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;


public class QueueTimeouter extends UntypedActor {
Expand All @@ -54,9 +56,11 @@ public class QueueTimeouter extends UntypedActor {
private final ActorSystemFig actorSystemFig;
private final QakkaFig qakkaFig;
private final CassandraClient cassandraClient;

private final MessageCounterSerialization messageCounterSerialization;

private final AtomicLong runCount = new AtomicLong(0);
private final AtomicLong totalTimedout = new AtomicLong(0);


public QueueTimeouter(String queueName ) {
this.queueName = queueName;
Expand Down Expand Up @@ -137,13 +141,43 @@ public void onReceive(Object message) {
}
}

if (count > 0) {
logger.debug( "Timed out {} messages for queue {}", count, queueName );

messageCounterSerialization.decrementCounter(
queueName, DatabaseQueueMessage.Type.DEFAULT, count);
long runs = runCount.incrementAndGet();
long timeoutCount = totalTimedout.addAndGet( count );

if ( logger.isDebugEnabled() && runs % 100 == 0 ) {

final DecimalFormat format = new DecimalFormat("##.###");
final long nano = 1000000000;
Timer t = metricsService.getMetricRegistry().timer(MetricsService.TIMEOUT_TIME );

logger.debug("QueueTimeouter for queue '{}' stats:\n" +
" Num runs={}\n" +
" Timeout count={}\n" +
" Mean={}\n" +
" One min rate={}\n" +
" Five min rate={}\n" +
" Snapshot mean={}\n" +
" Snapshot min={}\n" +
" Snapshot max={}",
queueName,
t.getCount(),
timeoutCount,
format.format( t.getMeanRate() ),
format.format( t.getOneMinuteRate() ),
format.format( t.getFiveMinuteRate() ),
format.format( t.getSnapshot().getMean() / nano ),
format.format( (double) t.getSnapshot().getMin() / nano ),
format.format( (double) t.getSnapshot().getMax() / nano ) );
}

// if (count > 0) {
// logger.debug( "Timed out {} messages for queue {}", count, queueName );
//
// messageCounterSerialization.decrementCounter(
// queueName, DatabaseQueueMessage.Type.DEFAULT, count);
// }

} finally {
timer.close();
}
Expand Down
Expand Up @@ -39,10 +39,7 @@
import scala.concurrent.Await;
import scala.concurrent.Future;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.TimeUnit;


Expand Down Expand Up @@ -168,7 +165,7 @@ public DistributedQueueService.Status sendMessageToRegion(
logger.debug("SUCCESS after {} retries", retries );
}

logger.debug("{} Requesting refresh if empty for queue: {}", this, queueName);
// send refresh-queue-if-empty message
QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false );
clientActor.tell( qrr, null );

Expand Down Expand Up @@ -221,6 +218,11 @@ public Collection<DatabaseQueueMessage> getNextMessagesInternal( String queueNam
throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
}

if ( actorSystemManager.getClientActor() == null || !actorSystemManager.isReady() ) {
logger.error("Akka Actor System is not ready yet for requests.");
return Collections.EMPTY_LIST;
}

int maxRetries = qakkaFig.getMaxGetRetries();
int retries = 0;

Expand Down

0 comments on commit 2cd8ecb

Please sign in to comment.