diff --git a/dotnet/DotNetStandardClasses.sln b/dotnet/DotNetStandardClasses.sln index 94c5499b3..be80e4fa5 100644 --- a/dotnet/DotNetStandardClasses.sln +++ b/dotnet/DotNetStandardClasses.sln @@ -188,6 +188,14 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GxXsl", "src\dotnetcore\GxXsl\GxXsl.csproj", "{30E7E437-F9B0-42B8-9144-A8E8F972B462}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GXAzureQueue", "src\dotnetcore\Providers\Messaging\GXAzureQueue\GXAzureQueue.csproj", "{0CED9D4D-EE7C-4E19-9FC6-D4BBCB04DA97}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "messaging", "messaging", "{30159B0F-BE61-4DB7-AC02-02851426BE4B}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "common", "common", "{4C43F2DA-59E5-46F5-B691-195449498555}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GXQueue", "src\dotnetcore\Providers\Messaging\GXQueue\GXQueue.csproj", "{C1AC62A2-0FAC-4762-9B1A-47BB9FE5BF82}" +EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "mocking", "mocking", "{5045873B-E7CF-4317-94C1-0EF8623D23FA}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{8E5A25F9-2D64-4742-8227-2A3C5816AFEC}" @@ -200,6 +208,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestMockDBAccess", "src\ext EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GeneXus.Programs.Common", "src\extensions\Azure\test\GeneXus.Programs.Common\GeneXus.Programs.Common.csproj", "{DCEC0B38-93B6-4003-81E6-9FBC2BB4F163}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GXAmazonSQS", "src\dotnetcore\Providers\Messaging\GXAmazonSQS\GXAmazonSQS.csproj", "{F8BA0D65-267D-491F-BFAB-33F5E5B61AD7}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "apiattractions", "src\extensions\Azure\test\apiattractions\apiattractions.csproj", "{E85FDB0F-FA81-4CDD-8BF3-865269CE2DB3}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetRedisTest", "test\DotNetRedisTest\DotNetRedisTest.csproj", "{48430E50-043A-47A2-8278-B86A4420758A}" @@ -478,6 +487,14 @@ Global {30E7E437-F9B0-42B8-9144-A8E8F972B462}.Debug|Any CPU.Build.0 = Debug|Any CPU {30E7E437-F9B0-42B8-9144-A8E8F972B462}.Release|Any CPU.ActiveCfg = Release|Any CPU {30E7E437-F9B0-42B8-9144-A8E8F972B462}.Release|Any CPU.Build.0 = Release|Any CPU + {0CED9D4D-EE7C-4E19-9FC6-D4BBCB04DA97}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0CED9D4D-EE7C-4E19-9FC6-D4BBCB04DA97}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0CED9D4D-EE7C-4E19-9FC6-D4BBCB04DA97}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0CED9D4D-EE7C-4E19-9FC6-D4BBCB04DA97}.Release|Any CPU.Build.0 = Release|Any CPU + {C1AC62A2-0FAC-4762-9B1A-47BB9FE5BF82}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C1AC62A2-0FAC-4762-9B1A-47BB9FE5BF82}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C1AC62A2-0FAC-4762-9B1A-47BB9FE5BF82}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C1AC62A2-0FAC-4762-9B1A-47BB9FE5BF82}.Release|Any CPU.Build.0 = Release|Any CPU {8D05D621-3DB3-459F-8665-BEA4574F4EFF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {8D05D621-3DB3-459F-8665-BEA4574F4EFF}.Debug|Any CPU.Build.0 = Debug|Any CPU {8D05D621-3DB3-459F-8665-BEA4574F4EFF}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -490,6 +507,10 @@ Global {DCEC0B38-93B6-4003-81E6-9FBC2BB4F163}.Debug|Any CPU.Build.0 = Debug|Any CPU {DCEC0B38-93B6-4003-81E6-9FBC2BB4F163}.Release|Any CPU.ActiveCfg = Release|Any CPU {DCEC0B38-93B6-4003-81E6-9FBC2BB4F163}.Release|Any CPU.Build.0 = Release|Any CPU + {F8BA0D65-267D-491F-BFAB-33F5E5B61AD7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F8BA0D65-267D-491F-BFAB-33F5E5B61AD7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F8BA0D65-267D-491F-BFAB-33F5E5B61AD7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F8BA0D65-267D-491F-BFAB-33F5E5B61AD7}.Release|Any CPU.Build.0 = Release|Any CPU {E85FDB0F-FA81-4CDD-8BF3-865269CE2DB3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {E85FDB0F-FA81-4CDD-8BF3-865269CE2DB3}.Debug|Any CPU.Build.0 = Debug|Any CPU {E85FDB0F-FA81-4CDD-8BF3-865269CE2DB3}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -588,12 +609,17 @@ Global {B59F76D8-FDB2-4F51-89DB-F15E9BDFF1DC} = {420E8A4A-11D9-42E9-BFB7-4325EA7330B8} {D97E17A4-C945-4BF3-957E-F73142C4C6D0} = {947E032A-C385-4586-96E3-FC7D2767F082} {30E7E437-F9B0-42B8-9144-A8E8F972B462} = {2261B65E-3757-4E5B-9DCD-EAE8D1E236A3} + {0CED9D4D-EE7C-4E19-9FC6-D4BBCB04DA97} = {30159B0F-BE61-4DB7-AC02-02851426BE4B} + {30159B0F-BE61-4DB7-AC02-02851426BE4B} = {2261B65E-3757-4E5B-9DCD-EAE8D1E236A3} + {4C43F2DA-59E5-46F5-B691-195449498555} = {30159B0F-BE61-4DB7-AC02-02851426BE4B} + {C1AC62A2-0FAC-4762-9B1A-47BB9FE5BF82} = {4C43F2DA-59E5-46F5-B691-195449498555} {5045873B-E7CF-4317-94C1-0EF8623D23FA} = {C6AFB6A3-FF0B-4970-B1F1-10BCD3D932B2} {8E5A25F9-2D64-4742-8227-2A3C5816AFEC} = {5045873B-E7CF-4317-94C1-0EF8623D23FA} {8D05D621-3DB3-459F-8665-BEA4574F4EFF} = {8E5A25F9-2D64-4742-8227-2A3C5816AFEC} {C16BD5A9-4412-4B91-BB70-5C88B7AAE675} = {5045873B-E7CF-4317-94C1-0EF8623D23FA} {B01A243D-C012-4BEB-BAA9-E1D9AC1468C8} = {C16BD5A9-4412-4B91-BB70-5C88B7AAE675} {DCEC0B38-93B6-4003-81E6-9FBC2BB4F163} = {7BA5A2CE-7992-4F87-9D84-91AE4D046F5A} + {F8BA0D65-267D-491F-BFAB-33F5E5B61AD7} = {30159B0F-BE61-4DB7-AC02-02851426BE4B} {E85FDB0F-FA81-4CDD-8BF3-865269CE2DB3} = {7BA5A2CE-7992-4F87-9D84-91AE4D046F5A} {48430E50-043A-47A2-8278-B86A4420758A} = {1D6F1776-FF4B-46C2-9B3D-BC46CCF049DC} EndGlobalSection diff --git a/dotnet/src/dotnetcore/GxClasses/Properties/AssemblyInfo.cs b/dotnet/src/dotnetcore/GxClasses/Properties/AssemblyInfo.cs index 7b646928c..1d7f35bd7 100644 --- a/dotnet/src/dotnetcore/GxClasses/Properties/AssemblyInfo.cs +++ b/dotnet/src/dotnetcore/GxClasses/Properties/AssemblyInfo.cs @@ -4,4 +4,4 @@ [assembly: InternalsVisibleTo("GxClasses.Web")] [assembly: InternalsVisibleTo("GxSearch")] [assembly: InternalsVisibleTo("GxNetCoreStartup")] - +[assembly: InternalsVisibleTo("GXQueue")] diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXAmazonSQS/AWSMessageQueueProvider.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXAmazonSQS/AWSMessageQueueProvider.cs new file mode 100644 index 000000000..ebf20e338 --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXAmazonSQS/AWSMessageQueueProvider.cs @@ -0,0 +1,41 @@ +using GeneXus.Messaging.Common; +using GeneXus.Utils; + +namespace GeneXus.Messaging.Queue +{ + public class AWSMessageQueueProvider + { + private const string AWS_SQS = "AWS_SQS"; + public SimpleMessageQueue Connect(GxUserType awsCredentials, string queueURL, out GXBaseCollection errorMessages, out bool success) + + { + MessageQueueProvider messageQueueProvider = new MessageQueueProvider(); + GXProperties properties = TransformAWSCredentials(awsCredentials); + properties.Add("QUEUE_AWSSQS_QUEUE_URL", queueURL); + SimpleMessageQueue simpleMessageQueue = messageQueueProvider.Connect(AWS_SQS, properties, out GXBaseCollection errorMessagesConnect, out bool successConnect); + errorMessages = errorMessagesConnect; + success = successConnect; + return simpleMessageQueue; + } + public SimpleMessageQueue Connect(string queueURL, out GXBaseCollection errorMessages, out bool success) + + { + MessageQueueProvider messageQueueProvider = new MessageQueueProvider(); + GXProperties properties = new GXProperties(); + properties.Add("QUEUE_AWSSQS_QUEUE_URL", queueURL); + SimpleMessageQueue simpleMessageQueue = messageQueueProvider.Connect(AWS_SQS, properties, out GXBaseCollection errorMessagesConnect, out bool successConnect); + errorMessages = errorMessagesConnect; + success = successConnect; + return simpleMessageQueue; + } + + public GXProperties TransformAWSCredentials(GxUserType awsCredentials) + { + GXProperties properties = new GXProperties(); + properties.Add("QUEUE_AWSSQS_ACCESS_KEY", awsCredentials.GetPropertyValue("Accesskey")); + properties.Add("QUEUE_AWSSQS_SECRET_KEY", awsCredentials.GetPropertyValue("Secretkey")); + properties.Add("QUEUE_AWSSQS_REGION", awsCredentials.GetPropertyValue("Region")); + return properties; + } + } +} diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXAmazonSQS/AWSQueue.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXAmazonSQS/AWSQueue.cs new file mode 100644 index 000000000..5fd3829c3 --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXAmazonSQS/AWSQueue.cs @@ -0,0 +1,634 @@ +using System; +using System.Collections.Generic; +using Amazon; +using Amazon.Runtime; +using Amazon.SQS; +using Amazon.SQS.Model; +using GeneXus.Messaging.Common; +using GeneXus.Services; +using GeneXus.Utils; +using log4net; +using System.Threading.Tasks; +using System.Reflection; + +namespace GeneXus.Messaging.Queue +{ + public class AWSQueue : QueueBase, IQueue + { + + public static String Name = "AWSSQS"; + const string ACCESS_KEY = "ACCESS_KEY"; + const string SECRET_ACCESS_KEY = "SECRET_KEY"; + const string REGION = "REGION"; + const string QUEUE_URL = "QUEUE_URL"; + + AmazonSQSClient _sqsClient; + private string _accessKey; + private string _secret; + private string _awsregion; + private string _queueURL; + private bool _isFIFO; + public const string MESSSAGE_GROUP_ID = "MessageGroupId"; + public const string MESSSAGE_DEDUPLICATION_ID = "MessageDeduplicationId"; + + static readonly ILog logger = log4net.LogManager.GetLogger(typeof(AWSQueue)); + + public AWSQueue() : this(null) + { + } + + public AWSQueue(GXService providerService) : base(providerService) + { + Initialize(providerService); + BasicAWSCredentials basicCredentials; + RegionEndpoint region = RegionEndpoint.GetBySystemName(_awsregion); + + if ((_accessKey != null) && (_secret != null)) + { + basicCredentials = new BasicAWSCredentials(_accessKey, _secret); + _sqsClient = new AmazonSQSClient(basicCredentials, region); + } + else //Use IAM Role + { + _sqsClient = new AmazonSQSClient(region); + } + } + + private void Initialize(GXService providerService) + { + ServiceSettings serviceSettings = new("QUEUE", Name, providerService); + + _queueURL = serviceSettings.GetEncryptedPropertyValue(QUEUE_URL); + _accessKey = serviceSettings.GetEncryptedPropertyValue(ACCESS_KEY); + _secret = serviceSettings.GetEncryptedPropertyValue(SECRET_ACCESS_KEY); + _awsregion = serviceSettings.GetEncryptedPropertyValue(REGION); + + _isFIFO = _queueURL.EndsWith(".fifo"); + + } + public void Clear(out bool success) + { + try + { + Task task = Task.Run(async () => await PurgeQueueAsync()); + PurgeQueueResponse response = task.Result; + success = response != null; + + } + catch (AggregateException ae) + { + throw ae; + } + } + + public MessageQueueResult DeleteMessage(string messageHandleId, out bool success) + { + success = false; + MessageQueueResult messageQueueResult = new MessageQueueResult(); + + List messageHandleIdToDelete = new List { messageHandleId }; + IList messageQueueResults = RemoveMessages(messageHandleIdToDelete, out bool operationOK); + if ((operationOK) && (messageQueueResults != null)) + { + messageQueueResult = messageQueueResults[0]; + success = true; + } + return messageQueueResult; + } + + public IList DeleteMessages(List messageHandleId, out bool success) + { + return RemoveMessages(messageHandleId, out success); + } + private IList RemoveMessages(List messageHandleId, out bool success) + { + IList messageQueueResults = new List(); + success = false; + try + { + Task task = Task.Run(async () => await DeleteQueueMessageBatchAsync(messageHandleId)); + + DeleteMessageBatchResponse deleteMessageBatchResponse = task.Result; + if (deleteMessageBatchResponse != null) + success = (deleteMessageBatchResponse.Failed.Count == 0); + + foreach (BatchResultErrorEntry entry in deleteMessageBatchResponse.Failed) + { + MessageQueueResult messageQueueResult = SetupMessageQueueResult(entry); + messageQueueResults.Add(messageQueueResult); + } + + foreach (DeleteMessageBatchResultEntry entry in deleteMessageBatchResponse.Successful) + { + MessageQueueResult messageQueueResult = SetupMessageQueueResult(entry); + messageQueueResults.Add(messageQueueResult); + } + + } + catch (AggregateException ae) + { + throw ae; + } + return messageQueueResults; + } + public IList DeleteMessages(IList simpleQueueMessages, out bool success) + { + return RemoveMessages(simpleQueueMessages, out success); + } + private IList RemoveMessages(IList simpleQueueMessages, out bool success) + { + IList messageQueueResults = new List(); + List messageHandleIds = new List(); + success = false; + try + { + foreach (SimpleQueueMessage simpleQueueMessage in simpleQueueMessages) + { + messageHandleIds.Add(simpleQueueMessage.MessageHandleId); + } + Task task = Task.Run(async () => await DeleteQueueMessageBatchAsync(messageHandleIds)); + + DeleteMessageBatchResponse deleteMessageBatchResponse = task.Result; + if (deleteMessageBatchResponse != null) + success = (deleteMessageBatchResponse.Failed.Count == 0); + + foreach (BatchResultErrorEntry entry in deleteMessageBatchResponse.Failed) + { + MessageQueueResult messageQueueResult = SetupMessageQueueResult(entry); + messageQueueResults.Add(messageQueueResult); + } + + foreach (DeleteMessageBatchResultEntry entry in deleteMessageBatchResponse.Successful) + { + MessageQueueResult messageQueueResult = SetupMessageQueueResult(entry); + messageQueueResults.Add(messageQueueResult); + } + + } + catch (AggregateException ae) + { + throw ae; + } + return messageQueueResults; + } + + public IList GetMessages(out bool success) + { + return RetrieveMessages(success : out success); + } + + public IList GetMessages(MessageQueueOptions messageQueueOptions, out bool success) + { + return RetrieveMessages(out success, messageQueueOptions); + } + private IList RetrieveMessages(out bool success, MessageQueueOptions messageQueueOptions = null) + { + success = false; + IList simpleQueueMessages = new List(); + try + { + Task task = Task.Run(async () => await GetMessageAsync(messageQueueOptions)); + + ReceiveMessageResponse response = task.Result; + success = response != null; + if (success) + { + List messagesList = response.Messages; + + foreach (Message message in messagesList) + { + SimpleQueueMessage simpleQueueMessage = SetupSimpleQueueMessage(message); + simpleQueueMessages.Add(simpleQueueMessage); + } + } + } + catch (AggregateException ae) + { + throw ae; + } + return simpleQueueMessages; + } + + public int GetQueueLength(out bool success) + { + int approxNumberMessages = 0; + success = false; + try + { + List attributes = new List { "ApproximateNumberOfMessages" }; + Task task = Task.Run(async () => await GetQueueAttributeAsync(attributes).ConfigureAwait(false)); + GetQueueAttributesResponse response = task.Result; + success = response != null; + if (success) + { + return (response.ApproximateNumberOfMessages); + } + + } + catch (Exception ex) + { + throw ex; + } + return approxNumberMessages; + } + + public MessageQueueResult SendMessage(SimpleQueueMessage simpleQueueMessage, out bool success) + { + success = false; + MessageQueueResult messageQueueResult = new MessageQueueResult(); + List simpleQueueMessages = new List() { simpleQueueMessage }; + try + { + Task task = Task.Run(async () => await SendMessageBatchAsync(simpleQueueMessages)); + SendMessageBatchResponse sendMessageBatchResponse = task.Result; + if (sendMessageBatchResponse != null) + success = (sendMessageBatchResponse.Failed.Count == 0); + + foreach (BatchResultErrorEntry entry in sendMessageBatchResponse.Failed) + { + messageQueueResult = SetupMessageQueueResult(entry); + } + + foreach (SendMessageBatchResultEntry entry in sendMessageBatchResponse.Successful) + { + messageQueueResult = SetupMessageQueueResult(entry); + } + + } + catch (AggregateException ae) + { + throw ae; + } + return messageQueueResult; + } + + protected MessageQueueResult SendMessage(SimpleQueueMessage simpleQueueMessage, MessageQueueOptions messageQueueOptions, out bool success) + { + success = false; + MessageQueueResult messageQueueResult = new MessageQueueResult(); + List simpleQueueMessages = new List() { simpleQueueMessage }; + try + { + Task task = Task.Run(async () => await SendMessageBatchAsync(simpleQueueMessages, messageQueueOptions)); + SendMessageBatchResponse sendMessageBatchResponse = task.Result; + if (sendMessageBatchResponse != null) + success = (sendMessageBatchResponse.Failed.Count == 0); + + foreach (BatchResultErrorEntry entry in sendMessageBatchResponse.Failed) + { + messageQueueResult = SetupMessageQueueResult(entry); + } + + foreach (SendMessageBatchResultEntry entry in sendMessageBatchResponse.Successful) + { + messageQueueResult = SetupMessageQueueResult(entry); + } + + } + catch (AggregateException ae) + { + throw ae; + } + return messageQueueResult; + } + + public IList SendMessages(IList simpleQueueMessages, MessageQueueOptions messageQueueOptions, out bool success) + { + success = false; + IList messageQueueResults = new List(); + try + { + Task task = Task.Run(async () => await SendMessageBatchAsync(simpleQueueMessages, messageQueueOptions)); + SendMessageBatchResponse sendMessageBatchResponse = task.Result; + if (sendMessageBatchResponse != null) + success = (sendMessageBatchResponse.Failed.Count == 0); + + foreach (BatchResultErrorEntry entry in sendMessageBatchResponse.Failed) + { + MessageQueueResult messageQueueResult = SetupMessageQueueResult(entry); + messageQueueResults.Add(messageQueueResult); + } + + foreach (SendMessageBatchResultEntry entry in sendMessageBatchResponse.Successful) + { + MessageQueueResult messageQueueResult = SetupMessageQueueResult(entry); + messageQueueResults.Add(messageQueueResult); + } + + } + catch (AggregateException ae) + { + throw ae; + } + return messageQueueResults; + } + + public override string GetName() + { + return Name; + } + + public bool GetMessageFromException(Exception ex, SdtMessages_Message msg) + { + try + { + AmazonSQSException sqs_ex = (AmazonSQSException)ex; + msg.gxTpr_Id = sqs_ex.ErrorCode; + msg.gxTpr_Description = sqs_ex.Message; + return true; + } + catch (Exception) + { + return false; + } + } + + private MessageQueueResult SetupMessageQueueResult(SendMessageResponse response) + { + MessageQueueResult messageQueueResult = new MessageQueueResult(); + messageQueueResult.MessageId = response.MessageId; + messageQueueResult.MessageStatus = MessageQueueResultStatus.Sent; + + messageQueueResult.MessageAttributes = new GXProperties(); + + messageQueueResult.MessageAttributes.Add("MD5OfMessageSystemAttributes", response.MD5OfMessageSystemAttributes); + messageQueueResult.MessageAttributes.Add("MD5OfMessageAttributes", response.MD5OfMessageAttributes); + messageQueueResult.MessageAttributes.Add("ContentLength", response.ContentLength.ToString()); + messageQueueResult.MessageAttributes.Add("MD5OfMessageBody", response.MD5OfMessageBody); + messageQueueResult.MessageAttributes.Add("SequenceNumber", response.SequenceNumber); + + Type t = response.ResponseMetadata.GetType(); + PropertyInfo[] props = t.GetProperties(); + + foreach (PropertyInfo prop in props) + { + object value; + if (prop.GetIndexParameters().Length == 0 && response.ResponseMetadata != null) + { + value = prop.GetValue(response.ResponseMetadata); + if (value != null) + messageQueueResult.MessageAttributes.Add(prop.Name, value.ToString()); + } + } + return messageQueueResult; + } + private MessageQueueResult SetupMessageQueueResult(SendMessageBatchResultEntry response) + { + MessageQueueResult messageQueueResult = new MessageQueueResult(); + messageQueueResult.MessageId = response.Id; + messageQueueResult.MessageStatus = MessageQueueResultStatus.Sent; + + messageQueueResult.MessageAttributes = new GXProperties(); + + messageQueueResult.MessageAttributes.Add("MD5OfMessageSystemAttributes", response.MD5OfMessageSystemAttributes); + messageQueueResult.MessageAttributes.Add("MD5OfMessageAttributes", response.MD5OfMessageAttributes); + messageQueueResult.MessageAttributes.Add("MD5OfMessageBody", response.MD5OfMessageBody); + messageQueueResult.MessageAttributes.Add("SequenceNumber", response.SequenceNumber); + return messageQueueResult; + } + + private MessageQueueResult SetupMessageQueueResult(DeleteMessageBatchResultEntry response) + { + MessageQueueResult messageQueueResult = new MessageQueueResult(); + messageQueueResult.MessageStatus = MessageQueueResultStatus.Deleted; + messageQueueResult.MessageId = response.Id; + return messageQueueResult; + } + + private MessageQueueResult SetupMessageQueueResult(BatchResultErrorEntry response) + { + MessageQueueResult messageQueueResult = new MessageQueueResult(); + messageQueueResult.MessageStatus = MessageQueueResultStatus.Failed; + messageQueueResult.MessageId = response.Id; + + //Write error codes to log in debug mode + GXLogging.Debug(logger, $"Error processing SQS. Message: {response.Id}. Error: {response.Message}({response.Code})"); + + return messageQueueResult; + } + + private SimpleQueueMessage SetupSimpleQueueMessage(Message response) + { + SimpleQueueMessage simpleQueueMessage = new SimpleQueueMessage(); + simpleQueueMessage.MessageId = response.MessageId; + simpleQueueMessage.MessageBody = response.Body; + simpleQueueMessage.MessageHandleId = response.ReceiptHandle; + + simpleQueueMessage.MessageAttributes = new GXProperties(); + + simpleQueueMessage.MessageAttributes.Add("MD5OfMessageAttributes", response.MD5OfMessageAttributes); + simpleQueueMessage.MessageAttributes.Add("MD5OfBody", response.MD5OfBody); + + foreach (var messageAttribute in response.MessageAttributes) + { + MessageAttributeValue messageAttributeValue = messageAttribute.Value; + simpleQueueMessage.MessageAttributes.Add(messageAttribute.Key, messageAttribute.Value.StringValue); + } + + foreach (var attribute in response.Attributes) + { + simpleQueueMessage.MessageAttributes.Add(attribute.Key, attribute.Value); + } + + return simpleQueueMessage; + } + + private async Task SendMessageAsync(SimpleQueueMessage simpleQueueMessage, MessageQueueOptions messageQueueOptions = null) + { + SendMessageResponse sendMessageResponse = new SendMessageResponse(); + + if (simpleQueueMessage != null) + { + GXProperties messageAttributes = simpleQueueMessage.MessageAttributes; + Dictionary properties = new Dictionary(); + + GxKeyValuePair messageAttribute = new GxKeyValuePair(); + if (messageAttributes != null) + messageAttribute = messageAttributes.GetFirst(); + while (!messageAttributes.Eof()) + { + properties.Add(messageAttribute.Key, new MessageAttributeValue() { DataType = "String", StringValue = messageAttribute.Value }); + messageAttribute = messageAttributes.GetNext(); + } + + SendMessageRequest sendMessageRequest = new SendMessageRequest + { + QueueUrl = _queueURL, + MessageBody = simpleQueueMessage.MessageBody, + MessageAttributes = properties + }; + + if (messageQueueOptions != null && messageQueueOptions.DelaySeconds != 0) + sendMessageRequest.DelaySeconds = messageQueueOptions.DelaySeconds; + + if ((messageAttributes != null) && (_isFIFO)) + { + string mesageGroupId = messageAttributes.Get(MESSSAGE_GROUP_ID); + string messageDeduplicationId = messageAttributes.Get(MESSSAGE_DEDUPLICATION_ID); + + if ((mesageGroupId != null) && (messageDeduplicationId != null)) + { + sendMessageRequest.MessageGroupId = mesageGroupId; + sendMessageRequest.MessageDeduplicationId = messageDeduplicationId; + + } + } + sendMessageResponse = await _sqsClient.SendMessageAsync(sendMessageRequest).ConfigureAwait(false); + } + return sendMessageResponse; + } + + private async Task SendMessageBatchAsync(IList simpleQueueMessages, MessageQueueOptions messageQueueOptions=null) + { + List messageBatchRequestEntries = new List(); + SendMessageBatchResponse responseSendBatch = new SendMessageBatchResponse(); + + foreach (SimpleQueueMessage simpleQueueMessage in simpleQueueMessages) + { + SendMessageBatchRequestEntry requestEntry = new SendMessageBatchRequestEntry(); + GXProperties messageAttributes = simpleQueueMessage.MessageAttributes; + Dictionary properties = new Dictionary(); + + GxKeyValuePair messageAttribute = new GxKeyValuePair(); + if (messageAttributes != null) + messageAttribute = messageAttributes.GetFirst(); + while (!messageAttributes.Eof()) + { + properties.Add(messageAttribute.Key, new MessageAttributeValue() { DataType = "String", StringValue = messageAttribute.Value }); + messageAttribute = messageAttributes.GetNext(); + } + + requestEntry.MessageBody = simpleQueueMessage.MessageBody; + requestEntry.Id = simpleQueueMessage.MessageId; + requestEntry.MessageAttributes = properties; + if ((messageQueueOptions != null) && (messageQueueOptions.DelaySeconds != 0)) + requestEntry.DelaySeconds = messageQueueOptions.DelaySeconds; + + if ((messageAttributes != null) && (_isFIFO)) + { + string mesageGroupId = messageAttributes.Get(MESSSAGE_GROUP_ID); + string messageDeduplicationId = messageAttributes.Get(MESSSAGE_DEDUPLICATION_ID); + + if ((mesageGroupId != null) && (mesageGroupId != null)) + { + requestEntry.MessageGroupId = mesageGroupId; + requestEntry.MessageDeduplicationId = messageDeduplicationId; + } + } + + messageBatchRequestEntries.Add(requestEntry); + } + if (messageBatchRequestEntries.Count > 0) + responseSendBatch = await _sqsClient.SendMessageBatchAsync(_queueURL, messageBatchRequestEntries).ConfigureAwait(false); + + return (responseSendBatch); + + } + + private async Task GetMessageAsync(MessageQueueOptions messageQueueOptions = null) + { + ReceiveMessageResponse receiveMessageResponse = new ReceiveMessageResponse(); + try + { + + ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(); + receiveMessageRequest.QueueUrl = _queueURL; + if (messageQueueOptions != null) + { + if (messageQueueOptions.MaxNumberOfMessages != 0) + receiveMessageRequest.MaxNumberOfMessages = messageQueueOptions.MaxNumberOfMessages; + if (messageQueueOptions.VisibilityTimeout != 0) + receiveMessageRequest.VisibilityTimeout = messageQueueOptions.VisibilityTimeout; + if (messageQueueOptions.WaitTimeout != 0) + receiveMessageRequest.WaitTimeSeconds = messageQueueOptions.WaitTimeout; + if (! string.IsNullOrEmpty(messageQueueOptions.ReceiveRequestAttemptId)) + receiveMessageRequest.ReceiveRequestAttemptId = messageQueueOptions.ReceiveRequestAttemptId; + // TO DO : Check only for specific attrributes in the list + + if (messageQueueOptions.ReceiveMessageAttributes) + { + receiveMessageRequest.AttributeNames = new List { "All" }; + receiveMessageRequest.MessageAttributeNames = new List() { "All" }; + } + } + + return await _sqsClient.ReceiveMessageAsync(receiveMessageRequest).ConfigureAwait(false); + } + catch (TaskCanceledException) + { + GXLogging.Debug(logger, $"Get Message Operation cancelled for SQS {_queueURL}."); + } + catch (Exception ex) + { + throw (ex); + } + return receiveMessageResponse; + } + + private async Task DeleteQueueMessageBatchAsync(List messageHandleId) + { + DeleteMessageBatchResponse deleteMessageBatchResponse = new DeleteMessageBatchResponse(); + try + { + + List deleteMessageBatchRequestEntries = new List(); + + foreach (string handleId in messageHandleId) + { + DeleteMessageBatchRequestEntry deleteMessageBatchRequestEntry = new DeleteMessageBatchRequestEntry(); + deleteMessageBatchRequestEntry.ReceiptHandle = handleId; + deleteMessageBatchRequestEntry.Id = Guid.NewGuid().ToString(); + deleteMessageBatchRequestEntries.Add(deleteMessageBatchRequestEntry); + } + + deleteMessageBatchResponse = await _sqsClient.DeleteMessageBatchAsync(_queueURL, deleteMessageBatchRequestEntries).ConfigureAwait(false); + } + catch (Exception ex) + { + throw (ex); + } + return deleteMessageBatchResponse; + } + + private async Task PurgeQueueAsync() + { + PurgeQueueResponse purgeQueueResponse = new PurgeQueueResponse(); + try + { + PurgeQueueRequest purgeQueueRequest = new PurgeQueueRequest(); + purgeQueueRequest.QueueUrl = _queueURL; + + purgeQueueResponse= await _sqsClient.PurgeQueueAsync(purgeQueueRequest).ConfigureAwait(false); + } + catch (Exception ex) + { + throw (ex); + } + return purgeQueueResponse; + } + + private async Task GetQueueAttributeAsync(List attributesName) + { + GetQueueAttributesResponse getQueueAttributesResponse = new GetQueueAttributesResponse(); + GetQueueAttributesRequest getQueueAttributesRequest = new GetQueueAttributesRequest(); + try + { + getQueueAttributesRequest.QueueUrl= _queueURL; + if (attributesName == null) + getQueueAttributesRequest.AttributeNames.Add("All"); + foreach (string name in attributesName) + getQueueAttributesRequest.AttributeNames.Add(name); + getQueueAttributesResponse = await _sqsClient.GetQueueAttributesAsync(getQueueAttributesRequest).ConfigureAwait(false); + } + catch (Exception ex) + { + throw (ex); + } + return getQueueAttributesResponse; + + } + } + +} + + diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXAmazonSQS/GXAmazonSQS.csproj b/dotnet/src/dotnetcore/Providers/Messaging/GXAmazonSQS/GXAmazonSQS.csproj new file mode 100644 index 000000000..b9f71c2aa --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXAmazonSQS/GXAmazonSQS.csproj @@ -0,0 +1,17 @@ + + + + net6.0 + + + + + + + + + + + + + diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXAzureQueue/AzureMessageQueueProvider.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureQueue/AzureMessageQueueProvider.cs new file mode 100644 index 000000000..9f6857498 --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureQueue/AzureMessageQueueProvider.cs @@ -0,0 +1,22 @@ +using GeneXus.Messaging.Common; +using GeneXus.Utils; + +namespace GeneXus.Messaging.Queue +{ + public class AzureMessageQueueProvider + { + private const string AZUREQUEUE = "AZUREQUEUE"; + public SimpleMessageQueue Connect(string queueName, string queueURL, out GXBaseCollection errorMessages, out bool success) + + { + MessageQueueProvider messageQueueProvider = new MessageQueueProvider(); + GXProperties properties = new GXProperties(); + properties.Add("QUEUE_AZUREQUEUE_QUEUENAME", queueName); + properties.Add("QUEUE_AZUREQUEUE_CONNECTIONSTRING", queueURL); + SimpleMessageQueue simpleMessageQueue = messageQueueProvider.Connect(AZUREQUEUE, properties, out GXBaseCollection errorMessagesConnect, out bool successConnect); + errorMessages = errorMessagesConnect; + success = successConnect; + return simpleMessageQueue; + } + } +} diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXAzureQueue/AzureQueue.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureQueue/AzureQueue.cs new file mode 100644 index 000000000..e18c42e7d --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureQueue/AzureQueue.cs @@ -0,0 +1,459 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using Azure.Storage.Queues; +using Azure.Storage.Queues.Models; +using GeneXus.Messaging.Common; +using GeneXus.Services; +using GeneXus.Utils; + +namespace GeneXus.Messaging.Queue +{ + public class AzureQueue : QueueBase, IQueue + { + + public static String Name = "AZUREQUEUE"; + const string QUEUE_NAME = "QUEUENAME"; + const string QUEUE_CONNECTION_STRING = "CONNECTIONSTRING"; + + QueueClient _queueClient { get; set; } + private string _queueName { get; set; } + private string _connectionString { get; set; } + + public AzureQueue() : this(null) + { + } + + public AzureQueue(GXService providerService) : base(providerService) + { + Initialize(providerService); + } + + private void Initialize(GXService providerService) + { + ServiceSettings serviceSettings = new("QUEUE", Name, providerService); + _queueName = serviceSettings.GetEncryptedPropertyValue(QUEUE_NAME); + _connectionString = serviceSettings.GetEncryptedPropertyValue(QUEUE_CONNECTION_STRING); + + QueueClientOptions queueClientOptions = new QueueClientOptions() + { + MessageEncoding = QueueMessageEncoding.Base64 + }; + + _queueClient = new QueueClient(_connectionString, _queueName, queueClientOptions); + } + + QueueClient QueueClient + { + get + { + if (_queueClient == null) + _queueClient = new QueueClient(_connectionString, _queueName); + return _queueClient; + } + } + + public AzureQueue(string connectionString, string queueName) + { + _queueName = queueName; + _connectionString = connectionString; + } + + //public AzureQueue(Uri uri, TokenCredential tokenCredential) + //{ + //_queueClient = new QueueClient(uri, tokenCredential); + //} + + public bool GetMessageFromException(Exception ex, SdtMessages_Message msg) + { + try + { + Azure.RequestFailedException az_ex = (Azure.RequestFailedException)ex; + msg.gxTpr_Id = az_ex.ErrorCode; + msg.gxTpr_Description = az_ex.Message; + return true; + } + catch (Exception) + { + return false; + } + } + + /// + /// Get the approximate number of messages in the queue + /// + /// + /// + public int GetQueueLength(out bool success) + { + int cachedMessagesCount = 0; + success = false; + if (_queueClient is QueueClient && _queueClient.Exists()) + { + QueueProperties properties = _queueClient.GetProperties(); + + // Retrieve the cached approximate message count. + cachedMessagesCount = properties.ApproximateMessagesCount; + success = true; + } + return cachedMessagesCount; + } + + /// + /// Deletes all messages from a queue. + /// + public void Clear(out bool success) + { + success = false; + if (_queueClient is QueueClient && _queueClient.Exists()) + { + Azure.Response result = _queueClient.ClearMessages(); + success = !result.IsError; + } + } + + public MessageQueueResult DeleteMessage(string messageHandleId, out bool success) + { + //This method should receive messageHandleId + popReceipt + success = false; + MessageQueueResult messageQueueResult = new MessageQueueResult(); + if (_queueClient is QueueClient && _queueClient.Exists()) + { + Azure.Response receivedMessage = _queueClient.ReceiveMessage(); + + if ((receivedMessage != null) && (!receivedMessage.GetRawResponse().IsError) && (receivedMessage.Value != null) && (receivedMessage.Value.MessageId == messageHandleId)) + { + Azure.Response deleteResult = _queueClient.DeleteMessage(receivedMessage.Value.MessageId, receivedMessage.Value.PopReceipt); + + success = !deleteResult.IsError; + if (success) + { + return (AzQueueMessageToMessageQueueResult(receivedMessage.Value, MessageQueueResultStatus.Deleted)); + } + } + } + return messageQueueResult; + } + + /// + /// Deletes permanently the messages given on the list. + /// + + public IList DeleteMessages(List messageHandleId, out bool success) + { + success = false; + IList messageQueueResults = new List(); + if (_queueClient is QueueClient && _queueClient.Exists()) + { + QueueMessage[] receivedMessages = _queueClient.ReceiveMessages(); + Azure.Response deleteResult; + foreach (QueueMessage message in receivedMessages) + { + if (messageHandleId.Contains(message.MessageId)) + { + deleteResult = _queueClient.DeleteMessage(message?.MessageId, message?.PopReceipt); + if ((deleteResult != null) && (!deleteResult.IsError) && message is QueueMessage) + messageQueueResults.Add(AzQueueMessageToMessageQueueResult(queueMessage: message, status: MessageQueueResultStatus.Deleted)); + } + } + success = true; + } + return messageQueueResults; + } + + /// + /// Deletes permanently the messages given on the list. + /// + + public IList DeleteMessages(IList simpleQueueMessages, out bool success) + { + success = false; + IList messageQueueResults = new List(); + if (_queueClient is QueueClient && _queueClient.Exists()) + { + Azure.Response deleteResult; + foreach (SimpleQueueMessage simpleQueueMessage in simpleQueueMessages) + { + deleteResult = _queueClient.DeleteMessage(simpleQueueMessage?.MessageId, simpleQueueMessage?.MessageHandleId); + + if (deleteResult != null) + { + if (!deleteResult.IsError) + { + messageQueueResults.Add(SimpleQueueMessageToMessageQueueResult(simpleQueueMessage, MessageQueueResultStatus.Deleted)); + success = true; + } + else + { + messageQueueResults.Add(SimpleQueueMessageToMessageQueueResult(simpleQueueMessage, MessageQueueResultStatus.Failed)); + success = false; + } + } + } + } + return messageQueueResults; + } + + /// + /// Retrieves all the messages from the queue. + /// + public IList GetMessages(out bool success) + { + success = false; + IList simpleQueueMessages = new List(); + if (_queueClient is QueueClient && _queueClient.Exists()) + { + Azure.Response messageResponse; + + messageResponse = _queueClient.ReceiveMessages(); + + if (messageResponse.Value != null) + { + foreach (QueueMessage message in messageResponse.Value) + { + simpleQueueMessages.Add(AzQueueMessageToSimpleQueueMessage(message)); + + } + } + success = true; + } + return simpleQueueMessages; + } + + /// + /// Retrieves mesages from the queue, given some options. + /// + public IList GetMessages(MessageQueueOptions messageQueueOptions, out bool success) + { + success = false; + bool deleteSuccess = true; + IList simpleQueueMessages = new List(); + if (_queueClient is QueueClient && _queueClient.Exists()) + { + Azure.Response messageResponse; + if (messageQueueOptions.MaxNumberOfMessages != 0) + + messageResponse = _queueClient.ReceiveMessages(messageQueueOptions.MaxNumberOfMessages, TimeSpan.FromSeconds(messageQueueOptions.VisibilityTimeout)); + else + messageResponse = _queueClient.ReceiveMessages(visibilityTimeout: TimeSpan.FromSeconds(messageQueueOptions.VisibilityTimeout)); + + if (messageResponse.Value != null) + { + foreach (QueueMessage message in messageResponse.Value) + { + simpleQueueMessages.Add(AzQueueMessageToSimpleQueueMessage(message)); + if (messageQueueOptions.DeleteConsumedMessages) + { + Azure.Response deleteResponse = _queueClient.DeleteMessage(message.MessageId, message.PopReceipt); + if (deleteResponse != null && deleteResponse.IsError) + deleteSuccess = false; + } + } + } + success = deleteSuccess; + } + return simpleQueueMessages; + } + private MessageQueueResult SendMessage(SimpleQueueMessage simpleQueueMessage, MessageQueueOptions messageQueueOptions, out bool success) + { + success = false; + Azure.Response sendReceipt; + MessageQueueResult queueMessageResult = new MessageQueueResult(); + + if (_queueClient is QueueClient && _queueClient.Exists()) + { + + if (messageQueueOptions.TimetoLive != 0) + sendReceipt = _queueClient.SendMessage(simpleQueueMessage.MessageBody, TimeSpan.FromSeconds(messageQueueOptions.VisibilityTimeout), TimeSpan.FromSeconds(messageQueueOptions.TimetoLive)); + else + sendReceipt = _queueClient.SendMessage(simpleQueueMessage.MessageBody, TimeSpan.FromSeconds(messageQueueOptions.VisibilityTimeout)); + + if ((sendReceipt != null) && (sendReceipt.Value != null)) + { + MessageQueueResult result = new MessageQueueResult() + { + MessageId = sendReceipt.Value.MessageId, + ServerMessageId = sendReceipt.Value.MessageId, + MessageStatus = MessageQueueResultStatus.Sent, + MessageAttributes = new GXProperties() + + }; + Type t = sendReceipt.GetType(); + PropertyInfo[] props = t.GetProperties(); + foreach (PropertyInfo prop in props) + { + object value; + if (prop.GetIndexParameters().Length == 0 && sendReceipt != null) + { + value = prop.GetValue(sendReceipt); + if (value != null) + result.MessageAttributes.Add(prop.Name, value.ToString()); + } + } + success = true; + return result; + } + } + return queueMessageResult; + + } + public MessageQueueResult SendMessage(SimpleQueueMessage simpleQueueMessage, out bool success) + { + success = false; + Azure.Response sendReceipt; + MessageQueueResult queueMessageResult = new MessageQueueResult(); + if (_queueClient is QueueClient && _queueClient.Exists()) + { + sendReceipt = _queueClient.SendMessage(simpleQueueMessage.MessageBody); + if ((sendReceipt != null) && (sendReceipt.Value != null)) + { + MessageQueueResult result = new MessageQueueResult() + { + MessageId = simpleQueueMessage.MessageId, + ServerMessageId = sendReceipt.Value.MessageId, + MessageStatus = MessageQueueResultStatus.Sent, + MessageAttributes = new GXProperties() + + }; + Type t = sendReceipt.Value.GetType(); + PropertyInfo[] props = t.GetProperties(); + foreach (PropertyInfo prop in props) + { + object value; + if (prop.GetIndexParameters().Length == 0 && sendReceipt.Value != null) + { + value = prop.GetValue(sendReceipt.Value); + if (value != null) + result.MessageAttributes.Add(prop.Name, value.ToString()); + } + } + success = true; + return result; + } + } + return queueMessageResult; + + } + + /// + /// Send messages using Queue options + /// + + public IList SendMessages(IList simpleQueueMessages, MessageQueueOptions messageQueueOptions, out bool success) + { + MessageQueueResult messageQueueResult = new MessageQueueResult(); + IList messageQueueResults = new List(); + + bool sendError = false; + bool successSend = false; + foreach (SimpleQueueMessage simpleQueueMessage in simpleQueueMessages) + { + messageQueueResult = SendMessage(simpleQueueMessage, messageQueueOptions, out successSend); + if (successSend) + messageQueueResults.Add(messageQueueResult); + else + { + sendError = true; + } + } + success = !sendError; + return messageQueueResults; + } + public override string GetName() + { + return Name; + } + + #region Transform Methods + + private MessageQueueResult SimpleQueueMessageToMessageQueueResult(SimpleQueueMessage simpleQueueMessage, string messageStatus) + { + MessageQueueResult messageQueueResult = new MessageQueueResult(); + messageQueueResult.MessageId = simpleQueueMessage.MessageId; + messageQueueResult.MessageStatus = messageStatus; + messageQueueResult.ServerMessageId = simpleQueueMessage.MessageId; + messageQueueResult.MessageHandleId = simpleQueueMessage.MessageHandleId; + return messageQueueResult; + + } + private MessageQueueResult AzQueueMessageToMessageQueueResult(QueueMessage queueMessage, string status) + { + MessageQueueResult messageQueueResult = new MessageQueueResult(); + if (queueMessage != null) + { + messageQueueResult.MessageId = queueMessage.MessageId; + messageQueueResult.ServerMessageId = queueMessage.MessageId; + messageQueueResult.MessageStatus = status; + messageQueueResult.MessageAttributes = new GXProperties(); + + + Type t = queueMessage.GetType(); + PropertyInfo[] props = t.GetProperties(); + foreach (PropertyInfo prop in props) + { + object value; + if (prop.GetIndexParameters().Length == 0 && queueMessage != null) + { + value = prop.GetValue(queueMessage); + if (value != null) + messageQueueResult.MessageAttributes.Add(prop.Name, value.ToString()); + } + } + } + return messageQueueResult; + } + + private SimpleQueueMessage AzPeekedMessageToSimpleQueueMessage(PeekedMessage peekedMessage) + { + SimpleQueueMessage simpleQueueMessage = new SimpleQueueMessage(); + if (peekedMessage != null) + { + simpleQueueMessage.MessageId = peekedMessage.MessageId; + simpleQueueMessage.MessageBody = peekedMessage.Body.ToString(); + simpleQueueMessage.MessageAttributes = new GXProperties(); + + Type t = peekedMessage.GetType(); + PropertyInfo[] props = t.GetProperties(); + foreach (PropertyInfo prop in props) + { + object value; + if (prop.GetIndexParameters().Length == 0 && peekedMessage != null) + { + value = prop.GetValue(peekedMessage); + if (value != null) + simpleQueueMessage.MessageAttributes.Add(prop.Name, value.ToString()); + } + } + } + return simpleQueueMessage; + } + + private SimpleQueueMessage AzQueueMessageToSimpleQueueMessage(QueueMessage queueMessage) + { + SimpleQueueMessage simpleQueueMessage = new SimpleQueueMessage(); + if (queueMessage != null) + { + simpleQueueMessage.MessageId = queueMessage.MessageId; + simpleQueueMessage.MessageHandleId = queueMessage.PopReceipt; + simpleQueueMessage.MessageBody = queueMessage.Body.ToString(); + simpleQueueMessage.MessageAttributes = new GXProperties(); + + Type t = queueMessage.GetType(); + PropertyInfo[] props = t.GetProperties(); + foreach (PropertyInfo prop in props) + { + object value; + if (prop.GetIndexParameters().Length == 0 && queueMessage != null) + { + value = prop.GetValue(queueMessage); + if (value != null) + simpleQueueMessage.MessageAttributes.Add(prop.Name, value.ToString()); + } + } + } + return simpleQueueMessage; + } + #endregion + } + +} \ No newline at end of file diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXAzureQueue/GXAzureQueue.csproj b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureQueue/GXAzureQueue.csproj new file mode 100644 index 000000000..30e505229 --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXAzureQueue/GXAzureQueue.csproj @@ -0,0 +1,16 @@ + + + + net6.0 + + + + + + + + + + + + diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/GXQueue.csproj b/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/GXQueue.csproj new file mode 100644 index 000000000..4e7afff8d --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/GXQueue.csproj @@ -0,0 +1,12 @@ + + + + net6.0 + TRACE;DEBUG;NETCORE + + + + + + + diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/MessageQueueProvider.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/MessageQueueProvider.cs new file mode 100644 index 000000000..90d951738 --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/MessageQueueProvider.cs @@ -0,0 +1,129 @@ +using System; +using GeneXus.Attributes; +using GeneXus.Encryption; +using GeneXus.Services; +using GeneXus.Utils; +using GxClasses.Helpers; +using log4net; + +namespace GeneXus.Messaging.Common +{ + [GXApi] + public class MessageQueueProvider : SimpleMessageQueue + { + static readonly ILog logger = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); + private static GXService providerService; + public MessageQueueProvider() + { + + } + + public SimpleMessageQueue Connect(string providerTypeName, GXProperties properties, out GXBaseCollection errorMessages, out bool success) + { + errorMessages = new GXBaseCollection(); + SimpleMessageQueue simpleMessageQueue = new SimpleMessageQueue(); + if (string.IsNullOrEmpty(providerTypeName)) + { + GXUtil.ErrorToMessages("GXQueue1000", "Queue provider cannot be empty", errorMessages); + GXLogging.Error(logger, "(GXQueue1000)Failed to Connect to a queue : Queue provider cannot be empty."); + success = false; + return simpleMessageQueue; + } + try + { + if (providerService == null || !string.Equals(providerService.Name, providerTypeName, StringComparison.OrdinalIgnoreCase)) + { + providerService = new GXService(); + providerService.Type = GXServices.QUEUE_SERVICE; + providerService.Name = providerTypeName; + providerService.AllowMultiple = false; + providerService.Properties = new GXProperties(); + } + Preprocess(providerTypeName, properties); + + GxKeyValuePair prop = properties.GetFirst(); + while (!properties.Eof()) + { + providerService.Properties.Set(prop.Key, prop.Value); + prop = properties.GetNext(); + } + + string typeFullName = providerService.ClassName; + GXLogging.Debug(logger, "Loading Queue provider: " + typeFullName); +#if !NETCORE + Type type = Type.GetType(typeFullName, true, true); +#else + Type type = AssemblyLoader.GetType(typeFullName); +#endif + simpleMessageQueue.queue = (IQueue)Activator.CreateInstance(type, new object[] { providerService }); + + } + catch (Exception ex) + { + GXLogging.Error(logger, "(GXQueue1001)Couldn't connect to Queue provider: " + ExceptionExtensions.GetInnermostException(ex)); + GXUtil.ErrorToMessages("GXQueue1001", ex, errorMessages); + success = false; + return simpleMessageQueue; + } + success = true; + return (simpleMessageQueue); + } + + private static void Preprocess(String name, GXProperties properties) + { + string className; + + switch (name) + { + case "AZUREQUEUE": + className = "GeneXus.Messaging.Queue.AzureQueue"; + SetEncryptedProperty(properties, "QUEUE_AZUREQUEUE_QUEUENAME"); + SetEncryptedProperty(properties, "QUEUE_AZUREQUEUE_CONNECTIONSTRING"); + if (string.IsNullOrEmpty(providerService.ClassName) || !providerService.ClassName.Contains(className)) + { + providerService.ClassName = "GeneXus.Messaging.Queue.AzureQueue, GXAzureQueue, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"; + } + break; + case "AWS_SQS": + className = "GeneXus.Messaging.Queue.AWSQueue"; + SetEncryptedProperty(properties, "QUEUE_AWSSQS_QUEUE_URL"); + SetEncryptedProperty(properties, "QUEUE_AWSSQS_ACCESS_KEY"); + SetEncryptedProperty(properties, "QUEUE_AWSSQS_SECRET_KEY"); + SetEncryptedProperty(properties, "QUEUE_AWSSQS_REGION"); + if (string.IsNullOrEmpty(providerService.ClassName) || !providerService.ClassName.Contains(className)) + { + providerService.ClassName = "GeneXus.Messaging.Queue.AWSQueue, GXAmazonSQS, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"; + } + break; + + default: + throw new SystemException(string.Format("Provider {0} is not supported.", name)); + } + } + private static void SetEncryptedProperty(GXProperties properties, String prop) + { + String value = properties.Get(prop); + if (string.IsNullOrEmpty(value)) + value = String.Empty; + value = CryptoImpl.Encrypt(value); + properties.Set(prop, value); + } + + } + public static class ExceptionExtensions + { + public static string GetInnermostException(Exception e) + { + Exception ex = e; + if (ex != null) + { + while (ex.InnerException != null) + { + ex = ex.InnerException; + } + + } + return ex.Message; + } + } +} diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/Queue.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/Queue.cs new file mode 100644 index 000000000..c63399590 --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/Queue.cs @@ -0,0 +1,120 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using GeneXus.Utils; + +namespace GeneXus.Messaging.Common +{ + public interface IQueue + { + public const int MAX_NUMBER_MESSAGES = 10; + public const bool DELETE_CONSUMED_MESSAGES = false; + public const int WAIT_TIMEOUT = 10; + public const int VISIBILITY_TIMEOUT = 60; + + int GetQueueLength(out bool success); + MessageQueueResult SendMessage(SimpleQueueMessage simpleQueueMessage, out bool success); + IList SendMessages(IList simpleQueueMessages, MessageQueueOptions messageQueueOptions, out bool success); + IList GetMessages(out bool success); + IList GetMessages(MessageQueueOptions messageQueueOptions, out bool success); + MessageQueueResult DeleteMessage(string messageHandleId, out bool success); + IList DeleteMessages(List messageHandleId, out bool success); + IList DeleteMessages(IList simpleQueueMessages, out bool success); + void Clear(out bool success); + bool GetMessageFromException(Exception ex, SdtMessages_Message msg); + } + public class SimpleQueueMessage : GxUserType + { + public string MessageId { get; set; } + public string MessageBody { get; set; } + public GXProperties MessageAttributes { get; set; } + public string MessageHandleId { get; set; } + + #region Json + private static Hashtable mapper; + public override String JsonMap(String value) + { + if (mapper == null) + { + mapper = new Hashtable(); + } + return (String)mapper[value]; ; + } + + public override void ToJSON() + { + ToJSON(true); + return; + } + + public override void ToJSON(bool includeState) + { + AddObjectProperty("MessageId", MessageId, false); + AddObjectProperty("MessageBody", MessageBody, false); + AddObjectProperty("MessageHandleId", MessageHandleId, false); + return; + } + + #endregion + + } + + public class MessageQueueResult : GxUserType + { + public string MessageId { get; set; } + public string ServerMessageId { get; set; } + public GXProperties MessageAttributes { get; set; } + public string MessageHandleId { get; set; } + public string MessageStatus { get; set; } = "Unknown"; + + #region Json + private static Hashtable mapper; + public override String JsonMap(String value) + { + if (mapper == null) + { + mapper = new Hashtable(); + } + return (String)mapper[value]; ; + } + + public override void ToJSON() + { + ToJSON(true); + return; + } + + public override void ToJSON(bool includeState) + { + AddObjectProperty("MessageId", MessageId, false); + AddObjectProperty("ServerMessageId", ServerMessageId, false); + AddObjectProperty("MessageHandleId", MessageHandleId, false); + AddObjectProperty("MessageStatus", MessageStatus, false); + + return; + } + + #endregion + } + + public class MessageQueueOptions : GxUserType + { + public short MaxNumberOfMessages { get; set; } + public bool DeleteConsumedMessages { get; set; } + public int WaitTimeout { get; set; } + public int VisibilityTimeout { get; set; } + public int TimetoLive { get; set; } + public int DelaySeconds { get; set; } + public string ReceiveRequestAttemptId { get; set; } + public bool ReceiveMessageAttributes { get; set; } + + } + + public static class MessageQueueResultStatus + { + public const string Unknown = "Unknown"; + public const string Sent = "Sent"; + public const string Deleted = "Deleted"; + public const string Failed = "Failed"; + } +} diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/QueueBase.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/QueueBase.cs new file mode 100644 index 000000000..14f523305 --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/QueueBase.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using GeneXus.Services; +using log4net; + +namespace GeneXus.Messaging.Common +{ + public abstract class QueueBase + { + static readonly ILog logger = log4net.LogManager.GetLogger(typeof(QueueBase)); + internal GXService service; + public QueueBase() + { + } + + public QueueBase(GXService s) + { + if (s == null) + { + try + { + s = ServiceFactory.GetGXServices().Get(GXServices.QUEUE_SERVICE); + } + catch (Exception) + { + GXLogging.Warn(logger, "QUEUE_SERVICE is not activated"); + } + } + + service = s; + } + public abstract String GetName(); + } +} diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/ServiceSettings.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/ServiceSettings.cs new file mode 100644 index 000000000..443f5223e --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/ServiceSettings.cs @@ -0,0 +1,101 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using GeneXus.Encryption; +using GeneXus.Messaging.Common; +using GeneXus.Services; +using log4net; + +namespace GeneXus.Messaging.Queue +{ + public class ServiceSettings + { + static readonly ILog logger = log4net.LogManager.GetLogger(typeof(ServiceSettings)); + + internal GXService service; + public string serviceNameResolver { get; } + public string name { get; } + + public ServiceSettings(string serviceNameResolver, string name, GXService gXService) + { + this.serviceNameResolver = serviceNameResolver; + this.name = name; + this.service = gXService; + } + + public string GetEncryptedPropertyValue(string propertyName, string alternativePropertyName = null) + { + String value = GetEncryptedPropertyValue(propertyName, alternativePropertyName, null); + if (value == null) + { + String errorMessage = String.Format($"Service configuration error - Property name {ResolvePropertyName(propertyName)} must be defined"); + logger.Fatal(errorMessage); + throw new Exception(errorMessage); + } + return value; + } + public string GetEncryptedPropertyValue(string propertyName, string alternativePropertyName, string defaultValue) + { + String value = GetPropertyValue(propertyName, alternativePropertyName, defaultValue); + if (!String.IsNullOrEmpty(value)) + { + try + { + string ret = String.Empty; + if (CryptoImpl.Decrypt(ref ret, value)) + { + value = ret; + } + } + catch (Exception) + { + logger.Warn($"Could not decrypt property name: {ResolvePropertyName(propertyName)}"); + } + } + return value; + } + + internal string GetPropertyValue(string propertyName, string alternativePropertyName = null) + { + String value = GetPropertyValue(propertyName, alternativePropertyName, null); + if (value == null) + { + String errorMessage = String.Format($"Service configuration error - Property name {ResolvePropertyName(propertyName)} must be defined"); + logger.Fatal(errorMessage); + throw new Exception(errorMessage); + } + return value; + } + + internal string GetPropertyValue(string propertyName, string alternativePropertyName, string defaultValue) + { + String value = null; + value = string.IsNullOrEmpty(value) ? GetPropertyValueImpl(ResolvePropertyName(propertyName)) : value; + value = string.IsNullOrEmpty(value) ? GetPropertyValueImpl(propertyName) : value; + value = string.IsNullOrEmpty(value) ? GetPropertyValueImpl(alternativePropertyName) : value; + value = string.IsNullOrEmpty(value) ? defaultValue : value; + return value; + } + + internal string GetPropertyValueImpl(string propertyName) + { + String value = null; + if (!string.IsNullOrEmpty(propertyName)) + { + value = Environment.GetEnvironmentVariable(propertyName); + if (service != null && value == null) + { + value = service.Properties.Get(propertyName); + } + } + return value; + } + + internal string ResolvePropertyName(string propertyName) + { + return $"{serviceNameResolver}_{name}_{propertyName}"; + } + } +} diff --git a/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/SimpleMessageQueue.cs b/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/SimpleMessageQueue.cs new file mode 100644 index 000000000..c432357c0 --- /dev/null +++ b/dotnet/src/dotnetcore/Providers/Messaging/GXQueue/SimpleMessageQueue.cs @@ -0,0 +1,534 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.IO; +using System.Reflection; +using System.Text; +using GeneXus.Application; +using GeneXus.Services; +using GeneXus.Utils; +using GxClasses.Helpers; +using log4net; + +namespace GeneXus.Messaging.Common +{ + public class SimpleMessageQueue + { + internal IQueue queue = null; + public static Assembly assembly; + static readonly ILog logger = log4net.LogManager.GetLogger(typeof(SimpleMessageQueue)); + private const string SDT_MESSAGE_CLASS_NAME = @"SdtMessage"; + private const string SDT_MESSAGEPROPERTY_CLASS_NAME = @"SdtMessageProperty"; + private const string SDT_MESSAGERESULT_CLASS_NAME = @"SdtMessageResult"; + private const string NAMESPACE = @"GeneXus.Programs.genexusmessagingqueue.simplequeue"; + private const string GENEXUS_COMMON_DLL = @"GeneXus.Programs.Common.dll"; + + public SimpleMessageQueue() + { + } + public SimpleMessageQueue(SimpleMessageQueue other) + { + queue = other.queue; + } + void ValidQueue() + { + if (queue == null) + { + GXLogging.Error(logger, "Queue was not instantiated."); + throw new Exception("Queue was not instantiated."); + } + } + + public void Clear(out GXBaseCollection errorMessages, out bool success) + { + errorMessages = new GXBaseCollection(); + try + { + ValidQueue(); + queue.Clear(out success); + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + success = false; + } + } + + public int GetQueueLength(out GXBaseCollection errorMessages, out bool success) + { + errorMessages = new GXBaseCollection(); + int queueLength = 0; + try + { + ValidQueue(); + queueLength = queue.GetQueueLength(out success); + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + success = false; + } + + return queueLength; + } + public GxUserType DeleteMessage(string messageHandleId, out GXBaseCollection errorMessages, out bool success) + { + MessageQueueResult messageQueueResult = new MessageQueueResult(); + GxUserType messageResult = new GxUserType(); + errorMessages = new GXBaseCollection(); + try + { + ValidQueue(); + messageQueueResult = queue.DeleteMessage(messageHandleId, out success); + LoadAssemblyIfRequired(); + try + { + if (messageQueueResult != null && TransformMessageQueueResult(messageQueueResult) is GxUserType result) + return result; + } + catch (Exception ex) + { + GXLogging.Error(logger, ex); + success = false; + throw ex; + } + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + GXLogging.Error(logger, ex); + success = false; + } + return TransformMessageQueueResult(messageQueueResult); + } + + public IList DeleteMessages(List messageHandleId, out GXBaseCollection errorMessages, out bool success) + { + IList messageQueueResults = new List(); + errorMessages = new GXBaseCollection(); + IList messageResults = new List(); + success = false; + try + { + try + { + ValidQueue(); + messageQueueResults = queue.DeleteMessages(messageHandleId, out success); + LoadAssemblyIfRequired(); + foreach (MessageQueueResult messageResult in messageQueueResults) + { + if (TransformMessageQueueResult(messageResult) is GxUserType result) + messageResults.Add(result); + } + success = true; + } + catch (Exception ex) + { + GXLogging.Error(logger, ex); + QueueErrorMessagesSetup(ex, out errorMessages); + success = false; + } + } + catch (Exception ex) + { + GXLogging.Error(logger, ex); + success = false; + throw ex; + } + + return messageResults; + } + + public IList DeleteMessages(IList simpleQueueMessages, out GXBaseCollection errorMessages, out bool success) + { + IList messageQueueResults = new List(); + errorMessages = new GXBaseCollection(); + IList messageResults = new List(); + success = false; + + IList simpleQueueMessagesList = new List(); + foreach (GxUserType simpleQueueMessage in simpleQueueMessages) + { + if (TransformGXUserTypeToSimpleQueueMessage(simpleQueueMessage) is SimpleQueueMessage queueMessage) + simpleQueueMessagesList.Add(queueMessage); + } + try + { + try + { + ValidQueue(); + messageQueueResults = queue.DeleteMessages(simpleQueueMessagesList, out success); + LoadAssemblyIfRequired(); + foreach (MessageQueueResult messageResult in messageQueueResults) + { + if (TransformMessageQueueResult(messageResult) is GxUserType result) + messageResults.Add(result); + } + success = true; + } + catch (Exception ex) + { + GXLogging.Error(logger, ex); + QueueErrorMessagesSetup(ex, out errorMessages); + success = false; + } + } + catch (Exception ex) + { + GXLogging.Error(logger, ex); + success = false; + throw ex; + } + + return messageResults; + } + + public IList GetMessages(out GXBaseCollection errorMessages, out bool success) + { + + errorMessages = new GXBaseCollection(); + IList resultMessages = new List(); + success = false; + try + { + ValidQueue(); + IList simpleQueueMessages = queue.GetMessages(out success); + + try + { + LoadAssemblyIfRequired(); + + foreach (SimpleQueueMessage simpleQueueMessage in simpleQueueMessages) + { + if (TransformSimpleQueueMessage(simpleQueueMessage) is GxUserType result) + resultMessages.Add(result); + } + success = true; + } + catch (Exception ex) + { + GXLogging.Error(logger, ex); + success = false; + } + } + catch (Exception ex) + { + GXLogging.Error(logger, ex); + QueueErrorMessagesSetup(ex, out errorMessages); + success = false; + } + return resultMessages; + } + + public IList GetMessages(GxUserType messageQueueOptions, out GXBaseCollection errorMessages, out bool success) + { + + errorMessages = new GXBaseCollection(); + IList resultMessages = new List(); + success = false; + try + { + MessageQueueOptions options = TransformOptions(messageQueueOptions); + + try + { + ValidQueue(); + IList simpleQueueMessages = queue.GetMessages(options, out success); + LoadAssemblyIfRequired(); + foreach (SimpleQueueMessage simpleQueueMessage in simpleQueueMessages) + { + if (TransformSimpleQueueMessage(simpleQueueMessage) is GxUserType result) + resultMessages.Add(result); + } + success = true; + + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + GXLogging.Error(logger, ex); + success = false; + } + } + catch (Exception ex) + { + GXLogging.Error(logger, ex); + success = false; + throw ex; + } + + return resultMessages; + } + + private static void LoadAssemblyIfRequired() + { + if (assembly == null) + { + assembly = LoadAssembly(Path.Combine(GxContext.StaticPhysicalPath(), GENEXUS_COMMON_DLL)); + if (assembly == null) + assembly = LoadAssembly(Path.Combine(GxContext.StaticPhysicalPath(), "bin", GENEXUS_COMMON_DLL)); + } + } + + private MessageQueueOptions TransformOptions(GxUserType messageQueueOptions) + { + MessageQueueOptions options = new MessageQueueOptions(); + options.MaxNumberOfMessages = messageQueueOptions.GetPropertyValue("Maxnumberofmessages"); + options.DeleteConsumedMessages = messageQueueOptions.GetPropertyValue("Deleteconsumedmessages"); + options.WaitTimeout = messageQueueOptions.GetPropertyValue("Waittimeout"); + options.VisibilityTimeout = messageQueueOptions.GetPropertyValue("Visibilitytimeout"); + options.TimetoLive = messageQueueOptions.GetPropertyValue("Timetolive"); + return options; + } + + private SimpleQueueMessage TransformGXUserTypeToSimpleQueueMessage(GxUserType simpleQueueMessage) + { + SimpleQueueMessage queueMessage = new SimpleQueueMessage(); + queueMessage.MessageId = simpleQueueMessage.GetPropertyValue("Messageid"); + queueMessage.MessageBody = simpleQueueMessage.GetPropertyValue("Messagebody"); + queueMessage.MessageHandleId = simpleQueueMessage.GetPropertyValue("Messagehandleid"); + IList messageAttributes = simpleQueueMessage.GetPropertyValue("Messageattributes_GXBaseCollection"); + queueMessage.MessageAttributes = new GXProperties(); + foreach (GxUserType messageAttribute in messageAttributes) + { + string messagePropKey = messageAttribute.GetPropertyValue("Propertykey"); + string messagePropValue = messageAttribute.GetPropertyValue("Propertyvalue"); + queueMessage.MessageAttributes.Add(messagePropKey, messagePropValue); + } + return queueMessage; + } + private GxUserType TransformSimpleQueueMessage(SimpleQueueMessage simpleQueueMessage) + { + Type classType = assembly.GetType(NAMESPACE + "." + SDT_MESSAGE_CLASS_NAME, false, ignoreCase: true); + Type propertyClassType = assembly.GetType(NAMESPACE + "." + SDT_MESSAGEPROPERTY_CLASS_NAME, false, ignoreCase: true); + + if (classType != null && Activator.CreateInstance(classType) is GxUserType simpleMessageSDT) + { + simpleMessageSDT.SetPropertyValue("Messageid", simpleQueueMessage.MessageId); + simpleMessageSDT.SetPropertyValue("Messagebody", simpleQueueMessage.MessageBody); + simpleMessageSDT.SetPropertyValue("Messagehandleid", simpleQueueMessage.MessageHandleId); + + IList messageResultSDTAttributes = (IList)Activator.CreateInstance(classType.GetProperty("gxTpr_Messageattributes").PropertyType, new object[] { simpleMessageSDT.context, "MessageProperty", string.Empty }); + + if ((simpleQueueMessage != null) && (simpleQueueMessage.MessageAttributes != null)) + { + GxKeyValuePair prop = simpleQueueMessage.MessageAttributes.GetFirst(); + while (!simpleQueueMessage.MessageAttributes.Eof()) + { + if (propertyClassType != null && Activator.CreateInstance(propertyClassType) is GxUserType propertyClassTypeSDT) + { + propertyClassTypeSDT.SetPropertyValue("Propertykey", prop.Key); + propertyClassTypeSDT.SetPropertyValue("Propertyvalue", prop.Value); + messageResultSDTAttributes.Add(propertyClassTypeSDT); + prop = simpleQueueMessage.MessageAttributes.GetNext(); + } + } + simpleMessageSDT.SetPropertyValue("Messageattributes", messageResultSDTAttributes); + } + return simpleMessageSDT; + } + return null; + } + private GxUserType TransformMessageQueueResult(MessageQueueResult messageQueueResult) + { + Type classType = assembly.GetType(NAMESPACE + "." + SDT_MESSAGERESULT_CLASS_NAME, false, ignoreCase: true); + Type propertyClassType = assembly.GetType(NAMESPACE + "." + SDT_MESSAGEPROPERTY_CLASS_NAME, false, ignoreCase: true); + + if (classType != null && Activator.CreateInstance(classType) is GxUserType messageResultSDT) + { + messageResultSDT.SetPropertyValue("Messageid", messageQueueResult.MessageId); + messageResultSDT.SetPropertyValue("Servermessageid", messageQueueResult.ServerMessageId); + messageResultSDT.SetPropertyValue("Messagehandleid", messageQueueResult.MessageHandleId); + messageResultSDT.SetPropertyValue("Messagestatus", messageQueueResult.MessageStatus); + + IList messageResultSDTAttributes = (IList)Activator.CreateInstance(classType.GetProperty("gxTpr_Messageattributes").PropertyType, new object[] { messageResultSDT.context, "MessageProperty", string.Empty }); + GxKeyValuePair prop; + if ((messageQueueResult != null) && (messageQueueResult.MessageAttributes != null)) + { + prop = messageQueueResult.MessageAttributes.GetFirst(); + while (!messageQueueResult.MessageAttributes.Eof()) + { + if (propertyClassType != null && Activator.CreateInstance(propertyClassType) is GxUserType propertyClassTypeSDT) + { + propertyClassTypeSDT.SetPropertyValue("Propertykey", prop.Key); + propertyClassTypeSDT.SetPropertyValue("Propertyvalue", prop.Value); + + messageResultSDTAttributes.Add(propertyClassTypeSDT); + prop = messageQueueResult.MessageAttributes.GetNext(); + } + } + messageResultSDT.SetPropertyValue("Messageattributes", messageResultSDTAttributes); + } + return messageResultSDT; + } + return null; + } + private static Assembly LoadAssembly(string fileName) + { + if (File.Exists(fileName)) + { + Assembly assemblyLoaded = Assembly.LoadFrom(fileName); + return assemblyLoaded; + } + else + return null; + } + public GxUserType SendMessage(GxUserType simpleQueueMessage, out GXBaseCollection errorMessages, out bool success) + { + success = false; + MessageQueueResult messageQueueResult = new MessageQueueResult(); + errorMessages = new GXBaseCollection(); + GxUserType result = new GxUserType(); + try + { + SimpleQueueMessage queueMessage = TransformGXUserTypeToSimpleQueueMessage(simpleQueueMessage); + LoadAssemblyIfRequired(); + try + { + ValidQueue(); + messageQueueResult = queue.SendMessage(queueMessage, out success); + + if (TransformMessageQueueResult(messageQueueResult) is GxUserType messageResult) + return messageResult; + success = true; + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + success = false; + GXLogging.Error(logger, ex); + } + } + catch (Exception ex) + { + success = false; + GXLogging.Error(logger,ex); + throw ex; + } + + return TransformMessageQueueResult(messageQueueResult); + } + + public IList SendMessages(IList simpleQueueMessages, GxUserType messageQueueOptions, out GXBaseCollection errorMessages, out bool success) + { + errorMessages = new GXBaseCollection(); + List messageResults = new List(); + try + { + // Load Message Queue Options// + MessageQueueOptions options = TransformOptions(messageQueueOptions); + + IList simpleQueueMessagesList = new List(); + foreach (GxUserType simpleQueueMessage in simpleQueueMessages) + { + if (TransformGXUserTypeToSimpleQueueMessage(simpleQueueMessage) is SimpleQueueMessage queueMessage) + simpleQueueMessagesList.Add(queueMessage); + } + try + { + ValidQueue(); + IList messageQueueResults = queue.SendMessages(simpleQueueMessagesList, options, out success); + LoadAssemblyIfRequired(); + foreach (MessageQueueResult messageResult in messageQueueResults) + { + if (TransformMessageQueueResult(messageResult) is GxUserType result) + messageResults.Add(result); + } + success = true; + + } + catch (Exception ex) + { + QueueErrorMessagesSetup(ex, out errorMessages); + success = false; + GXLogging.Error(logger, ex); + } + } + catch (Exception ex) + { + GXLogging.Error(logger, ex); + throw ex; + } + return messageResults; + } + + protected void QueueErrorMessagesSetup(Exception ex, out GXBaseCollection errorMessages) + { + errorMessages = new GXBaseCollection(); + if (errorMessages != null && ex != null) + { + SdtMessages_Message msg = new SdtMessages_Message(); + if (queue != null && queue.GetMessageFromException(ex, msg)) + { + msg.gxTpr_Type = 1; + StringBuilder str = new StringBuilder(); + str.Append(ex.Message); + while (ex.InnerException != null) + { + str.Append(ex.InnerException.Message); + ex = ex.InnerException; + } + msg.gxTpr_Description = str.ToString(); + errorMessages.Add(msg); + } + else + { + GXLogging.Error(logger, "(GXQueue1002)Queue Error", ex); + GXUtil.ErrorToMessages("GXQueue1002", ex, errorMessages); + } + } + } + } + internal class ServiceFactory + { + private static IQueue queue; + private static readonly ILog log = log4net.LogManager.GetLogger(typeof(GeneXus.Services.ServiceFactory)); + + public static GXServices GetGXServices() + { + return GXServices.Instance; + } + + public static IQueue GetQueue() + { + if (queue == null) + { + queue = GetQueueImpl(GXServices.QUEUE_SERVICE); + } + return queue; + } + + public static IQueue GetQueueImpl(string service) + { + IQueue queueImpl = null; + if (GetGXServices() != null) + { + GXService providerService = GetGXServices().Get(service); + if (providerService != null) + { + try + { + string typeFullName = providerService.ClassName; + GXLogging.Debug(log, "Loading Queue settings:", typeFullName); +#if !NETCORE + if (!string.IsNullOrEmpty(typeFullName)) + Type type = Type.GetType(typeFullName, true, true); +#else + Type type = AssemblyLoader.GetType(typeFullName); +#endif + queueImpl = (IQueue)Activator.CreateInstance(type); + } + catch (Exception e) + { + GXLogging.Error(log, "Couldn't connect to the Queue.", e.Message, e); + throw e; + } + } + } + return queueImpl; + } + } +} + + + diff --git a/dotnet/src/dotnetframework/GxClasses/Domain/GxCollections.cs b/dotnet/src/dotnetframework/GxClasses/Domain/GxCollections.cs index cde4a563c..c05addf7d 100644 --- a/dotnet/src/dotnetframework/GxClasses/Domain/GxCollections.cs +++ b/dotnet/src/dotnetframework/GxClasses/Domain/GxCollections.cs @@ -1982,7 +1982,14 @@ public bool FromXmlFile(GxFile file, GXBaseCollection Messa else return false; } - + public T GetPropertyValue(string propertyName) + { + return (T)GetType().GetProperty($"gxTpr_{propertyName}").GetValue(this); + } + public void SetPropertyValue(string propertyName, object propertyValue) + { + GetType().GetProperty($"gxTpr_{propertyName}").SetValue(this, propertyValue); + } } public interface IGxJSONAble diff --git a/dotnet/src/dotnetframework/GxClasses/Services/Storage/GXServices.cs b/dotnet/src/dotnetframework/GxClasses/Services/Storage/GXServices.cs index 9f21859ac..5da88678d 100644 --- a/dotnet/src/dotnetframework/GxClasses/Services/Storage/GXServices.cs +++ b/dotnet/src/dotnetframework/GxClasses/Services/Storage/GXServices.cs @@ -22,6 +22,7 @@ public class GXServices public static string DATA_ACCESS_SERVICE = "DataAccess"; public static string SESSION_SERVICE = "Session"; public static string WEBNOTIFICATIONS_SERVICE = "WebNotifications"; + public static string QUEUE_SERVICE = "QueueService"; private static string[] SERVICES_FILE = new string[] { "CloudServices.dev.config", "CloudServices.config" }; [System.Diagnostics.CodeAnalysis.SuppressMessage("GxFxCopRules", "CR1000:EnforceThreadSafeType")] private Dictionary services = new Dictionary(); diff --git a/dotnet/test/DotNetCoreUnitTest/DotNetCoreUnitTest.csproj b/dotnet/test/DotNetCoreUnitTest/DotNetCoreUnitTest.csproj index 0008acf29..4cf8ced49 100644 --- a/dotnet/test/DotNetCoreUnitTest/DotNetCoreUnitTest.csproj +++ b/dotnet/test/DotNetCoreUnitTest/DotNetCoreUnitTest.csproj @@ -75,6 +75,7 @@ + diff --git a/dotnet/test/DotNetCoreUnitTest/Queue/AzureQueueTest.cs b/dotnet/test/DotNetCoreUnitTest/Queue/AzureQueueTest.cs new file mode 100644 index 000000000..dd72a8261 --- /dev/null +++ b/dotnet/test/DotNetCoreUnitTest/Queue/AzureQueueTest.cs @@ -0,0 +1,13 @@ +using GeneXus.Messaging.Queue; +using UnitTesting; + +namespace DotNetUnitTest +{ + public class AzureQueueTest : QueueTest + { + public AzureQueueTest() : base(AzureQueue.Name, typeof(AzureQueue)) + { + } + + } +} \ No newline at end of file diff --git a/dotnet/test/DotNetCoreUnitTest/Queue/QueueTest.cs b/dotnet/test/DotNetCoreUnitTest/Queue/QueueTest.cs new file mode 100644 index 000000000..04af1e6ab --- /dev/null +++ b/dotnet/test/DotNetCoreUnitTest/Queue/QueueTest.cs @@ -0,0 +1,174 @@ +using System; +using System.Collections.Generic; +using GeneXus.Messaging.Common; +using Xunit; + + +#pragma warning disable CA1031 // Do not catch general exception types +namespace UnitTesting +{ + [Collection("Sequential")] + public abstract class QueueTest + { + + private IQueue queue; + + public QueueTest(string queueName, Type queueType) + { + bool testEnabled = Environment.GetEnvironmentVariable("AZUREQUEUE_TEST_ENABLED") == "true"; + Skip.IfNot(testEnabled, "Environment variables not set"); + + if (queueName == GeneXus.Messaging.Queue.AzureQueue.Name) + { + //testEnabled = true; + //Environment variables needed here + Environment.SetEnvironmentVariable("QUEUE_AZUREQUEUE_QUEUENAME", ""); + Environment.SetEnvironmentVariable("QUEUE_AZUREQUEUE_CONNECTIONSTRING", ""); + + queue = (IQueue)Activator.CreateInstance(queueType); + Assert.NotNull(queue); + } + } + + [SkippableFact] + //Clear the Queue + public void TestClearQueue() + { + bool success = false; + queue.Clear(out success); + Assert.True(success); + + } + + [SkippableFact] + public void TestSimpleSendOneMessageMethod() + { + SimpleQueueMessage simpleQueueMessage = new SimpleQueueMessage(); + simpleQueueMessage.MessageId = "TestMsgId"; + simpleQueueMessage.MessageBody = "This is the message body"; + + bool success= false; + MessageQueueResult messageQueueResult = queue.SendMessage(simpleQueueMessage, out success); + + Assert.True(success); + + Assert.Equal(simpleQueueMessage.MessageId, messageQueueResult.MessageId); + Assert.Equal(messageQueueResult.MessageStatus,MessageQueueResultStatus.Sent); + + } + [SkippableFact] + public void TestSimpleGetMessagesMethod() + { + bool success = false; + IList simpleQueueMessages = queue.GetMessages(out success); + + Assert.True(success); + Assert.True(simpleQueueMessages.Count == 1); + Assert.Equal("This is the message body", simpleQueueMessages[0].MessageBody); + + } + + [SkippableFact] + public void TestSendMessageOptionsMethod() + { + SimpleQueueMessage simpleQueueMessage = new SimpleQueueMessage(); + simpleQueueMessage.MessageId = "TestMsgId2"; + simpleQueueMessage.MessageBody = "This is the message body with options"; + + MessageQueueOptions options = new MessageQueueOptions(); + //options.VisibilityTimeout = TimeSpan.FromSeconds(1); + options.TimetoLive = 3600; + + bool success = false; + MessageQueueResult messageQueueResult = queue.SendMessage(simpleQueueMessage, out success); + + Assert.True(success); + Assert.Equal(simpleQueueMessage.MessageId, messageQueueResult.MessageId); + Assert.Equal(MessageQueueResultStatus.Sent,messageQueueResult.MessageStatus); + + } + [SkippableFact] + public void TestGetMessageOptionsMethod() + { + + bool success = false; + MessageQueueOptions options = new MessageQueueOptions(); + options.MaxNumberOfMessages = 10; + options.DeleteConsumedMessages = true; + options.VisibilityTimeout = 1; + IList simpleQueueMessages = queue.GetMessages(options, out success); + + Assert.True(success); + Assert.True(simpleQueueMessages.Count == 2); + + Assert.Equal("TestMsgId1", simpleQueueMessages[0].MessageId); + Assert.Equal("TestMsgId2", simpleQueueMessages[1].MessageId); + + } + + [SkippableFact] + public void TestSendMessagesOptionsMethod() + { + + IList simpleQueueMessages = new List(); + + SimpleQueueMessage simpleQueueMessage = new SimpleQueueMessage(); + simpleQueueMessage.MessageId = "TestMsgId3"; + simpleQueueMessage.MessageBody = "This is the message body 3"; + simpleQueueMessages.Add(simpleQueueMessage); + + SimpleQueueMessage simpleQueueMessage2 = new SimpleQueueMessage(); + simpleQueueMessage2.MessageId = "TestMsgId4"; + simpleQueueMessage2.MessageBody = "This is the message body 4"; + simpleQueueMessages.Add(simpleQueueMessage2); + + MessageQueueOptions options = new MessageQueueOptions(); + + //Azure VisibilityTimeout. If specified, the request must be made using an x-ms-version of 2011-08-18 or later. If not specified, the default value is 0. Specifies the new visibility timeout value, in seconds, relative to server time. The new value must be larger than or equal to 0, and cannot be larger than 7 days. The visibility timeout of a message cannot be set to a value later than the expiry time. visibilitytimeout should be set to a value smaller than the time-to-live value. + //options.VisibilityTimeout = TimeSpan.FromSeconds(1); + + //TimeToLive. Specifies the time-to-live interval for the message, in seconds + //Azure -1 means not expires + options.TimetoLive = 3600; + + bool success = false; + IList messageQueueResults = queue.SendMessages(simpleQueueMessages,options, out success); + + Assert.True(success); + Assert.True(simpleQueueMessages.Count == 2); + + System.Threading.Thread.Sleep(1000); + Assert.Equal("TestMsgId3", simpleQueueMessages[0].MessageId); + Assert.Equal("TestMsgId4", simpleQueueMessages[1].MessageId); + + } + public void TestDeleteMessagesMethod() + { + bool success = false; + IList simpleQueueMessages = new List(); + + SimpleQueueMessage simpleQueueMessage = new SimpleQueueMessage(); + simpleQueueMessage.MessageId = "TestMsgId3"; + simpleQueueMessage.MessageBody = "This is the message body 3"; + simpleQueueMessages.Add(simpleQueueMessage); + IList messageQueueResults = new List(); + messageQueueResults = queue.DeleteMessages(simpleQueueMessages, out success); + + Assert.True(success); + Assert.True(messageQueueResults.Count == 1); + } + + + [SkippableFact] + public void TestGetQueueLength() + { + bool success = false; + int length = queue.GetQueueLength(out success); + Assert.True(success); + Assert.Equal(0, length); + + } + + } +} +#pragma warning restore CA1031 // Do not catch general exception types \ No newline at end of file