Permalink
Browse files

Allow PollingDuration SQS to be configurable

  • Loading branch information...
mythz committed Nov 28, 2018
1 parent c1b106e commit c6594215b15d9f962a797d5a7bd380b478ca55a4
Showing with 10 additions and 5 deletions.
  1. +6 −3 src/ServiceStack.Aws/Sqs/SqsMqServer.cs
  2. +4 −2 src/ServiceStack.Aws/Sqs/SqsMqWorker.cs
@@ -13,6 +13,8 @@ public class SqsMqServer : BaseMqServer<SqsMqWorker>
private readonly ISqsMqMessageFactory sqsMqMessageFactory;
public TimeSpan PollingDuration { get; set; } = TimeSpan.FromMilliseconds(1000);
public SqsMqServer() : this(new SqsConnectionFactory()) { }
public SqsMqServer(string awsAccessKey, string awsSecretKey, RegionEndpoint region)
@@ -201,8 +203,8 @@ protected override void Init()
// Base in q and workers
sqsMqMessageFactory.QueueManager.CreateQueue(info.QueueNames.In, info, dlqDefinition.QueueArn);
info.ThreadCount.Times(i => workers.Add(new SqsMqWorker(sqsMqMessageFactory, info,
info.QueueNames.In, WorkerErrorHandler)));
info.ThreadCount.Times(i => workers.Add(
new SqsMqWorker(sqsMqMessageFactory, info, info.QueueNames.In, WorkerErrorHandler) { PollingDuration = PollingDuration }));
// Need an outq?
if (PublishResponsesWhitelist == null || PublishResponsesWhitelist.Any(x => x == msgType.Name))
@@ -215,7 +217,8 @@ protected override void Init()
{ // Need priority queue and workers
sqsMqMessageFactory.QueueManager.CreateQueue(info.QueueNames.Priority, info, dlqDefinition.QueueArn);
info.ThreadCount.Times(i => workers.Add(new SqsMqWorker(sqsMqMessageFactory, info, info.QueueNames.Priority, WorkerErrorHandler)));
info.ThreadCount.Times(i => workers.Add(
new SqsMqWorker(sqsMqMessageFactory, info, info.QueueNames.Priority, WorkerErrorHandler) { PollingDuration = PollingDuration }));
}
}
}
@@ -21,6 +21,8 @@ public class SqsMqWorker : IMqWorker<SqsMqWorker>
private Thread bgThread;
private int status;
private int totalMessagesProcessed;
public TimeSpan PollingDuration { get; set; } = TimeSpan.FromMilliseconds(1000);
public SqsMqWorker(ISqsMqMessageFactory mqFactory,
SqsMqWorkerInfo queueWorkerInfo,
@@ -169,7 +171,7 @@ private void StartPolling()
totalMessagesProcessed += msgsProcessedThisTime;
Monitor.Wait(_msgLock, millisecondsTimeout: 1000);
Monitor.Wait(_msgLock, timeout: PollingDuration);
retryCount = 0;
}
@@ -180,7 +182,7 @@ private void StartPolling()
catch (Exception ex)
{
if (Interlocked.CompareExchange(ref status, 0, 0) != WorkerStatus.Started)
{ // No longer suppossed to be running...
{ // No longer supposed to be running...
return;
}

0 comments on commit c659421

Please sign in to comment.