diff --git a/src/Shared/Configuration/MessageBrokerConfigurationKeys.cs b/src/Shared/Configuration/MessageBrokerConfigurationKeys.cs index 18a221cd1..088f840eb 100644 --- a/src/Shared/Configuration/MessageBrokerConfigurationKeys.cs +++ b/src/Shared/Configuration/MessageBrokerConfigurationKeys.cs @@ -64,16 +64,23 @@ public class MessageBrokerConfigurationKeys /// /// Gets or sets the topic for publishing task update events. - /// Defaults to `md.tasks.update`. + /// Defaults to `md.tasks.cancellation`. /// [ConfigurationKeyName("taskCancellation")] public string TaskCancellationRequest { get; set; } = "md.tasks.cancellation"; /// /// Gets or sets the topic for publishing clinical review request events. - /// Defaults to `md.tasks.update`. + /// Defaults to `aide.clinical_review.request`. /// [ConfigurationKeyName("aideClinicalReviewRequest")] public string AideClinicalReviewRequest { get; set; } = "aide.clinical_review.request"; + + /// + /// Gets or sets the topic for publishing clinical review cancelation events. + /// Defaults to `aide.clinical_review.cancellation`. + /// + [ConfigurationKeyName("aideClinicalReviewCancelation")] + public string AideClinicalReviewCancelation { get; set; } = "aide.clinical_review.cancellation"; } } diff --git a/src/Shared/Configuration/WorkflowManagerOptions.cs b/src/Shared/Configuration/WorkflowManagerOptions.cs index 0359ae4c4..67fc18162 100755 --- a/src/Shared/Configuration/WorkflowManagerOptions.cs +++ b/src/Shared/Configuration/WorkflowManagerOptions.cs @@ -15,6 +15,7 @@ */ using System; +using System.Collections.Generic; using Microsoft.Extensions.Configuration; namespace Monai.Deploy.WorkflowManager.Configuration @@ -47,6 +48,10 @@ public class WorkflowManagerOptions : PagedOptions [ConfigurationKeyName("taskTimeoutMinutes")] public double TaskTimeoutMinutes { get; set; } = 60; + [ConfigurationKeyName("perTaskTypeTimeoutMinutes")] + public Dictionary PerTaskTypeTimeoutMinutes { get; set; } + + public TimeSpan TaskTimeout { get => TimeSpan.FromMinutes(TaskTimeoutMinutes); } /// diff --git a/src/TaskManager/TaskManager/appsettings.json b/src/TaskManager/TaskManager/appsettings.json index 60074c561..cd0e7c6ac 100755 --- a/src/TaskManager/TaskManager/appsettings.json +++ b/src/TaskManager/TaskManager/appsettings.json @@ -71,7 +71,8 @@ "exportComplete": "md.export.complete", "exportRequestPrefix": "md.export.request", "taskCallback": "md.tasks.callback", - "aideClinicalReviewRequest": "aide.clinical_review.request" + "aideClinicalReviewRequest": "aide.clinical_review.request", + "aideClinicalReviewCancelation": "aide.clinical_review.cancellation" }, "dicomAgents": { "dicomWebAgentName": "monaidicomweb", diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index 835a1555d..21d2892ed 100755 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -16,6 +16,7 @@ using System.Globalization; using System.Linq; +using System.Runtime.CompilerServices; using Ardalis.GuardClauses; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -52,9 +53,11 @@ public class WorkflowExecuterService : IWorkflowExecuterService private readonly IPayloadService _payloadService; private readonly StorageServiceConfiguration _storageConfiguration; private readonly double _defaultTaskTimeoutMinutes; + private readonly Dictionary _defaultPerTaskTypeTimeoutMinutes = new Dictionary(); private string TaskDispatchRoutingKey { get; } private string ExportRequestRoutingKey { get; } + private string TaskTimeoutRoutingKey { get; } public WorkflowExecuterService( ILogger logger, @@ -81,7 +84,9 @@ public WorkflowExecuterService( _storageConfiguration = storageConfiguration.Value; _defaultTaskTimeoutMinutes = configuration.Value.TaskTimeoutMinutes; + _defaultPerTaskTypeTimeoutMinutes = configuration.Value.PerTaskTypeTimeoutMinutes; TaskDispatchRoutingKey = configuration.Value.Messaging.Topics.TaskDispatchRequest; + TaskTimeoutRoutingKey = configuration.Value.Messaging.Topics.AideClinicalReviewCancelation; ExportRequestRoutingKey = $"{configuration.Value.Messaging.Topics.ExportRequestPrefix}.{configuration.Value.Messaging.DicomAgents.ScuAgentName}"; _logger = logger ?? throw new ArgumentNullException(nameof(logger)); @@ -275,6 +280,7 @@ public async Task ProcessTaskUpdate(TaskUpdateEvent message) if (message.Reason == FailureReason.TimedOut && currentTask.Status == TaskExecutionStatus.Failed) { _logger.TaskTimedOut(message.TaskId, message.WorkflowInstanceId, currentTask.Timeout); + await TimeOutEvent(workflowInstance, currentTask, message.CorrelationId); return false; } @@ -750,6 +756,15 @@ private async Task ExportRequest(WorkflowInstance workflowInstance, TaskEx return true; } + private async Task TimeOutEvent(WorkflowInstance workflowInstance, TaskExecution taskExec, string correlationId) + { + var exportRequestEvent = EventMapper.GenerateTaskCancellationEvent("", taskExec.ExecutionId, workflowInstance.Id, taskExec.TaskId, FailureReason.TimedOut, "Timed out"); + var jsonMesssage = new JsonMessage(exportRequestEvent, MessageBrokerConfiguration.WorkflowManagerApplicationId, correlationId, Guid.NewGuid().ToString()); + + await _messageBrokerPublisherService.Publish(TaskTimeoutRoutingKey, jsonMesssage.ToMessage()); + return true; + } + private async Task CreateWorkflowInstanceAsync(WorkflowRequestEvent message, WorkflowRevision workflow) { Guard.Against.Null(message, nameof(message)); @@ -822,6 +837,10 @@ public async Task CreateTaskExecutionAsync(TaskObject task, { task.TimeoutMinutes = _defaultTaskTimeoutMinutes; } + if (_defaultPerTaskTypeTimeoutMinutes is not null && _defaultPerTaskTypeTimeoutMinutes.ContainsKey(task.Type)) + { + task.TimeoutMinutes = _defaultPerTaskTypeTimeoutMinutes[task.Type]; + } var inputArtifacts = new Dictionary(); var artifactFound = true; diff --git a/src/WorkflowManager/WorkflowManager/appsettings.json b/src/WorkflowManager/WorkflowManager/appsettings.json index f796d4c19..7fd29b5cb 100755 --- a/src/WorkflowManager/WorkflowManager/appsettings.json +++ b/src/WorkflowManager/WorkflowManager/appsettings.json @@ -35,6 +35,9 @@ "TaskManager": { "concurrency": 1, "taskTimeoutMinutes": 60, + "perTaskTypeTimeoutMinutes": { + "aide_clinical_review": 5760 + }, "plug-ins": { "argo": "Monai.Deploy.WorkflowManager.TaskManager.Argo.ArgoPlugin, Monai.Deploy.WorkflowManager.TaskManager.Argo", "aide_clinical_review": "Monai.Deploy.WorkflowManager.TaskManager.AideClinicalReview.AideClinicalReviewPlugin, Monai.Deploy.WorkflowManager.TaskManager.AideClinicalReview", diff --git a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs index 90e000de3..2bf5e4126 100755 --- a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs +++ b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs @@ -58,6 +58,8 @@ public class WorkflowExecuterServiceTests private readonly Mock _workflowService; private readonly IOptions _configuration; private readonly IOptions _storageConfiguration; + private readonly int _timeoutForTypeTask = 999; + private readonly int _timeoutForDefault = 966; public WorkflowExecuterServiceTests() { @@ -71,7 +73,7 @@ public WorkflowExecuterServiceTests() _payloadService = new Mock(); _workflowService = new Mock(); - _configuration = Options.Create(new WorkflowManagerOptions() { Messaging = new MessageBrokerConfiguration { Topics = new MessageBrokerConfigurationKeys { TaskDispatchRequest = "md.task.dispatch", ExportRequestPrefix = "md.export.request" }, DicomAgents = new DicomAgentConfiguration { DicomWebAgentName = "monaidicomweb" } } }); + _configuration = Options.Create(new WorkflowManagerOptions() { TaskTimeoutMinutes = _timeoutForDefault, PerTaskTypeTimeoutMinutes = new Dictionary { { "taskType", _timeoutForTypeTask } }, Messaging = new MessageBrokerConfiguration { Topics = new MessageBrokerConfigurationKeys { TaskDispatchRequest = "md.task.dispatch", ExportRequestPrefix = "md.export.request" }, DicomAgents = new DicomAgentConfiguration { DicomWebAgentName = "monaidicomweb" } } }); _storageConfiguration = Options.Create(new StorageServiceConfiguration() { Settings = new Dictionary { { "bucket", "testbucket" }, { "endpoint", "localhost" }, { "securedConnection", "False" } } }); var dicom = new Mock(); @@ -1906,6 +1908,65 @@ public async Task ProcessTaskUpdate_ValidTaskUpdateEventWorkflowDoesNotExist_Ret response.Should().BeTrue(); } + [Fact] + public async Task ProcessTaskUpdate_Timout_Sends_Message_To_TaskTimeoutRoutingKey() + { + var workflowInstanceId = Guid.NewGuid().ToString(); + + var metadata = new Dictionary(); + metadata.Add("a", "b"); + metadata.Add("c", "d"); + + var updateEvent = new TaskUpdateEvent + { + WorkflowInstanceId = workflowInstanceId, + TaskId = "pizza", + ExecutionId = Guid.NewGuid().ToString(), + Status = TaskExecutionStatus.Failed, + Reason = FailureReason.TimedOut, + Message = "This is a message", + Metadata = metadata, + CorrelationId = Guid.NewGuid().ToString() + }; + + var workflowId = Guid.NewGuid().ToString(); + + var workflow = new WorkflowRevision + { + Id = Guid.NewGuid().ToString(), + WorkflowId = workflowId, + Revision = 1, + Workflow = new Workflow + { + Name = "Workflowname2", + Description = "Workflowdesc2", + Version = "1", + } + }; + + var workflowInstance = new WorkflowInstance + { + Id = workflowInstanceId, + WorkflowId = workflowId, + WorkflowName = workflow.Workflow.Name, + PayloadId = Guid.NewGuid().ToString(), + Status = Status.Created, + BucketId = "bucket", + Tasks = new List + { + new TaskExecution + { + TaskId = "pizza", + Status = TaskExecutionStatus.Failed + } + } + }; + + _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(workflowInstance.Id)).ReturnsAsync(workflowInstance); + var response = await WorkflowExecuterService.ProcessTaskUpdate(updateEvent); + _messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.AideClinicalReviewCancelation, It.IsAny()), Times.Exactly(1)); + } + [Fact] public async Task ProcessExportComplete_ValidExportCompleteEventMultipleTaskDestinationsDispatched_ReturnsTrue() { @@ -2259,5 +2320,57 @@ public void AttachPatientMetaData_AtachesDataToTaskExec_TaskExecShouldHavePatien taskExec.TaskPluginArguments[PatientKeys.PatientHospitalId].Should().BeSameAs(patientDetails.PatientHospitalId); taskExec.TaskPluginArguments[PatientKeys.PatientName].Should().BeSameAs(patientDetails.PatientName); } + + [Fact] + public async Task TaskExecShouldHaveCorrectTimeout() + { + var workflowId = Guid.NewGuid().ToString(); + var payloadId = Guid.NewGuid().ToString(); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var pizzaTask = new TaskObject + { + Id = "pizza", + Type = "taskType", + Description = "taskdesc", + }; + + var workflowInstance = new WorkflowInstance + { + Id = workflowInstanceId, + WorkflowId = workflowId, + }; + var bucket = "bucket"; + + var newPizza = await WorkflowExecuterService.CreateTaskExecutionAsync(pizzaTask, workflowInstance, bucket, payloadId); + Assert.Equal(_timeoutForTypeTask, newPizza.TimeoutInterval); + + } + + [Fact] + public async Task TaskExecShouldPickUpConfiguredDefaultTimeout() + { + var workflowId = Guid.NewGuid().ToString(); + var payloadId = Guid.NewGuid().ToString(); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var pizzaTask = new TaskObject + { + Id = "pizza", + Type = "someothertype", + Description = "taskdesc", + }; + + var workflowInstance = new WorkflowInstance + { + Id = workflowInstanceId, + WorkflowId = workflowId, + }; + var bucket = "bucket"; + + var newPizza = await WorkflowExecuterService.CreateTaskExecutionAsync(pizzaTask, workflowInstance, bucket, payloadId); + Assert.Equal(_timeoutForDefault, newPizza.TimeoutInterval); + + } } }