diff --git a/src/Common/Configuration/ConfigurationValidator.cs b/src/Common/Configuration/ConfigurationValidator.cs index b1ba52a89..f052bd8ad 100644 --- a/src/Common/Configuration/ConfigurationValidator.cs +++ b/src/Common/Configuration/ConfigurationValidator.cs @@ -58,6 +58,9 @@ public bool IsTopicsValid(MessageBrokerConfigurationKeys configurationKeys) valid &= IsStringValueNotNull(nameof(configurationKeys.WorkflowRequest), configurationKeys.WorkflowRequest); valid &= IsStringValueNotNull(nameof(configurationKeys.ExportRequestPrefix), configurationKeys.ExportRequestPrefix); valid &= IsStringValueNotNull(nameof(configurationKeys.TaskDispatchRequest), configurationKeys.TaskDispatchRequest); + valid &= IsStringValueNotNull(nameof(configurationKeys.ExportHL7), configurationKeys.ExportHL7); + valid &= IsStringValueNotNull(nameof(configurationKeys.ExportHL7Complete), configurationKeys.ExportHL7Complete); + return valid; } diff --git a/src/Common/Configuration/MessageBrokerConfigurationKeys.cs b/src/Common/Configuration/MessageBrokerConfigurationKeys.cs index bd8e48bf6..900261209 100644 --- a/src/Common/Configuration/MessageBrokerConfigurationKeys.cs +++ b/src/Common/Configuration/MessageBrokerConfigurationKeys.cs @@ -34,6 +34,13 @@ public class MessageBrokerConfigurationKeys [ConfigurationKeyName("exportComplete")] public string ExportComplete { get; set; } = "md.export.complete"; + /// + /// Gets or sets the topic for publishing workflow requests. + /// Defaults to `md.export.complete`. + /// + [ConfigurationKeyName("exportHL7Complete")] + public string ExportHL7Complete { get; set; } = "md.export.hl7complete"; + /// /// Gets or sets the topic for publishing workflow requests. /// Defaults to `md.export.request`. @@ -98,5 +105,20 @@ public class MessageBrokerConfigurationKeys /// [ConfigurationKeyName("externalAppRequest")] public string ExternalAppRequest { get; set; } = "md.externalapp.request"; + + /// + /// Gets or sets the topic for publishing workflow requests. + /// Defaults to `md.export.request`. + /// + [ConfigurationKeyName("exportHl7")] + public string ExportHL7 { get; set; } = "md.export.hl7"; + + + /// + /// Gets or sets the topic for publishing export complete requests. + /// Defaults to `md_export_complete`. + /// + [ConfigurationKeyName("exportHl7Complete")] + public string ExportHl7Complete { get; set; } = "md.export.hl7complete"; } } diff --git a/src/Common/Miscellaneous/ValidationConstants.cs b/src/Common/Miscellaneous/ValidationConstants.cs index 44e323d13..eb6fabc04 100644 --- a/src/Common/Miscellaneous/ValidationConstants.cs +++ b/src/Common/Miscellaneous/ValidationConstants.cs @@ -118,6 +118,11 @@ public enum NotificationValues /// public const string ExternalAppTaskType = "remote_app_execution"; + /// + /// Key for Hl7 export task type. + /// + public const string HL7ExportTask = "export_hl7"; + /// /// Key for the export task type. /// @@ -141,7 +146,8 @@ public enum NotificationValues ExportTaskType, DockerTaskType, Email, - ExternalAppTaskType + ExternalAppTaskType, + HL7ExportTask }; } } diff --git a/src/TaskManager/API/Monai.Deploy.WorkflowManager.TaskManager.API.csproj b/src/TaskManager/API/Monai.Deploy.WorkflowManager.TaskManager.API.csproj index fe98f400f..00762e71f 100644 --- a/src/TaskManager/API/Monai.Deploy.WorkflowManager.TaskManager.API.csproj +++ b/src/TaskManager/API/Monai.Deploy.WorkflowManager.TaskManager.API.csproj @@ -43,4 +43,6 @@ + + diff --git a/src/TaskManager/API/packages.lock.json b/src/TaskManager/API/packages.lock.json old mode 100755 new mode 100644 diff --git a/src/TaskManager/Database/packages.lock.json b/src/TaskManager/Database/packages.lock.json old mode 100755 new mode 100644 diff --git a/src/TaskManager/TaskManager/Monai.Deploy.WorkflowManager.TaskManager.csproj b/src/TaskManager/TaskManager/Monai.Deploy.WorkflowManager.TaskManager.csproj index a9c1b71bf..921cd5060 100644 --- a/src/TaskManager/TaskManager/Monai.Deploy.WorkflowManager.TaskManager.csproj +++ b/src/TaskManager/TaskManager/Monai.Deploy.WorkflowManager.TaskManager.csproj @@ -78,7 +78,6 @@ - @@ -86,7 +85,7 @@ - + diff --git a/src/TaskManager/TaskManager/appsettings.json b/src/TaskManager/TaskManager/appsettings.json index efb322e3e..16b45b24d 100755 --- a/src/TaskManager/TaskManager/appsettings.json +++ b/src/TaskManager/TaskManager/appsettings.json @@ -77,6 +77,7 @@ "aideClinicalReviewCancelation": "aide.clinical_review.cancellation", "notificationEmailRequest": "aide.notification_email.request", "notificationEmailCancelation": "aide.notification_email.cancellation", + "exportHl7": "md.export.hl7" }, "dicomAgents": { "dicomWebAgentName": "monaidicomweb", diff --git a/src/WorkflowManager/Contracts/Constants/TaskTypeConstants.cs b/src/WorkflowManager/Contracts/Constants/TaskTypeConstants.cs index 1496763da..91595ad88 100644 --- a/src/WorkflowManager/Contracts/Constants/TaskTypeConstants.cs +++ b/src/WorkflowManager/Contracts/Constants/TaskTypeConstants.cs @@ -20,10 +20,10 @@ public static class TaskTypeConstants { public const string RouterTask = "router"; - public const string ExportTask = "export"; + public const string DicomExportTask = "export"; public const string ExternalAppTask = "remote_app_execution"; - public const string ExportHl7Task = "export_hl7"; + public const string HL7ExportTask = "export_hl7"; } } diff --git a/src/WorkflowManager/Database/packages.lock.json b/src/WorkflowManager/Database/packages.lock.json old mode 100755 new mode 100644 diff --git a/src/WorkflowManager/Logging/Log.200000.Workflow.cs b/src/WorkflowManager/Logging/Log.200000.Workflow.cs index e69836004..fff85817a 100644 --- a/src/WorkflowManager/Logging/Log.200000.Workflow.cs +++ b/src/WorkflowManager/Logging/Log.200000.Workflow.cs @@ -108,5 +108,8 @@ public static partial class Log [LoggerMessage(EventId = 210007, Level = LogLevel.Information, Message = "Exporting to MIG task Id {taskid}, export destination {destination} number of files {fileCount} Mig data plugins {plugins}.")] public static partial void LogMigExport(this ILogger logger, string taskid, string destination, int fileCount, string plugins); + + [LoggerMessage(EventId = 200018, Level = LogLevel.Error, Message = "ExportList or Artifacts are empty! workflowInstanceId {workflowInstanceId} TaskId {taskId}")] + public static partial void ExportListOrArtifactsAreEmpty(this ILogger logger, string taskId, string workflowInstanceId); } } diff --git a/src/WorkflowManager/Logging/Log.700000.Artifact.cs b/src/WorkflowManager/Logging/Log.700000.Artifact.cs index 215a043be..aa0d292e7 100644 --- a/src/WorkflowManager/Logging/Log.700000.Artifact.cs +++ b/src/WorkflowManager/Logging/Log.700000.Artifact.cs @@ -51,5 +51,12 @@ public static partial class Log [LoggerMessage(EventId = 700009, Level = LogLevel.Debug, Message = "Mandatory output artifacts for task {taskId} are missing. waiting for remaining artifacts... {missingArtifacts}")] public static partial void MandatoryOutputArtifactsMissingForTask(this ILogger logger, string taskId, string missingArtifacts); + [LoggerMessage(EventId = 700010, Level = LogLevel.Debug, Message = "no files exsist in storage {artifactList}")] + public static partial void NoFilesExistInStorage(this ILogger logger, string artifactList); + + [LoggerMessage(EventId = 700011, Level = LogLevel.Debug, Message = "adding files to workflowInstance {workflowInstanceId} :Task {taskId} : {artifactList}")] + public static partial void AddingFilesToWorkflowInstance(this ILogger logger, string workflowInstanceId, string taskId, string artifactList); + + } } diff --git a/src/WorkflowManager/Logging/packages.lock.json b/src/WorkflowManager/Logging/packages.lock.json old mode 100755 new mode 100644 diff --git a/src/WorkflowManager/PayloadListener/Services/PayloadListenerService.cs b/src/WorkflowManager/PayloadListener/Services/PayloadListenerService.cs index 02b4497c8..96162aa55 100644 --- a/src/WorkflowManager/PayloadListener/Services/PayloadListenerService.cs +++ b/src/WorkflowManager/PayloadListener/Services/PayloadListenerService.cs @@ -43,6 +43,7 @@ public class PayloadListenerService : IHostedService, IMonaiService, IDisposable public string TaskStatusUpdateRoutingKey { get; set; } public string ExportCompleteRoutingKey { get; set; } public string ArtifactRecievedRoutingKey { get; set; } + public string ExportHL7CompleteRoutingKey { get; set; } protected int Concurrency { get; set; } public ServiceStatus Status { get; set; } = ServiceStatus.Unknown; public string ServiceName => "Payload Listener Service"; @@ -67,6 +68,7 @@ public PayloadListenerService( WorkflowRequestRoutingKey = configuration.Value.Messaging.Topics.WorkflowRequest; ExportCompleteRoutingKey = configuration.Value.Messaging.Topics.ExportComplete; ArtifactRecievedRoutingKey = configuration.Value.Messaging.Topics.ArtifactRecieved; + ExportHL7CompleteRoutingKey = configuration.Value.Messaging.Topics.ExportHL7Complete; Concurrency = 2; @@ -110,6 +112,9 @@ private void SetupPolling() _messageSubscriber.SubscribeAsync(ArtifactRecievedRoutingKey, ArtifactRecievedRoutingKey, OnArtifactReceivedtReceivedCallbackAsync); _logger.EventSubscription(ServiceName, ArtifactRecievedRoutingKey); + + _messageSubscriber.SubscribeAsync(ExportHL7CompleteRoutingKey, ExportHL7CompleteRoutingKey, OnExportHL7CompleteReceivedCallback); + _logger.EventSubscription(ServiceName, ExportHL7CompleteRoutingKey); } private async Task OnWorkflowRequestReceivedCallbackAsync(MessageReceivedEventArgs eventArgs) @@ -156,6 +161,21 @@ private async Task OnExportCompleteReceivedCallback(MessageReceivedEventArgs eve } + private async Task OnExportHL7CompleteReceivedCallback(MessageReceivedEventArgs eventArgs) + { + using var loggerScope = _logger.BeginScope(new Common.Miscellaneous.LoggingDataDictionary + { + ["correlationId"] = eventArgs.Message.CorrelationId, + ["source"] = eventArgs.Message.ApplicationId, + ["messageId"] = eventArgs.Message.MessageId, + ["messageDescription"] = eventArgs.Message.MessageDescription, + }); + + _logger.ExportCompleteReceived(); + await _eventPayloadListenerService.ExportCompletePayload(eventArgs); + + } + private async Task OnArtifactReceivedtReceivedCallbackAsync(MessageReceivedEventArgs eventArgs) { diff --git a/src/WorkflowManager/PayloadListener/packages.lock.json b/src/WorkflowManager/PayloadListener/packages.lock.json index 6bd2e364b..c24a10ae6 100644 --- a/src/WorkflowManager/PayloadListener/packages.lock.json +++ b/src/WorkflowManager/PayloadListener/packages.lock.json @@ -833,6 +833,7 @@ "monai.deploy.workloadmanager.workflowexecuter": { "type": "Project", "dependencies": { + "Monai.Deploy.Messaging": "[1.0.5, )", "Monai.Deploy.WorkflowManager.Common": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Common.Configuration": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Common.Miscellaneous": "[1.0.0, )", diff --git a/src/WorkflowManager/Storage/packages.lock.json b/src/WorkflowManager/Storage/packages.lock.json old mode 100755 new mode 100644 diff --git a/src/WorkflowManager/WorkflowExecuter/Common/ArtifactMapper.cs b/src/WorkflowManager/WorkflowExecuter/Common/ArtifactMapper.cs index 524922a99..e7cf03f5d 100755 --- a/src/WorkflowManager/WorkflowExecuter/Common/ArtifactMapper.cs +++ b/src/WorkflowManager/WorkflowExecuter/Common/ArtifactMapper.cs @@ -181,7 +181,7 @@ private async Task> ConvertVariableStringToPath(Art var artifactName = variableWords[4]; var outputArtifact = task.OutputArtifacts?.FirstOrDefault(a => a.Key == artifactName); - if (!outputArtifact.HasValue) + if (!outputArtifact.HasValue || string.IsNullOrEmpty(outputArtifact.Value.Value)) { return default; } @@ -203,7 +203,7 @@ private async Task> VerifyExists(KeyValuePair); + artifact = await _storageService.VerifyObjectExistsAsync(bucketId, artifact.Value) ? artifact : default; } return artifact; diff --git a/src/WorkflowManager/WorkflowExecuter/Monai.Deploy.WorkloadManager.WorkflowExecuter.csproj b/src/WorkflowManager/WorkflowExecuter/Monai.Deploy.WorkloadManager.WorkflowExecuter.csproj index 9ef0454c9..0e3789995 100644 --- a/src/WorkflowManager/WorkflowExecuter/Monai.Deploy.WorkloadManager.WorkflowExecuter.csproj +++ b/src/WorkflowManager/WorkflowExecuter/Monai.Deploy.WorkloadManager.WorkflowExecuter.csproj @@ -37,6 +37,10 @@ + + + + @@ -46,6 +50,7 @@ + diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index 276815405..f7fc03b5c 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -56,11 +56,12 @@ public class WorkflowExecuterService : IWorkflowExecuterService private readonly IPayloadService _payloadService; private readonly StorageServiceConfiguration _storageConfiguration; private readonly double _defaultTaskTimeoutMinutes; - private readonly Dictionary _defaultPerTaskTypeTimeoutMinutes = new Dictionary(); + private readonly Dictionary _defaultPerTaskTypeTimeoutMinutes = new(); private string TaskDispatchRoutingKey { get; } private string ExportRequestRoutingKey { get; } private string ExternalAppRoutingKey { get; } + private string ExportHL7RoutingKey { get; } private string ClinicalReviewTimeoutRoutingKey { get; } public WorkflowExecuterService( @@ -96,6 +97,7 @@ public WorkflowExecuterService( _migExternalAppPlugins = configuration.Value.MigExternalAppPlugins.Select(p => p.Trim()).Where(p => p.Length > 0).ToList(); ExportRequestRoutingKey = $"{configuration.Value.Messaging.Topics.ExportRequestPrefix}.{configuration.Value.Messaging.DicomAgents.ScuAgentName}"; ExternalAppRoutingKey = configuration.Value.Messaging.Topics.ExternalAppRequest; + ExportHL7RoutingKey = configuration.Value.Messaging.Topics.ExportHL7; _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _workflowRepository = workflowRepository ?? throw new ArgumentNullException(nameof(workflowRepository)); _workflowInstanceRepository = workflowInstanceRepository ?? throw new ArgumentNullException(nameof(workflowInstanceRepository)); @@ -255,7 +257,7 @@ private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message var artifactsInStorage = (await _storageService.VerifyObjectsExistAsync(workflowInstance.BucketId, artifactList, default)) ?? new Dictionary(); if (artifactsInStorage.Any(a => a.Value) is false) { - _logger.LogDebug($"no files exsist in storage {JsonConvert.SerializeObject(artifactList)}"); + _logger.NoFilesExistInStorage(JsonConvert.SerializeObject(artifactList)); return; } @@ -273,9 +275,9 @@ private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message var currentTask = workflowInstance.Tasks?.Find(t => t.TaskId == taskId); - currentTask!.OutputArtifacts = validArtifacts; // added here are the parent function saves the object ! + currentTask!.OutputArtifacts = validArtifacts; // adding the actual paths here, the parent function does the saving of the changes - _logger.LogDebug($"adding files to workflowInstance {workflowInstance.Id} :Task {taskId} : {JsonConvert.SerializeObject(validArtifacts)}"); + _logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts)); await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts); } @@ -348,15 +350,15 @@ private static Task SwitchTasksAsync(TaskExecution task, Func routerFunc, Func exportFunc, Func externalFunc, - Func notCreatedStatusFunc, Func exportHl7Func, + Func notCreatedStatusFunc, Func defaultFunc) => task switch { { TaskType: TaskTypeConstants.RouterTask } => routerFunc(), - { TaskType: TaskTypeConstants.ExportTask } => exportFunc(), + { TaskType: TaskTypeConstants.DicomExportTask } => exportFunc(), { TaskType: TaskTypeConstants.ExternalAppTask } => externalFunc(), - { TaskType: TaskTypeConstants.ExportHl7Task } => exportHl7Func(), + { TaskType: TaskTypeConstants.HL7ExportTask } => exportHl7Func(), { Status: var s } when s != TaskExecutionStatus.Created => notCreatedStatusFunc(), _ => defaultFunc() }; @@ -493,7 +495,7 @@ public async Task ProcessExportComplete(ExportCompleteEvent message, strin await _workflowInstanceService.UpdateExportCompleteMetadataAsync(workflowInstance.Id, task.ExecutionId, message.FileStatuses); var succeededFileCount = message.FileStatuses.Count(f => f.Value == FileExportStatus.Success); - var totalFileCount = message.FileStatuses.Count(); + var totalFileCount = message.FileStatuses.Count; if (message.Status.Equals(ExportStatus.Success) && TaskExecutionStatus.Succeeded.IsTaskExecutionStatusUpdateValid(task.Status)) @@ -613,15 +615,35 @@ private async Task ExternalAppRequest(ExternalAppRequestEvent externalAppR private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId, List? plugins = null) { plugins ??= new List(); + var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId); + + if (exportList is null || artifactValues is null) + { + return; + } + + var exportRequestEvent = GetExportRequestEvent(workflowInstance, task, exportList, artifactValues, correlationId, plugins); + var jsonMesssage = new JsonMessage(exportRequestEvent, MessageBrokerConfiguration.WorkflowManagerApplicationId, exportRequestEvent.CorrelationId, Guid.NewGuid().ToString()); + + await _messageBrokerPublisherService.Publish(ExportRequestRoutingKey, jsonMesssage.ToMessage()); + await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched); + } + + private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId) + { var exportList = workflow.Workflow?.Tasks?.FirstOrDefault(t => t.Id == task.TaskId)?.ExportDestinations.Select(e => e.Name).ToArray(); + if (exportList is null || !exportList.Any()) + { + exportList = null; + } var artifactValues = await GetArtifactValues(workflow, workflowInstance, task, exportList, correlationId); if (artifactValues.IsNullOrEmpty()) { - return; + artifactValues = null; } - await DispatchDicomExport(workflowInstance, task, exportList, artifactValues, correlationId, plugins); + return (exportList, artifactValues); } private async Task GetArtifactValues(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string[]? exportList, string correlationId) @@ -659,11 +681,37 @@ private async Task GetArtifactValues(WorkflowRevision workflow, Workfl private async Task HandleHl7ExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId) { - // create message. send + var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId); + + if (exportList is null || artifactValues is null) + { + _logger.ExportListOrArtifactsAreEmpty(task.TaskId, workflowInstance.Id); + return; + } + var exportRequestEvent = GetExportRequestEvent(workflowInstance, task, exportList, artifactValues, correlationId, new List()); + exportRequestEvent.Target!.DataService = DataService.HL7; + var jsonMesssage = new JsonMessage(exportRequestEvent, MessageBrokerConfiguration.WorkflowManagerApplicationId, exportRequestEvent.CorrelationId, Guid.NewGuid().ToString()); + + await _messageBrokerPublisherService.Publish(ExportHL7RoutingKey, jsonMesssage.ToMessage()); + await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched); } - private string[] GetDicomExports(WorkflowRevision workflow, TaskExecution task, string[]? exportDestinations) + private ExportRequestEvent GetExportRequestEvent( + WorkflowInstance workflowInstance, + TaskExecution task, + string[] exportDestinations, + string[] artifactValues, + string correlationId, + List plugins) + { + _logger.LogMigExport(task.TaskId, string.Join(",", exportDestinations), artifactValues.Length, string.Join(",", plugins)); + var exportRequestEvent = EventMapper.ToExportRequestEvent(artifactValues, exportDestinations, task.TaskId, workflowInstance.Id, correlationId, plugins); + exportRequestEvent.PayloadId = workflowInstance.PayloadId; + return exportRequestEvent; + } + + private static string[] GetDicomExports(WorkflowRevision workflow, TaskExecution task, string[]? exportDestinations) { var validExportDestinations = workflow.Workflow?.InformaticsGateway?.ExportDestinations; @@ -691,17 +739,7 @@ private string[] GetDicomExports(WorkflowRevision workflow, TaskExecution task, return new List(task.InputArtifacts.Values).ToArray(); } - private async Task DispatchDicomExport(WorkflowInstance workflowInstance, TaskExecution task, string[]? exportDestinations, string[] artifactValues, string correlationId, List plugins) - { - if (exportDestinations is null || !exportDestinations.Any()) - { - return false; - } - _logger.LogMigExport(task.TaskId, string.Join(",", exportDestinations), artifactValues.Length, string.Join(",", plugins)); - await ExportRequest(workflowInstance, task, exportDestinations, artifactValues, correlationId, plugins); - return await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched); - } private async Task HandleOutputArtifacts(WorkflowInstance workflowInstance, List outputs, TaskExecution task, WorkflowRevision workflowRevision) { @@ -777,7 +815,7 @@ private async Task DispatchTaskDestinations(WorkflowInstance workflowInsta continue; } - if (string.Equals(taskExec!.TaskType, TaskTypeConstants.ExportTask, StringComparison.InvariantCultureIgnoreCase)) + if (string.Equals(taskExec!.TaskType, TaskTypeConstants.DicomExportTask, StringComparison.InvariantCultureIgnoreCase)) { await HandleDicomExportAsync(workflow, workflowInstance, taskExec!, correlationId); @@ -791,7 +829,7 @@ private async Task DispatchTaskDestinations(WorkflowInstance workflowInsta continue; } - if (string.Equals(taskExec!.TaskType, TaskTypeConstants.ExportHl7Task, StringComparison.InvariantCultureIgnoreCase)) + if (string.Equals(taskExec!.TaskType, TaskTypeConstants.HL7ExportTask, StringComparison.InvariantCultureIgnoreCase)) { await HandleHl7ExportAsync(workflow, workflowInstance, taskExec!, correlationId); @@ -941,15 +979,7 @@ private async Task DispatchTask(WorkflowInstance workflowInstance, Workflo } } - private async Task ExportRequest(WorkflowInstance workflowInstance, TaskExecution taskExec, string[] exportDestinations, IList dicomImages, string correlationId, List plugins) - { - var exportRequestEvent = EventMapper.ToExportRequestEvent(dicomImages, exportDestinations, taskExec.TaskId, workflowInstance.Id, correlationId, plugins); - exportRequestEvent.PayloadId = workflowInstance.PayloadId; - var jsonMesssage = new JsonMessage(exportRequestEvent, MessageBrokerConfiguration.WorkflowManagerApplicationId, exportRequestEvent.CorrelationId, Guid.NewGuid().ToString()); - await _messageBrokerPublisherService.Publish(ExportRequestRoutingKey, jsonMesssage.ToMessage()); - return true; - } private async Task ClinicalReviewTimeOutEvent(WorkflowInstance workflowInstance, TaskExecution taskExec, string correlationId) { diff --git a/src/WorkflowManager/WorkflowExecuter/packages.lock.json b/src/WorkflowManager/WorkflowExecuter/packages.lock.json index fc30e4702..0266cb757 100644 --- a/src/WorkflowManager/WorkflowExecuter/packages.lock.json +++ b/src/WorkflowManager/WorkflowExecuter/packages.lock.json @@ -2,6 +2,18 @@ "version": 1, "dependencies": { "net6.0": { + "Monai.Deploy.Messaging": { + "type": "Direct", + "requested": "[1.0.5, )", + "resolved": "1.0.5", + "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", + "dependencies": { + "Ardalis.GuardClauses": "4.1.1", + "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", + "Newtonsoft.Json": "13.0.3", + "System.IO.Abstractions": "17.2.3" + } + }, "Ardalis.GuardClauses": { "type": "Transitive", "resolved": "4.1.1", @@ -268,17 +280,6 @@ "System.Security.Principal.Windows": "5.0.0" } }, - "Monai.Deploy.Messaging": { - "type": "Transitive", - "resolved": "1.0.5", - "contentHash": "J8Lskfy8PSVQLDE2uLqh53uaPpqpRJuSGVHpR2jrw+GYnTTDv21j/2gxwG8Hq2NgNOkWLNVi+fFnyWd6WFiUTA==", - "dependencies": { - "Ardalis.GuardClauses": "4.1.1", - "Microsoft.Extensions.Diagnostics.HealthChecks": "6.0.21", - "Newtonsoft.Json": "13.0.3", - "System.IO.Abstractions": "17.2.3" - } - }, "Monai.Deploy.Storage": { "type": "Transitive", "resolved": "0.2.18", diff --git a/src/WorkflowManager/WorkflowManager/Validators/WorkflowValidator.cs b/src/WorkflowManager/WorkflowManager/Validators/WorkflowValidator.cs index 580bdac9f..99bf78a6c 100644 --- a/src/WorkflowManager/WorkflowManager/Validators/WorkflowValidator.cs +++ b/src/WorkflowManager/WorkflowManager/Validators/WorkflowValidator.cs @@ -346,6 +346,9 @@ private void TaskTypeSpecificValidation(Workflow workflow, TaskObject currentTas case Email: ValidateEmailTask(currentTask); break; + case HL7ExportTask: + ValidateHL7ExportTask(workflow, currentTask); + break; } } @@ -616,6 +619,23 @@ private void ValidateExportTask(Workflow workflow, TaskObject currentTask) ValidateInputs(currentTask); } + private void ValidateHL7ExportTask(Workflow workflow, TaskObject currentTask) + { + if (currentTask.ExportDestinations.Any() is false) + { + Errors.Add($"Task: '{currentTask.Id}' does not contain a destination."); + } + + CheckDestinationInMigDestinations(currentTask, workflow.InformaticsGateway); + + if (currentTask.ExportDestinations.Length != currentTask.ExportDestinations.Select(t => t.Name).Distinct().Count()) + { + Errors.Add($"Task: '{currentTask.Id}' contains duplicate destinations."); + } + + ValidateInputs(currentTask); + } + private void ValidateExternalAppTask(Workflow workflow, TaskObject currentTask) { if (currentTask.ExportDestinations.Any() is false) diff --git a/src/WorkflowManager/WorkflowManager/appsettings.json b/src/WorkflowManager/WorkflowManager/appsettings.json index b04fff02f..502dabf97 100755 --- a/src/WorkflowManager/WorkflowManager/appsettings.json +++ b/src/WorkflowManager/WorkflowManager/appsettings.json @@ -58,7 +58,9 @@ "exportComplete": "md.export.complete", "exportRequestPrefix": "md.export.request", "callbackRequest": "md.tasks.callback", - "aideClinicalReviewRequest": "aide.clinical_review.request" + "aideClinicalReviewRequest": "aide.clinical_review.request", + "exportHl7": "md.export.hl7", + "exportHl7Complete": "md.export.hl7complete" }, "dicomAgents": { "dicomWebAgentName": "monaidicomweb", diff --git a/src/WorkflowManager/WorkflowManager/packages.lock.json b/src/WorkflowManager/WorkflowManager/packages.lock.json old mode 100755 new mode 100644 index 8cd76e773..426336806 --- a/src/WorkflowManager/WorkflowManager/packages.lock.json +++ b/src/WorkflowManager/WorkflowManager/packages.lock.json @@ -1138,6 +1138,7 @@ "monai.deploy.workloadmanager.workflowexecuter": { "type": "Project", "dependencies": { + "Monai.Deploy.Messaging": "[1.0.5, )", "Monai.Deploy.WorkflowManager.Common": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Common.Configuration": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Common.Miscellaneous": "[1.0.0, )", diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Monai.Deploy.WorkflowManager.WorkflowExecutor.IntegrationTests.csproj b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Monai.Deploy.WorkflowManager.WorkflowExecutor.IntegrationTests.csproj index 8f12dac09..f871f144e 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Monai.Deploy.WorkflowManager.WorkflowExecutor.IntegrationTests.csproj +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Monai.Deploy.WorkflowManager.WorkflowExecutor.IntegrationTests.csproj @@ -48,6 +48,7 @@ + diff --git a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs index 7ad63a1e8..c849fb622 100755 --- a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs +++ b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs @@ -90,7 +90,7 @@ public WorkflowExecuterServiceTests() PerTaskTypeTimeoutMinutes = new Dictionary { { "taskType", _timeoutForTypeTask } }, Messaging = new MessageBrokerConfiguration { - Topics = new MessageBrokerConfigurationKeys { TaskDispatchRequest = "md.task.dispatch", ExportRequestPrefix = "md.export.request" }, + Topics = new MessageBrokerConfigurationKeys { TaskDispatchRequest = "md.task.dispatch", ExportRequestPrefix = "md.export.request", ExportHL7 = "md.export.hl7" }, DicomAgents = new DicomAgentConfiguration { DicomWebAgentName = "monaidicomweb" } }, MigExternalAppPlugins = new List { { "examplePlugin" } }.ToArray() @@ -3184,7 +3184,6 @@ public async Task ArtifactReceveid_Valid_ReturnesTrue() Assert.True(result); } - [Fact] public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_UpdateTaskOutputArtifactsAsync() { @@ -3193,7 +3192,7 @@ public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_Updat var message = new ArtifactsReceivedEvent { WorkflowInstanceId = "123", TaskId = "456", - Artifacts = new List() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = artifactPath } } + Artifacts = new List() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = $"{new Guid()}/{artifactPath}" } } }; var workflowInstance = new WorkflowInstance { @@ -3213,7 +3212,7 @@ public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_Updat .ReturnsAsync(workflowTemplate); _storageService.Setup(s => s.VerifyObjectsExistAsync(It.IsAny(), It.IsAny>(), It.IsAny())) - .ReturnsAsync(new Dictionary { { $"{artifactPath}", true } }); + .ReturnsAsync(new Dictionary { { $"{message.PayloadId}/{artifactPath}", true } }); //previously received artifacts _artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id)) @@ -3223,6 +3222,525 @@ public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_Updat _workflowInstanceRepository.Verify(w => w.UpdateTaskOutputArtifactsAsync(It.IsAny(), It.IsAny(), It.IsAny>()), Times.Once()); } + [Fact] + public async Task ProcessPayload_WithExportTask_NoExportsFails() + { + var workflowId1 = Guid.NewGuid().ToString(); + var workflowId2 = Guid.NewGuid().ToString(); + var workflowRequest = new WorkflowRequestEvent + { + Bucket = "testbucket", + DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" }, + CorrelationId = Guid.NewGuid().ToString(), + Timestamp = DateTime.UtcNow, + Workflows = new List + { + workflowId1.ToString() + } + }; + + var workflows = new List + { + new WorkflowRevision + { + Id = Guid.NewGuid().ToString(), + WorkflowId = workflowId1, + Revision = 1, + Workflow = new Workflow + { + Name = "Workflowname1", + Description = "Workflowdesc1", + Version = "1", + InformaticsGateway = new InformaticsGateway + { + AeTitle = "aetitle", + ExportDestinations = new string[] { "PROD_PACS" } + }, + Tasks = new TaskObject[] + { + new TaskObject { + Id = Guid.NewGuid().ToString(), + Type = "export", + Description = "taskdesc", + Artifacts = new ArtifactMap + { + Input = new Artifact[] { new Artifact { Name = "dicomexport", Value = "{{ context.input }}" } } + }, + TaskDestinations = Array.Empty(), + ExportDestinations = new ExportDestination[] + { + } + } + } + } + } + }; + + _workflowRepository.Setup(w => w.GetByWorkflowsIdsAsync(new List { workflowId1.ToString() })).ReturnsAsync(workflows); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflowId1.ToString())).ReturnsAsync(workflows[0]); + _workflowInstanceRepository.Setup(w => w.CreateAsync(It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.UpdateTasksAsync(It.IsAny(), It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.GetByWorkflowsIdsAsync(It.IsAny>())).ReturnsAsync(new List()); + _workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(true); + var dcmInfo = new Dictionary() { { "dicomexport", "/dcm" } }; + _artifactMapper.Setup(a => a.TryConvertArtifactVariablesToPath(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), out dcmInfo)).Returns(true); + + _messageBrokerPublisherService.Setup(m => m.Publish(It.IsAny(), It.IsAny())); + + var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() }); + + _messageBrokerPublisherService.Verify(w => w.Publish($"{_configuration.Value.Messaging.Topics.ExportRequestPrefix}.{_configuration.Value.Messaging.DicomAgents.ScuAgentName}", It.IsAny()), Times.Exactly(0)); + + Assert.True(result); + +#pragma warning restore CS8604 // Possible null reference argument. + } + + [Fact] + public async Task ProcessPayload_WithHl7ExportTask_DispatchesExportHl7() + { + var workflowId1 = Guid.NewGuid().ToString(); + var workflowId2 = Guid.NewGuid().ToString(); + var workflowRequest = new WorkflowRequestEvent + { + Bucket = "testbucket", + DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" }, + CorrelationId = Guid.NewGuid().ToString(), + Timestamp = DateTime.UtcNow, + Workflows = new List + { + workflowId1.ToString() + } + }; + + var workflows = new List + { + new WorkflowRevision + { + Id = Guid.NewGuid().ToString(), + WorkflowId = workflowId1, + Revision = 1, + Workflow = new Workflow + { + Name = "Workflowname1", + Description = "Workflowdesc1", + Version = "1", + InformaticsGateway = new InformaticsGateway + { + AeTitle = "aetitle", + ExportDestinations = new string[] { "PROD_PACS" } + }, + Tasks = new TaskObject[] + { + new TaskObject { + Id = Guid.NewGuid().ToString(), + Type = "export_hl7", + Description = "taskdesc", + Artifacts = new ArtifactMap + { + Input = new Artifact[] { new Artifact { Name = "dicomexport", Value = "{{ context.input }}" } } + }, + TaskDestinations = Array.Empty(), + ExportDestinations = new ExportDestination[] + { + new ExportDestination + { + Name = "PROD_PACS" + } + } + } + } + } + } + }; + + _workflowRepository.Setup(w => w.GetByWorkflowsIdsAsync(new List { workflowId1.ToString() })).ReturnsAsync(workflows); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflowId1.ToString())).ReturnsAsync(workflows[0]); + _workflowInstanceRepository.Setup(w => w.CreateAsync(It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.UpdateTasksAsync(It.IsAny(), It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.GetByWorkflowsIdsAsync(It.IsAny>())).ReturnsAsync(new List()); + _workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(true); + var dcmInfo = new Dictionary() { { "dicomexport", "/dcm" } }; + _artifactMapper.Setup(a => a.TryConvertArtifactVariablesToPath(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), out dcmInfo)).Returns(true); + _storageService.Setup(w => w.ListObjectsAsync(workflowRequest.Bucket, "/dcm", true, It.IsAny())) + .ReturnsAsync(new List() + { + new VirtualFileInfo("testfile.dcm", "/dcm/testfile.dcm", "test", ulong.MaxValue) + }); + + Message? messageSent = null; + _messageBrokerPublisherService.Setup(m => m.Publish(It.IsAny(), It.IsAny())) + .Callback((string topic, Message m) => messageSent = m); + + var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() }); + + _messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.ExportHL7, It.IsAny()), Times.Exactly(1)); + + Assert.True(result); + Assert.NotNull(messageSent); +#pragma warning disable CS8604 // Possible null reference argument. + var body = Encoding.UTF8.GetString(messageSent?.Body); + var exportMessageBody = JsonConvert.DeserializeObject(body); + Assert.Empty(exportMessageBody!.PluginAssemblies); + + var exportEventMessage = messageSent.ConvertTo(); + Assert.NotNull(exportEventMessage.Target); + Assert.Equal(DataService.HL7, exportEventMessage.Target.DataService); + +#pragma warning restore CS8604 // Possible null reference argument. + } + + [Fact] + public async Task ProcessPayload_WithHl7ExportTask_NoExportsFails() + { + var workflowId1 = Guid.NewGuid().ToString(); + var workflowId2 = Guid.NewGuid().ToString(); + var workflowRequest = new WorkflowRequestEvent + { + Bucket = "testbucket", + DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" }, + CorrelationId = Guid.NewGuid().ToString(), + Timestamp = DateTime.UtcNow, + Workflows = new List + { + workflowId1.ToString() + } + }; + + var workflows = new List + { + new WorkflowRevision + { + Id = Guid.NewGuid().ToString(), + WorkflowId = workflowId1, + Revision = 1, + Workflow = new Workflow + { + Name = "Workflowname1", + Description = "Workflowdesc1", + Version = "1", + InformaticsGateway = new InformaticsGateway + { + AeTitle = "aetitle", + ExportDestinations = new string[] { "PROD_PACS" } + }, + Tasks = new TaskObject[] + { + new TaskObject { + Id = Guid.NewGuid().ToString(), + Type = "export_hl7", + Description = "taskdesc", + Artifacts = new ArtifactMap + { + Input = new Artifact[] { new Artifact { Name = "dicomexport", Value = "{{ context.input }}" } } + }, + TaskDestinations = Array.Empty(), + ExportDestinations = new ExportDestination[] + { + } + } + } + } + } + }; + + _workflowRepository.Setup(w => w.GetByWorkflowsIdsAsync(new List { workflowId1.ToString() })).ReturnsAsync(workflows); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflowId1.ToString())).ReturnsAsync(workflows[0]); + _workflowInstanceRepository.Setup(w => w.CreateAsync(It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.UpdateTasksAsync(It.IsAny(), It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.GetByWorkflowsIdsAsync(It.IsAny>())).ReturnsAsync(new List()); + _workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(true); + var dcmInfo = new Dictionary() { { "dicomexport", "/dcm" } }; + _artifactMapper.Setup(a => a.TryConvertArtifactVariablesToPath(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), out dcmInfo)).Returns(true); + + _messageBrokerPublisherService.Setup(m => m.Publish(It.IsAny(), It.IsAny())); + + var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() }); + + _messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.ExportHL7, It.IsAny()), Times.Exactly(0)); + + Assert.True(result); + +#pragma warning restore CS8604 // Possible null reference argument. + } + + [Fact] + public async Task ProcessPayload_WithInvalidHl7ExportTask_DoesNotDispatchExportHl7() + { + // because this test has no _artifactMapper.Setup(a => a.TryConvertArtifactVariablesToPath + // it returns no matching artifacts. hence the failure ! + var workflowId1 = Guid.NewGuid().ToString(); + var workflowId2 = Guid.NewGuid().ToString(); + var workflowRequest = new WorkflowRequestEvent + { + Bucket = "testbucket", + DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" }, + CorrelationId = Guid.NewGuid().ToString(), + Timestamp = DateTime.UtcNow, + Workflows = new List + { + workflowId1.ToString() + } + }; + + var workflows = new List + { + new WorkflowRevision + { + Id = Guid.NewGuid().ToString(), + WorkflowId = workflowId1, + Revision = 1, + Workflow = new Workflow + { + Name = "Workflowname1", + Description = "Workflowdesc1", + Version = "1", + InformaticsGateway = new InformaticsGateway + { + AeTitle = "aetitle", + ExportDestinations = new string[] { "PROD_PACS" } + }, + Tasks = new TaskObject[] + { + new TaskObject { + Id = Guid.NewGuid().ToString(), + Type = "export_hl7", + Description = "taskdesc", + Artifacts = new ArtifactMap + { + Input = new Artifact[] { new Artifact { Name = "dicomexport", Value = "{{ context.input }}" } } + }, + TaskDestinations = Array.Empty(), + ExportDestinations = new ExportDestination[] + { + new ExportDestination + { + Name = "PROD_PINS" + } + } + } + } + } + } + }; + + _workflowRepository.Setup(w => w.GetByWorkflowsIdsAsync(new List { workflowId1.ToString() })).ReturnsAsync(workflows); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflowId1.ToString())).ReturnsAsync(workflows[0]); + _workflowInstanceRepository.Setup(w => w.CreateAsync(It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.UpdateTasksAsync(It.IsAny(), It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.GetByWorkflowsIdsAsync(It.IsAny>())).ReturnsAsync(new List()); + _workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(true); + _artifactMapper.Setup(a => a.ConvertArtifactVariablesToPath(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(new Dictionary() { { "dicomexport", "/dcm" } }); + + var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() }); + + _messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.ExportHL7, It.IsAny()), Times.Exactly(0)); + Assert.True(result); + } + + [Fact] + public async Task ProcessTaskUpdate_ValidTaskUpdateEventWithExportHl7TaskDestination_ReturnsTrue() + { + var workflowInstanceId = Guid.NewGuid().ToString(); + var taskId = Guid.NewGuid().ToString(); + + var updateEvent = new TaskUpdateEvent + { + WorkflowInstanceId = workflowInstanceId, + TaskId = "pizza", + ExecutionId = Guid.NewGuid().ToString(), + Status = TaskExecutionStatus.Succeeded, + Reason = FailureReason.None, + Message = "This is a message", + Metadata = new Dictionary(), + CorrelationId = Guid.NewGuid().ToString() + }; + + var workflowId = Guid.NewGuid().ToString(); + + var workflow = new WorkflowRevision + { + Id = Guid.NewGuid().ToString(), + WorkflowId = workflowId, + Revision = 1, + Workflow = new Workflow + { + Name = "Workflowname2", + Description = "Workflowdesc2", + Version = "1", + InformaticsGateway = new InformaticsGateway + { + AeTitle = "aetitle", + ExportDestinations = new string[] { "PROD_PACS" } + }, + Tasks = new TaskObject[] + { + new TaskObject { + Id = "pizza", + Type = "type", + Description = "taskdesc", + TaskDestinations = new TaskDestination[] + { + new TaskDestination + { + Name = "exporttaskid" + }, + } + }, + new TaskObject { + Id = "exporttaskid", + Type = "export_hl7", + Description = "taskdesc", + Artifacts = new ArtifactMap + { + Input = new Artifact[] { new Artifact { Name = "dicomexport", Value = "{{ context.input }}" } } + }, + TaskDestinations = Array.Empty(), + ExportDestinations = new ExportDestination[] + { + new ExportDestination + { + Name = "PROD_PACS" + } + } + } + } + } + }; + + var workflowInstance = new WorkflowInstance + { + Id = workflowInstanceId, + WorkflowId = workflowId, + WorkflowName = workflow.Workflow.Name, + PayloadId = Guid.NewGuid().ToString(), + Status = Status.Created, + BucketId = "bucket", + Tasks = new List + { + new TaskExecution + { + TaskId = "pizza", + Status = TaskExecutionStatus.Dispatched + } + } + }; + + _workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(workflowInstance.Id)).ReturnsAsync(workflowInstance); + _workflowInstanceRepository.Setup(w => w.UpdateTasksAsync(workflowInstance.Id, It.IsAny>())).ReturnsAsync(true); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflowInstance.WorkflowId)).ReturnsAsync(workflow); + _payloadService.Setup(p => p.GetByIdAsync(It.IsAny())).ReturnsAsync(new Payload { PatientDetails = new PatientDetails { } }); + var expectedDcmValue = new Dictionary { { "dicomexport", "/dcm" } }; + _artifactMapper.Setup(a => a.TryConvertArtifactVariablesToPath(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), out expectedDcmValue)).Returns(true); + _storageService.Setup(w => w.ListObjectsAsync(It.IsAny(), "/dcm", true, It.IsAny())) + .ReturnsAsync(new List() + { + new VirtualFileInfo("testfile.dcm", "/dcm/testfile.dcm", "test", ulong.MaxValue) + }); + + var response = await WorkflowExecuterService.ProcessTaskUpdate(updateEvent); + + _messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.ExportHL7, It.IsAny()), Times.Exactly(1)); + + response.Should().BeTrue(); + } + + [Fact] + public async Task ProcessTaskUpdate_ValidTaskUpdateEventWithExportHl7TaskDestination_NoExportDestinations_DoesNotDispatchExport() + { + var workflowInstanceId = Guid.NewGuid().ToString(); + var taskId = Guid.NewGuid().ToString(); + + var updateEvent = new TaskUpdateEvent + { + WorkflowInstanceId = workflowInstanceId, + TaskId = "pizza", + ExecutionId = Guid.NewGuid().ToString(), + Status = TaskExecutionStatus.Succeeded, + Reason = FailureReason.None, + Message = "This is a message", + Metadata = new Dictionary(), + CorrelationId = Guid.NewGuid().ToString() + }; + + var workflowId = Guid.NewGuid().ToString(); + + var workflow = new WorkflowRevision + { + Id = Guid.NewGuid().ToString(), + WorkflowId = workflowId, + Revision = 1, + Workflow = new Workflow + { + Name = "Workflowname2", + Description = "Workflowdesc2", + Version = "1", + InformaticsGateway = new InformaticsGateway + { + AeTitle = "aetitle" + }, + Tasks = new TaskObject[] + { + new TaskObject { + Id = "pizza", + Type = "type", + Description = "taskdesc", + TaskDestinations = new TaskDestination[] + { + new TaskDestination + { + Name = "exporttaskid" + }, + } + }, + new TaskObject { + Id = "exporttaskid", + Type = "export_hl7", + Description = "taskdesc", + Artifacts = new ArtifactMap + { + Input = new Artifact[] { new Artifact { Name = "dicomexport", Value = "{{ context.input }}" } } + }, + TaskDestinations = Array.Empty() + } + } + } + }; + + var workflowInstance = new WorkflowInstance + { + Id = workflowInstanceId, + WorkflowId = workflowId, + WorkflowName = workflow.Workflow.Name, + PayloadId = Guid.NewGuid().ToString(), + Status = Status.Created, + BucketId = "bucket", + Tasks = new List + { + new TaskExecution + { + TaskId = "pizza", + Status = TaskExecutionStatus.Dispatched + } + } + }; + + _workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(workflowInstance.Id)).ReturnsAsync(workflowInstance); + _workflowInstanceRepository.Setup(w => w.UpdateTasksAsync(workflowInstance.Id, It.IsAny>())).ReturnsAsync(true); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflowInstance.WorkflowId)).ReturnsAsync(workflow); + _payloadService.Setup(p => p.GetByIdAsync(It.IsAny())).ReturnsAsync(new Payload { PatientDetails = new PatientDetails { } }); + _artifactMapper.Setup(a => a.ConvertArtifactVariablesToPath(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(new Dictionary { { "dicomexport", "/dcm" } }); + + var response = await WorkflowExecuterService.ProcessTaskUpdate(updateEvent); + + _messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.ExportHL7, It.IsAny()), Times.Exactly(0)); + + response.Should().BeTrue(); + } + } + #pragma warning restore CS8625 // Cannot convert null literal to non-nullable reference type. } diff --git a/tests/UnitTests/WorkflowManager.Tests/packages.lock.json b/tests/UnitTests/WorkflowManager.Tests/packages.lock.json old mode 100755 new mode 100644 index 341932962..bcb898dff --- a/tests/UnitTests/WorkflowManager.Tests/packages.lock.json +++ b/tests/UnitTests/WorkflowManager.Tests/packages.lock.json @@ -1971,6 +1971,7 @@ "monai.deploy.workloadmanager.workflowexecuter": { "type": "Project", "dependencies": { + "Monai.Deploy.Messaging": "[1.0.5, )", "Monai.Deploy.WorkflowManager.Common": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Common.Configuration": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Common.Miscellaneous": "[1.0.0, )",