Skip to content

Commit

Permalink
Standardized the topology for partition key across service bus, event…
Browse files Browse the repository at this point in the history
… hub, and the SQL transport
  • Loading branch information
phatboyg committed Apr 25, 2024
1 parent af7a812 commit e7c59ea
Show file tree
Hide file tree
Showing 18 changed files with 103 additions and 127 deletions.
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
namespace MassTransit
namespace MassTransit
{
using System;
using AzureServiceBusTransport;
using AzureServiceBusTransport.Configuration;
using Configuration;
using Transports;


public static class ServiceBusPartitionKeyConventionExtensions
public static class PartitionKeyConventionExtensions
{
public static void UsePartitionKeyFormatter<T>(this IMessageSendTopologyConfigurator<T> configurator, IMessagePartitionKeyFormatter<T> formatter)
where T : class
{
configurator.UpdateConvention<IPartitionKeyMessageSendTopologyConvention<T>>(
update =>
{
update.SetFormatter(formatter);
configurator.UpdateConvention<IPartitionKeyMessageSendTopologyConvention<T>>(update =>
{
update.SetFormatter(formatter);
return update;
});
return update;
});
}

/// <summary>
Expand Down
11 changes: 5 additions & 6 deletions src/MassTransit/Configuration/RoutingKeyConventionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ public static class RoutingKeyConventionExtensions
public static void UseRoutingKeyFormatter<T>(this IMessageSendTopologyConfigurator<T> configurator, IMessageRoutingKeyFormatter<T> formatter)
where T : class
{
configurator.UpdateConvention<IRoutingKeyMessageSendTopologyConvention<T>>(
update =>
{
update.SetFormatter(formatter);
configurator.UpdateConvention<IRoutingKeyMessageSendTopologyConvention<T>>(update =>
{
update.SetFormatter(formatter);
return update;
});
return update;
});
}

/// <summary>
Expand Down
33 changes: 33 additions & 0 deletions src/MassTransit/Middleware/SetPartitionKeyFilter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace MassTransit.Middleware
{
using System.Threading.Tasks;
using Transports;


public class SetPartitionKeyFilter<TMessage> :
IFilter<SendContext<TMessage>>
where TMessage : class
{
readonly IMessagePartitionKeyFormatter<TMessage> _routingKeyFormatter;

public SetPartitionKeyFilter(IMessagePartitionKeyFormatter<TMessage> routingKeyFormatter)
{
_routingKeyFormatter = routingKeyFormatter;
}

public Task Send(SendContext<TMessage> context, IPipe<SendContext<TMessage>> next)
{
var routingKey = _routingKeyFormatter.FormatPartitionKey(context);

if (context.TryGetPayload(out PartitionKeySendContext routingKeySendContext))
routingKeySendContext.PartitionKey = routingKey;

return next.Send(context);
}

public void Probe(ProbeContext context)
{
context.CreateFilterScope("setPartitionKey");
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace MassTransit.AzureServiceBusTransport.Configuration
namespace MassTransit.Configuration
{
using MassTransit.Configuration;
using Transports;


public interface IPartitionKeyMessageSendTopologyConvention<TMessage> :
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace MassTransit.Configuration
{
public interface IPartitionKeySendTopologyConvention :
ISendTopologyConvention
{
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
namespace MassTransit.AzureServiceBusTransport.Configuration
namespace MassTransit.Configuration
{
using MassTransit.Configuration;
using Topology;
using Transports;


public class PartitionKeyMessageSendTopologyConvention<TMessage> :
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
namespace MassTransit.AzureServiceBusTransport.Configuration
namespace MassTransit.Configuration
{
using MassTransit.Configuration;


public class PartitionKeySendTopologyConvention :
IPartitionKeySendTopologyConvention
{
readonly ITopologyConventionCache<IMessageSendTopologyConvention> _cache;

public PartitionKeySendTopologyConvention()
{
DefaultFormatter = new EmptyPartitionKeyFormatter();

_cache = new TopologyConventionCache<IMessageSendTopologyConvention>(typeof(IPartitionKeyMessageSendTopologyConvention<>), new Factory());
}

Expand All @@ -20,8 +15,6 @@ bool IMessageSendTopologyConvention.TryGetMessageSendTopologyConvention<T>(out I
return _cache.GetOrAdd<T, IMessageSendTopologyConvention<T>>().TryGetMessageSendTopologyConvention(out convention);
}

public IPartitionKeyFormatter DefaultFormatter { get; set; }


class Factory :
IConventionTypeFactory<IMessageSendTopologyConvention>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace MassTransit.Configuration;

using System;
using Middleware;
using Transports;


public class SetPartitionKeyMessageSendTopology<TMessage> :
IMessageSendTopology<TMessage>
where TMessage : class
{
readonly IFilter<SendContext<TMessage>> _filter;

public SetPartitionKeyMessageSendTopology(IMessagePartitionKeyFormatter<TMessage> partitionKeyFormatter)
{
if (partitionKeyFormatter == null)
throw new ArgumentNullException(nameof(partitionKeyFormatter));

_filter = new SetPartitionKeyFilter<TMessage>(partitionKeyFormatter);
}

public void Apply(ITopologyPipeBuilder<SendContext<TMessage>> builder)
{
builder.AddFilter(_filter);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace MassTransit.AzureServiceBusTransport
namespace MassTransit.Transports
{
using System;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace MassTransit.AzureServiceBusTransport
namespace MassTransit.Transports
{
public interface IMessagePartitionKeyFormatter<in TMessage>
where TMessage : class
Expand Down
13 changes: 13 additions & 0 deletions src/MassTransit/Transports/IPartitionKeyFormatter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace MassTransit.Transports;

public interface IPartitionKeyFormatter
{
/// <summary>
/// Format the partition key to be used by the transport, if supported
/// </summary>
/// <typeparam name="T">The message type</typeparam>
/// <param name="context">The message send context</param>
/// <returns>The routing key to specify in the transport</returns>
string FormatPartitionKey<T>(SendContext<T> context)
where T : class;
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace MassTransit.AzureServiceBusTransport
namespace MassTransit.Transports
{
public class MessagePartitionKeyFormatter<TMessage> :
IMessagePartitionKeyFormatter<TMessage>
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
namespace MassTransit
{
public interface EventHubSendContext :
SendContext
SendContext,
PartitionKeySendContext
{
string PartitionId { get; set; }
string PartitionKey { get; set; }
}


Expand Down

0 comments on commit e7c59ea

Please sign in to comment.