Skip to content

Commit

Permalink
windows server servicebus now xproj
Browse files Browse the repository at this point in the history
  • Loading branch information
dpvreony committed Sep 1, 2016
1 parent 26f17e3 commit fb2de58
Show file tree
Hide file tree
Showing 19 changed files with 454 additions and 307 deletions.
10 changes: 5 additions & 5 deletions Foundatio.sln
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Foundatio.RabbitMQPublishCo
EndProject
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Foundatio.RabbitMQSubscribeConsole", "samples\Foundatio.RabbitMQSubscribeConsole\Foundatio.RabbitMQSubscribeConsole.xproj", "{5250856E-2587-4ED3-AA1A-DD673EC46EF0}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Foundatio.WindowsServerServiceBus", "src\Foundatio.WindowsServerServiceBus\Foundatio.WindowsServerServiceBus.csproj", "{5CACEF55-E509-4043-B9C8-19C28D9154E3}"
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Foundatio.WindowsServerServiceBus", "src\Foundatio.WindowsServerServiceBus\Foundatio.WindowsServerServiceBus.xproj", "{2AE275A8-1541-4D97-B26D-5BEFB07B5DE7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -142,10 +142,10 @@ Global
{5250856E-2587-4ED3-AA1A-DD673EC46EF0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5250856E-2587-4ED3-AA1A-DD673EC46EF0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5250856E-2587-4ED3-AA1A-DD673EC46EF0}.Release|Any CPU.Build.0 = Release|Any CPU
{5CACEF55-E509-4043-B9C8-19C28D9154E3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5CACEF55-E509-4043-B9C8-19C28D9154E3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5CACEF55-E509-4043-B9C8-19C28D9154E3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5CACEF55-E509-4043-B9C8-19C28D9154E3}.Release|Any CPU.Build.0 = Release|Any CPU
{2AE275A8-1541-4D97-B26D-5BEFB07B5DE7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2AE275A8-1541-4D97-B26D-5BEFB07B5DE7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2AE275A8-1541-4D97-B26D-5BEFB07B5DE7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2AE275A8-1541-4D97-B26D-5BEFB07B5DE7}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
18 changes: 18 additions & 0 deletions src/Foundatio.WindowsServerServiceBus/Extensions/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

namespace Foundatio.Extensions {
internal static class TaskExtensions {
[DebuggerStepThrough]
public static ConfiguredTaskAwaitable<TResult> AnyContext<TResult>(this Task<TResult> task) {
return task.ConfigureAwait(continueOnCapturedContext: false);
}

[DebuggerStepThrough]
public static ConfiguredTaskAwaitable AnyContext(this Task task) {
return task.ConfigureAwait(continueOnCapturedContext: false);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0.25123" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">14.0.25123</VisualStudioVersion>
<VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath>
</PropertyGroup>
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.Props" Condition="'$(VSToolsPath)' != ''" />
<PropertyGroup Label="Globals">
<ProjectGuid>2ae275a8-1541-4d97-b26d-5befb07b5de7</ProjectGuid>
<RootNamespace>Foundatio.WindowsServerServiceBus</RootNamespace>
<BaseIntermediateOutputPath Condition="'$(BaseIntermediateOutputPath)'=='' ">.\obj</BaseIntermediateOutputPath>
<OutputPath Condition="'$(OutputPath)'=='' ">.\bin\</OutputPath>
</PropertyGroup>
<PropertyGroup>
<SchemaVersion>2.0</SchemaVersion>
</PropertyGroup>
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.targets" Condition="'$(VSToolsPath)' != ''" />
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -7,70 +7,50 @@
using Foundatio.Serializer;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Nito.AsyncEx;

namespace Foundatio.Queues
{
public class WindowsServerServiceBusQueue<T> : QueueBase<T> where T : class
{
private readonly string _queueName;
private readonly string _connectionString;
private readonly NamespaceManager _namespaceManager;
private readonly QueueClient _queueClient;
private QueueClient _queueClient;
private QueueDescription _queueDescription;
private long _enqueuedCount;
private long _dequeuedCount;
private long _completedCount;
private long _abandonedCount;
private long _workerErrorCount;
private readonly int _retries;
private readonly RetryPolicy _retryPolicy;
private readonly TimeSpan _workItemTimeout = TimeSpan.FromMinutes(5);

public WindowsServerServiceBusQueue(string connectionString, string queueName = null, int retries = 2, TimeSpan? workItemTimeout = null, bool shouldRecreate = false, RetryPolicy retryPolicy = null, ISerializer serializer = null, IEnumerable<IQueueBehavior<T>> behaviors = null, ILoggerFactory loggerFactory = null) : base(serializer, behaviors, loggerFactory)
{
_queueName = queueName ?? typeof(T).Name;
private readonly TimeSpan _autoDeleteOnIdle = TimeSpan.MaxValue;
private readonly TimeSpan _defaultMessageTimeToLive = TimeSpan.MaxValue;
private readonly AsyncLock _lock = new AsyncLock();

public WindowsServerServiceBusQueue(string connectionString, string queueName = null, int retries = 2, TimeSpan? workItemTimeout = null, RetryPolicy retryPolicy = null, ISerializer serializer = null, IEnumerable<IQueueBehavior<T>> behaviors = null, ILoggerFactory loggerFactory = null, TimeSpan? autoDeleteOnIdle = null, TimeSpan? defaultMessageTimeToLive = null) : base(serializer, behaviors, loggerFactory) {
_connectionString = connectionString;
if (!String.IsNullOrEmpty(queueName))
_queueName = queueName;
_namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
_retries = retries;
_retryPolicy = retryPolicy;

if (workItemTimeout.HasValue && workItemTimeout.Value < TimeSpan.FromMinutes(5))
{
_workItemTimeout = workItemTimeout.Value;
}

if (_namespaceManager.QueueExists(_queueName) && shouldRecreate)
_namespaceManager.DeleteQueue(_queueName);

if (!_namespaceManager.QueueExists(_queueName))
if (autoDeleteOnIdle.HasValue && autoDeleteOnIdle.Value >= TimeSpan.FromMinutes(5))
{
_queueDescription = new QueueDescription(_queueName)
{
MaxDeliveryCount = retries + 1,
LockDuration = _workItemTimeout
};
_namespaceManager.CreateQueue(_queueDescription);
_autoDeleteOnIdle = autoDeleteOnIdle.Value;
}
else {
_queueDescription = _namespaceManager.GetQueue(_queueName);

bool changes = false;

int newMaxDeliveryCount = retries + 1;
if (_queueDescription.MaxDeliveryCount != newMaxDeliveryCount)
{
_queueDescription.MaxDeliveryCount = newMaxDeliveryCount;
changes = true;
}

if (_queueDescription.LockDuration != _workItemTimeout)
{
_queueDescription.LockDuration = _workItemTimeout;
changes = true;
}

if (changes)
{
_namespaceManager.UpdateQueue(_queueDescription);
}
if (defaultMessageTimeToLive.HasValue && defaultMessageTimeToLive.Value > TimeSpan.Zero)
{
_defaultMessageTimeToLive = defaultMessageTimeToLive.Value;
}

_queueClient = QueueClient.CreateFromConnectionString(connectionString, _queueDescription.Path);
if (retryPolicy != null)
_queueClient.RetryPolicy = retryPolicy;
}

public override async Task DeleteQueueAsync()
Expand All @@ -93,7 +73,86 @@ public override async Task DeleteQueueAsync()
_workerErrorCount = 0;
}

public override async Task<QueueStats> GetQueueStatsAsync()
protected override async Task EnsureQueueCreatedAsync(CancellationToken cancellationToken = new CancellationToken())
{
if (_queueClient != null)
{
return;
}

using (await _lock.LockAsync(cancellationToken))
{
if (_queueClient != null)
{
return;
}

QueueDescription queueDescription;

if (!await _namespaceManager.QueueExistsAsync(_queueName).AnyContext())
{
try
{
queueDescription = await _namespaceManager.CreateQueueAsync(new QueueDescription(_queueName)
{
MaxDeliveryCount = _retries + 1,
LockDuration = _workItemTimeout,
AutoDeleteOnIdle = _autoDeleteOnIdle,
DefaultMessageTimeToLive = _defaultMessageTimeToLive
}).AnyContext();
}
catch (MessagingException)
{
queueDescription = await _namespaceManager.GetQueueAsync(_queueName).AnyContext();
}
}
else
{
queueDescription = await _namespaceManager.GetQueueAsync(_queueName).AnyContext();

bool changes = false;

int newMaxDeliveryCount = _retries + 1;
if (queueDescription.MaxDeliveryCount != newMaxDeliveryCount)
{
queueDescription.MaxDeliveryCount = newMaxDeliveryCount;
changes = true;
}

if (queueDescription.LockDuration != _workItemTimeout)
{
queueDescription.LockDuration = _workItemTimeout;
changes = true;
}

if (queueDescription.AutoDeleteOnIdle != _autoDeleteOnIdle)
{
queueDescription.AutoDeleteOnIdle = _autoDeleteOnIdle;
changes = true;
}

if (queueDescription.DefaultMessageTimeToLive != _defaultMessageTimeToLive)
{
queueDescription.DefaultMessageTimeToLive = _defaultMessageTimeToLive;
changes = true;
}

if (changes)
{
await _namespaceManager.UpdateQueueAsync(queueDescription).AnyContext();
}
}

_queueClient = QueueClient.CreateFromConnectionString(_connectionString, queueDescription.Path);

if (_retryPolicy != null)
{
_queueClient.RetryPolicy = _retryPolicy;
}
}
}

protected override async Task<QueueStats> GetQueueStatsImplAsync()
{
var q = await _namespaceManager.GetQueueAsync(_queueName).AnyContext();
return new QueueStats
Expand All @@ -110,12 +169,12 @@ public override async Task<QueueStats> GetQueueStatsAsync()
};
}

public override Task<IEnumerable<T>> GetDeadletterItemsAsync(CancellationToken cancellationToken = default(CancellationToken))
protected override Task<IEnumerable<T>> GetDeadletterItemsImplAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

public override async Task<string> EnqueueAsync(T data)
protected override async Task<string> EnqueueImplAsync(T data)
{
if (!await OnEnqueuingAsync(data).AnyContext())
return null;
Expand All @@ -130,7 +189,7 @@ public override async Task<string> EnqueueAsync(T data)
return message.MessageId;
}

public override void StartWorking(Func<IQueueEntry<T>, CancellationToken, Task> handler, bool autoComplete = false, CancellationToken cancellationToken = default(CancellationToken))
protected override void StartWorkingImpl(Func<IQueueEntry<T>, CancellationToken, Task> handler, bool autoComplete, CancellationToken cancellationToken)
{
if (handler == null)
throw new ArgumentNullException(nameof(handler));
Expand Down Expand Up @@ -163,7 +222,7 @@ public override async Task<IQueueEntry<T>> DequeueAsync(TimeSpan? timeout = null
}
}

public override Task<IQueueEntry<T>> DequeueAsync(CancellationToken cancellationToken)
protected override Task<IQueueEntry<T>> DequeueImplAsync(CancellationToken cancellationToken)
{
_logger.Warn("Azure Service Bus does not support CancellationTokens - use TimeSpan overload instead. Using default 30 second timeout.");

Expand Down
Loading

0 comments on commit fb2de58

Please sign in to comment.