diff --git a/dotnet/DotNetStandardClasses.sln b/dotnet/DotNetStandardClasses.sln index 635476adf..9acaf7c71 100644 --- a/dotnet/DotNetStandardClasses.sln +++ b/dotnet/DotNetStandardClasses.sln @@ -212,6 +212,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GXAmazonSQS", "src\dotnetco EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "apiattractions", "src\extensions\Azure\test\apiattractions\apiattractions.csproj", "{E85FDB0F-FA81-4CDD-8BF3-865269CE2DB3}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GXMessageBroker", "src\dotnetcore\Providers\Messaging\GXMessageBroker\GXMessageBroker.csproj", "{3B1B5706-E896-4CEB-A551-E30226303BDB}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GXAzureServiceBus", "src\dotnetcore\Providers\Messaging\GXAzureServiceBus\GXAzureServiceBus.csproj", "{F8ABEA82-F823-4E9C-96FA-26AF24C932E0}" +EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ProjectHealthTest", "test\ProjectHealthTest\ProjectHealthTest.csproj", "{65048104-212A-4819-AECF-89CA9C08C83F}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetRedisTest", "test\DotNetRedisTest\DotNetRedisTest.csproj", "{48430E50-043A-47A2-8278-B86A4420758A}" @@ -524,6 +528,14 @@ Global {48430E50-043A-47A2-8278-B86A4420758A}.Debug|Any CPU.Build.0 = Debug|Any CPU {48430E50-043A-47A2-8278-B86A4420758A}.Release|Any CPU.ActiveCfg = Release|Any CPU {48430E50-043A-47A2-8278-B86A4420758A}.Release|Any CPU.Build.0 = Release|Any CPU + {3B1B5706-E896-4CEB-A551-E30226303BDB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3B1B5706-E896-4CEB-A551-E30226303BDB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3B1B5706-E896-4CEB-A551-E30226303BDB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3B1B5706-E896-4CEB-A551-E30226303BDB}.Release|Any CPU.Build.0 = Release|Any CPU + {F8ABEA82-F823-4E9C-96FA-26AF24C932E0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F8ABEA82-F823-4E9C-96FA-26AF24C932E0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F8ABEA82-F823-4E9C-96FA-26AF24C932E0}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F8ABEA82-F823-4E9C-96FA-26AF24C932E0}.Release|Any CPU.Build.0 = Release|Any CPU {531863CA-93A0-42AA-AB5C-FA0E672C03B8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {531863CA-93A0-42AA-AB5C-FA0E672C03B8}.Debug|Any CPU.Build.0 = Debug|Any CPU {531863CA-93A0-42AA-AB5C-FA0E672C03B8}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -632,6 +644,8 @@ Global {E85FDB0F-FA81-4CDD-8BF3-865269CE2DB3} = {7BA5A2CE-7992-4F87-9D84-91AE4D046F5A} {65048104-212A-4819-AECF-89CA9C08C83F} = {1D6F1776-FF4B-46C2-9B3D-BC46CCF049DC} {48430E50-043A-47A2-8278-B86A4420758A} = {1D6F1776-FF4B-46C2-9B3D-BC46CCF049DC} + {3B1B5706-E896-4CEB-A551-E30226303BDB} = {4C43F2DA-59E5-46F5-B691-195449498555} + {F8ABEA82-F823-4E9C-96FA-26AF24C932E0} = {30159B0F-BE61-4DB7-AC02-02851426BE4B} {531863CA-93A0-42AA-AB5C-FA0E672C03B8} = {1D6F1776-FF4B-46C2-9B3D-BC46CCF049DC} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution diff --git a/dotnet/src/dotnetcore/GxClasses/Properties/AssemblyInfo.cs b/dotnet/src/dotnetcore/GxClasses/Properties/AssemblyInfo.cs index ba48bbac5..2109ab09c 100644 --- a/dotnet/src/dotnetcore/GxClasses/Properties/AssemblyInfo.cs +++ b/dotnet/src/dotnetcore/GxClasses/Properties/AssemblyInfo.cs @@ -6,6 +6,7 @@ [assembly: InternalsVisibleTo("GeneXus.Deploy.AzureFunctions.Handlers")] [assembly: InternalsVisibleTo("AzureFunctionsTest")] [assembly: InternalsVisibleTo("GXQueue")] +[assembly: InternalsVisibleTo("GXMessageBroker")] [assembly: InternalsVisibleTo("DotNetCoreUnitTest")] [assembly: InternalsVisibleTo("DotNetCoreWebUnitTest")] [assembly: InternalsVisibleTo("GeneXus.Deploy.AzureFunctions.Handlers")] diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/AzureServiceBus.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/AzureServiceBus.cs new file mode 100644 index 000000000..e00d3c3c4 --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/AzureServiceBus.cs @@ -0,0 +1,1095 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Reflection; +using System.Runtime.Serialization; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using GeneXus.Messaging.Common; +using GeneXus.Services; +using GeneXus.Utils; + +namespace GeneXus.Messaging.GXAzureServiceBus +{ + public class AzureServiceBus : MessageBrokerBase, IMessageBroker + { + private const int MAX_MESSAGES_DEFAULT = 10; + private const short LOCK_DURATION = 5; + public static String Name = "AZURESB"; + + private ConcurrentDictionary> m_messages = new ConcurrentDictionary>(); + ServiceBusClient _serviceBusClient { get; set; } + private string _queueOrTopicName { get; set; } + private string _connectionString { get; set; } + private string _subscriptionName { get; set; } + private ServiceBusSender _sender { get; set; } + private ServiceBusReceiver _receiver { get; set; } + private ServiceBusSessionReceiver _sessionReceiver { get; set; } + private ServiceBusReceiverOptions _serviceBusReceiverOptions { get; set; } + private bool _sessionEnabled { get; set; } + ServiceBusReceiveMode _sessionEnabledQueueReceiveMode { get; set; } + public AzureServiceBus() : this(null) + { + } + public AzureServiceBus(GXService providerService) : base(providerService) + { + Initialize(providerService); + } + private void Initialize(GXService providerService) + { + ServiceSettings serviceSettings = new(PropertyConstants.MESSAGE_BROKER, Name, providerService); + _queueOrTopicName = serviceSettings.GetEncryptedPropertyValue(PropertyConstants.QUEUE_NAME); + _connectionString = serviceSettings.GetEncryptedPropertyValue(PropertyConstants.QUEUE_CONNECTION_STRING); + _subscriptionName = serviceSettings.GetEncryptedPropertyValue(PropertyConstants.TOPIC_SUBSCRIPTION); + + string sessionEnabled = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.SESSION_ENABLED); + + if (!string.IsNullOrEmpty(sessionEnabled)) + _sessionEnabled = Convert.ToBoolean(sessionEnabled); + else + _sessionEnabled = false; + + string senderIdentifier = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.SENDER_IDENTIFIER); + + ServiceBusSenderOptions serviceBusSenderOptions = new ServiceBusSenderOptions(); + if (!string.IsNullOrEmpty(senderIdentifier)) + serviceBusSenderOptions.Identifier = senderIdentifier; + + //TO DO Consider connection options here + //https://docs.microsoft.com/en-us/javascript/api/@azure/service-bus/servicebusclientoptions?view=azure-node-latest#@azure-service-bus-servicebusclientoptions-websocketoptions + + _serviceBusClient = new ServiceBusClient(_connectionString); + if (_serviceBusClient != null) + { + _sender = _serviceBusClient.CreateSender(_queueOrTopicName, serviceBusSenderOptions); + if (!_sessionEnabled && _sender != null) + { + _serviceBusReceiverOptions = new ServiceBusReceiverOptions(); + + string receiveMode = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.RECEIVE_MODE); + string prefetchCount = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.PREFETCH_COUNT); + string receiverIdentifier = serviceSettings.GetEncryptedOptPropertyValue(PropertyConstants.RECEIVER_IDENTIFIER); + + if (!string.IsNullOrEmpty(receiveMode)) + _serviceBusReceiverOptions.ReceiveMode = (ServiceBusReceiveMode)Convert.ToInt16(receiveMode); + + if (!string.IsNullOrEmpty(prefetchCount)) + { + int prefetchcnt = Convert.ToInt32(prefetchCount); + if (prefetchcnt != 0) + _serviceBusReceiverOptions.PrefetchCount = prefetchcnt; + + } + if (!string.IsNullOrEmpty(receiverIdentifier)) + _serviceBusReceiverOptions.Identifier = receiverIdentifier; + else + _serviceBusReceiverOptions.Identifier = String.Empty; + + if (string.IsNullOrEmpty(_subscriptionName)) + _receiver = _serviceBusClient.CreateReceiver(_queueOrTopicName, _serviceBusReceiverOptions); + else + _receiver = _serviceBusClient.CreateReceiver(_queueOrTopicName, _subscriptionName, _serviceBusReceiverOptions); + } + } + } + public override string GetName() + { + return Name; + } + + #region Async methods + + private async Task CreateReceiverAsync() + { + //Release resources of previous receiver + if (_receiver != null) + await _receiver.CloseAsync().ConfigureAwait(false); + + if (string.IsNullOrEmpty(_subscriptionName)) + _receiver = _serviceBusClient.CreateReceiver(_queueOrTopicName, _serviceBusReceiverOptions); + else + _receiver = _serviceBusClient.CreateReceiver(_queueOrTopicName, _subscriptionName, _serviceBusReceiverOptions); + } + private async Task AcceptSessionAsync(string sessionId, ServiceBusSessionReceiverOptions sessionReceiverOptions) + { + if (_sessionEnabled && (!string.IsNullOrEmpty(sessionId))) + { + //Create new session receiver + ServiceBusSessionReceiver sessionReceiver; + if (string.IsNullOrEmpty(_subscriptionName)) + sessionReceiver = await _serviceBusClient.AcceptSessionAsync(_queueOrTopicName, sessionId, sessionReceiverOptions).ConfigureAwait(false); + else + sessionReceiver = await _serviceBusClient.AcceptSessionAsync(_queueOrTopicName, _subscriptionName, sessionId, sessionReceiverOptions).ConfigureAwait(false); + return sessionReceiver; + } + return null; + } + private async Task ServiceClientDisposeAsync() + { + await _serviceBusClient.DisposeAsync().ConfigureAwait(false); + if (_receiver != null) + await _receiver.DisposeAsync().ConfigureAwait(false); + await _sender.DisposeAsync().ConfigureAwait(false); + } + private async Task sendAsync(ServiceBusMessage serviceBusMessage, string options) + { + try + { + await _sender.SendMessageAsync(serviceBusMessage).ConfigureAwait(false); + return true; + } + catch (Exception ex) + { + throw ex; + } + } + private async Task CancelScheduleAsync(long sequenceNumber) + { + try + { + await _sender.CancelScheduledMessageAsync(sequenceNumber).ConfigureAwait(false); + return true; + } + catch (Exception ex) + { + throw ex; + } + } + private async Task ScheduleMessageAsync(ServiceBusMessage serviceBusMessage, string options) + { + ScheduleMessageOptions scheduleOptions = JSONHelper.Deserialize(options); + if ((serviceBusMessage != null) && (scheduleOptions != null) && (!string.IsNullOrEmpty(scheduleOptions.ScheduledEnqueueTime))) + { + try + { + return (await _sender.ScheduleMessageAsync(serviceBusMessage, DateTime.Parse(scheduleOptions.ScheduledEnqueueTime)).ConfigureAwait(false)); + } + catch (Exception ex) + { + throw ex; + } + } + return 0; + } + private async Task SendMessagesBatchAsync(IList brokerMessages, string options) + { + bool success = false; + if (_sender == null) + { + throw new Exception("There was an error at the Message Broker initialization."); + } + else + { + ServiceBusMessage serviceBusMessage; + IList serviceBusMessages = new List(); + foreach (BrokerMessage brokerMessage in brokerMessages) + { + serviceBusMessage = BrokerMessageToServiceBusMessage(brokerMessage); + serviceBusMessages.Add(serviceBusMessage); + } + try + { + await _sender.SendMessagesAsync(serviceBusMessages).ConfigureAwait(false); + success = true; + } + catch (Exception ex) + { + throw ex; + } + } + return success; + } + private async Task InitializeReceiversForReceiveMthAsync(BrokerReceiverOpts brokerReceiverOptions) + { + + if (_sessionEnabled && (brokerReceiverOptions == null)) + { + throw new Exception("SessionId cannot be null for session-enabled queue or topic."); + } + else if (_sessionEnabled && (brokerReceiverOptions != null)) + { + ServiceBusSessionReceiverOptions serviceBusSessionReceiverOptions = BrokerRecOptsToServiceBusSessionRecOpts(brokerReceiverOptions); + + if ((serviceBusSessionReceiverOptions != null) && (!string.IsNullOrEmpty(brokerReceiverOptions.SessionId))) + { + _sessionEnabledQueueReceiveMode = serviceBusSessionReceiverOptions.ReceiveMode; + //Store receiver in case that the message has to be settled + if (_sessionEnabledQueueReceiveMode == ServiceBusReceiveMode.PeekLock) + { + if (_sessionReceiver != null) + await _sessionReceiver.CloseAsync().ConfigureAwait(false); + _sessionReceiver = await AcceptSessionAsync(brokerReceiverOptions.SessionId, serviceBusSessionReceiverOptions).ConfigureAwait(false); + } + else + _sessionReceiver = await AcceptSessionAsync(brokerReceiverOptions.SessionId, serviceBusSessionReceiverOptions).ConfigureAwait(false); + } + else + { + throw new Exception("SessionId cannot be null for session-enabled queue or topic."); + } + } + else if (!_sessionEnabled && (brokerReceiverOptions != null)) + { + //Check if a new receiver must be defined using new settings + if ((_serviceBusReceiverOptions.ReceiveMode != brokerReceiverOptions.ReceiveMode) || (_serviceBusReceiverOptions.Identifier != brokerReceiverOptions.Identifier) || (_serviceBusReceiverOptions.PrefetchCount != brokerReceiverOptions.PrefetchCount)) + { + _serviceBusReceiverOptions.ReceiveMode = brokerReceiverOptions.ReceiveMode; + _serviceBusReceiverOptions.Identifier = brokerReceiverOptions.Identifier; + _serviceBusReceiverOptions.PrefetchCount = brokerReceiverOptions.PrefetchCount; + + await CreateReceiverAsync().ConfigureAwait(false); + } + } + } + private async Task> ReceiveMessagesNotSessionAsync(ReceiveMessageOptions receiveOptions) + { + IReadOnlyList receivedMessages; + int maxMessagesReceive = MAX_MESSAGES_DEFAULT; + if ((receiveOptions != null) && (receiveOptions.MaxMessages != 0)) + maxMessagesReceive = receiveOptions.MaxMessages; + + TimeSpan maxWait = TimeSpan.Zero; + if ((receiveOptions != null) && receiveOptions.MaxWaitTime != 0) + maxWait = TimeSpan.FromSeconds(receiveOptions.MaxWaitTime); + + if (_receiver != null) + { + if ((receiveOptions != null) && (receiveOptions.ReceiveDeferredSequenceNumbers != null) && (receiveOptions.ReceiveDeferredSequenceNumbers.Count > 0)) + { + receivedMessages = await _receiver.ReceiveDeferredMessagesAsync(receiveOptions.ReceiveDeferredSequenceNumbers).ConfigureAwait(false); + } + else + { + if (maxWait == TimeSpan.Zero) + if ((receiveOptions != null) && (receiveOptions.PeekReceive != null) && (receiveOptions.PeekReceive.Peek)) + if (receiveOptions.PeekReceive.PeekFromSequenceNumber != 0) + receivedMessages = await _receiver.PeekMessagesAsync(maxMessages: maxMessagesReceive, receiveOptions.PeekReceive.PeekFromSequenceNumber).ConfigureAwait(false); + else + receivedMessages = await _receiver.PeekMessagesAsync(maxMessages: maxMessagesReceive).ConfigureAwait(false); + else + receivedMessages = await _receiver.ReceiveMessagesAsync(maxMessages: maxMessagesReceive).ConfigureAwait(false); + else + receivedMessages = await _receiver.ReceiveMessagesAsync(maxMessages: maxMessagesReceive, maxWaitTime: maxWait).ConfigureAwait(false); + } + } + else + { + throw new Exception("Invalid Operation. No valid receiver defined."); + } + return receivedMessages; + } + private async Task> ReceiveMessagesSessionAsync(ReceiveMessageOptions receiveOptions) + { + IReadOnlyList receivedMessages; + int maxMessagesReceive = MAX_MESSAGES_DEFAULT; + if ((receiveOptions != null) && (receiveOptions.MaxMessages != 0)) + maxMessagesReceive = receiveOptions.MaxMessages; + + TimeSpan maxWait = TimeSpan.Zero; + if ((receiveOptions != null) && receiveOptions.MaxWaitTime != 0) + maxWait = TimeSpan.FromSeconds(receiveOptions.MaxWaitTime); + + if (_sessionReceiver != null) + { + if ((receiveOptions != null) && (receiveOptions.ReceiveDeferredSequenceNumbers != null) && (receiveOptions.ReceiveDeferredSequenceNumbers.Count > 0)) + { + receivedMessages = await _sessionReceiver.ReceiveDeferredMessagesAsync(receiveOptions.ReceiveDeferredSequenceNumbers).ConfigureAwait(false); + } + else + { + if (maxWait == TimeSpan.Zero) + if ((receiveOptions != null) && (receiveOptions.PeekReceive != null) && (receiveOptions.PeekReceive.Peek)) + if (receiveOptions.PeekReceive.PeekFromSequenceNumber != 0) + receivedMessages = await _sessionReceiver.PeekMessagesAsync(maxMessages: maxMessagesReceive, receiveOptions.PeekReceive.PeekFromSequenceNumber).ConfigureAwait(false); + else + receivedMessages = await _sessionReceiver.PeekMessagesAsync(maxMessages: maxMessagesReceive).ConfigureAwait(false); + else + receivedMessages = await _sessionReceiver.ReceiveMessagesAsync(maxMessages: maxMessagesReceive).ConfigureAwait(false); + else + receivedMessages = await _sessionReceiver.ReceiveMessagesAsync(maxMessages: maxMessagesReceive, maxWaitTime: maxWait).ConfigureAwait(false); + + //Release session lock + if (_sessionEnabledQueueReceiveMode != ServiceBusReceiveMode.PeekLock) + await _sessionReceiver.CloseAsync().ConfigureAwait(false); + } + } + else + { + throw new Exception("Invalid Operation. No valid Session receiver defined."); + } + return receivedMessages; + } + private async Task> ReceiveMessagesAsync(string options) + { + ReceiveMessageOptions receiveOptions = JSONHelper.Deserialize(options); + + IReadOnlyList receivedMessages; + try + { + await InitializeReceiversForReceiveMthAsync(receiveOptions.BrokerReceiverOptions).ConfigureAwait(false); + if (_sessionEnabled) + receivedMessages = await ReceiveMessagesSessionAsync(receiveOptions).ConfigureAwait(false); + else + receivedMessages = await ReceiveMessagesNotSessionAsync(receiveOptions).ConfigureAwait(false); + + return receivedMessages; + } + catch (Exception ex) + { + throw ex; + } + } + + private async Task ReceiveMessageSessionAsync(ReceiveMessageOptions receiveOptions) + { + TimeSpan maxWait = TimeSpan.Zero; + ServiceBusReceivedMessage receivedMessage; + if (_sessionReceiver != null) + { + if ((receiveOptions != null) && receiveOptions.MaxWaitTime != 0) + maxWait = TimeSpan.FromSeconds(receiveOptions.MaxWaitTime); + + if ((receiveOptions != null) && (receiveOptions.ReceiveDeferredSequenceNumbers != null) && (receiveOptions.ReceiveDeferredSequenceNumbers.Count > 0)) + receivedMessage = await _sessionReceiver.ReceiveDeferredMessageAsync(receiveOptions.ReceiveDeferredSequenceNumbers[0]).ConfigureAwait(false); + + else + { + if (maxWait != TimeSpan.Zero) + receivedMessage = await _sessionReceiver.ReceiveMessageAsync(maxWaitTime: maxWait).ConfigureAwait(false); + else + if ((receiveOptions != null) && (receiveOptions.PeekReceive != null) && (receiveOptions.PeekReceive.Peek)) + if (receiveOptions.PeekReceive.PeekFromSequenceNumber != 0) + receivedMessage = await _sessionReceiver.PeekMessageAsync(receiveOptions.PeekReceive.PeekFromSequenceNumber).ConfigureAwait(false); + else + receivedMessage = await _sessionReceiver.PeekMessageAsync().ConfigureAwait(false); + else + receivedMessage = await _sessionReceiver.ReceiveMessageAsync().ConfigureAwait(false); + + //Release session lock + if (_sessionEnabledQueueReceiveMode != ServiceBusReceiveMode.PeekLock) + await _sessionReceiver.CloseAsync().ConfigureAwait(false); + } + return receivedMessage; + } + else + { + throw new Exception("Invalid Operation. No valid Session receiver defined."); + } + } + + private async Task ReceiveMessageNotSessionAsync(ReceiveMessageOptions receiveOptions) + { + TimeSpan maxWait = TimeSpan.Zero; + ServiceBusReceivedMessage receivedMessage; + if (_receiver != null) + { + if ((receiveOptions != null) && receiveOptions.MaxWaitTime != 0) + maxWait = TimeSpan.FromSeconds(receiveOptions.MaxWaitTime); + + if ((receiveOptions != null) && (receiveOptions.ReceiveDeferredSequenceNumbers != null) && (receiveOptions.ReceiveDeferredSequenceNumbers.Count > 0)) + { + receivedMessage = await _receiver.ReceiveDeferredMessageAsync(receiveOptions.ReceiveDeferredSequenceNumbers[0]).ConfigureAwait(false); + } + else + { + if (maxWait != TimeSpan.Zero) + receivedMessage = await _receiver.ReceiveMessageAsync(maxWaitTime: maxWait).ConfigureAwait(false); + else + if ((receiveOptions != null) && (receiveOptions.PeekReceive != null) && (receiveOptions.PeekReceive.Peek)) + if (receiveOptions.PeekReceive.PeekFromSequenceNumber != 0) + receivedMessage = await _receiver.PeekMessageAsync(receiveOptions.PeekReceive.PeekFromSequenceNumber).ConfigureAwait(false); + else + receivedMessage = await _receiver.PeekMessageAsync().ConfigureAwait(false); + else + receivedMessage = await _receiver.ReceiveMessageAsync().ConfigureAwait(false); + + } + return receivedMessage; + } + else + { + throw new Exception("Invalid Operation. No valid receiver defined."); + } + } + private async Task ReceiveMessageAsync(string options) + { + ReceiveMessageOptions receiveOptions = JSONHelper.Deserialize(options); + try + { + + await InitializeReceiversForReceiveMthAsync(receiveOptions.BrokerReceiverOptions).ConfigureAwait(false); + + if (_sessionEnabled) + return await ReceiveMessageSessionAsync(receiveOptions).ConfigureAwait(false); + else + return await ReceiveMessageNotSessionAsync(receiveOptions).ConfigureAwait(false); + } + catch (Exception ex) + { + throw ex; + } + } + private async Task CompleteMessageAsync(ServiceBusReceiver receiver, ServiceBusReceivedMessage serviceBusReceivedMessage) + { + try + { + await receiver.CompleteMessageAsync(serviceBusReceivedMessage).ConfigureAwait(false); + return true; + } + catch (ServiceBusException sbex) + { + throw sbex; + } + } + private async Task AbandonMessageAsync(ServiceBusReceiver receiver, ServiceBusReceivedMessage serviceBusReceivedMessage) + { + try + { + await receiver.AbandonMessageAsync(serviceBusReceivedMessage).ConfigureAwait(false); + return true; + } + catch (ServiceBusException sbex) + { + throw sbex; + } + } + private async Task DeadLetterMessageAsync(ServiceBusReceiver receiver, ServiceBusReceivedMessage serviceBusReceivedMessage) + { + try + { + await receiver.DeadLetterMessageAsync(serviceBusReceivedMessage).ConfigureAwait(false); + return true; + } + catch (ServiceBusException sbex) + { + throw sbex; + } + } + private async Task DeferMessageAsync(ServiceBusReceiver receiver, ServiceBusReceivedMessage serviceBusReceivedMessage) + { + try + { + await receiver.DeferMessageAsync(serviceBusReceivedMessage).ConfigureAwait(false); + return true; + } + catch (ServiceBusException sbex) + { + throw sbex; + } + } + private async Task RenewMessageLockAsync(ServiceBusReceiver receiver, ServiceBusReceivedMessage serviceBusReceivedMessage) + { + try + { + await receiver.RenewMessageLockAsync(serviceBusReceivedMessage).ConfigureAwait(false); + return true; + } + catch (ServiceBusException sbex) + { + throw sbex; + } + } + #endregion + + #region API Methods + public bool SendMessage(BrokerMessage brokerMessage, string options) + { + bool success = false; + ServiceBusMessage serviceBusMessage = BrokerMessageToServiceBusMessage(brokerMessage); + try + { + Task task; + if (_sender != null) + { + task = Task.Run(async () => await sendAsync(serviceBusMessage, options).ConfigureAwait(false)); + success = task.Result; + ClearServiceBusAuxiliaryStorage(); + } + else + { + throw new Exception("There was an error at the Message Broker initialization."); + } + } + catch (AggregateException ae) + { + throw ae; + } + return success; + } + bool IMessageBroker.SendMessages(IList brokerMessages, string options) + { + bool success = false; + try + { + Task task = Task.Run(async () => await SendMessagesBatchAsync(brokerMessages, options).ConfigureAwait(false)); + success = task.Result; + ClearServiceBusAuxiliaryStorage(); + } + catch (AggregateException ae) + { + throw ae; + } + return success; + } + IList IMessageBroker.GetMessages(string options, out bool success) + { + IList brokerMessages = new List(); + success = false; + try + { + Task> receivedMessages = Task>.Run(async () => await ReceiveMessagesAsync(options).ConfigureAwait(false)); + if (receivedMessages != null && receivedMessages.Result != null) + { + ClearServiceBusAuxiliaryStorage(); + foreach (ServiceBusReceivedMessage serviceBusReceivedMessage in receivedMessages.Result) + { + if (serviceBusReceivedMessage != null) + brokerMessages.Add(SBReceivedMessageToBrokerMessage(serviceBusReceivedMessage)); + + //If receive Mode = Peek Lock, save the messages to be retrieved later + if (GetReceiveMode() != null && (GetReceiveMode() == ServiceBusReceiveMode.PeekLock)) + { + if (!AddOrUpdateStoredServiceReceivedMessage(serviceBusReceivedMessage)) + { + throw new Exception("Invalid operation. Try retrieving the message again."); + } + } + } + success = true; + } + } + catch (AggregateException ae) + { + throw ae; + } + return brokerMessages; + } + + /// + /// Settling a message + /// + public bool ConsumeMessage(BrokerMessage brokerMessage, string options) + { + ConsumeMessageOptions consumeOptions = JSONHelper.Deserialize(options); + if (consumeOptions != null) + { + ServiceBusReceiver receiver; + if (_sessionEnabled && _sessionReceiver != null) + { + receiver = _sessionReceiver; + } + else + if (!_sessionEnabled) + receiver = _receiver; + else + throw new Exception("Invalid operation. Try retrieving the message again."); + + ClearServiceBusAuxiliaryStorage(); + ServiceBusReceivedMessage serviceBusReceivedMessage = GetStoredServiceBusReceivedMessage(brokerMessage); + if (serviceBusReceivedMessage != null) + { + try + { + Task taskB; + switch (consumeOptions.ConsumeMode) + { + case ConsumeMessageOptions.ConsumeModeOpts.Complete: + { + taskB = Task.Run(async () => await CompleteMessageAsync(receiver,serviceBusReceivedMessage).ConfigureAwait(false)); + RemoveStoredServiceBusReceivedMessage(brokerMessage); + return taskB.Result; + } + case ConsumeMessageOptions.ConsumeModeOpts.Abandon: + { + taskB = Task.Run(async () => await AbandonMessageAsync(receiver,serviceBusReceivedMessage).ConfigureAwait(false)); + RemoveStoredServiceBusReceivedMessage(brokerMessage); + return taskB.Result; + } + case ConsumeMessageOptions.ConsumeModeOpts.DeadLetter: + { + taskB = Task.Run(async () => await DeadLetterMessageAsync(receiver,serviceBusReceivedMessage).ConfigureAwait(false)); + RemoveStoredServiceBusReceivedMessage(brokerMessage); + return taskB.Result; + } + case ConsumeMessageOptions.ConsumeModeOpts.Defer: + { + taskB = Task.Run(async () => await DeferMessageAsync(receiver,serviceBusReceivedMessage).ConfigureAwait(false)); + RemoveStoredServiceBusReceivedMessage(brokerMessage); + return taskB.Result; + } + case ConsumeMessageOptions.ConsumeModeOpts.RenewMessageLock: + { + taskB = Task.Run(async () => await RenewMessageLockAsync(receiver,serviceBusReceivedMessage).ConfigureAwait(false)); + RemoveStoredServiceBusReceivedMessage(brokerMessage); + return taskB.Result; + } + } + return true; + } + catch (AggregateException ae) + { + throw ae; + } + } + else + { + throw new Exception("Invalid operation. Try retrieving the message again."); + } + } + return false; + } + + public BrokerMessage GetMessage(string options, out bool success) + { + BrokerMessage brokerMessage = new BrokerMessage(); + success = false; + try + { + ClearServiceBusAuxiliaryStorage(); + Task receivedMessage = Task.Run(async () => await ReceiveMessageAsync(options).ConfigureAwait(false)); + if (receivedMessage != null && receivedMessage.Result != null) + { + ServiceBusReceivedMessage serviceBusReceivedMessage = receivedMessage.Result; + + //If receive Mode = Peek Lock, save the message to be settled later + if (GetReceiveMode() != null && (GetReceiveMode() == ServiceBusReceiveMode.PeekLock)) + { + if (!AddOrUpdateStoredServiceReceivedMessage(serviceBusReceivedMessage)) + throw new Exception("Invalid operation. Try retrieving the message again."); + } + success = true; + return (SBReceivedMessageToBrokerMessage(serviceBusReceivedMessage)); + } + } + catch (AggregateException ae) + { + throw ae; + } + return brokerMessage; + } + public void Dispose() + { + Task task = Task.Run(async () => await ServiceClientDisposeAsync().ConfigureAwait(false)); + m_messages.Clear(); + } + public long ScheduleMessage(BrokerMessage brokerMessage, string options) + { + long sequenceNumber = 0; + ServiceBusMessage serviceBusMessage = BrokerMessageToServiceBusMessage(brokerMessage); + try + { + Task task; + if (_sender != null) + { + task = Task.Run(async () => await ScheduleMessageAsync(serviceBusMessage, options).ConfigureAwait(false)); + sequenceNumber = task.Result; + ClearServiceBusAuxiliaryStorage(); + } + else + { + throw new Exception("There was an error at the Message Broker initialization."); + } + } + catch (AggregateException ae) + { + throw ae; + } + return sequenceNumber; + } + public bool CancelSchedule(long sequenceNumber) + { + bool success = false; + try + { + Task task; + if (_sender != null) + { + task = Task.Run(async () => await CancelScheduleAsync(sequenceNumber).ConfigureAwait(false)); + success = task.Result; + ClearServiceBusAuxiliaryStorage(); + } + else + { + throw new Exception("There was an error at the Message Broker initialization."); + } + } + catch (AggregateException ae) + { + throw ae; + } + return false; + } + public bool GetMessageFromException(Exception ex, SdtMessages_Message msg) + { + try + { + ServiceBusException az_ex = (ServiceBusException)ex; + msg.gxTpr_Id = az_ex.Reason.ToString(); + msg.gxTpr_Description = az_ex.Message; + return true; + } + catch (Exception) + { + return false; + } + } + + #endregion + + #region Transformation Methods + private ServiceBusSessionReceiverOptions BrokerRecOptsToServiceBusSessionRecOpts(BrokerReceiverOpts brokerReceiverOptions) + { + ServiceBusSessionReceiverOptions serviceBusSessionReceiverOptions = new ServiceBusSessionReceiverOptions(); + if (brokerReceiverOptions != null) + { + serviceBusSessionReceiverOptions.PrefetchCount = brokerReceiverOptions.PrefetchCount; + serviceBusSessionReceiverOptions.Identifier = brokerReceiverOptions.Identifier; + serviceBusSessionReceiverOptions.ReceiveMode = brokerReceiverOptions.ReceiveMode; + return serviceBusSessionReceiverOptions; + } + return null; + + } + private ServiceBusMessage BrokerMessageToServiceBusMessage(BrokerMessage brokerMessage) + { + if (brokerMessage != null) + { + ServiceBusMessage serviceBusMessage = new ServiceBusMessage(brokerMessage.MessageBody); + serviceBusMessage.MessageId = brokerMessage.MessageId; + + GXProperties messageAttributes = brokerMessage.MessageAttributes; + if (messageAttributes != null) + LoadMessageProperties(messageAttributes, ref serviceBusMessage); + + return serviceBusMessage; + } + return null; + } + private BrokerMessage SBReceivedMessageToBrokerMessage(ServiceBusReceivedMessage serviceBusReceivedMessage) + { + BrokerMessage brokerMessage = new BrokerMessage(); + brokerMessage.MessageId = serviceBusReceivedMessage.MessageId; + brokerMessage.MessageBody = serviceBusReceivedMessage.Body.ToString(); + + LoadReceivedMessageProperties(serviceBusReceivedMessage, ref brokerMessage); + return brokerMessage; + } + + #endregion + + #region Data + [DataContract()] + internal class ScheduleMessageOptions + { + long _cancelSequenceNumber; + string _scheduledEnqueueTime; + + [DataMember()] + internal string ScheduledEnqueueTime { get => _scheduledEnqueueTime; set => _scheduledEnqueueTime = value; } + + [DataMember()] + internal long CancelSequenceNumber { get => _cancelSequenceNumber; set => _cancelSequenceNumber = value; } + } + + [DataContract()] + internal class ReceiveMessageOptions + { + int _maxmessages; + int _maxwaittime; + + PeekReceiveOpts _peekreceiveopts; + IList _receivedeferredsequencenumbers; + BrokerReceiverOpts _brokerreceiveroptions; + + [DataMember()] + internal int MaxMessages { get => _maxmessages; set => _maxmessages = value; } + + [DataMember()] + internal int MaxWaitTime { get => _maxwaittime; set => _maxwaittime = value; } + + [DataMember()] + internal PeekReceiveOpts PeekReceive { get => _peekreceiveopts; set => _peekreceiveopts = value; } + + [DataMember()] + internal IList ReceiveDeferredSequenceNumbers { get => _receivedeferredsequencenumbers; set => _receivedeferredsequencenumbers = value; } + + [DataMember()] + internal BrokerReceiverOpts BrokerReceiverOptions { get => _brokerreceiveroptions; set => _brokerreceiveroptions = value; } + } + + [DataContract()] + public class BrokerReceiverOpts + { + ServiceBusReceiveMode _receiveMode; + int _prefetchCount; + string _identifier; + string _sessionId; + + [DataMember()] + internal int PrefetchCount { get => _prefetchCount; set => _prefetchCount = value; } + + [DataMember()] + internal string Identifier { get => _identifier; set => _identifier = value; } + + [DataMember()] + internal string SessionId { get => _sessionId; set => _sessionId = value; } + + [DataMember()] + internal ServiceBusReceiveMode ReceiveMode { get => _receiveMode; set => _receiveMode = value; } + + } + + [DataContract()] + public class PeekReceiveOpts + { + bool _peek; + long _peekfromsequencenumber; + + [DataMember()] + internal bool Peek { get => _peek; set => _peek = value; } + + [DataMember()] + internal long PeekFromSequenceNumber { get => _peekfromsequencenumber ; set => _peekfromsequencenumber = value; } + } + + [DataContract] + internal class ConsumeMessageOptions + { + + BrokerReceiverOpts _brokerreceiveroptions; + + [DataMember] + internal ConsumeModeOpts ConsumeMode { get; set; } + + [DataMember()] + internal BrokerReceiverOpts BrokerReceiverOptions { get => _brokerreceiveroptions; set => _brokerreceiveroptions = value; } + + internal enum ConsumeModeOpts + { + Complete, + Abandon, + DeadLetter, + Defer, + RenewMessageLock + } + } + + #endregion + + #region Helper methods + + private ServiceBusReceiveMode? GetReceiveMode() + { + if (_sessionEnabled) + return _sessionEnabledQueueReceiveMode; + else + if (_serviceBusReceiverOptions != null) + return _serviceBusReceiverOptions.ReceiveMode; + return null; + } + private ServiceBusReceivedMessage GetStoredServiceBusReceivedMessage(BrokerMessage message) + { + string messageIdentifier = GetMessageIdentifier(message); + if (m_messages.TryGetValue(messageIdentifier, out Tuple messageStored)) + { + return messageStored.Item2; + } + else + return null; + } + private void RemoveStoredServiceBusReceivedMessage(BrokerMessage message) + { + string messageIdentifier = GetMessageIdentifier(message); + m_messages.TryRemove(messageIdentifier, out _); + } + private bool AddOrUpdateStoredServiceReceivedMessage(ServiceBusReceivedMessage serviceBusReceivedMessage) + { + string messageIdentifier = GetMessageIdentifierFromServiceBus(serviceBusReceivedMessage); + if (!string.IsNullOrEmpty(messageIdentifier)) + { + Tuple messageStored = new Tuple(DateTime.UtcNow, serviceBusReceivedMessage); + m_messages[messageIdentifier] = messageStored; + return true; + } + return false; + } + + private void ClearServiceBusAuxiliaryStorage() + { + //Clear all messages older than 5 minutes + //When a consumer locks a message, the broker temporarily hides it from other consumers (LockDuration). + //However, the lock on the message has a timeout, which is 5 mins maximum + + foreach (KeyValuePair> entry in m_messages) + { + if (entry.Value.Item1.AddMinutes(LOCK_DURATION) < DateTime.UtcNow) + { + m_messages.TryRemove(entry.Key, out _); + } + } + } + + private string GetMessageIdentifier(BrokerMessage message) + { + //The sequence number is a unique 64-bit integer assigned to a message as it is accepted and stored by the broker and functions as its true identifier. + //For partitioned entities, the sequence number is issued relative to the partition. + //https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-sequencing + //Follow this to identify the message + //https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-partitioning#using-a-partition-key + + + string messageSequenceNumber = GetMessageSequenceNumber(message); + string messageIdentifier = string.Empty; + + //Get SessionId of the message + string messageSessionId = GetMessageSessionId(message); + if (!string.IsNullOrEmpty(messageSessionId)) + messageIdentifier = $"{messageSequenceNumber}_{messageSessionId}"; + else + { + //Get PartitionKey of the message + string messagePartitionKey = GetMessagePartitionKey(message); + if (!string.IsNullOrEmpty(messagePartitionKey)) + messageIdentifier = $"{messageSequenceNumber}_{messagePartitionKey}"; + else + messageIdentifier = $"{messageSequenceNumber}_{message.MessageId}"; + } + return messageIdentifier; + } + + private string GetMessageIdentifierFromServiceBus(ServiceBusReceivedMessage message) + { + string messageSequenceNumber = message.SequenceNumber.ToString(); + string messageIdentifier = string.Empty; + //Get SessionId of the message + string messageSessionId = message.SessionId; + if (!string.IsNullOrEmpty(messageSessionId)) + messageIdentifier = $"{messageSequenceNumber}_{messageSessionId}"; + else + { + //Get PartitionKey of the message + string messagePartitionKey = message.PartitionKey; + if (!string.IsNullOrEmpty(messagePartitionKey)) + messageIdentifier = $"{messageSequenceNumber}_{messagePartitionKey}"; + else + messageIdentifier = $"{messageSequenceNumber}_{message.MessageId}"; + } + return messageIdentifier; + } + private string GetMessageSequenceNumber(BrokerMessage message) + { + string sequenceNumberValue = string.Empty; + if (message != null) + { + if (message.MessageAttributes.ContainsKey("SequenceNumber")) + sequenceNumberValue = message.MessageAttributes.Get("SequenceNumber"); + } + return sequenceNumberValue; + } + private string GetMessageSessionId(BrokerMessage message) + { + string messageSessionId = string.Empty; + if (message != null) + { + if (message.MessageAttributes.ContainsKey("SessionId")) + messageSessionId = message.MessageAttributes.Get("SessionId"); + } + return messageSessionId; + } + private string GetMessagePartitionKey(BrokerMessage message) + { + string messagePartitionKey = string.Empty; + if (message != null) + { + if (message.MessageAttributes.ContainsKey("PartitionKey")) + messagePartitionKey = message.MessageAttributes.Get("PartitionKey"); + } + return messagePartitionKey; + } + private void LoadReceivedMessageProperties(ServiceBusReceivedMessage serviceBusReceivedMessage, ref BrokerMessage brokerMessage) + { + GXProperties properties = new GXProperties(); + + if (serviceBusReceivedMessage != null) + { + brokerMessage.MessageAttributes = new GXProperties(); + Type t = serviceBusReceivedMessage.GetType(); + PropertyInfo[] props = t.GetProperties(); + foreach (PropertyInfo prop in props) + { + object value; + if (prop.Name != "ApplicationProperties") + { + if (prop.GetIndexParameters().Length == 0 && serviceBusReceivedMessage != null) + { + value = prop.GetValue(serviceBusReceivedMessage); + + if (value != null) + brokerMessage.MessageAttributes.Add(prop.Name, value.ToString()); + } + } + } + //Application Properties + foreach (KeyValuePair o in serviceBusReceivedMessage.ApplicationProperties) + { + brokerMessage.MessageAttributes.Add(o.Key, o.Value.ToString()); + } + } + } + private void LoadMessageProperties(GXProperties properties, ref ServiceBusMessage serviceBusMessage) + { + if (properties != null) + { + GxKeyValuePair messageAttribute = new GxKeyValuePair(); + messageAttribute = properties.GetFirst(); + while (!properties.Eof()) + { + switch (messageAttribute.Key.ToLower()) + { + case "timetolive": + serviceBusMessage.TimeToLive = System.TimeSpan.Parse(messageAttribute.Value); + break; + case "to": + serviceBusMessage.To = messageAttribute.Value; + break; + case "subject": + serviceBusMessage.Subject = messageAttribute.Value; + break; + case "partitionkey": + serviceBusMessage.PartitionKey = messageAttribute.Value; + break; + case "transactionpartitionkey": + serviceBusMessage.TransactionPartitionKey = messageAttribute.Value; + break; + case "contenttype": + serviceBusMessage.ContentType = messageAttribute.Value; + break; + case "correlationid": + serviceBusMessage.CorrelationId = messageAttribute.Value; + break; + case "replyto": + serviceBusMessage.ReplyTo = messageAttribute.Value; + break; + case "replytosessionid": + serviceBusMessage.ReplyToSessionId = messageAttribute.Value; + break; + case "sessionid": + serviceBusMessage.SessionId = messageAttribute.Value; + break; + case "scheduledenqueuetime": + serviceBusMessage.ScheduledEnqueueTime = System.DateTimeOffset.Parse(messageAttribute.Value); + break; + default: + serviceBusMessage.ApplicationProperties.Add(messageAttribute.Key, messageAttribute.Value); + break; + } + messageAttribute = properties.GetNext(); + } + } + } + #endregion + } +} + + + diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/GXAzureServiceBus.csproj b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/GXAzureServiceBus.csproj new file mode 100644 index 000000000..ac8137a4a --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/GXAzureServiceBus.csproj @@ -0,0 +1,17 @@ + + + + net6.0 + GeneXus.Azure.ServiceBus + Azure ServiceBus Messaging + + + + + + + + + + + diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/ServiceBusMessageBrokerProvider.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/ServiceBusMessageBrokerProvider.cs new file mode 100644 index 000000000..036c89652 --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureServiceBus/ServiceBusMessageBrokerProvider.cs @@ -0,0 +1,130 @@ +using System; +using System.Collections; +using GeneXus.Messaging.Common; +using GeneXus.Utils; + +namespace GeneXus.Messaging.GXAzureServiceBus +{ + public class ServiceBusMessageBrokerProvider + { + public MessageQueue Connect(string queueName, string connectionString, out GXBaseCollection errorMessages, out bool success) + { + MessageBrokerProvider messageBrokerProvider = new MessageBrokerProvider(); + GXProperties properties = new GXProperties(); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_QUEUENAME, queueName); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING, connectionString); + + MessageQueue messageQueue = messageBrokerProvider.Connect(PropertyConstants.AZURESERVICEBUS, properties, out GXBaseCollection errorMessagesConnect, out bool successConnect); + errorMessages = errorMessagesConnect; + success = successConnect; + return messageQueue; + } + + public MessageQueue Connect(string topicName, string subcriptionName, string connectionString, out GXBaseCollection errorMessages, out bool success) + { + MessageBrokerProvider messageBrokerProvider = new MessageBrokerProvider(); + GXProperties properties = new GXProperties(); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_QUEUENAME, topicName); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_SUBSCRIPTION_NAME, subcriptionName); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING, connectionString); + + MessageQueue messageQueue = messageBrokerProvider.Connect(PropertyConstants.AZURESERVICEBUS, properties, out GXBaseCollection errorMessagesConnect, out bool successConnect); + errorMessages = errorMessagesConnect; + success = successConnect; + return messageQueue; + } + public MessageQueue Connect(string queueName, string connectionString, bool sessionEnabled, GxUserType receiverOptions, string senderIdentifier, out GXBaseCollection errorMessages, out bool success) + { + + MessageBrokerProvider messageBrokerProvider = new MessageBrokerProvider(); + ReceiverOptions options = TransformGXUserTypeToReceiverOptions(receiverOptions); + + GXProperties properties = new GXProperties(); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_QUEUENAME, queueName); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING, connectionString); + properties.Add(PropertyConstants.SESSION_ENABLED, sessionEnabled.ToString()); + properties.Add(PropertyConstants.RECEIVE_MODE, options.ReceiveMode.ToString()); + properties.Add(PropertyConstants.PREFETCH_COUNT, options.PrefetchCount.ToString()); + properties.Add(PropertyConstants.RECEIVER_IDENTIFIER, options.Identifier); + properties.Add(PropertyConstants.RECEIVER_SESSIONID, options.SessionId); + properties.Add(PropertyConstants.SENDER_IDENTIFIER, senderIdentifier); + + MessageQueue messageQueue = messageBrokerProvider.Connect(PropertyConstants.AZURESERVICEBUS, properties, out GXBaseCollection errorMessagesConnect, out bool successConnect); + errorMessages = errorMessagesConnect; + success = successConnect; + return messageQueue; + } + + public MessageQueue Connect(string topicName, string subcriptionName, string connectionString, bool sessionEnabled, GxUserType receiverOptions, string senderIdentifier, out GXBaseCollection errorMessages, out bool success) + { + MessageBrokerProvider messageBrokerProvider = new MessageBrokerProvider(); + GXProperties properties = new GXProperties(); + ReceiverOptions options = TransformGXUserTypeToReceiverOptions(receiverOptions); + + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_QUEUENAME, topicName); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_SUBSCRIPTION_NAME, subcriptionName); + properties.Add(PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING, connectionString); + properties.Add(PropertyConstants.SESSION_ENABLED, sessionEnabled.ToString()); + properties.Add(PropertyConstants.RECEIVE_MODE, options.ReceiveMode.ToString()); + properties.Add(PropertyConstants.PREFETCH_COUNT, options.PrefetchCount.ToString()); + properties.Add(PropertyConstants.RECEIVER_IDENTIFIER, options.Identifier); + properties.Add(PropertyConstants.RECEIVER_SESSIONID, options.SessionId); + properties.Add(PropertyConstants.SENDER_IDENTIFIER, senderIdentifier); + + MessageQueue messageQueue = messageBrokerProvider.Connect(PropertyConstants.AZURESERVICEBUS, properties, out GXBaseCollection errorMessagesConnect, out bool successConnect); + errorMessages = errorMessagesConnect; + success = successConnect; + return messageQueue; + } + + #region Transformation methods + private ReceiverOptions TransformGXUserTypeToReceiverOptions(GxUserType options) + { + ReceiverOptions receiverOptions = new ReceiverOptions(); + if (options != null) + { + receiverOptions.ReceiveMode = options.GetPropertyValue("Receivemode"); + receiverOptions.Identifier = options.GetPropertyValue("Identifier"); + receiverOptions.PrefetchCount = options.GetPropertyValue("Prefetchcount"); + receiverOptions.SessionId = options.GetPropertyValue("Sessionid"); + } + return receiverOptions; + } + #endregion + public class ReceiverOptions : GxUserType + { + public short ReceiveMode { get; set; } + public short PrefetchCount { get; set; } + public string Identifier { get; set; } + public string SessionId { get; set; } + + #region Json + private static Hashtable mapper; + public override String JsonMap(String value) + { + if (mapper == null) + { + mapper = new Hashtable(); + } + return (String)mapper[value]; ; + } + + public override void ToJSON() + { + ToJSON(true); + return; + } + + public override void ToJSON(bool includeState) + { + AddObjectProperty("ReceiveMode", ReceiveMode, false); + AddObjectProperty("PrefetchCount", PrefetchCount, false); + AddObjectProperty("Identifier", Identifier, false); + AddObjectProperty("SessionId", Identifier, false); + return; + } + + #endregion + } + } +} diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/GXMessageBroker.csproj b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/GXMessageBroker.csproj new file mode 100644 index 000000000..a1fc0121d --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/GXMessageBroker.csproj @@ -0,0 +1,14 @@ + + + + net6.0 + GeneXus.Message.MessageBroker + Broker Messaging + TRACE;DEBUG;NETCORE + + + + + + + diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBroker.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBroker.cs new file mode 100644 index 000000000..c30bebbcf --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBroker.cs @@ -0,0 +1,55 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using GeneXus.Utils; + +namespace GeneXus.Messaging.Common +{ + public interface IMessageBroker + { + bool SendMessage(BrokerMessage brokerMessage, string options); + bool SendMessages(IList brokerMessages, string options); + IList GetMessages(string options, out bool success); + BrokerMessage GetMessage(string options, out bool success); + bool ConsumeMessage(BrokerMessage brokerMessage, string options); + long ScheduleMessage(BrokerMessage brokerMessage, string options); + bool CancelSchedule(long handleId); + void Dispose(); + bool GetMessageFromException(Exception ex, SdtMessages_Message msg); + } + public class BrokerMessage : GxUserType + { + public string MessageId { get; set; } + public string MessageBody { get; set; } + public GXProperties MessageAttributes { get; set; } + public string MessageHandleId { get; set; } + + #region Json + private static Hashtable mapper; + public override String JsonMap(String value) + { + if (mapper == null) + { + mapper = new Hashtable(); + } + return (String)mapper[value]; ; + } + + public override void ToJSON() + { + ToJSON(true); + return; + } + + public override void ToJSON(bool includeState) + { + AddObjectProperty("MessageId", MessageId, false); + AddObjectProperty("MessageBody", MessageBody, false); + AddObjectProperty("MessageHandleId", MessageHandleId, false); + return; + } + + #endregion + + } +} diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBrokerBase.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBrokerBase.cs new file mode 100644 index 000000000..da044c4ef --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBrokerBase.cs @@ -0,0 +1,33 @@ +using System; +using GeneXus.Services; +using log4net; + +namespace GeneXus.Messaging.Common +{ + public abstract class MessageBrokerBase + { + static readonly ILog logger = log4net.LogManager.GetLogger(typeof(MessageBrokerBase)); + internal GXService service; + public MessageBrokerBase() + { + } + + public MessageBrokerBase(GXService s) + { + if (s == null) + { + try + { + s = ServiceFactory.GetGXServices().Get(GXServices.MESSAGEBROKER_SERVICE); + } + catch (Exception) + { + GXLogging.Warn(logger, "MESSAGEBROKER_SERVICE is not activated"); + } + } + + service = s; + } + public abstract String GetName(); + } +} diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBrokerProvider.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBrokerProvider.cs new file mode 100644 index 000000000..c3acfb7d8 --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageBrokerProvider.cs @@ -0,0 +1,117 @@ +using System; +using GeneXus.Attributes; +using GeneXus.Encryption; +using GeneXus.Services; +using GeneXus.Utils; +using GxClasses.Helpers; +using log4net; + +namespace GeneXus.Messaging.Common +{ + [GXApi] + public class MessageBrokerProvider : MessageQueue + { + static readonly ILog logger = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); + private static GXService providerService; + public MessageBrokerProvider() + { + + } + + public MessageQueue Connect(string providerTypeName, GXProperties properties, out GXBaseCollection errorMessages, out bool success) + { + errorMessages = new GXBaseCollection(); + MessageQueue messageQueue = new MessageQueue(); + if (string.IsNullOrEmpty(providerTypeName)) + { + GXUtil.ErrorToMessages("GXMessageBroker", "Message Broker provider cannot be empty", errorMessages); + GXLogging.Error(logger, "(GXMessageBroker)Failed to Connect to a Message Broker : Provider cannot be empty."); + success = false; + return messageQueue; + } + try + { + if (providerService == null || !string.Equals(providerService.Name, providerTypeName, StringComparison.OrdinalIgnoreCase)) + { + providerService = new GXService(); + providerService.Type = GXServices.MESSAGEBROKER_SERVICE; + providerService.Name = providerTypeName; + providerService.AllowMultiple = false; + providerService.Properties = new GXProperties(); + } + Preprocess(providerTypeName, properties); + + GxKeyValuePair prop = properties.GetFirst(); + while (!properties.Eof()) + { + providerService.Properties.Set(prop.Key, prop.Value); + prop = properties.GetNext(); + } + + string typeFullName = providerService.ClassName; + GXLogging.Debug(logger, "Loading Message Broker provider: " + typeFullName); + Type type = AssemblyLoader.GetType(typeFullName); + messageQueue.messageBroker = (IMessageBroker)Activator.CreateInstance(type, new object[] { providerService }); + + } + catch (Exception ex) + { + GXLogging.Error(logger, "(GXMessageBroker)Couldn't connect to Message Broker provider: " + ExceptionExtensions.GetInnermostException(ex)); + GXUtil.ErrorToMessages("GXMessageBroker", ex, errorMessages); + success = false; + return messageQueue; + } + success = true; + return (messageQueue); + } + private static void Preprocess(String name, GXProperties properties) + { + string className; + + switch (name) + { + case Providers.AzureServiceBus: + className = PropertyConstants.AZURE_SB_CLASSNAME; + SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_QUEUENAME); + SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_SUBSCRIPTION_NAME); + SetEncryptedProperty(properties, PropertyConstants.MESSAGEBROKER_AZURESB_CONNECTIONSTRING); + if (string.IsNullOrEmpty(providerService.ClassName) || !providerService.ClassName.Contains(className)) + { + providerService.ClassName = PropertyConstants.AZURE_SB_PROVIDER_CLASSNAME; + } + break; + default: + throw new SystemException(string.Format("Provider {0} is not supported.", name)); + } + } + private static void SetEncryptedProperty(GXProperties properties, String prop) + { + String value = properties.Get(prop); + if (string.IsNullOrEmpty(value)) + value = String.Empty; + value = CryptoImpl.Encrypt(value); + properties.Set(prop, value); + } + + } + public static class ExceptionExtensions + { + public static string GetInnermostException(Exception e) + { + Exception ex = e; + if (ex != null) + { + while (ex.InnerException != null) + { + ex = ex.InnerException; + } + + } + return ex.Message; + } + } + static class Providers + { + public const string AzureServiceBus = "AZURESERVICEBUS"; + } +} diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageQueue.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageQueue.cs new file mode 100644 index 000000000..0aa749ba0 --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/MessageQueue.cs @@ -0,0 +1,393 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.IO; +using System.Reflection; +using System.Text; +using GeneXus.Services; +using GeneXus.Utils; +using GxClasses.Helpers; +using log4net; + +namespace GeneXus.Messaging.Common +{ + public class MessageQueue + { + internal IMessageBroker messageBroker = null; + public static Assembly assembly; + static readonly ILog logger = log4net.LogManager.GetLogger(typeof(MessageQueue)); + private const string SDT_MESSAGE_CLASS_NAME = @"SdtMessage"; + private const string SDT_MESSAGEPROPERTY_CLASS_NAME = @"SdtMessageProperty"; + private const string NAMESPACE = @"GeneXus.Programs.genexusmessagingmessagebroker"; + private const string MODULE_DLL = @"GeneXusMessagingMessageBroker"; + + public MessageQueue() + { + } + public MessageQueue(MessageQueue other) + { + messageBroker = other.messageBroker; + } + void ValidQueue() + { + if (messageBroker == null) + { + GXLogging.Error(logger, "Message Broker was not instantiated."); + throw new Exception("Message Broker was not instantiated."); + } + } + private static Assembly LoadAssembly(string fileName) + { + if (File.Exists(fileName)) + { + Assembly assemblyLoaded = Assembly.LoadFrom(fileName); + return assemblyLoaded; + } + else + return null; + } + + private static void LoadAssemblyIfRequired() + { + if (assembly == null) + { + assembly = AssemblyLoader.LoadAssembly(new AssemblyName(MODULE_DLL)); + } + } + + public void Dispose() + { + try + { + ValidQueue(); + messageBroker.Dispose(); + } + catch (Exception ex) + { + GXLogging.Error(logger, ex); + + } + } + + public long ScheduleMessage(GxUserType messageQueue, string options, out GXBaseCollection errorMessages) + { + errorMessages = new GXBaseCollection(); + GxUserType result = new GxUserType(); + try + { + BrokerMessage brokerQueueMessage = TransformGXUserTypeToBrokerMessage(messageQueue); + LoadAssemblyIfRequired(); + try + { + ValidQueue(); + return messageBroker.ScheduleMessage(brokerQueueMessage, options); + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + GXLogging.Error(logger, ex); + } + } + catch (Exception ex) + { + GXLogging.Error(logger, ex); + throw ex; + } + return 0; + } + public bool CancelSchedule(long handleId, out GXBaseCollection errorMessages) + { + errorMessages = new GXBaseCollection(); + try + { + ValidQueue(); + return messageBroker.CancelSchedule(handleId); + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + GXLogging.Error(logger, ex); + return false; + } + } + + public bool ConsumeMessage(GxUserType messageQueue, string options, out GXBaseCollection errorMessages) + { + bool success = false; + errorMessages = new GXBaseCollection(); + GxUserType result = new GxUserType(); + try + { + BrokerMessage brokerQueueMessage = TransformGXUserTypeToBrokerMessage(messageQueue); + LoadAssemblyIfRequired(); + try + { + ValidQueue(); + return (messageBroker.ConsumeMessage(brokerQueueMessage, options)); + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + success = false; + GXLogging.Error(logger, ex); + } + } + catch (Exception ex) + { + success = false; + GXLogging.Error(logger, ex); + throw ex; + } + return success; + } + public GxUserType GetMessage(string options, out GXBaseCollection errorMessages, out bool success) + { + errorMessages = new GXBaseCollection(); + success = false; + try + { + ValidQueue(); + BrokerMessage brokerMessage = messageBroker.GetMessage(options, out success); + LoadAssemblyIfRequired(); + + if (TransformBrokerMessage(brokerMessage) is GxUserType result) + { + success = true; + return result; + } + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + GXLogging.Error(logger, ex); + success = false; + } + return TransformBrokerMessage(new BrokerMessage()); + } + public IList GetMessages(string options, out GXBaseCollection errorMessages, out bool success) + { + errorMessages = new GXBaseCollection(); + IList resultMessages = new List(); + success = false; + try + { + ValidQueue(); + IList brokerMessages = messageBroker.GetMessages(options, out success); + LoadAssemblyIfRequired(); + foreach (BrokerMessage brokerMessage in brokerMessages) + { + if (TransformBrokerMessage(brokerMessage) is GxUserType result) + resultMessages.Add(result); + } + success = true; + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + GXLogging.Error(logger, ex); + success = false; + } + return resultMessages; + } + public bool SendMessage(GxUserType messageQueue, string options, out GXBaseCollection errorMessages) + { + bool success = false; + errorMessages = new GXBaseCollection(); + try + { + BrokerMessage brokerQueueMessage = TransformGXUserTypeToBrokerMessage(messageQueue); + LoadAssemblyIfRequired(); + try + { + ValidQueue(); + if (messageBroker != null) + return(messageBroker.SendMessage(brokerQueueMessage, options)); + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + success = false; + GXLogging.Error(logger, ex); + } + } + catch (Exception ex) + { + success = false; + GXLogging.Error(logger,ex); + throw ex; + } + return success; + } + + public bool SendMessages(IList queueMessages, string options, out GXBaseCollection errorMessages) + { + errorMessages = new GXBaseCollection(); + bool success = false; + try + { + IList brokerMessagesList = new List(); + foreach (GxUserType queueMessage in queueMessages) + { + if (TransformGXUserTypeToBrokerMessage(queueMessage) is BrokerMessage brokerMessage) + brokerMessagesList.Add(brokerMessage); + } + try + { + ValidQueue(); + success = messageBroker.SendMessages(brokerMessagesList, options); + LoadAssemblyIfRequired(); + + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + success = false; + GXLogging.Error(logger, ex); + } + } + catch (Exception ex) + { + GXLogging.Error(logger, ex); + throw ex; + } + return success; + } + + protected void QueueErrorMessagesSetup(Exception ex, out GXBaseCollection errorMessages) + { + errorMessages = new GXBaseCollection(); + bool foundGeneralException = false; + if (errorMessages != null && ex != null) + { + SdtMessages_Message msg = new SdtMessages_Message(); + if (messageBroker != null) + { + while (ex.InnerException != null) + { + if (messageBroker.GetMessageFromException(ex.InnerException, msg)) + { + msg.gxTpr_Type = 1; + errorMessages.Add(msg); + } + else + { + foundGeneralException = true; + break; + } + ex = ex.InnerException; + } + if (foundGeneralException) + GXUtil.ErrorToMessages("GXServiceBus1002", ex, errorMessages); + } + else + { + GXUtil.ErrorToMessages("GXServiceBus1002", ex, errorMessages); + } + } + } + #region Transform operations + + private GxUserType TransformBrokerMessage(BrokerMessage brokerMessage) + { + Type classType = assembly.GetType(NAMESPACE + "." + SDT_MESSAGE_CLASS_NAME, false, ignoreCase: true); + Type propertyClassType = assembly.GetType(NAMESPACE + "." + SDT_MESSAGEPROPERTY_CLASS_NAME, false, ignoreCase: true); + + if (classType != null && Activator.CreateInstance(classType) is GxUserType messageSDT) + { + messageSDT.SetPropertyValue("Messageid", brokerMessage.MessageId); + messageSDT.SetPropertyValue("Messagebody", brokerMessage.MessageBody); + messageSDT.SetPropertyValue("Messagehandleid", brokerMessage.MessageHandleId); + + IList messageResultSDTAttributes = (IList)Activator.CreateInstance(classType.GetProperty("gxTpr_Messageattributes").PropertyType, new object[] { messageSDT.context, "MessageProperty", string.Empty }); + + if ((brokerMessage != null) && (brokerMessage.MessageAttributes != null)) + { + GxKeyValuePair prop = brokerMessage.MessageAttributes.GetFirst(); + while (!brokerMessage.MessageAttributes.Eof()) + { + if (propertyClassType != null && Activator.CreateInstance(propertyClassType) is GxUserType propertyClassTypeSDT) + { + propertyClassTypeSDT.SetPropertyValue("Propertykey", prop.Key); + propertyClassTypeSDT.SetPropertyValue("Propertyvalue", prop.Value); + messageResultSDTAttributes.Add(propertyClassTypeSDT); + prop = brokerMessage.MessageAttributes.GetNext(); + } + } + messageSDT.SetPropertyValue("Messageattributes", messageResultSDTAttributes); + } + return messageSDT; + } + return null; + } + private BrokerMessage TransformGXUserTypeToBrokerMessage(GxUserType queueMessage) + { + if (queueMessage != null) + { + BrokerMessage brokerQueueMessage = new BrokerMessage(); + brokerQueueMessage.MessageId = queueMessage.GetPropertyValue("Messageid"); + brokerQueueMessage.MessageBody = queueMessage.GetPropertyValue("Messagebody"); + brokerQueueMessage.MessageHandleId = queueMessage.GetPropertyValue("Messagehandleid"); + IList messageAttributes = queueMessage.GetPropertyValue("Messageattributes_GXBaseCollection"); + brokerQueueMessage.MessageAttributes = new GXProperties(); + foreach (GxUserType messageAttribute in messageAttributes) + { + string messagePropKey = messageAttribute.GetPropertyValue("Propertykey"); + string messagePropValue = messageAttribute.GetPropertyValue("Propertyvalue"); + brokerQueueMessage.MessageAttributes.Add(messagePropKey, messagePropValue); + } + return brokerQueueMessage; + } + return null; + } + #endregion + + } + internal class ServiceFactory + { + private static IMessageBroker messageBroker; + private static readonly ILog log = log4net.LogManager.GetLogger(typeof(GeneXus.Services.ServiceFactory)); + + public static GXServices GetGXServices() + { + return GXServices.Instance; + } + + public static IMessageBroker GetMessageBroker() + { + if (messageBroker == null) + { + messageBroker = GetMessageBrokerImpl(GXServices.MESSAGEBROKER_SERVICE); + } + return messageBroker; + } + + public static IMessageBroker GetMessageBrokerImpl(string service) + { + IMessageBroker messageBrokerImpl = null; + if (GetGXServices() != null) + { + GXService providerService = GetGXServices().Get(service); + if (providerService != null) + { + try + { + string typeFullName = providerService.ClassName; + GXLogging.Debug(log, "Loading Message Broker settings:", typeFullName); + Type type = AssemblyLoader.GetType(typeFullName); + messageBrokerImpl = (IMessageBroker)Activator.CreateInstance(type); + } + catch (Exception e) + { + GXLogging.Error(log, "Couldn't connect to the Message Broker.", e.Message, e); + throw e; + } + } + } + return messageBrokerImpl; + } + } + +} + + + diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/PropertyConstants.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/PropertyConstants.cs new file mode 100644 index 000000000..9c5ed2930 --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/PropertyConstants.cs @@ -0,0 +1,25 @@ +namespace GeneXus.Messaging.Common +{ + public static class PropertyConstants + { + //Azure Service Bus + internal const string AZURE_SB_CLASSNAME = "GeneXus.Messaging.GXAzureServiceBus.AzureServiceBus"; + internal const string AZURE_SB_PROVIDER_CLASSNAME = "GeneXus.Messaging.GXAzureServiceBus.AzureServiceBus, GXAzureServiceBus, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"; + public const string AZURESERVICEBUS = "AZURESERVICEBUS"; + public const string RECEIVE_MODE = "RECEIVE_MODE"; + public const string PREFETCH_COUNT = "PREFETCH_COUNT"; + public const string RECEIVER_IDENTIFIER = "RECEIVER_IDENTIFIER"; + public const string RECEIVER_SESSIONID = "RECEIVER_SESSIONID"; + public const string SENDER_IDENTIFIER = "SENDER_IDENTIFIER"; + + public const string MESSAGEBROKER_AZURESB_QUEUENAME = "MESSAGEBROKER_AZURESB_QUEUENAME"; + public const string MESSAGEBROKER_AZURESB_TOPICNAME = "MESSAGEBROKER_AZURESB_TOPICNAME"; + public const string MESSAGEBROKER_AZURESB_SUBSCRIPTION_NAME = "MESSAGEBROKER_AZURESB_SUBSCRIPTION"; + public const string MESSAGEBROKER_AZURESB_CONNECTIONSTRING = "MESSAGEBROKER_AZURESB_QUEUECONNECTION"; + public const string QUEUE_NAME = "QUEUENAME"; + public const string QUEUE_CONNECTION_STRING = "QUEUECONNECTION"; + public const string TOPIC_SUBSCRIPTION = "SUBSCRIPTION"; + public const string MESSAGE_BROKER = "MESSAGEBROKER"; + public const string SESSION_ENABLED = "SESSION_ENABLED"; + } +} diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/ServiceSettings.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/ServiceSettings.cs new file mode 100644 index 000000000..2d920710c --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXMessageBroker/ServiceSettings.cs @@ -0,0 +1,106 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using GeneXus.Encryption; +using GeneXus.Messaging.Common; +using GeneXus.Services; +using log4net; + +namespace GeneXus.Messaging.Common +{ + public class ServiceSettings + { + static readonly ILog logger = log4net.LogManager.GetLogger(typeof(ServiceSettings)); + + internal GXService service; + public string serviceNameResolver { get; } + public string name { get; } + + public ServiceSettings(string serviceNameResolver, string name, GXService gXService) + { + this.serviceNameResolver = serviceNameResolver; + this.name = name; + this.service = gXService; + } + + public string GetEncryptedOptPropertyValue(string propertyName, string alternativePropertyName = null) + { + String value = GetEncryptedPropertyValue(propertyName, alternativePropertyName, null); + return value; + } + public string GetEncryptedPropertyValue(string propertyName, string alternativePropertyName = null) + { + String value = GetEncryptedPropertyValue(propertyName, alternativePropertyName, null); + if (value == null) + { + String errorMessage = String.Format($"Service configuration error - Property name {ResolvePropertyName(propertyName)} must be defined"); + logger.Fatal(errorMessage); + throw new Exception(errorMessage); + } + return value; + } + public string GetEncryptedPropertyValue(string propertyName, string alternativePropertyName, string defaultValue) + { + String value = GetPropertyValue(propertyName, alternativePropertyName, defaultValue); + if (!String.IsNullOrEmpty(value)) + { + try + { + string ret = String.Empty; + if (CryptoImpl.Decrypt(ref ret, value)) + { + value = ret; + } + } + catch (Exception) + { + logger.Warn($"Could not decrypt property name: {ResolvePropertyName(propertyName)}"); + } + } + return value; + } + + internal string GetPropertyValue(string propertyName, string alternativePropertyName = null) + { + String value = GetPropertyValue(propertyName, alternativePropertyName, null); + if (value == null) + { + String errorMessage = String.Format($"Service configuration error - Property name {ResolvePropertyName(propertyName)} must be defined"); + logger.Fatal(errorMessage); + throw new Exception(errorMessage); + } + return value; + } + + internal string GetPropertyValue(string propertyName, string alternativePropertyName, string defaultValue) + { + String value = null; + value = string.IsNullOrEmpty(value) ? GetPropertyValueImpl(ResolvePropertyName(propertyName)) : value; + value = string.IsNullOrEmpty(value) ? GetPropertyValueImpl(propertyName) : value; + value = string.IsNullOrEmpty(value) ? GetPropertyValueImpl(alternativePropertyName) : value; + value = string.IsNullOrEmpty(value) ? defaultValue : value; + return value; + } + + internal string GetPropertyValueImpl(string propertyName) + { + String value = null; + if (!string.IsNullOrEmpty(propertyName)) + { + value = Environment.GetEnvironmentVariable(propertyName); + if (service != null && value == null) + { + value = service.Properties.Get(propertyName); + } + } + return value; + } + + internal string ResolvePropertyName(string propertyName) + { + return $"{serviceNameResolver}_{name}_{propertyName}"; + } + } +} diff --git a/dotnet/src/dotnetframework/GxClasses/Services/Storage/GXServices.cs b/dotnet/src/dotnetframework/GxClasses/Services/Storage/GXServices.cs index d8a74c28c..14534fa97 100644 --- a/dotnet/src/dotnetframework/GxClasses/Services/Storage/GXServices.cs +++ b/dotnet/src/dotnetframework/GxClasses/Services/Storage/GXServices.cs @@ -23,6 +23,7 @@ public class GXServices public static string SESSION_SERVICE = "Session"; public static string WEBNOTIFICATIONS_SERVICE = "WebNotifications"; public static string QUEUE_SERVICE = "QueueService"; + public static string MESSAGEBROKER_SERVICE = "MessageBrokerService"; private static string[] SERVICES_FILE = new string[] { "CloudServices.dev.config", "CloudServices.config" }; [System.Diagnostics.CodeAnalysis.SuppressMessage("GxFxCopRules", "CR1000:EnforceThreadSafeType")] private Dictionary services = new Dictionary(); diff --git a/dotnet/test/DotNetCoreUnitTest/DotNetCoreUnitTest.csproj b/dotnet/test/DotNetCoreUnitTest/DotNetCoreUnitTest.csproj index 44264c9ec..8ae3c4323 100644 --- a/dotnet/test/DotNetCoreUnitTest/DotNetCoreUnitTest.csproj +++ b/dotnet/test/DotNetCoreUnitTest/DotNetCoreUnitTest.csproj @@ -76,6 +76,7 @@ + diff --git a/dotnet/test/DotNetCoreUnitTest/MessageBroker/AzureMessageBrokerTest.cs b/dotnet/test/DotNetCoreUnitTest/MessageBroker/AzureMessageBrokerTest.cs new file mode 100644 index 000000000..620ca26f7 --- /dev/null +++ b/dotnet/test/DotNetCoreUnitTest/MessageBroker/AzureMessageBrokerTest.cs @@ -0,0 +1,72 @@ +using System; +using System.Collections.Generic; +using GeneXus.Messaging.Common; +using Xunit; + + +#pragma warning disable CA1031 // Do not catch general exception types +namespace UnitTesting +{ + [Collection("Sequential")] + public abstract class AzureMessageBrokerTest + { + + private IMessageBroker messageBroker; + + public AzureMessageBrokerTest(string queueName, Type queueType) + { + bool testEnabled = Environment.GetEnvironmentVariable("AZURESB_TEST_ENABLED") == "true"; + Skip.IfNot(testEnabled, "Environment variables not set"); + + if (queueName == GeneXus.Messaging.GXAzureServiceBus.AzureServiceBus.Name) + { + //Environment variables needed here + Environment.SetEnvironmentVariable("MESSAGEBROKER_AZURESB_QUEUENAME", ""); + Environment.SetEnvironmentVariable("MESSAGEBROKER_AZURESB_QUEUECONNECTION", ""); + + messageBroker = (IMessageBroker)Activator.CreateInstance(queueType); + Assert.NotNull(messageBroker); + } + } + + [SkippableFact] + public void TestSendOneMessageMethod() + { + BrokerMessage brokerMessage = new BrokerMessage(); + brokerMessage.MessageId = "TestMsgId"; + brokerMessage.MessageBody = "This is the message body"; + + bool success = messageBroker.SendMessage(brokerMessage,String.Empty); + + Assert.True(success); + + } + + [SkippableFact] + public void TestSendBatchMessagesMethod() + { + BrokerMessage brokerMessage1 = new BrokerMessage(); + brokerMessage1.MessageId = "TestMsgId1"; + brokerMessage1.MessageBody = "This is the message body 1"; + + BrokerMessage brokerMessage2 = new BrokerMessage(); + brokerMessage2.MessageId = "TestMsgId2"; + brokerMessage2.MessageBody = "This is the message body 2"; + + IList messages = new List(); + messages.Add(brokerMessage1); + messages.Add(brokerMessage2); + + bool success = messageBroker.SendMessages(messages, String.Empty); + + Assert.True(success); + } + [SkippableFact] + public void TestGetBatchMessagesMethod() + { + IList messages = messageBroker.GetMessages(String.Empty, out bool success); + Assert.True(success); + } + } +} +#pragma warning restore CA1031 // Do not catch general exception types \ No newline at end of file diff --git a/dotnet/test/DotNetCoreUnitTest/MessageBroker/AzureServiceBusTest.cs b/dotnet/test/DotNetCoreUnitTest/MessageBroker/AzureServiceBusTest.cs new file mode 100644 index 000000000..fa400edaa --- /dev/null +++ b/dotnet/test/DotNetCoreUnitTest/MessageBroker/AzureServiceBusTest.cs @@ -0,0 +1,13 @@ +using GeneXus.Messaging.GXAzureServiceBus; +using UnitTesting; + +namespace DotNetUnitTest +{ + public class AzureServiceBusTest : AzureMessageBrokerTest + { + public AzureServiceBusTest() : base(AzureServiceBus.Name, typeof(AzureServiceBus)) + { + } + + } +} \ No newline at end of file