From 405559a5baa9b5cf06c40d426b4e40917a65f254 Mon Sep 17 00:00:00 2001 From: Michael Russo Date: Mon, 31 Aug 2015 12:59:02 -0700 Subject: [PATCH 1/2] Remove SNS Async client and use the synchronous client, putting the publish events on the system's rx async thread pool. --- .../queue/impl/SNSQueueManagerImpl.java | 74 ++++++++----------- 1 file changed, 30 insertions(+), 44 deletions(-) diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index 7e1e99ce45..8bfab8e452 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -19,10 +19,8 @@ import com.amazonaws.AmazonServiceException; -import com.amazonaws.handlers.AsyncHandler; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; -import com.amazonaws.services.sns.AmazonSNSAsyncClient; import com.amazonaws.services.sns.AmazonSNSClient; import com.amazonaws.services.sns.model.*; import com.amazonaws.services.sqs.AmazonSQSClient; @@ -41,12 +39,15 @@ import org.apache.usergrid.persistence.queue.*; import org.apache.usergrid.persistence.queue.Queue; import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; import java.util.concurrent.ExecutionException; +import rx.Observable; public class SNSQueueManagerImpl implements QueueManager { @@ -58,7 +59,7 @@ public class SNSQueueManagerImpl implements QueueManager { private final CassandraFig cassandraFig; private final AmazonSQSClient sqs; private final AmazonSNSClient sns; - private final AmazonSNSAsyncClient snsAsync; + private final RxTaskScheduler rxTaskScheduler; private final JsonFactory JSON_FACTORY = new JsonFactory(); @@ -107,16 +108,16 @@ public Queue load(String queueName) throws Exception { @Inject - public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig, CassandraFig cassandraFig) { + public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig, CassandraFig cassandraFig, final RxTaskScheduler rxTaskScheduler) { this.scope = scope; this.fig = fig; this.clusterFig = clusterFig; this.cassandraFig = cassandraFig; + this.rxTaskScheduler = rxTaskScheduler; try { sqs = createSQSClient(getRegion()); sns = createSNSClient(getRegion()); - snsAsync = createAsyncSNSClient(getRegion()); } catch (Exception e) { throw new RuntimeException("Error setting up mapper", e); @@ -265,27 +266,6 @@ private String setupTopics(final String queueName) return primaryTopicArn; } - /** - * The Asynchronous SNS client is used for publishing events to SNS. - * - */ - - private AmazonSNSAsyncClient createAsyncSNSClient(final Region region) { - final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider(); - - /** - * The Async client will use default client configurations (default max conn: 50) - * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html - * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/constant-values.html#com.amazonaws.ClientConfiguration.DEFAULT_MAX_CONNECTIONS - */ - - final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials()); - - sns.setRegion(region); - - return sns; - } - /** * The Synchronous SNS client is used for creating topics and subscribing queues. * @@ -400,7 +380,7 @@ public long getQueueDepth() { @Override public void sendMessages(final List bodies) throws IOException { - if (snsAsync == null) { + if (sns == null) { logger.error("SNS client is null, perhaps it failed to initialize successfully"); return; } @@ -413,32 +393,38 @@ public void sendMessages(final List bodies) throws IOException { @Override public void sendMessage(final Object body) throws IOException { + Observable.just(body).doOnNext(message->{ - if (snsAsync == null) { - logger.error("SNS client is null, perhaps it failed to initialize successfully"); - return; - } + if (sns == null) { + logger.error("SNS client is null, perhaps it failed to initialize successfully"); + return; + } + + final String stringBody; + try { - final String stringBody = toString(body); + stringBody = toString(body); + String topicArn = getWriteTopicArn(); - String topicArn = getWriteTopicArn(); + if (logger.isDebugEnabled()){ + logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn); + } - if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn); + PublishRequest publishRequest = new PublishRequest(topicArn, stringBody); - PublishRequest publishRequest = new PublishRequest(topicArn, stringBody); + // publish message to SNS + PublishResult publishResult = sns.publish(publishRequest); - snsAsync.publishAsync(publishRequest, new AsyncHandler() { - @Override - public void onError(Exception e) { - logger.error("Error publishing message... {}", e); + if(logger.isDebugEnabled()){ + logger.debug("Successfully published... messageID=[{}], arn=[{}]", + publishResult.getMessageId(), publishRequest.getTopicArn()); } - @Override - public void onSuccess(PublishRequest request, PublishResult result) { - if (logger.isDebugEnabled()) logger.debug("Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(), request.getTopicArn()); + } catch (IOException e) { + logger.error("Unable to convert queue object to a string message body", e); + } - } - }); + }).subscribeOn(rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); } From c905ebebd366cc25b55ca6282d64111a2e8a9716 Mon Sep 17 00:00:00 2001 From: Michael Russo Date: Mon, 31 Aug 2015 13:10:04 -0700 Subject: [PATCH 2/2] Added error handling. --- .../usergrid/persistence/queue/impl/SNSQueueManagerImpl.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java index 8bfab8e452..bcf4499516 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java @@ -424,6 +424,8 @@ public void sendMessage(final Object body) throws IOException { logger.error("Unable to convert queue object to a string message body", e); } + }).doOnError(e ->{ + logger.error("Error while publishing SNS message: ", e); }).subscribeOn(rxTaskScheduler.getAsyncIOScheduler() ).subscribe(); }