From 131b074f502ba79588e89dc824bb35f2e415129b Mon Sep 17 00:00:00 2001 From: Joe Batt Date: Tue, 26 Jul 2022 10:18:36 +0100 Subject: [PATCH 1/3] Added a step to purge rabbit messages Signed-off-by: Joe Batt --- .../TaskManager.IntegrationTests/Hooks.cs | 20 +++++++++++++------ .../Support/RabbitConnectionFactory.cs | 13 ++++++++++-- .../Support/RabbitConsumer.cs | 5 ++--- .../Support/RabbitPublisher.cs | 5 ++--- .../Hooks.cs | 19 +++++++++++++----- .../Support/RabbitConnectionFactory.cs | 13 ++++++++++-- .../Support/RabbitConsumer.cs | 5 ++--- .../Support/RabbitPublisher.cs | 5 ++--- 8 files changed, 58 insertions(+), 27 deletions(-) diff --git a/tests/IntegrationTests/TaskManager.IntegrationTests/Hooks.cs b/tests/IntegrationTests/TaskManager.IntegrationTests/Hooks.cs index 022a071e3..d0d962303 100644 --- a/tests/IntegrationTests/TaskManager.IntegrationTests/Hooks.cs +++ b/tests/IntegrationTests/TaskManager.IntegrationTests/Hooks.cs @@ -14,8 +14,6 @@ * limitations under the License. */ -using System.Diagnostics; -using BoDi; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Monai.Deploy.WorkflowManager.IntegrationTests.Support; @@ -107,10 +105,20 @@ await RetryPolicy.ExecuteAsync(async () => } }); - TaskDispatchPublisher = new RabbitPublisher(RabbitConnectionFactory.GetConnectionFactory(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskDispatchQueue); - TaskCallbackPublisher = new RabbitPublisher(RabbitConnectionFactory.GetConnectionFactory(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskCallbackQueue); - TaskUpdateConsumer = new RabbitConsumer(RabbitConnectionFactory.GetConnectionFactory(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskUpdateQueue); - ClinicalReviewConsumer = new RabbitConsumer(RabbitConnectionFactory.GetConnectionFactory(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.ClinicalReviewQueue); + TaskDispatchPublisher = new RabbitPublisher(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskDispatchQueue); + TaskCallbackPublisher = new RabbitPublisher(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskCallbackQueue); + TaskUpdateConsumer = new RabbitConsumer(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskUpdateQueue); + ClinicalReviewConsumer = new RabbitConsumer(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.ClinicalReviewQueue); + } + + [AfterScenario] + [BeforeScenario] + public void PurgeRabbitMessages() + { + RabbitConnectionFactory.PurgeQueue(TestExecutionConfig.RabbitConfig.TaskDispatchQueue); + RabbitConnectionFactory.PurgeQueue(TestExecutionConfig.RabbitConfig.TaskUpdateQueue); + RabbitConnectionFactory.PurgeQueue(TestExecutionConfig.RabbitConfig.TaskCallbackQueue); + RabbitConnectionFactory.PurgeQueue(TestExecutionConfig.RabbitConfig.ClinicalReviewQueue); } /// diff --git a/tests/IntegrationTests/TaskManager.IntegrationTests/Support/RabbitConnectionFactory.cs b/tests/IntegrationTests/TaskManager.IntegrationTests/Support/RabbitConnectionFactory.cs index 644bc92a8..fe6b0aafe 100644 --- a/tests/IntegrationTests/TaskManager.IntegrationTests/Support/RabbitConnectionFactory.cs +++ b/tests/IntegrationTests/TaskManager.IntegrationTests/Support/RabbitConnectionFactory.cs @@ -20,7 +20,9 @@ namespace Monai.Deploy.WorkflowManager.TaskManager.IntegrationTests { public static class RabbitConnectionFactory { - public static ConnectionFactory GetConnectionFactory() + private static IModel? Channel { get; set; } + + public static IModel GetRabbitConnection() { var connectionFactory = new ConnectionFactory { @@ -30,7 +32,14 @@ public static ConnectionFactory GetConnectionFactory() VirtualHost = TestExecutionConfig.RabbitConfig.VirtualHost }; - return connectionFactory; + Channel = connectionFactory.CreateConnection().CreateModel(); + + return Channel; + } + + public static void PurgeQueue(string queueName) + { + Channel?.QueuePurge(queueName); } } } diff --git a/tests/IntegrationTests/TaskManager.IntegrationTests/Support/RabbitConsumer.cs b/tests/IntegrationTests/TaskManager.IntegrationTests/Support/RabbitConsumer.cs index 1e8a1452f..e918ee50c 100644 --- a/tests/IntegrationTests/TaskManager.IntegrationTests/Support/RabbitConsumer.cs +++ b/tests/IntegrationTests/TaskManager.IntegrationTests/Support/RabbitConsumer.cs @@ -22,12 +22,11 @@ namespace Monai.Deploy.WorkflowManager.TaskManager.IntegrationTests { public class RabbitConsumer { - public RabbitConsumer(ConnectionFactory connectionFactory, string exchange, string routingKey) + public RabbitConsumer(IModel channel, string exchange, string routingKey) { Exchange = exchange; RoutingKey = routingKey; - var connection = connectionFactory.CreateConnection(); - Channel = connection.CreateModel(); + Channel = channel; Queue = Channel.QueueDeclare(queue: string.Empty, durable: true, exclusive: false, autoDelete: false); Channel.QueueBind(Queue.QueueName, Exchange, RoutingKey); Channel.ExchangeDeclare(Exchange, ExchangeType.Topic, durable: true); diff --git a/tests/IntegrationTests/TaskManager.IntegrationTests/Support/RabbitPublisher.cs b/tests/IntegrationTests/TaskManager.IntegrationTests/Support/RabbitPublisher.cs index 8ebdf2741..4821d1235 100644 --- a/tests/IntegrationTests/TaskManager.IntegrationTests/Support/RabbitPublisher.cs +++ b/tests/IntegrationTests/TaskManager.IntegrationTests/Support/RabbitPublisher.cs @@ -21,12 +21,11 @@ namespace Monai.Deploy.WorkflowManager.TaskManager.IntegrationTests { public class RabbitPublisher { - public RabbitPublisher(ConnectionFactory connectionFactory, string exchange, string routingKey) + public RabbitPublisher(IModel channel, string exchange, string routingKey) { Exchange = exchange; RoutingKey = routingKey; - var connection = connectionFactory.CreateConnection(); - Channel = connection.CreateModel(); + Channel = channel; } private string Exchange { get; set; } diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Hooks.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Hooks.cs index 4e5bb1509..bfd7d5f08 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Hooks.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Hooks.cs @@ -41,7 +41,7 @@ public Hooks(IObjectContainer objectContainer) } private static HttpClient? HttpClient { get; set; } - public static AsyncRetryPolicy RetryPolicy { get; private set; } + public static AsyncRetryPolicy? RetryPolicy { get; private set; } private static RabbitPublisher? WorkflowPublisher { get; set; } private static RabbitConsumer? TaskDispatchConsumer { get; set; } private static RabbitPublisher? TaskUpdatePublisher { get; set; } @@ -117,9 +117,9 @@ await RetryPolicy.ExecuteAsync(async () => } }); - WorkflowPublisher = new RabbitPublisher(RabbitConnectionFactory.GetConnectionFactory(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.WorkflowRequestQueue); - TaskDispatchConsumer = new RabbitConsumer(RabbitConnectionFactory.GetConnectionFactory(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskDispatchQueue); - TaskUpdatePublisher = new RabbitPublisher(RabbitConnectionFactory.GetConnectionFactory(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskUpdateQueue); + WorkflowPublisher = new RabbitPublisher(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.WorkflowRequestQueue); + TaskDispatchConsumer = new RabbitConsumer(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskDispatchQueue); + TaskUpdatePublisher = new RabbitPublisher(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskUpdateQueue); } /// @@ -183,11 +183,20 @@ public void DeleteTestData() foreach (var workflowRevision in dataHelper.WorkflowRevisions) { - MongoClient.DeleteWorkflowRevisionDocumentByWorkflowId(workflowRevision.WorkflowId); + MongoClient?.DeleteWorkflowRevisionDocumentByWorkflowId(workflowRevision.WorkflowId); } } } + [AfterScenario] + [BeforeScenario] + public void PurgeRabbitMessages() + { + RabbitConnectionFactory.PurgeQueue(TestExecutionConfig.RabbitConfig.TaskDispatchQueue); + RabbitConnectionFactory.PurgeQueue(TestExecutionConfig.RabbitConfig.TaskUpdateQueue); + RabbitConnectionFactory.PurgeQueue(TestExecutionConfig.RabbitConfig.WorkflowRequestQueue); + } + /// /// Runs after all tests to closes Rabbit connections. /// diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitConnectionFactory.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitConnectionFactory.cs index 98212d040..cdb69944f 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitConnectionFactory.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitConnectionFactory.cs @@ -21,7 +21,9 @@ namespace Monai.Deploy.WorkflowManager.IntegrationTests.Support { public static class RabbitConnectionFactory { - public static ConnectionFactory GetConnectionFactory() + private static IModel? Channel { get; set; } + + public static IModel GetRabbitConnection() { var connectionFactory = new ConnectionFactory { @@ -31,7 +33,14 @@ public static ConnectionFactory GetConnectionFactory() VirtualHost = TestExecutionConfig.RabbitConfig.VirtualHost }; - return connectionFactory; + Channel = connectionFactory.CreateConnection().CreateModel(); + + return Channel; + } + + public static void PurgeQueue(string queueName) + { + Channel?.QueuePurge(queueName); } } } diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitConsumer.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitConsumer.cs index 71dda2018..fd04a3fcf 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitConsumer.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitConsumer.cs @@ -22,12 +22,11 @@ namespace Monai.Deploy.WorkflowManager.IntegrationTests.Support { public class RabbitConsumer { - public RabbitConsumer(ConnectionFactory connectionFactory, string exchange, string routingKey) + public RabbitConsumer(IModel channel, string exchange, string routingKey) { Exchange = exchange; RoutingKey = routingKey; - var connection = connectionFactory.CreateConnection(); - Channel = connection.CreateModel(); + Channel = channel; Queue = Channel.QueueDeclare(queue: string.Empty, durable: true, exclusive: false, autoDelete: false); Channel.QueueBind(Queue.QueueName, Exchange, RoutingKey); Channel.ExchangeDeclare(Exchange, ExchangeType.Topic, durable: true); diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitPublisher.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitPublisher.cs index bb3d497c1..d64dbe13f 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitPublisher.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitPublisher.cs @@ -21,12 +21,11 @@ namespace Monai.Deploy.WorkflowManager.IntegrationTests.Support { public class RabbitPublisher { - public RabbitPublisher(ConnectionFactory connectionFactory, string exchange, string routingKey) + public RabbitPublisher(IModel channel, string exchange, string routingKey) { Exchange = exchange; RoutingKey = routingKey; - var connection = connectionFactory.CreateConnection(); - Channel = connection.CreateModel(); + Channel = channel; Channel.ExchangeDeclare(Exchange, ExchangeType.Topic, durable: true); } From 4f74ef05ad159b6b353365235d5202d2689ad9cf Mon Sep 17 00:00:00 2001 From: Joe Batt Date: Tue, 26 Jul 2022 10:36:43 +0100 Subject: [PATCH 2/3] Updated hooks Signed-off-by: Joe Batt --- tests/IntegrationTests/TaskManager.IntegrationTests/Hooks.cs | 1 - .../TaskManager.IntegrationTests/Support/RabbitConsumer.cs | 2 +- .../IntegrationTests/WorkflowExecutor.IntegrationTests/Hooks.cs | 1 - .../WorkflowExecutor.IntegrationTests/Support/RabbitConsumer.cs | 2 +- 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/IntegrationTests/TaskManager.IntegrationTests/Hooks.cs b/tests/IntegrationTests/TaskManager.IntegrationTests/Hooks.cs index d0d962303..614662fb9 100644 --- a/tests/IntegrationTests/TaskManager.IntegrationTests/Hooks.cs +++ b/tests/IntegrationTests/TaskManager.IntegrationTests/Hooks.cs @@ -112,7 +112,6 @@ await RetryPolicy.ExecuteAsync(async () => } [AfterScenario] - [BeforeScenario] public void PurgeRabbitMessages() { RabbitConnectionFactory.PurgeQueue(TestExecutionConfig.RabbitConfig.TaskDispatchQueue); diff --git a/tests/IntegrationTests/TaskManager.IntegrationTests/Support/RabbitConsumer.cs b/tests/IntegrationTests/TaskManager.IntegrationTests/Support/RabbitConsumer.cs index e918ee50c..d4b909041 100644 --- a/tests/IntegrationTests/TaskManager.IntegrationTests/Support/RabbitConsumer.cs +++ b/tests/IntegrationTests/TaskManager.IntegrationTests/Support/RabbitConsumer.cs @@ -27,7 +27,7 @@ public RabbitConsumer(IModel channel, string exchange, string routingKey) Exchange = exchange; RoutingKey = routingKey; Channel = channel; - Queue = Channel.QueueDeclare(queue: string.Empty, durable: true, exclusive: false, autoDelete: false); + Queue = Channel.QueueDeclare(queue: routingKey, durable: true, exclusive: false, autoDelete: false); Channel.QueueBind(Queue.QueueName, Exchange, RoutingKey); Channel.ExchangeDeclare(Exchange, ExchangeType.Topic, durable: true); } diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Hooks.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Hooks.cs index bfd7d5f08..94ecc90dc 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Hooks.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Hooks.cs @@ -189,7 +189,6 @@ public void DeleteTestData() } [AfterScenario] - [BeforeScenario] public void PurgeRabbitMessages() { RabbitConnectionFactory.PurgeQueue(TestExecutionConfig.RabbitConfig.TaskDispatchQueue); diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitConsumer.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitConsumer.cs index fd04a3fcf..cf48036b4 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitConsumer.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitConsumer.cs @@ -27,7 +27,7 @@ public RabbitConsumer(IModel channel, string exchange, string routingKey) Exchange = exchange; RoutingKey = routingKey; Channel = channel; - Queue = Channel.QueueDeclare(queue: string.Empty, durable: true, exclusive: false, autoDelete: false); + Queue = Channel.QueueDeclare(queue: routingKey, durable: true, exclusive: false, autoDelete: false); Channel.QueueBind(Queue.QueueName, Exchange, RoutingKey); Channel.ExchangeDeclare(Exchange, ExchangeType.Topic, durable: true); } From 0779db7826d9da3dc076633ae80457df29ce74e7 Mon Sep 17 00:00:00 2001 From: Joe Batt Date: Tue, 26 Jul 2022 15:13:10 +0100 Subject: [PATCH 3/3] Updated Test Data Signed-off-by: Joe Batt --- .../Features/TaskUpdate.feature | 2 +- .../TestData/TaskDispatchTestData.cs | 48 +++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/tests/IntegrationTests/TaskManager.IntegrationTests/Features/TaskUpdate.feature b/tests/IntegrationTests/TaskManager.IntegrationTests/Features/TaskUpdate.feature index 7c8607781..1b1ca5e2c 100644 --- a/tests/IntegrationTests/TaskManager.IntegrationTests/Features/TaskUpdate.feature +++ b/tests/IntegrationTests/TaskManager.IntegrationTests/Features/TaskUpdate.feature @@ -19,7 +19,7 @@ Integration tests for testing TaskUpdateEvents from TaskManager @TaskDispatch_TaskUpdate Scenario: TaskUpdateEvent is published with status Accepted after receiving a valid TaskDispatchEvent Given I have a bucket in MinIO bucket1 - When A Task Dispatch event is published Task_Dispatch_Clinical_Review_Full_Patient_Details + When A Task Dispatch event is published Task_Dispatch_Accepted Then A Task Update event with status Accepted is published with Task Dispatch details @TaskDispatch_TaskUpdate diff --git a/tests/IntegrationTests/TaskManager.IntegrationTests/TestData/TaskDispatchTestData.cs b/tests/IntegrationTests/TaskManager.IntegrationTests/TestData/TaskDispatchTestData.cs index de9e2e23b..b3e29e7e6 100644 --- a/tests/IntegrationTests/TaskManager.IntegrationTests/TestData/TaskDispatchTestData.cs +++ b/tests/IntegrationTests/TaskManager.IntegrationTests/TestData/TaskDispatchTestData.cs @@ -599,6 +599,54 @@ public static class TaskDispatchesTestData } } }, + new TaskDispatchTestData + { + Name = "Task_Dispatch_Accepted", + TaskDispatchEvent = new TaskDispatchEvent() + { + PayloadId = Guid.NewGuid().ToString(), + CorrelationId = Guid.NewGuid().ToString(), + ExecutionId = Guid.NewGuid().ToString(), + WorkflowInstanceId = Guid.NewGuid().ToString(), + TaskId = Guid.NewGuid().ToString(), + Status = TaskExecutionStatus.Dispatched, + TaskPluginType = "aide_clinical_review", + Inputs = new List() + { + new Messaging.Common.Storage + { + Name = "input", + Endpoint = "//test", + Credentials = new Messaging.Common.Credentials() + { + AccessKey = "test", + AccessToken = "test", + }, + Bucket = "bucket1", + RelativeRootPath = "//dcm" + } + }, + IntermediateStorage = new Messaging.Common.Storage + { + Name = "input", + Endpoint = "//test", + Credentials = new Messaging.Common.Credentials() + { + AccessKey = "test1", + AccessToken = "test", + }, + Bucket = "bucket1", + RelativeRootPath = "//dcm" + }, + TaskPluginArguments = new Dictionary() + { + { "workflow_name", "Workflow_1" }, + { "reviewed_task_details", "Reviewed_Task" }, + { "patient_id", "100001" }, + { "queue_name", "aide.clinical_review.request" }, + } + } + }, }; } }