Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/Shared/Configuration/MessageBrokerConfigurationKeys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,23 @@ public class MessageBrokerConfigurationKeys

/// <summary>
/// Gets or sets the topic for publishing task update events.
/// Defaults to `md.tasks.update`.
/// Defaults to `md.tasks.cancellation`.
/// </summary>
[ConfigurationKeyName("taskCancellation")]
public string TaskCancellationRequest { get; set; } = "md.tasks.cancellation";

/// <summary>
/// Gets or sets the topic for publishing clinical review request events.
/// Defaults to `md.tasks.update`.
/// Defaults to `aide.clinical_review.request`.
/// </summary>
[ConfigurationKeyName("aideClinicalReviewRequest")]
public string AideClinicalReviewRequest { get; set; } = "aide.clinical_review.request";

/// <summary>
/// Gets or sets the topic for publishing clinical review cancelation events.
/// Defaults to `aide.clinical_review.cancellation`.
/// </summary>
[ConfigurationKeyName("aideClinicalReviewCancelation")]
public string AideClinicalReviewCancelation { get; set; } = "aide.clinical_review.cancellation";
}
}
5 changes: 5 additions & 0 deletions src/Shared/Configuration/WorkflowManagerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

using System;
using System.Collections.Generic;
using Microsoft.Extensions.Configuration;

namespace Monai.Deploy.WorkflowManager.Configuration
Expand Down Expand Up @@ -47,6 +48,10 @@ public class WorkflowManagerOptions : PagedOptions
[ConfigurationKeyName("taskTimeoutMinutes")]
public double TaskTimeoutMinutes { get; set; } = 60;

[ConfigurationKeyName("perTaskTypeTimeoutMinutes")]
public Dictionary<string, double> PerTaskTypeTimeoutMinutes { get; set; }


public TimeSpan TaskTimeout { get => TimeSpan.FromMinutes(TaskTimeoutMinutes); }

/// <summary>
Expand Down
3 changes: 2 additions & 1 deletion src/TaskManager/TaskManager/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@
"exportComplete": "md.export.complete",
"exportRequestPrefix": "md.export.request",
"taskCallback": "md.tasks.callback",
"aideClinicalReviewRequest": "aide.clinical_review.request"
"aideClinicalReviewRequest": "aide.clinical_review.request",
"aideClinicalReviewCancelation": "aide.clinical_review.cancellation"
},
"dicomAgents": {
"dicomWebAgentName": "monaidicomweb",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System.Globalization;
using System.Linq;
using System.Runtime.CompilerServices;
using Ardalis.GuardClauses;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand Down Expand Up @@ -52,9 +53,11 @@ public class WorkflowExecuterService : IWorkflowExecuterService
private readonly IPayloadService _payloadService;
private readonly StorageServiceConfiguration _storageConfiguration;
private readonly double _defaultTaskTimeoutMinutes;
private readonly Dictionary<string, double> _defaultPerTaskTypeTimeoutMinutes = new Dictionary<string, double>();

private string TaskDispatchRoutingKey { get; }
private string ExportRequestRoutingKey { get; }
private string TaskTimeoutRoutingKey { get; }

public WorkflowExecuterService(
ILogger<WorkflowExecuterService> logger,
Expand All @@ -81,7 +84,9 @@ public WorkflowExecuterService(

_storageConfiguration = storageConfiguration.Value;
_defaultTaskTimeoutMinutes = configuration.Value.TaskTimeoutMinutes;
_defaultPerTaskTypeTimeoutMinutes = configuration.Value.PerTaskTypeTimeoutMinutes;
TaskDispatchRoutingKey = configuration.Value.Messaging.Topics.TaskDispatchRequest;
TaskTimeoutRoutingKey = configuration.Value.Messaging.Topics.AideClinicalReviewCancelation;
ExportRequestRoutingKey = $"{configuration.Value.Messaging.Topics.ExportRequestPrefix}.{configuration.Value.Messaging.DicomAgents.ScuAgentName}";

_logger = logger ?? throw new ArgumentNullException(nameof(logger));
Expand Down Expand Up @@ -275,6 +280,7 @@ public async Task<bool> ProcessTaskUpdate(TaskUpdateEvent message)
if (message.Reason == FailureReason.TimedOut && currentTask.Status == TaskExecutionStatus.Failed)
{
_logger.TaskTimedOut(message.TaskId, message.WorkflowInstanceId, currentTask.Timeout);
await TimeOutEvent(workflowInstance, currentTask, message.CorrelationId);

return false;
}
Expand Down Expand Up @@ -750,6 +756,15 @@ private async Task<bool> ExportRequest(WorkflowInstance workflowInstance, TaskEx
return true;
}

private async Task<bool> TimeOutEvent(WorkflowInstance workflowInstance, TaskExecution taskExec, string correlationId)
{
var exportRequestEvent = EventMapper.GenerateTaskCancellationEvent("", taskExec.ExecutionId, workflowInstance.Id, taskExec.TaskId, FailureReason.TimedOut, "Timed out");
var jsonMesssage = new JsonMessage<TaskCancellationEvent>(exportRequestEvent, MessageBrokerConfiguration.WorkflowManagerApplicationId, correlationId, Guid.NewGuid().ToString());

await _messageBrokerPublisherService.Publish(TaskTimeoutRoutingKey, jsonMesssage.ToMessage());
return true;
}

private async Task<WorkflowInstance> CreateWorkflowInstanceAsync(WorkflowRequestEvent message, WorkflowRevision workflow)
{
Guard.Against.Null(message, nameof(message));
Expand Down Expand Up @@ -822,6 +837,10 @@ public async Task<TaskExecution> CreateTaskExecutionAsync(TaskObject task,
{
task.TimeoutMinutes = _defaultTaskTimeoutMinutes;
}
if (_defaultPerTaskTypeTimeoutMinutes is not null && _defaultPerTaskTypeTimeoutMinutes.ContainsKey(task.Type))
{
task.TimeoutMinutes = _defaultPerTaskTypeTimeoutMinutes[task.Type];
}

var inputArtifacts = new Dictionary<string, string>();
var artifactFound = true;
Expand Down
3 changes: 3 additions & 0 deletions src/WorkflowManager/WorkflowManager/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
"TaskManager": {
"concurrency": 1,
"taskTimeoutMinutes": 60,
"perTaskTypeTimeoutMinutes": {
"aide_clinical_review": 5760
},
"plug-ins": {
"argo": "Monai.Deploy.WorkflowManager.TaskManager.Argo.ArgoPlugin, Monai.Deploy.WorkflowManager.TaskManager.Argo",
"aide_clinical_review": "Monai.Deploy.WorkflowManager.TaskManager.AideClinicalReview.AideClinicalReviewPlugin, Monai.Deploy.WorkflowManager.TaskManager.AideClinicalReview",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class WorkflowExecuterServiceTests
private readonly Mock<IWorkflowService> _workflowService;
private readonly IOptions<WorkflowManagerOptions> _configuration;
private readonly IOptions<StorageServiceConfiguration> _storageConfiguration;
private readonly int _timeoutForTypeTask = 999;
private readonly int _timeoutForDefault = 966;

public WorkflowExecuterServiceTests()
{
Expand All @@ -71,7 +73,7 @@ public WorkflowExecuterServiceTests()
_payloadService = new Mock<IPayloadService>();
_workflowService = new Mock<IWorkflowService>();

_configuration = Options.Create(new WorkflowManagerOptions() { Messaging = new MessageBrokerConfiguration { Topics = new MessageBrokerConfigurationKeys { TaskDispatchRequest = "md.task.dispatch", ExportRequestPrefix = "md.export.request" }, DicomAgents = new DicomAgentConfiguration { DicomWebAgentName = "monaidicomweb" } } });
_configuration = Options.Create(new WorkflowManagerOptions() { TaskTimeoutMinutes = _timeoutForDefault, PerTaskTypeTimeoutMinutes = new Dictionary<string, double> { { "taskType", _timeoutForTypeTask } }, Messaging = new MessageBrokerConfiguration { Topics = new MessageBrokerConfigurationKeys { TaskDispatchRequest = "md.task.dispatch", ExportRequestPrefix = "md.export.request" }, DicomAgents = new DicomAgentConfiguration { DicomWebAgentName = "monaidicomweb" } } });
_storageConfiguration = Options.Create(new StorageServiceConfiguration() { Settings = new Dictionary<string, string> { { "bucket", "testbucket" }, { "endpoint", "localhost" }, { "securedConnection", "False" } } });

var dicom = new Mock<IDicomService>();
Expand Down Expand Up @@ -1906,6 +1908,65 @@ public async Task ProcessTaskUpdate_ValidTaskUpdateEventWorkflowDoesNotExist_Ret
response.Should().BeTrue();
}

[Fact]
public async Task ProcessTaskUpdate_Timout_Sends_Message_To_TaskTimeoutRoutingKey()
{
var workflowInstanceId = Guid.NewGuid().ToString();

var metadata = new Dictionary<string, object>();
metadata.Add("a", "b");
metadata.Add("c", "d");

var updateEvent = new TaskUpdateEvent
{
WorkflowInstanceId = workflowInstanceId,
TaskId = "pizza",
ExecutionId = Guid.NewGuid().ToString(),
Status = TaskExecutionStatus.Failed,
Reason = FailureReason.TimedOut,
Message = "This is a message",
Metadata = metadata,
CorrelationId = Guid.NewGuid().ToString()
};

var workflowId = Guid.NewGuid().ToString();

var workflow = new WorkflowRevision
{
Id = Guid.NewGuid().ToString(),
WorkflowId = workflowId,
Revision = 1,
Workflow = new Workflow
{
Name = "Workflowname2",
Description = "Workflowdesc2",
Version = "1",
}
};

var workflowInstance = new WorkflowInstance
{
Id = workflowInstanceId,
WorkflowId = workflowId,
WorkflowName = workflow.Workflow.Name,
PayloadId = Guid.NewGuid().ToString(),
Status = Status.Created,
BucketId = "bucket",
Tasks = new List<TaskExecution>
{
new TaskExecution
{
TaskId = "pizza",
Status = TaskExecutionStatus.Failed
}
}
};

_workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(workflowInstance.Id)).ReturnsAsync(workflowInstance);
var response = await WorkflowExecuterService.ProcessTaskUpdate(updateEvent);
_messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.AideClinicalReviewCancelation, It.IsAny<Message>()), Times.Exactly(1));
}

[Fact]
public async Task ProcessExportComplete_ValidExportCompleteEventMultipleTaskDestinationsDispatched_ReturnsTrue()
{
Expand Down Expand Up @@ -2259,5 +2320,57 @@ public void AttachPatientMetaData_AtachesDataToTaskExec_TaskExecShouldHavePatien
taskExec.TaskPluginArguments[PatientKeys.PatientHospitalId].Should().BeSameAs(patientDetails.PatientHospitalId);
taskExec.TaskPluginArguments[PatientKeys.PatientName].Should().BeSameAs(patientDetails.PatientName);
}

[Fact]
public async Task TaskExecShouldHaveCorrectTimeout()
{
var workflowId = Guid.NewGuid().ToString();
var payloadId = Guid.NewGuid().ToString();
var workflowInstanceId = Guid.NewGuid().ToString();

var pizzaTask = new TaskObject
{
Id = "pizza",
Type = "taskType",
Description = "taskdesc",
};

var workflowInstance = new WorkflowInstance
{
Id = workflowInstanceId,
WorkflowId = workflowId,
};
var bucket = "bucket";

var newPizza = await WorkflowExecuterService.CreateTaskExecutionAsync(pizzaTask, workflowInstance, bucket, payloadId);
Assert.Equal(_timeoutForTypeTask, newPizza.TimeoutInterval);

}

[Fact]
public async Task TaskExecShouldPickUpConfiguredDefaultTimeout()
{
var workflowId = Guid.NewGuid().ToString();
var payloadId = Guid.NewGuid().ToString();
var workflowInstanceId = Guid.NewGuid().ToString();

var pizzaTask = new TaskObject
{
Id = "pizza",
Type = "someothertype",
Description = "taskdesc",
};

var workflowInstance = new WorkflowInstance
{
Id = workflowInstanceId,
WorkflowId = workflowId,
};
var bucket = "bucket";

var newPizza = await WorkflowExecuterService.CreateTaskExecutionAsync(pizzaTask, workflowInstance, bucket, payloadId);
Assert.Equal(_timeoutForDefault, newPizza.TimeoutInterval);

}
}
}