Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Use Amazon Async SNS client, but with a customer executor that has a …
Browse files Browse the repository at this point in the history
…configurable bounded queue.
  • Loading branch information
michaelarusso committed Sep 1, 2015
1 parent b1d762e commit 7ffe7f1
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 83 deletions.
7 changes: 7 additions & 0 deletions stack/config/src/main/resources/usergrid-default.properties
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -405,6 +405,13 @@ usergrid.queue.lock.timeout=5
# #
#usergrid.queue.deliveryLimit=5 #usergrid.queue.deliveryLimit=5


# Set the number of async workers used to publish messages to SNS
#
#usergrid.queue.publish.threads=100

# Set the queue size for the number of messages that can be queued during async publishing to SNS
#
#usergrid.queue.publish.queuesize=850000






Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
*/ */
public class TaskExecutorFactory { public class TaskExecutorFactory {



public enum RejectionAction {
ABORT,
CALLERRUNS
}
/** /**
* Create a task executor * Create a task executor
* @param schedulerName * @param schedulerName
Expand All @@ -42,18 +45,27 @@ public class TaskExecutorFactory {
* @return * @return
*/ */
public static ThreadPoolExecutor createTaskExecutor( final String schedulerName, final int maxThreadCount, public static ThreadPoolExecutor createTaskExecutor( final String schedulerName, final int maxThreadCount,
final int maxQueueSize ) { final int maxQueueSize, RejectionAction rejectionAction ) {




final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize ); final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize );




final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, schedulerName, maxThreadCount ); if(rejectionAction.equals(RejectionAction.ABORT)){


return new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );


return threadPool; }
} else if(rejectionAction.equals(RejectionAction.CALLERRUNS)){

return new MaxSizeThreadPoolCallerRuns( queue, schedulerName, maxThreadCount );

}else{
//default to the thread pool with ABORT policy
return new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
}


}


/** /**
* Create a thread pool that will reject work if our audit tasks become overwhelmed * Create a thread pool that will reject work if our audit tasks become overwhelmed
Expand All @@ -65,6 +77,17 @@ public MaxSizeThreadPool( final BlockingQueue<Runnable> queue, final String pool
} }
} }


/**
* Create a thread pool that will implement CallerRunsPolicy if our tasks become overwhelmed
*/
private static final class MaxSizeThreadPoolCallerRuns extends ThreadPoolExecutor {

public MaxSizeThreadPoolCallerRuns( final BlockingQueue<Runnable> queue, final String poolName, final int maxPoolSize ) {
super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue,
new CountingThreadFactory( poolName ), new ThreadPoolExecutor.CallerRunsPolicy() );
}
}



/** /**
* Thread factory that will name and count threads for easier debugging * Thread factory that will name and count threads for easier debugging
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public ShardGroupCompactionImpl( final TimeService timeService, final GraphFig g


this.taskExecutor = MoreExecutors.listeningDecorator( TaskExecutorFactory this.taskExecutor = MoreExecutors.listeningDecorator( TaskExecutorFactory
.createTaskExecutor( "ShardCompaction", graphFig.getShardAuditWorkerCount(), .createTaskExecutor( "ShardCompaction", graphFig.getShardAuditWorkerCount(),
graphFig.getShardAuditWorkerQueueSize() ) ); graphFig.getShardAuditWorkerQueueSize(), TaskExecutorFactory.RejectionAction.ABORT ) );
} }




Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,23 +20,23 @@ public interface QueueFig extends GuicyFig {
*/ */
@Key( "usergrid.queue.region" ) @Key( "usergrid.queue.region" )
@Default("us-east-1") @Default("us-east-1")
public String getRegion(); String getRegion();


/** /**
* Flag to determine if Usergrid should use a multi-region Amazon queue * Flag to determine if Usergrid should use a multi-region Amazon queue
* implementation. * implementation.
*/ */
@Key( "usergrid.queue.multiregion" ) @Key( "usergrid.queue.multiregion" )
@Default("false") @Default("false")
public boolean isMultiRegion(); boolean isMultiRegion();


/** /**
* Comma-separated list of one or more Amazon regions to use if multiregion * Comma-separated list of one or more Amazon regions to use if multiregion
* is set to true. * is set to true.
*/ */
@Key( "usergrid.queue.regionList" ) @Key( "usergrid.queue.regionList" )
@Default("us-east-1") @Default("us-east-1")
public String getRegionList(); String getRegionList();




/** /**
Expand All @@ -45,24 +45,33 @@ public interface QueueFig extends GuicyFig {
*/ */
@Key( "usergrid.queue.retention" ) @Key( "usergrid.queue.retention" )
@Default("1209600") @Default("1209600")
public String getRetentionPeriod(); String getRetentionPeriod();


/** /**
* Set the amount of time (in minutes) to retain messages in a dead letter queue. * Set the amount of time (in minutes) to retain messages in a dead letter queue.
* 1209600 = 14 days (maximum retention period) * 1209600 = 14 days (maximum retention period)
*/ */
@Key( "usergrid.queue.deadletter.retention" ) @Key( "usergrid.queue.deadletter.retention" )
@Default("1209600") @Default("1209600")
public String getDeadletterRetentionPeriod(); String getDeadletterRetentionPeriod();


/** /**
* The maximum number of messages to deliver to a dead letter queue. * The maximum number of messages to deliver to a dead letter queue.
*/ */
@Key( "usergrid.queue.deliveryLimit" ) @Key( "usergrid.queue.deliveryLimit" )
@Default("5") @Default("5")
public String getQueueDeliveryLimit(); String getQueueDeliveryLimit();


@Key("usergrid.use.default.queue") @Key("usergrid.use.default.queue")
@Default("false") @Default("false")
public boolean overrideQueueForDefault(); boolean overrideQueueForDefault();

@Key("usergrid.queue.publish.threads")
@Default("100")
int getAsyncMaxThreads();

// current msg size 1.2kb * 850000 = 1.02 GB (let this default be the most we'll queue in heap)
@Key("usergrid.queue.publish.queuesize")
@Default("850000")
int getAsyncQueueSize();
} }
Loading

0 comments on commit 7ffe7f1

Please sign in to comment.