Skip to content

Commit

Permalink
SB tweak, update of Redis client
Browse files Browse the repository at this point in the history
  • Loading branch information
phatboyg committed Jan 17, 2019
1 parent 1f6b042 commit 35f7652
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
namespace MassTransit.Azure.ServiceBus.Core.Transport
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Contexts;
Expand Down Expand Up @@ -61,7 +59,8 @@ Task ISendTransport.Send<T>(T message, IPipe<SendContext<T>> pipe, CancellationT
await pipe.Send(context).ConfigureAwait(false);
if (message is CancelScheduledMessage cancelScheduledMessage
&& (context.TryGetScheduledMessageId(out var sequenceNumber) || context.TryGetSequencyNumber(cancelScheduledMessage.TokenId, out sequenceNumber)))
&& (context.TryGetScheduledMessageId(out var sequenceNumber)
|| context.TryGetSequencyNumber(cancelScheduledMessage.TokenId, out sequenceNumber)))
{
try
{
Expand All @@ -81,37 +80,24 @@ Task ISendTransport.Send<T>(T message, IPipe<SendContext<T>> pipe, CancellationT
await _observers.PreSend(context).ConfigureAwait(false);
var brokeredMessage = new Message(context.Body)
{
ContentType = context.ContentType.MediaType
};
brokeredMessage.UserProperties.SetTextHeaders(context.Headers, (_, text) => text);
if (context.TimeToLive.HasValue)
brokeredMessage.TimeToLive = context.TimeToLive.Value;
if (context.MessageId.HasValue)
brokeredMessage.MessageId = context.MessageId.Value.ToString("N");
if (context.CorrelationId.HasValue)
brokeredMessage.CorrelationId = context.CorrelationId.Value.ToString("N");
CopyIncomingIdentifiersIfPresent(context);
if (context.PartitionKey != null)
brokeredMessage.PartitionKey = context.PartitionKey;
var sessionId = string.IsNullOrWhiteSpace(context.SessionId) ? context.ConversationId?.ToString("N") : context.SessionId;
if (!string.IsNullOrWhiteSpace(sessionId))
{
brokeredMessage.SessionId = sessionId;
var brokeredMessage = CreateBrokeredMessage(context);
if (context.ReplyToSessionId == null)
brokeredMessage.ReplyToSessionId = sessionId;
}
if (context.ReplyToSessionId != null)
brokeredMessage.ReplyToSessionId = context.ReplyToSessionId;
if (context.ScheduledEnqueueTimeUtc.HasValue)
if (context.ScheduledEnqueueTimeUtc.HasValue && context.ScheduledEnqueueTimeUtc.Value < DateTime.UtcNow)
{
var enqueueTimeUtc = context.ScheduledEnqueueTimeUtc.Value;
sequenceNumber = await clientContext.ScheduleSend(brokeredMessage, enqueueTimeUtc).ConfigureAwait(false);
try
{
sequenceNumber = await clientContext.ScheduleSend(brokeredMessage, enqueueTimeUtc).ConfigureAwait(false);
}
catch (ArgumentOutOfRangeException exception)
{
brokeredMessage = CreateBrokeredMessage(context);
await clientContext.Send(brokeredMessage).ConfigureAwait(false);
sequenceNumber = 0;
}
context.SetScheduledMessageId(sequenceNumber);
Expand All @@ -137,6 +123,44 @@ Task ISendTransport.Send<T>(T message, IPipe<SendContext<T>> pipe, CancellationT
return _source.Send(clientPipe, cancellationToken);
}

Message CreateBrokeredMessage<T>(AzureServiceBusSendContext<T> context)
where T : class
{
var brokeredMessage = new Message(context.Body)
{
ContentType = context.ContentType.MediaType
};

brokeredMessage.UserProperties.SetTextHeaders(context.Headers, (_, text) => text);

if (context.TimeToLive.HasValue)
brokeredMessage.TimeToLive = context.TimeToLive.Value;

if (context.MessageId.HasValue)
brokeredMessage.MessageId = context.MessageId.Value.ToString("N");

if (context.CorrelationId.HasValue)
brokeredMessage.CorrelationId = context.CorrelationId.Value.ToString("N");

CopyIncomingIdentifiersIfPresent(context);
if (context.PartitionKey != null)
brokeredMessage.PartitionKey = context.PartitionKey;

var sessionId = string.IsNullOrWhiteSpace(context.SessionId) ? context.ConversationId?.ToString("N") : context.SessionId;
if (!string.IsNullOrWhiteSpace(sessionId))
{
brokeredMessage.SessionId = sessionId;

if (context.ReplyToSessionId == null)
brokeredMessage.ReplyToSessionId = sessionId;
}

if (context.ReplyToSessionId != null)
brokeredMessage.ReplyToSessionId = context.ReplyToSessionId;

return brokeredMessage;
}

public ConnectHandle ConnectSendObserver(ISendObserver observer)
{
return _observers.Connect(observer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
<PackageReference Include="NUnit" Version="3.11.0" />
<PackageReference Include="NUnit3TestAdapter" Version="3.12.0" />
<PackageReference Include="Shouldly" Version="3.0.0" />
<PackageReference Include="StackExchange.Redis" Version="1.2.6" />
<PackageReference Include="StackExchange.Redis" Version="1.2.6" Condition="'$(TargetFramework)' == 'net452'"/>
<PackageReference Include="StackExchange.Redis" Version="2.0.519" Condition="'$(TargetFramework)' == 'netcoreapp2.0'"/>
<ProjectReference Include="..\..\MassTransit.TestFramework\MassTransit.TestFramework.csproj" />
<ProjectReference Include="..\..\MassTransit\MassTransit.csproj" />
<ProjectReference Include="..\MassTransit.RedisIntegration\MassTransit.RedisIntegration.csproj" />
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net452</TargetFrameworks>
<LangVersion>latest</LangVersion>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<AllowedOutputExtensionsInPackageBuildOutputFolder>$(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb</AllowedOutputExtensionsInPackageBuildOutputFolder>
</PropertyGroup>
<PropertyGroup>
<PackageId>MassTransit.Redis</PackageId>
<Title>MassTransit.Redis</Title>
<Description>MassTransit Redis support; $(Description)</Description>
<DocumentationFile>bin\$(Configuration)\$(TargetFramework)\MassTransit.RedisIntegration.xml</DocumentationFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="GreenPipes" Version="2.1.3"/>
<PackageReference Include="NewId" Version="3.0.1"/>
<PackageReference Include="Newtonsoft.Json" Version="11.0.2"/>
<PackageReference Include="StackExchange.Redis" Version="1.2.6"/>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-*" PrivateAssets="All"/>
<ProjectReference Include="..\..\MassTransit\MassTransit.csproj"/>
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'net452' ">
<Reference Include="System"/>
<Reference Include="System.Core"/>
<Reference Include="System.Data"/>
</ItemGroup>
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net452;net461</TargetFrameworks>
<LangVersion>latest</LangVersion>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<AllowedOutputExtensionsInPackageBuildOutputFolder>$(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb</AllowedOutputExtensionsInPackageBuildOutputFolder>
</PropertyGroup>
<PropertyGroup>
<PackageId>MassTransit.Redis</PackageId>
<Title>MassTransit.Redis</Title>
<Description>MassTransit Redis support; $(Description)</Description>
<DocumentationFile>bin\$(Configuration)\$(TargetFramework)\MassTransit.RedisIntegration.xml</DocumentationFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="GreenPipes" Version="2.1.3"/>
<PackageReference Include="NewId" Version="3.0.1"/>
<PackageReference Include="Newtonsoft.Json" Version="11.0.2"/>
<PackageReference Include="StackExchange.Redis" Version="1.2.6" Condition="'$(TargetFramework)' == 'net452'"/>
<PackageReference Include="StackExchange.Redis" Version="2.0.519" Condition="'$(TargetFramework)' != 'net452'"/>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-*" PrivateAssets="All"/>
<ProjectReference Include="..\..\MassTransit\MassTransit.csproj"/>
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'net452' ">
<Reference Include="System"/>
<Reference Include="System.Core"/>
<Reference Include="System.Data"/>
</ItemGroup>
</Project>

0 comments on commit 35f7652

Please sign in to comment.