Skip to content

Commit

Permalink
allow message id for enqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
dpvreony committed Sep 1, 2016
1 parent fb2de58 commit 50f18ce
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 8 deletions.
7 changes: 7 additions & 0 deletions Foundatio.sln
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Foundatio.RabbitMQSubscribe
EndProject
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Foundatio.WindowsServerServiceBus", "src\Foundatio.WindowsServerServiceBus\Foundatio.WindowsServerServiceBus.xproj", "{2AE275A8-1541-4D97-B26D-5BEFB07B5DE7}"
EndProject
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Foundatio.WindowsServerServiceBus.Tests", "test\Foundatio.WindowsServerServiceBus.Tests\Foundatio.WindowsServerServiceBus.Tests.xproj", "{053AD861-48E8-48F4-9A39-D40A17C2DB4A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -146,6 +148,10 @@ Global
{2AE275A8-1541-4D97-B26D-5BEFB07B5DE7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2AE275A8-1541-4D97-B26D-5BEFB07B5DE7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2AE275A8-1541-4D97-B26D-5BEFB07B5DE7}.Release|Any CPU.Build.0 = Release|Any CPU
{053AD861-48E8-48F4-9A39-D40A17C2DB4A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{053AD861-48E8-48F4-9A39-D40A17C2DB4A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{053AD861-48E8-48F4-9A39-D40A17C2DB4A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{053AD861-48E8-48F4-9A39-D40A17C2DB4A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -162,5 +168,6 @@ Global
{CE14E239-DC13-423A-BE23-2B806F2313FD} = {70515E66-DAF8-4D18-8F8F-8A2934171AA9}
{E3BE24B5-FC1F-41A0-AAE8-B31D84162D78} = {A1DFF80C-113F-4FEC-84BB-1E3790FB410F}
{5250856E-2587-4ED3-AA1A-DD673EC46EF0} = {A1DFF80C-113F-4FEC-84BB-1E3790FB410F}
{053AD861-48E8-48F4-9A39-D40A17C2DB4A} = {70515E66-DAF8-4D18-8F8F-8A2934171AA9}
EndGlobalSection
EndGlobal
17 changes: 16 additions & 1 deletion src/Foundatio.AzureServiceBus/Queues/AzureServiceBusQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,22 @@ public class AzureServiceBusQueue<T> : QueueBase<T> where T : class {

return message.MessageId;
}


protected override async Task<string> EnqueueImplAsync(string messageId, T data)
{
if (!await OnEnqueuingAsync(data).AnyContext())
return null;

Interlocked.Increment(ref _enqueuedCount);
var message = new BrokeredMessage(data) {MessageId = messageId};
await _queueClient.SendAsync(message).AnyContext();

var entry = new QueueEntry<T>(message.MessageId, data, this, SystemClock.UtcNow, 0);
await OnEnqueuedAsync(entry).AnyContext();

return message.MessageId;
}

protected override void StartWorkingImpl(Func<IQueueEntry<T>, CancellationToken, Task> handler, bool autoComplete, CancellationToken cancellationToken) {
if (handler == null)
throw new ArgumentNullException(nameof(handler));
Expand Down
15 changes: 15 additions & 0 deletions src/Foundatio.AzureStorage/Queues/AzureStorageQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ public class AzureStorageQueue<T> : QueueBase<T> where T : class {
return message.Id;
}

protected override async Task<string> EnqueueImplAsync(string messageId, T data)
{
if (!await OnEnqueuingAsync(data).AnyContext())
return null;

Interlocked.Increment(ref _enqueuedCount);
var message = new CloudQueueMessage(messageId, null);
message.SetMessageContent(await _serializer.SerializeAsync(data));
await _queueReference.AddMessageAsync(message).AnyContext();

var entry = new QueueEntry<T>(message.Id, data, this, SystemClock.UtcNow, 0);
await OnEnqueuedAsync(entry).AnyContext();

return message.Id;
}
protected override async Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken cancellationToken) {
// TODO: Use cancellation token overloads
var linkedCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_queueDisposedCancellationTokenSource.Token, cancellationToken).Token;
Expand Down
17 changes: 13 additions & 4 deletions src/Foundatio.Redis/Queues/RedisQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,18 @@ public class RedisQueue<T> : QueueBase<T> where T : class {
return String.Concat("q:", _queueName, ":in");
}

protected override async Task<string> EnqueueImplAsync(T data) {
protected override async Task<string> EnqueueImplAsync(T data)
{
string id = Guid.NewGuid().ToString("N");
return await EnqueueImplAsync(id, data);
}

protected override async Task<string> EnqueueImplAsync(string id, T data)
{
_logger.Debug("Queue {_queueName} enqueue item: {id}", _queueName, id);

if (!await OnEnqueuingAsync(data).AnyContext()) {
if (!await OnEnqueuingAsync(data).AnyContext())
{
_logger.Trace("Aborting enqueue item: {id}", id);
return null;
}
Expand All @@ -196,9 +203,11 @@ public class RedisQueue<T> : QueueBase<T> where T : class {
await Run.WithRetriesAsync(() => Database.ListLeftPushAsync(QueueListName, id), logger: _logger).AnyContext();

// This should pulse the monitor.
try {
try
{
await Run.WithRetriesAsync(() => _subscriber.PublishAsync(GetTopicName(), id), logger: _logger).AnyContext();
} catch { }
}
catch { }

Interlocked.Increment(ref _enqueuedCount);
var entry = new QueueEntry<T>(id, data, this, now, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,21 @@ protected override async Task<string> EnqueueImplAsync(T data)
return message.MessageId;
}

protected override async Task<string> EnqueueImplAsync(string messageId, T data)
{
if (!await OnEnqueuingAsync(data).AnyContext())
return null;

Interlocked.Increment(ref _enqueuedCount);
var message = new BrokeredMessage(data) {MessageId = messageId};
await _queueClient.SendAsync(message).AnyContext();

var entry = new QueueEntry<T>(message.MessageId, data, this, DateTime.UtcNow, 0);
await OnEnqueuedAsync(entry).AnyContext();

return message.MessageId;
}

protected override void StartWorkingImpl(Func<IQueueEntry<T>, CancellationToken, Task> handler, bool autoComplete, CancellationToken cancellationToken)
{
if (handler == null)
Expand Down
21 changes: 21 additions & 0 deletions src/Foundatio/Queues/InMemoryQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,27 @@ public class InMemoryQueue<T> : QueueBase<T> where T : class {
return id;
}

protected override async Task<string> EnqueueImplAsync(string messageId, T data)
{
_logger.Trace("Queue {0} enqueue item: {1}", _queueName, messageId);

if (!await OnEnqueuingAsync(data).AnyContext())
return null;

var entry = new QueueEntry<T>(messageId, data.Copy(), this, SystemClock.UtcNow, 0);
_queue.Enqueue(entry);
_logger.Trace("Enqueue: Set Event");

using (await _monitor.EnterAsync())
_monitor.Pulse();
Interlocked.Increment(ref _enqueuedCount);

await OnEnqueuedAsync(entry).AnyContext();
_logger.Trace("Enqueue done");

return messageId;
}

protected override void StartWorkingImpl(Func<IQueueEntry<T>, CancellationToken, Task> handler, bool autoComplete = false, CancellationToken cancellationToken = default(CancellationToken)) {
if (handler == null)
throw new ArgumentNullException(nameof(handler));
Expand Down
8 changes: 8 additions & 0 deletions src/Foundatio/Queues/QueueBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@ public abstract class QueueBase<T> : MaintenanceBase, IQueue<T> where T : class
protected abstract Task EnsureQueueCreatedAsync(CancellationToken cancellationToken = default(CancellationToken));

protected abstract Task<string> EnqueueImplAsync(T data);
protected abstract Task<string> EnqueueImplAsync(string messageId, T data);

public async Task<string> EnqueueAsync(T data) {
await EnsureQueueCreatedAsync().AnyContext();
return await EnqueueImplAsync(data).AnyContext();
}

public async Task<string> EnqueueAsync(string messageId, T data)
{
await EnsureQueueCreatedAsync().AnyContext();
return await EnqueueImplAsync(messageId, data).AnyContext();
}

protected abstract Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken cancellationToken);
public async Task<IQueueEntry<T>> DequeueAsync(CancellationToken cancellationToken) {
await EnsureQueueCreatedAsync(cancellationToken).AnyContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
</PropertyGroup>
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.Props" Condition="'$(VSToolsPath)' != ''" />
<PropertyGroup Label="Globals">
<ProjectGuid>dfcfb06a-396d-4d80-adb7-0b4b32e5d1ce</ProjectGuid>
<ProjectGuid>053ad861-48e8-48f4-9a39-d40a17c2db4a</ProjectGuid>
<RootNamespace>Foundatio.Azure.Tests</RootNamespace>
<BaseIntermediateOutputPath Condition="'$(BaseIntermediateOutputPath)'=='' ">.\obj</BaseIntermediateOutputPath>
<OutputPath Condition="'$(OutputPath)'=='' ">.\bin\</OutputPath>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class AzureServiceBusMessageBusTests : MessageBusTestBase {
if (String.IsNullOrEmpty(Configuration.GetConnectionString("ServiceBusConnectionString")))
return null;

return new AzureServiceBusMessageBus(Configuration.GetConnectionString("ServiceBusConnectionString"), _topicName, loggerFactory: Log);
return new WindowsServerServiceBusMessageBus(Configuration.GetConnectionString("ServiceBusConnectionString"), _topicName, loggerFactory: Log);
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class AzureServiceBusQueueTests : QueueTestBase {
var retryPolicy = new RetryExponential(retryDelay.Value, maxBackoff, retries + 1);

_logger.Debug("Queue Id: {queueId}", _queueName);
return new AzureServiceBusQueue<SimpleWorkItem>(Configuration.GetConnectionString("ServiceBusConnectionString"),
return new WindowsServerServiceBusQueue<SimpleWorkItem>(Configuration.GetConnectionString("ServiceBusConnectionString"),
_queueName, retries, workItemTimeout, retryPolicy, loggerFactory: Log);
}

Expand Down

0 comments on commit 50f18ce

Please sign in to comment.