Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Integration tests for testing TaskUpdateEvents from TaskManager
@TaskDispatch_TaskUpdate
Scenario: TaskUpdateEvent is published with status Accepted after receiving a valid TaskDispatchEvent
Given I have a bucket in MinIO bucket1
When A Task Dispatch event is published Task_Dispatch_Clinical_Review_Full_Patient_Details
When A Task Dispatch event is published Task_Dispatch_Accepted
Then A Task Update event with status Accepted is published with Task Dispatch details

@TaskDispatch_TaskUpdate
Expand Down
19 changes: 13 additions & 6 deletions tests/IntegrationTests/TaskManager.IntegrationTests/Hooks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
* limitations under the License.
*/

using System.Diagnostics;
using BoDi;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Monai.Deploy.WorkflowManager.IntegrationTests.Support;
Expand Down Expand Up @@ -107,10 +105,19 @@ await RetryPolicy.ExecuteAsync(async () =>
}
});

TaskDispatchPublisher = new RabbitPublisher(RabbitConnectionFactory.GetConnectionFactory(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskDispatchQueue);
TaskCallbackPublisher = new RabbitPublisher(RabbitConnectionFactory.GetConnectionFactory(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskCallbackQueue);
TaskUpdateConsumer = new RabbitConsumer(RabbitConnectionFactory.GetConnectionFactory(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskUpdateQueue);
ClinicalReviewConsumer = new RabbitConsumer(RabbitConnectionFactory.GetConnectionFactory(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.ClinicalReviewQueue);
TaskDispatchPublisher = new RabbitPublisher(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskDispatchQueue);
TaskCallbackPublisher = new RabbitPublisher(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskCallbackQueue);
TaskUpdateConsumer = new RabbitConsumer(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskUpdateQueue);
ClinicalReviewConsumer = new RabbitConsumer(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.ClinicalReviewQueue);
}

[AfterScenario]
public void PurgeRabbitMessages()
{
RabbitConnectionFactory.PurgeQueue(TestExecutionConfig.RabbitConfig.TaskDispatchQueue);
RabbitConnectionFactory.PurgeQueue(TestExecutionConfig.RabbitConfig.TaskUpdateQueue);
RabbitConnectionFactory.PurgeQueue(TestExecutionConfig.RabbitConfig.TaskCallbackQueue);
RabbitConnectionFactory.PurgeQueue(TestExecutionConfig.RabbitConfig.ClinicalReviewQueue);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ namespace Monai.Deploy.WorkflowManager.TaskManager.IntegrationTests
{
public static class RabbitConnectionFactory
{
public static ConnectionFactory GetConnectionFactory()
private static IModel? Channel { get; set; }

public static IModel GetRabbitConnection()
{
var connectionFactory = new ConnectionFactory
{
Expand All @@ -30,7 +32,14 @@ public static ConnectionFactory GetConnectionFactory()
VirtualHost = TestExecutionConfig.RabbitConfig.VirtualHost
};

return connectionFactory;
Channel = connectionFactory.CreateConnection().CreateModel();

return Channel;
}

public static void PurgeQueue(string queueName)
{
Channel?.QueuePurge(queueName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ namespace Monai.Deploy.WorkflowManager.TaskManager.IntegrationTests
{
public class RabbitConsumer
{
public RabbitConsumer(ConnectionFactory connectionFactory, string exchange, string routingKey)
public RabbitConsumer(IModel channel, string exchange, string routingKey)
{
Exchange = exchange;
RoutingKey = routingKey;
var connection = connectionFactory.CreateConnection();
Channel = connection.CreateModel();
Queue = Channel.QueueDeclare(queue: string.Empty, durable: true, exclusive: false, autoDelete: false);
Channel = channel;
Queue = Channel.QueueDeclare(queue: routingKey, durable: true, exclusive: false, autoDelete: false);
Channel.QueueBind(Queue.QueueName, Exchange, RoutingKey);
Channel.ExchangeDeclare(Exchange, ExchangeType.Topic, durable: true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ namespace Monai.Deploy.WorkflowManager.TaskManager.IntegrationTests
{
public class RabbitPublisher
{
public RabbitPublisher(ConnectionFactory connectionFactory, string exchange, string routingKey)
public RabbitPublisher(IModel channel, string exchange, string routingKey)
{
Exchange = exchange;
RoutingKey = routingKey;
var connection = connectionFactory.CreateConnection();
Channel = connection.CreateModel();
Channel = channel;
}

private string Exchange { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,54 @@ public static class TaskDispatchesTestData
}
}
},
new TaskDispatchTestData
{
Name = "Task_Dispatch_Accepted",
TaskDispatchEvent = new TaskDispatchEvent()
{
PayloadId = Guid.NewGuid().ToString(),
CorrelationId = Guid.NewGuid().ToString(),
ExecutionId = Guid.NewGuid().ToString(),
WorkflowInstanceId = Guid.NewGuid().ToString(),
TaskId = Guid.NewGuid().ToString(),
Status = TaskExecutionStatus.Dispatched,
TaskPluginType = "aide_clinical_review",
Inputs = new List<Messaging.Common.Storage>()
{
new Messaging.Common.Storage
{
Name = "input",
Endpoint = "//test",
Credentials = new Messaging.Common.Credentials()
{
AccessKey = "test",
AccessToken = "test",
},
Bucket = "bucket1",
RelativeRootPath = "//dcm"
}
},
IntermediateStorage = new Messaging.Common.Storage
{
Name = "input",
Endpoint = "//test",
Credentials = new Messaging.Common.Credentials()
{
AccessKey = "test1",
AccessToken = "test",
},
Bucket = "bucket1",
RelativeRootPath = "//dcm"
},
TaskPluginArguments = new Dictionary<string, string>()
{
{ "workflow_name", "Workflow_1" },
{ "reviewed_task_details", "Reviewed_Task" },
{ "patient_id", "100001" },
{ "queue_name", "aide.clinical_review.request" },
}
}
},
};
}
}
18 changes: 13 additions & 5 deletions tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Hooks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public Hooks(IObjectContainer objectContainer)
}

private static HttpClient? HttpClient { get; set; }
public static AsyncRetryPolicy RetryPolicy { get; private set; }
public static AsyncRetryPolicy? RetryPolicy { get; private set; }
private static RabbitPublisher? WorkflowPublisher { get; set; }
private static RabbitConsumer? TaskDispatchConsumer { get; set; }
private static RabbitPublisher? TaskUpdatePublisher { get; set; }
Expand Down Expand Up @@ -117,9 +117,9 @@ await RetryPolicy.ExecuteAsync(async () =>
}
});

WorkflowPublisher = new RabbitPublisher(RabbitConnectionFactory.GetConnectionFactory(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.WorkflowRequestQueue);
TaskDispatchConsumer = new RabbitConsumer(RabbitConnectionFactory.GetConnectionFactory(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskDispatchQueue);
TaskUpdatePublisher = new RabbitPublisher(RabbitConnectionFactory.GetConnectionFactory(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskUpdateQueue);
WorkflowPublisher = new RabbitPublisher(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.WorkflowRequestQueue);
TaskDispatchConsumer = new RabbitConsumer(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskDispatchQueue);
TaskUpdatePublisher = new RabbitPublisher(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskUpdateQueue);
}

/// <summary>
Expand Down Expand Up @@ -183,11 +183,19 @@ public void DeleteTestData()

foreach (var workflowRevision in dataHelper.WorkflowRevisions)
{
MongoClient.DeleteWorkflowRevisionDocumentByWorkflowId(workflowRevision.WorkflowId);
MongoClient?.DeleteWorkflowRevisionDocumentByWorkflowId(workflowRevision.WorkflowId);
}
}
}

[AfterScenario]
public void PurgeRabbitMessages()
{
RabbitConnectionFactory.PurgeQueue(TestExecutionConfig.RabbitConfig.TaskDispatchQueue);
RabbitConnectionFactory.PurgeQueue(TestExecutionConfig.RabbitConfig.TaskUpdateQueue);
RabbitConnectionFactory.PurgeQueue(TestExecutionConfig.RabbitConfig.WorkflowRequestQueue);
}

/// <summary>
/// Runs after all tests to closes Rabbit connections.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ namespace Monai.Deploy.WorkflowManager.IntegrationTests.Support
{
public static class RabbitConnectionFactory
{
public static ConnectionFactory GetConnectionFactory()
private static IModel? Channel { get; set; }

public static IModel GetRabbitConnection()
{
var connectionFactory = new ConnectionFactory
{
Expand All @@ -31,7 +33,14 @@ public static ConnectionFactory GetConnectionFactory()
VirtualHost = TestExecutionConfig.RabbitConfig.VirtualHost
};

return connectionFactory;
Channel = connectionFactory.CreateConnection().CreateModel();

return Channel;
}

public static void PurgeQueue(string queueName)
{
Channel?.QueuePurge(queueName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ namespace Monai.Deploy.WorkflowManager.IntegrationTests.Support
{
public class RabbitConsumer
{
public RabbitConsumer(ConnectionFactory connectionFactory, string exchange, string routingKey)
public RabbitConsumer(IModel channel, string exchange, string routingKey)
{
Exchange = exchange;
RoutingKey = routingKey;
var connection = connectionFactory.CreateConnection();
Channel = connection.CreateModel();
Queue = Channel.QueueDeclare(queue: string.Empty, durable: true, exclusive: false, autoDelete: false);
Channel = channel;
Queue = Channel.QueueDeclare(queue: routingKey, durable: true, exclusive: false, autoDelete: false);
Channel.QueueBind(Queue.QueueName, Exchange, RoutingKey);
Channel.ExchangeDeclare(Exchange, ExchangeType.Topic, durable: true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ namespace Monai.Deploy.WorkflowManager.IntegrationTests.Support
{
public class RabbitPublisher
{
public RabbitPublisher(ConnectionFactory connectionFactory, string exchange, string routingKey)
public RabbitPublisher(IModel channel, string exchange, string routingKey)
{
Exchange = exchange;
RoutingKey = routingKey;
var connection = connectionFactory.CreateConnection();
Channel = connection.CreateModel();
Channel = channel;
Channel.ExchangeDeclare(Exchange, ExchangeType.Topic, durable: true);
}

Expand Down