From 1fb4c543ae6fe8951513c0ed0286a8802fe825d4 Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Wed, 14 Feb 2024 15:39:02 +0000 Subject: [PATCH] Add support for Azure Service Bus entities partitioning (#3959) (#3961) Co-authored-by: Mauro Servienti Co-authored-by: Poornima Nayar --- .../ConnectionStringParserTests.cs | 7 +++++-- .../ASBSTransportCustomization.cs | 5 +++++ .../ConnectionSettings.cs | 7 +++++-- .../ConnectionStringParser.cs | 12 ++++++++++++ .../Instances/ServiceControlCoreTransports.cs | 2 +- 5 files changed, 28 insertions(+), 5 deletions(-) diff --git a/src/ServiceControl.Transports.ASBS.Tests/ConnectionStringParserTests.cs b/src/ServiceControl.Transports.ASBS.Tests/ConnectionStringParserTests.cs index 2b17dd6a53..d87e18a1ed 100644 --- a/src/ServiceControl.Transports.ASBS.Tests/ConnectionStringParserTests.cs +++ b/src/ServiceControl.Transports.ASBS.Tests/ConnectionStringParserTests.cs @@ -34,6 +34,9 @@ public static IEnumerable SupportedConnectionStrings //Custom query delay interval yield return new TestCaseData("Endpoint=sb://some.endpoint.name/;QueueLengthQueryDelayInterval=15000", new ConnectionSettings(new SharedAccessSignatureAuthentication("Endpoint=sb://some.endpoint.name/;QueueLengthQueryDelayInterval=15000"), queryDelayInterval: TimeSpan.FromSeconds(15))); + //EnablePartitioning + yield return new TestCaseData("Endpoint=sb://some.endpoint.name/;EnablePartitioning=True", + new ConnectionSettings(new SharedAccessSignatureAuthentication("Endpoint=sb://some.endpoint.name/;EnablePartitioning=True"), enablePartitioning: true)); } } @@ -61,13 +64,13 @@ public void VerifySupported(string connectionString, ConnectionSettings expected Assert.AreEqual(JsonSerializer.Serialize(expected), JsonSerializer.Serialize(actual)); - //needed since System..Text.Json doesn't handle polymorphic properties + //needed since System.Text.Json doesn't handle polymorphic properties Assert.AreEqual( JsonSerializer.Serialize(expected.AuthenticationMethod, expected.AuthenticationMethod.GetType()), JsonSerializer.Serialize(actual.AuthenticationMethod, actual.AuthenticationMethod.GetType())); - //needed since System..Text.Json doesn't handle polymorphic properties + //needed since System.Text.Json doesn't handle polymorphic properties if (expected.AuthenticationMethod is TokenCredentialAuthentication expectedAuthentication) { var actualAuthentication = actual.AuthenticationMethod as TokenCredentialAuthentication; diff --git a/src/ServiceControl.Transports.ASBS/ASBSTransportCustomization.cs b/src/ServiceControl.Transports.ASBS/ASBSTransportCustomization.cs index 9c5c35819d..af261ab024 100644 --- a/src/ServiceControl.Transports.ASBS/ASBSTransportCustomization.cs +++ b/src/ServiceControl.Transports.ASBS/ASBSTransportCustomization.cs @@ -69,6 +69,11 @@ public override IProvideQueueLength CreateQueueLengthProvider() transport.UseWebSockets(); } + if (connectionSettings.EnablePartitioning) + { + transport.EnablePartitioning(); + } + transport.ConfigureNameShorteners(); transport.Transactions(transportTransactionMode); diff --git a/src/ServiceControl.Transports.ASBS/ConnectionSettings.cs b/src/ServiceControl.Transports.ASBS/ConnectionSettings.cs index 13aa28d635..202d15b17b 100644 --- a/src/ServiceControl.Transports.ASBS/ConnectionSettings.cs +++ b/src/ServiceControl.Transports.ASBS/ConnectionSettings.cs @@ -4,15 +4,16 @@ public class ConnectionSettings { - public ConnectionSettings( - AuthenticationMethod authenticationSettings, + public ConnectionSettings(AuthenticationMethod authenticationSettings, string topicName = default, bool useWebSockets = default, + bool enablePartitioning = default, TimeSpan? queryDelayInterval = default) { AuthenticationMethod = authenticationSettings; TopicName = topicName; UseWebSockets = useWebSockets; + EnablePartitioning = enablePartitioning; QueryDelayInterval = queryDelayInterval; } @@ -23,5 +24,7 @@ public class ConnectionSettings public string TopicName { get; } public bool UseWebSockets { get; } + + public bool EnablePartitioning { get; } } } \ No newline at end of file diff --git a/src/ServiceControl.Transports.ASBS/ConnectionStringParser.cs b/src/ServiceControl.Transports.ASBS/ConnectionStringParser.cs index f6d199ab03..8f23b0f1da 100644 --- a/src/ServiceControl.Transports.ASBS/ConnectionStringParser.cs +++ b/src/ServiceControl.Transports.ASBS/ConnectionStringParser.cs @@ -62,6 +62,16 @@ public static ConnectionSettings Parse(string connectionString) clientIdString = (string)clientId; } + var enablePartitioning = false; + if (builder.TryGetValue("EnablePartitioning", out var enablePartitioningString)) + { + if (!bool.TryParse((string)enablePartitioningString, out enablePartitioning)) + { + throw new Exception( + $"Cannot enable partitioning, the specified value '{enablePartitioningString}' cannot be converted to a bool."); + } + } + var shouldUseManagedIdentity = builder.TryGetValue("Authentication", out var authType) && (string)authType == "Managed Identity"; if (shouldUseManagedIdentity) @@ -72,6 +82,7 @@ public static ConnectionSettings Parse(string connectionString) new TokenCredentialAuthentication(fullyQualifiedNamespace, clientIdString), topicNameString, useWebSockets, + enablePartitioning, queryDelayInterval); } @@ -84,6 +95,7 @@ public static ConnectionSettings Parse(string connectionString) new SharedAccessSignatureAuthentication(connectionString), topicNameString, useWebSockets, + enablePartitioning, queryDelayInterval); } } diff --git a/src/ServiceControlInstaller.Engine/Instances/ServiceControlCoreTransports.cs b/src/ServiceControlInstaller.Engine/Instances/ServiceControlCoreTransports.cs index 2cbcd5d24a..7281e87d7b 100644 --- a/src/ServiceControlInstaller.Engine/Instances/ServiceControlCoreTransports.cs +++ b/src/ServiceControlInstaller.Engine/Instances/ServiceControlCoreTransports.cs @@ -31,7 +31,7 @@ public class ServiceControlCoreTransports Name = TransportNames.AzureServiceBus, TypeName = "ServiceControl.Transports.ASBS.ASBSTransportCustomization, ServiceControl.Transports.ASBS", ZipName = "NetStandardAzureServiceBus", - SampleConnectionString = "Endpoint=sb://[namespace].servicebus.windows.net; SharedSecretIssuer=;SharedSecretValue=;QueueLengthQueryDelayInterval=;TopicName=", + SampleConnectionString = "Endpoint=sb://[namespace].servicebus.windows.net; SharedSecretIssuer=;SharedSecretValue=;QueueLengthQueryDelayInterval=;TopicName=;EnablePartitioning=", AvailableInSCMU = true, Matches = name => name.Equals(TransportNames.AzureServiceBus, StringComparison.OrdinalIgnoreCase) || name.Equals("ServiceControl.Transports.ASBS.ASBSTransportCustomization, ServiceControl.Transports.ASBS", StringComparison.OrdinalIgnoreCase)