From b5011a1cfce75ba2827864ddc5504ada0e3a5b16 Mon Sep 17 00:00:00 2001 From: Neil South Date: Wed, 31 Jan 2024 14:29:55 +0000 Subject: [PATCH 1/5] adding prefetchCount and shortening retry deplay Signed-off-by: Neil South --- src/TaskManager/TaskManager/appsettings.json | 3 ++- src/WorkflowManager/WorkflowManager/appsettings.json | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/TaskManager/TaskManager/appsettings.json b/src/TaskManager/TaskManager/appsettings.json index 16b45b24d..641c46a3c 100755 --- a/src/TaskManager/TaskManager/appsettings.json +++ b/src/TaskManager/TaskManager/appsettings.json @@ -108,7 +108,8 @@ "deadLetterExchange": "monaideploy-dead-letter", "exportRequestQueue": "export_tasks", "deliveryLimit": 3, - "requeueDelay": 30 + "requeueDelay": 3, + "prefetchCount": "5" } }, "storage": { diff --git a/src/WorkflowManager/WorkflowManager/appsettings.json b/src/WorkflowManager/WorkflowManager/appsettings.json index 4b946a500..80848b5f7 100755 --- a/src/WorkflowManager/WorkflowManager/appsettings.json +++ b/src/WorkflowManager/WorkflowManager/appsettings.json @@ -87,7 +87,8 @@ "deadLetterExchange": "monaideploy-dead-letter", "exportRequestQueue": "export_tasks", "deliveryLimit": 3, - "requeueDelay": 30 + "requeueDelay": 3, + "prefetchCount": "5" } }, "storage": { From e7b0a580467e7a4ec825d858136d31aab6d0eab7 Mon Sep 17 00:00:00 2001 From: Neil South Date: Fri, 2 Feb 2024 13:58:38 +0000 Subject: [PATCH 2/5] rough fix for artifact recieved Signed-off-by: Neil South --- .../Logging/Log.200000.Workflow.cs | 3 + .../Logging/Log.700000.Artifact.cs | 9 ++ .../Services/WorkflowExecuterService.cs | 129 ++++++++++++++---- .../TasksApiStepDefinitions.cs | 2 +- .../Services/WorkflowExecuterServiceTests.cs | 99 +++++++------- 5 files changed, 164 insertions(+), 78 deletions(-) diff --git a/src/WorkflowManager/Logging/Log.200000.Workflow.cs b/src/WorkflowManager/Logging/Log.200000.Workflow.cs index c3bc807dc..584429807 100644 --- a/src/WorkflowManager/Logging/Log.200000.Workflow.cs +++ b/src/WorkflowManager/Logging/Log.200000.Workflow.cs @@ -111,5 +111,8 @@ public static partial class Log [LoggerMessage(EventId = 210018, Level = LogLevel.Error, Message = "ExportList or Artifacts are empty! workflowInstanceId {workflowInstanceId} TaskId {taskId}")] public static partial void ExportListOrArtifactsAreEmpty(this ILogger logger, string taskId, string workflowInstanceId); + + [LoggerMessage(EventId = 210019, Level = LogLevel.Error, Message = "Task is missing required input artifacts {taskId} Artifacts {ArtifactsJson}")] + public static partial void TaskIsMissingRequiredInputArtifacts(this ILogger logger, string taskId, string ArtifactsJson); } } diff --git a/src/WorkflowManager/Logging/Log.700000.Artifact.cs b/src/WorkflowManager/Logging/Log.700000.Artifact.cs index aa0d292e7..246e14f2d 100644 --- a/src/WorkflowManager/Logging/Log.700000.Artifact.cs +++ b/src/WorkflowManager/Logging/Log.700000.Artifact.cs @@ -57,6 +57,15 @@ public static partial class Log [LoggerMessage(EventId = 700011, Level = LogLevel.Debug, Message = "adding files to workflowInstance {workflowInstanceId} :Task {taskId} : {artifactList}")] public static partial void AddingFilesToWorkflowInstance(this ILogger logger, string workflowInstanceId, string taskId, string artifactList); + [LoggerMessage(EventId = 700012, Level = LogLevel.Error, Message = "Error finding Task :{taskId}")] + public static partial void ErrorFindingTask(this ILogger logger, string taskId); + + [LoggerMessage(EventId = 700013, Level = LogLevel.Error, Message = "Error finding Task :{taskId} or previousTask {previousTask}")] + public static partial void ErrorFindingTaskOrPrevious(this ILogger logger, string taskId, string previousTask); + + [LoggerMessage(EventId = 700014, Level = LogLevel.Warning, Message = "Error Task :{taskId} cant be trigger as it has missing artifacts {missingtypesJson}")] + public static partial void ErrorTaskMissingArtifacts(this ILogger logger, string taskId, string missingtypesJson); + } } diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index 9b9974607..d4e67be83 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -14,13 +14,11 @@ * limitations under the License. */ -using Ardalis.GuardClauses; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Monai.Deploy.Messaging.API; using Monai.Deploy.Messaging.Events; using Monai.Deploy.Messaging.Messages; -using Monai.Deploy.WorkflowManager.Common.Miscellaneous; using Monai.Deploy.Storage.API; using Monai.Deploy.Storage.Configuration; using Monai.Deploy.WorkflowManager.Common.Miscellaneous.Extensions; @@ -36,7 +34,8 @@ using Monai.Deploy.WorkflowManager.Common.Logging; using Monai.Deploy.WorkflowManager.Common.WorkflowExecuter.Common; using Monai.Deploy.WorkloadManager.WorkflowExecuter.Extensions; -using Newtonsoft.Json; +using Monai.Deploy.Messaging.Common; +using System.Text.Json; namespace Monai.Deploy.WorkflowManager.Common.WorkflowExecuter.Services { @@ -237,14 +236,16 @@ await _artifactsRepository var allArtifacts = taskTemplate.Artifacts.Output.Select(a => a.Type); var unexpectedArtifacts = receivedArtifacts.Except(allArtifacts).ToList(); + var tasksThatCanBeTriggered = GetTasksWithAllInputs(workflowTemplate, taskId, receivedArtifacts); + if (unexpectedArtifacts.Any()) { _logger.UnexpectedArtifactsReceived(taskId, workflowInstanceId, string.Join(',', unexpectedArtifacts)); } - if (!missingArtifacts.Any()) + if (missingArtifacts.Count == 0 || tasksThatCanBeTriggered.Length is not 0) { - return await AllRequiredArtifactsReceivedAsync(message, workflowInstance, taskId, workflowInstanceId, workflowTemplate).ConfigureAwait(false); + return await AllRequiredArtifactsReceivedAsync(message, workflowInstance, taskId, workflowInstanceId, workflowTemplate, receivedArtifacts).ConfigureAwait(false); } _logger.MandatoryOutputArtifactsMissingForTask(taskId, string.Join(',', missingArtifacts)); @@ -257,7 +258,7 @@ private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message var artifactsInStorage = (await _storageService.VerifyObjectsExistAsync(workflowInstance.BucketId, artifactList, default)) ?? new Dictionary(); if (artifactsInStorage.Any(a => a.Value) is false) { - _logger.NoFilesExistInStorage(JsonConvert.SerializeObject(artifactList)); + _logger.NoFilesExistInStorage(JsonSerializer.Serialize(artifactList)); return; } @@ -290,13 +291,13 @@ private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message if (currentTask is not null && addedNew) { - _logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts)); + _logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonSerializer.Serialize(validArtifacts)); await _workflowInstanceRepository.UpdateTaskAsync(workflowInstance.Id, taskId, currentTask); } } private async Task AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, - string taskId, string workflowInstanceId, WorkflowRevision workflowTemplate) + string taskId, string workflowInstanceId, WorkflowRevision workflowTemplate, IEnumerable receivedArtifacts) { var taskExecution = workflowInstance.Tasks.FirstOrDefault(t => t.TaskId == taskId); @@ -307,21 +308,76 @@ private async Task AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEven } await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstanceId, taskId, - TaskExecutionStatus.Succeeded).ConfigureAwait(false); + TaskExecutionStatus.Succeeded).ConfigureAwait(false); // would need to change to be partial success if anot all destination covered // Dispatch Task var taskDispatchedResult = - await HandleTaskDestinations(workflowInstance, workflowTemplate, taskExecution, message.CorrelationId).ConfigureAwait(false); + await HandleTaskDestinations(workflowInstance, workflowTemplate, taskExecution, message.CorrelationId, receivedArtifacts).ConfigureAwait(false); if (taskDispatchedResult is false) { - _logger.LogTaskDispatchFailure(message.PayloadId.ToString(), taskId, workflowInstanceId, workflowTemplate.WorkflowId, JsonConvert.SerializeObject(message.Artifacts)); + _logger.LogTaskDispatchFailure(message.PayloadId.ToString(), taskId, workflowInstanceId, workflowTemplate.WorkflowId, JsonSerializer.Serialize(message.Artifacts)); return false; } return true; } + private string[] GetTasksWithAllInputs(WorkflowRevision workflowTemplate, string currentTaskId, IEnumerable artifactsRecieved) + { + var excutableTasks = new List(); + var task = workflowTemplate.Workflow!.Tasks.FirstOrDefault(t => t.Id == currentTaskId); + if (task is null) + { + _logger.ErrorFindingTask(currentTaskId); + return []; + } + var nextTasks = task.TaskDestinations.Select(t => t.Name); + foreach (var nextTask in nextTasks) + { + var neededInputs = GetTasksInput(workflowTemplate, nextTask, currentTaskId).Where(t => t.Value); + var remainingManditory = neededInputs.Where(i => artifactsRecieved.Any(a => a == i.Key) is false); + if (remainingManditory.Count() is 0) + { + // all manditory inputs found + excutableTasks.Add(nextTask); + } + else + { + _logger.ErrorTaskMissingArtifacts(nextTask, System.Text.Json.JsonSerializer.Serialize(remainingManditory)); + } + } + + return [.. excutableTasks]; + } + + private Dictionary GetTasksInput(WorkflowRevision workflowTemplate, string taskId, string previousTaskId) + { + var results = new Dictionary(); + var task = workflowTemplate.Workflow!.Tasks.FirstOrDefault(t => t.Id == taskId); + var previousTask = workflowTemplate.Workflow!.Tasks.FirstOrDefault(t => t.Id == previousTaskId); + if (previousTask is null || task is null) + { + _logger.ErrorFindingTask(taskId); + return results; + } + + foreach (var artifact in task.Artifacts.Input) + { + var matchType = previousTask.Artifacts.Output.FirstOrDefault(t => t.Name == artifact.Name); + if (matchType is null) + { + _logger.ErrorFindingTaskOrPrevious(taskId, previousTaskId); + } + else + { + results.Add(matchType.Type, artifact.Mandatory); + } + } + + return results; + } + public async Task ProcessFirstWorkflowTask(WorkflowInstance workflowInstance, string correlationId, Payload payload) { if (workflowInstance.Status == Status.Failed) @@ -339,7 +395,7 @@ public async Task ProcessFirstWorkflowTask(WorkflowInstance workflowInstance, st return; } - using var loggingScope = _logger.BeginScope(new LoggingDataDictionary + using var loggingScope = _logger.BeginScope(new Messaging.Common.LoggingDataDictionary { ["workflowInstanceId"] = workflowInstance.Id, ["durationSoFar"] = (DateTime.UtcNow - workflowInstance.StartTime).TotalMilliseconds, @@ -347,7 +403,7 @@ public async Task ProcessFirstWorkflowTask(WorkflowInstance workflowInstance, st }); await SwitchTasksAsync(task, - routerFunc: () => HandleTaskDestinations(workflowInstance, workflow, task, correlationId), + routerFunc: () => HandleTaskDestinations(workflowInstance, workflow, task, correlationId, new List()), exportFunc: () => HandleDicomExportAsync(workflow, workflowInstance, task, correlationId), externalFunc: () => HandleExternalAppAsync(workflow, workflowInstance, task, correlationId), exportHl7Func: () => HandleHl7ExportAsync(workflow, workflowInstance, task, correlationId), @@ -457,7 +513,7 @@ public async Task ProcessTaskUpdate(TaskUpdateEvent message) return await CompleteTask(currentTask, workflowInstance, message.CorrelationId, TaskExecutionStatus.Failed); } - return await HandleTaskDestinations(workflowInstance, workflow, currentTask, message.CorrelationId); + return await HandleTaskDestinations(workflowInstance, workflow, currentTask, message.CorrelationId, new List()); } private async Task HandleUpdatingTaskStatus(TaskExecution taskExecution, string workflowId, TaskExecutionStatus status) @@ -499,7 +555,7 @@ public async Task ProcessExportComplete(ExportCompleteEvent message, strin return false; } - using var loggingScope = _logger.BeginScope(new LoggingDataDictionary + using var loggingScope = _logger.BeginScope(new Messaging.Common.LoggingDataDictionary { ["workflowInstanceId"] = workflowInstance.Id, ["durationSoFar"] = (DateTime.UtcNow - workflowInstance.StartTime).TotalMilliseconds, @@ -529,7 +585,7 @@ public async Task ProcessExportComplete(ExportCompleteEvent message, strin { case TaskTypeConstants.DicomExportTask: case TaskTypeConstants.HL7ExportTask: - return await HandleTaskDestinations(workflowInstance, workflow, task, correlationId); + return await HandleTaskDestinations(workflowInstance, workflow, task, correlationId, new List()); default: break; } @@ -794,7 +850,7 @@ private async Task HandleOutputArtifacts(WorkflowInstance workflowInstance foreach (var artifact in artifactDict) { var outputArtifact = revisionTask.Artifacts.Output.FirstOrDefault(o => o.Name == artifact.Key); - _logger.LogArtifactPassing(new Artifact { Name = artifact.Key, Value = artifact.Value, Mandatory = outputArtifact?.Mandatory ?? true }, artifact.Value, "Post-Task Output Artifact", validOutputArtifacts?.ContainsKey(artifact.Key) ?? false); + _logger.LogArtifactPassing(new Contracts.Models.Artifact { Name = artifact.Key, Value = artifact.Value, Mandatory = outputArtifact?.Mandatory ?? true }, artifact.Value, "Post-Task Output Artifact", validOutputArtifacts?.ContainsKey(artifact.Key) ?? false); } var missingOutputs = revisionTask.Artifacts.Output.Where(o => validArtifacts.Any(m => m.Key == o.Name) is false); @@ -843,7 +899,7 @@ private async Task DispatchTaskDestinations(WorkflowInstance workflowInsta if (string.Equals(taskExec!.TaskType, TaskTypeConstants.RouterTask, StringComparison.InvariantCultureIgnoreCase)) { - await HandleTaskDestinations(workflowInstance, workflow, taskExec!, correlationId); + await HandleTaskDestinations(workflowInstance, workflow, taskExec!, correlationId, new List()); continue; } @@ -881,9 +937,14 @@ private async Task DispatchTaskDestinations(WorkflowInstance workflowInsta return processed; } - private async Task HandleTaskDestinations(WorkflowInstance workflowInstance, WorkflowRevision workflow, TaskExecution task, string correlationId) + private async Task HandleTaskDestinations( + WorkflowInstance workflowInstance, + WorkflowRevision workflow, + TaskExecution task, + string correlationId, + IEnumerable receivedArtifacts) { - var newTaskExecutions = await CreateTaskDestinations(workflowInstance, workflow, task.TaskId); + var newTaskExecutions = await CreateTaskDestinations(workflowInstance, workflow, task.TaskId, receivedArtifacts); if (newTaskExecutions.Any(task => task.Status == TaskExecutionStatus.Failed)) { @@ -902,7 +963,11 @@ private async Task HandleTaskDestinations(WorkflowInstance workflowInstanc return await CompleteTask(task, workflowInstance, correlationId, TaskExecutionStatus.Succeeded); } - private async Task> CreateTaskDestinations(WorkflowInstance workflowInstance, WorkflowRevision workflow, string taskId) + private async Task> CreateTaskDestinations( + WorkflowInstance workflowInstance, + WorkflowRevision workflow, + string taskId, + IEnumerable receivedArtifacts) { var currentTaskDestinations = workflow.Workflow?.Tasks?.SingleOrDefault(t => t.Id == taskId)?.TaskDestinations; @@ -915,10 +980,20 @@ private async Task> CreateTaskDestinations(WorkflowInstance foreach (var taskDest in currentTaskDestinations) { + // have we got all inputs we need ? + var neededInputs = GetTasksInput(workflow, taskDest.Name, taskId).Where(t => t.Value); + var remainingManditory = neededInputs.Where(i => receivedArtifacts.Any(a => a == i.Key) is false); + if (remainingManditory.Count() is not 0) + { + _logger.TaskIsMissingRequiredInputArtifacts(taskDest.Name, JsonSerializer.Serialize(remainingManditory)); + + continue; + } + //Evaluate Conditional if (taskDest.Conditions.IsNullOrEmpty() is false - && taskDest.Conditions.Any(c => string.IsNullOrWhiteSpace(c) is false) - && _conditionalParameterParser.TryParse(taskDest.Conditions, workflowInstance, out var resolvedConditional) is false) + && taskDest.Conditions.Any(c => string.IsNullOrWhiteSpace(c) is false) + && _conditionalParameterParser.TryParse(taskDest.Conditions, workflowInstance, out var resolvedConditional) is false) { _logger.TaskDestinationConditionFalse(resolvedConditional, taskDest.Conditions.CombineConditionString(), taskDest.Name); @@ -979,11 +1054,11 @@ private async Task DispatchTask(WorkflowInstance workflowInstance, Workflo var pathOutputArtifacts = new Dictionary(); try { - pathOutputArtifacts = await _artifactMapper.ConvertArtifactVariablesToPath(outputArtifacts ?? Array.Empty(), workflowInstance.PayloadId, workflowInstance.Id, workflowInstance.BucketId, false); + pathOutputArtifacts = await _artifactMapper.ConvertArtifactVariablesToPath(outputArtifacts ?? Array.Empty(), workflowInstance.PayloadId, workflowInstance.Id, workflowInstance.BucketId, false); } catch (FileNotFoundException) { - _logger.LogTaskDispatchFailure(workflowInstance.PayloadId, taskExec.TaskId, workflowInstance.Id, workflow?.Id, JsonConvert.SerializeObject(pathOutputArtifacts)); + _logger.LogTaskDispatchFailure(workflowInstance.PayloadId, taskExec.TaskId, workflowInstance.Id, workflow?.Id, JsonSerializer.Serialize(pathOutputArtifacts)); workflowInstance.Tasks.Add(taskExec); var updateResult = await HandleUpdatingTaskStatus(taskExec, workflowInstance.WorkflowId, TaskExecutionStatus.Failed); if (updateResult is false) @@ -1000,7 +1075,7 @@ private async Task DispatchTask(WorkflowInstance workflowInstance, Workflo } taskExec.TaskPluginArguments["workflow_name"] = workflow!.Workflow!.Name; - _logger.LogGeneralTaskDispatchInformation(workflowInstance.PayloadId, taskExec.TaskId, workflowInstance.Id, workflow?.Id, JsonConvert.SerializeObject(pathOutputArtifacts)); + _logger.LogGeneralTaskDispatchInformation(workflowInstance.PayloadId, taskExec.TaskId, workflowInstance.Id, workflow?.Id, JsonSerializer.Serialize(pathOutputArtifacts)); var taskDispatchEvent = EventMapper.ToTaskDispatchEvent(taskExec, workflowInstance, pathOutputArtifacts, correlationId, _storageConfiguration); var jsonMesssage = new JsonMessage(taskDispatchEvent, MessageBrokerConfiguration.WorkflowManagerApplicationId, taskDispatchEvent.CorrelationId, Guid.NewGuid().ToString()); @@ -1107,7 +1182,7 @@ public async Task CreateTaskExecutionAsync(TaskObject task, if (task?.Artifacts?.Input.IsNullOrEmpty() is false) { artifactFound = _artifactMapper.TryConvertArtifactVariablesToPath(task?.Artifacts?.Input - ?? Array.Empty(), payloadId, workflowInstanceId, bucketName, true, out inputArtifacts); + ?? Array.Empty(), payloadId, workflowInstanceId, bucketName, true, out inputArtifacts); } return new TaskExecution() diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/TasksApiStepDefinitions.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/TasksApiStepDefinitions.cs index 923297cfe..41da2efbd 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/TasksApiStepDefinitions.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/TasksApiStepDefinitions.cs @@ -49,7 +49,7 @@ public void ThenICanSeeTasksAreReturned(int number) { var result = ApiHelper.Response.Content.ReadAsStringAsync().Result; var response = JsonConvert.DeserializeObject>>(result!); - number.Should().Be(response.Data.Count); + number.Should().Be(response!.Data!.Count); } } } diff --git a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs index 558ae5856..6b92d5efa 100755 --- a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs +++ b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs @@ -234,8 +234,7 @@ public async Task ProcessArtifactReceived_WhenWorkflowTemplateReturnsNull_Return { Workflow = new Workflow { - Tasks = new[] - { new TaskObject() { Id = "not456" } } + Tasks = [new TaskObject() { Id = "not456" }] } }); var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message); @@ -3703,7 +3702,7 @@ public async Task ProcessTaskUpdate_ValidTaskUpdateEventWithExportHl7TaskDestina response.Should().BeTrue(); } - //[Fact] + [Fact] public async Task ProcessPayload_With_Multiple_Taskdestinations_One_Has_Inputs() { var workflowInstanceId = Guid.NewGuid().ToString(); @@ -3723,7 +3722,7 @@ public async Task ProcessPayload_With_Multiple_Taskdestinations_One_Has_Inputs() var workflows = new List { - new WorkflowRevision + new () { Id = Guid.NewGuid().ToString(), WorkflowId = workflowId1, @@ -3736,72 +3735,65 @@ public async Task ProcessPayload_With_Multiple_Taskdestinations_One_Has_Inputs() InformaticsGateway = new InformaticsGateway { AeTitle = "aetitle", - ExportDestinations = new string[] { "PROD_PACS" } + ExportDestinations = ["PROD_PACS"] }, - Tasks = new TaskObject[] - { + Tasks = + [ new TaskObject { Id = "router", Type = "router", Description = "router", Artifacts = new ArtifactMap { - Input = new Artifact[] { new Artifact { Name = "dicomexport", Value = "{{ context.input }}" } }, - Output = new OutputArtifact[] - { - new OutputArtifact + Input = [new Artifact { Name = "dicomexport", Value = "{{ context.input }}" }], + Output = + [ + new () { Name = "Artifact1", Value = "Artifact1Value", Mandatory = true, Type = ArtifactType.DOC }, - new OutputArtifact + new () { Name = "Artifact2", Value = "Artifact2Value", Mandatory = true, Type = ArtifactType.CT } - } + ] }, TaskDestinations = new TaskDestination[] { - new TaskDestination - { + new() { Name = "export1" }, - new TaskDestination + new () { Name = "export2" } } }, - new TaskObject - { + new() { Id ="export1", Type = "export", Artifacts = new ArtifactMap { - Input = new Artifact[] { new Artifact { Name = "artifact", Value = "{{ context.executions.router.artifacts.output.Artifact1 }}" } } + Input = [new () { Name = "Artifact1", Value = "{{ context.executions.router.artifacts.output.Artifact1 }}", Mandatory = true }] }, - ExportDestinations = new ExportDestination[] - { - } + ExportDestinations = [new (){Name = "PROD_PACS"}] }, - new TaskObject - { + new() { Id ="export2", Type = "export", Artifacts = new ArtifactMap { - Input = new Artifact[] { new Artifact { Name = "artifact2", Value = "{{ context.executions.router.artifacts.output.Artifact2 }}" } } + Input = [new () { Name = "Artifact2", Value = "{{ context.executions.router.artifacts.output.Artifact2 }}", Mandatory = true }] }, - ExportDestinations = new ExportDestination[] - { - } + ExportDestinations = [new (){Name = "PROD_PACS"}] }, - } + ] } } }; @@ -3813,29 +3805,29 @@ public async Task ProcessPayload_With_Multiple_Taskdestinations_One_Has_Inputs() PayloadId = Guid.NewGuid().ToString(), Status = Status.Created, BucketId = "bucket", - Tasks = new List - { - new TaskExecution - { - TaskId = "router", - Status = TaskExecutionStatus.Created - }, - //new TaskExecution - //{ - // TaskId = "export1", - // Status = TaskExecutionStatus.Created - //}, - //new TaskExecution - //{ - // TaskId = "export2", - // Status = TaskExecutionStatus.Created - //} - } + Tasks = + [ + new() + { + TaskId = "router", + Status = TaskExecutionStatus.Created + }, + //new TaskExecution + //{ + // TaskId = "export1", + // Status = TaskExecutionStatus.Created + //}, + //new TaskExecution + //{ + // TaskId = "export2", + // Status = TaskExecutionStatus.Created + //} + ] }; var artifactDict = new List { - new Messaging.Common.Storage + new () { Name = "artifactname", RelativeRootPath = "path/to/artifact" @@ -3861,11 +3853,18 @@ public async Task ProcessPayload_With_Multiple_Taskdestinations_One_Has_Inputs() workflowInstance.BucketId, It.Is>(l => l.Any(a => pathList.Any(p => p == a))), It.IsAny())) .ReturnsAsync(new Dictionary() { { pathList.First(), true } }); + _storageService.Setup(w => w.ListObjectsAsync(It.IsAny(), It.IsAny(), true, It.IsAny())) + .ReturnsAsync(new List() + { + new VirtualFileInfo("testfile.dcm", "/dcm/testfile.dcm", "test", ulong.MaxValue) + }); + var mess = new ArtifactsReceivedEvent { WorkflowInstanceId = workflowInstance.Id, TaskId = "router", - Artifacts = [new Messaging.Common.Artifact { Type = ArtifactType.DOC, Path = "path/to/artifact" }] + Artifacts = [new Messaging.Common.Artifact { Type = ArtifactType.DOC, Path = "path/to/artifact" }], + CorrelationId = Guid.NewGuid().ToString() }; @@ -3873,7 +3872,7 @@ public async Task ProcessPayload_With_Multiple_Taskdestinations_One_Has_Inputs() Assert.True(response); //_workflowInstanceRepository.Verify(w => w.UpdateTaskStatusAsync(workflowInstanceId, "router", TaskExecutionStatus.Succeeded)); - _workflowInstanceRepository.Verify(w => w.UpdateTaskStatusAsync(workflowInstanceId, "export1", TaskExecutionStatus.Succeeded)); + _workflowInstanceRepository.Verify(w => w.UpdateTaskStatusAsync(workflowInstanceId, "export1", TaskExecutionStatus.Dispatched)); From 0f161e0e0172fb7fbd6f48ff9761fb0635a6dd61 Mon Sep 17 00:00:00 2001 From: Neil South Date: Thu, 8 Feb 2024 15:45:27 +0000 Subject: [PATCH 3/5] adding seriesUid to payloads Signed-off-by: Neil South --- .../Common/Services/PayloadService.cs | 13 +-- .../Migrations/M001_Payload_addVerion.cs | 2 +- .../M002_Payload_addPayloadDeleted.cs | 2 +- .../Migrations/M005_Payload_seriesUid.cs | 42 +++++++++ .../Contracts/Models/Payload.cs | 7 +- .../Contracts/Models/PayloadDto.cs | 1 + .../Logging/Log.600000.Dicom.cs | 6 ++ .../Storage/Constants/DicomTagConstants.cs | 2 + .../Storage/Services/DicomService.cs | 90 ++++++++++++++----- .../Storage/Services/IDicomService.cs | 15 ++++ 10 files changed, 147 insertions(+), 33 deletions(-) create mode 100644 src/WorkflowManager/Contracts/Migrations/M005_Payload_seriesUid.cs diff --git a/src/WorkflowManager/Common/Services/PayloadService.cs b/src/WorkflowManager/Common/Services/PayloadService.cs index 15d90bc3b..7ceeebd0b 100644 --- a/src/WorkflowManager/Common/Services/PayloadService.cs +++ b/src/WorkflowManager/Common/Services/PayloadService.cs @@ -14,7 +14,6 @@ * limitations under the License. */ -using Ardalis.GuardClauses; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Monai.Deploy.Messaging.Events; @@ -83,6 +82,7 @@ public PayloadService( } var patientDetails = await _dicomService.GetPayloadPatientDetailsAsync(eventPayload.PayloadId.ToString(), eventPayload.Bucket); + var dict = await _dicomService.GetMetaData(eventPayload.PayloadId.ToString(), eventPayload.Bucket).ConfigureAwait(false); var payload = new Payload { @@ -96,7 +96,8 @@ public PayloadService( Timestamp = eventPayload.Timestamp, PatientDetails = patientDetails, PayloadDeleted = PayloadDeleted.No, - Expires = await GetExpiry(DateTime.UtcNow, eventPayload.WorkflowInstanceId) + Expires = await GetExpiry(DateTime.UtcNow, eventPayload.WorkflowInstanceId), + SeriesInstanceUid = _dicomService.GetSeriesInstanceUID(dict) }; if (await _payloadRepository.CreateAsync(payload)) @@ -197,13 +198,7 @@ public async Task DeletePayloadFromStorageAsync(string payloadId) { ArgumentNullException.ThrowIfNullOrWhiteSpace(payloadId, nameof(payloadId)); - var payload = await GetByIdAsync(payloadId); - - if (payload is null) - { - throw new MonaiNotFoundException($"Payload with ID: {payloadId} not found"); - } - + var payload = await GetByIdAsync(payloadId) ?? throw new MonaiNotFoundException($"Payload with ID: {payloadId} not found"); if (payload.PayloadDeleted == PayloadDeleted.InProgress || payload.PayloadDeleted == PayloadDeleted.Yes) { throw new MonaiBadRequestException($"Deletion of files for payload ID: {payloadId} already in progress or already deleted"); diff --git a/src/WorkflowManager/Contracts/Migrations/M001_Payload_addVerion.cs b/src/WorkflowManager/Contracts/Migrations/M001_Payload_addVerion.cs index d377ee09e..e2af2da3f 100644 --- a/src/WorkflowManager/Contracts/Migrations/M001_Payload_addVerion.cs +++ b/src/WorkflowManager/Contracts/Migrations/M001_Payload_addVerion.cs @@ -21,7 +21,7 @@ namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations { public class M001_Payload_addVerion : DocumentMigration { - public M001_Payload_addVerion() : base("1.0.0") { } + public M001_Payload_addVerion() : base("1.0.1") { } public override void Up(BsonDocument document) { diff --git a/src/WorkflowManager/Contracts/Migrations/M002_Payload_addPayloadDeleted.cs b/src/WorkflowManager/Contracts/Migrations/M002_Payload_addPayloadDeleted.cs index 3d80fc912..61cef2542 100644 --- a/src/WorkflowManager/Contracts/Migrations/M002_Payload_addPayloadDeleted.cs +++ b/src/WorkflowManager/Contracts/Migrations/M002_Payload_addPayloadDeleted.cs @@ -21,7 +21,7 @@ namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations { public class M002_Payload_addPayloadDeleted : DocumentMigration { - public M002_Payload_addPayloadDeleted() : base("1.0.1") { } + public M002_Payload_addPayloadDeleted() : base("1.0.2") { } public override void Up(BsonDocument document) { diff --git a/src/WorkflowManager/Contracts/Migrations/M005_Payload_seriesUid.cs b/src/WorkflowManager/Contracts/Migrations/M005_Payload_seriesUid.cs new file mode 100644 index 000000000..411ccfad5 --- /dev/null +++ b/src/WorkflowManager/Contracts/Migrations/M005_Payload_seriesUid.cs @@ -0,0 +1,42 @@ +// +// Copyright 2023 Guy’s and St Thomas’ NHS Foundation Trust +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Monai.Deploy.WorkflowManager.Common.Contracts.Models; +using Mongo.Migration.Migrations.Document; +using MongoDB.Bson; + +namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations +{ + public class M005_Payload_seriesUid : DocumentMigration + { + public M005_Payload_seriesUid() : base("1.0.5") { } + + public override void Up(BsonDocument document) + { + document.Add("SeriesInstanceUid", BsonNull.Create(null).ToJson(), true); + } + + public override void Down(BsonDocument document) + { + try + { + document.Remove("SeriesInstanceUid"); + } + catch + { // can ignore we dont want failures stopping startup ! + } + } + } +} diff --git a/src/WorkflowManager/Contracts/Models/Payload.cs b/src/WorkflowManager/Contracts/Models/Payload.cs index 96033100e..83108e3a4 100755 --- a/src/WorkflowManager/Contracts/Models/Payload.cs +++ b/src/WorkflowManager/Contracts/Models/Payload.cs @@ -27,11 +27,11 @@ namespace Monai.Deploy.WorkflowManager.Common.Contracts.Models { - [CollectionLocation("Payloads"), RuntimeVersion("1.0.4")] + [CollectionLocation("Payloads"), RuntimeVersion("1.0.5")] public class Payload : IDocument { [JsonConverter(typeof(DocumentVersionConvert)), BsonSerializer(typeof(DocumentVersionConverBson))] - public DocumentVersion Version { get; set; } = new DocumentVersion(1, 0, 4); + public DocumentVersion Version { get; set; } = new DocumentVersion(1, 0, 5); [JsonProperty(PropertyName = "id")] public string Id { get; set; } = string.Empty; @@ -71,6 +71,9 @@ public class Payload : IDocument [JsonProperty(PropertyName = "expires")] public DateTime? Expires { get; set; } + [JsonProperty(PropertyName = "series_instance_uid")] + public string? SeriesInstanceUid { get; set; } + } public enum PayloadDeleted diff --git a/src/WorkflowManager/Contracts/Models/PayloadDto.cs b/src/WorkflowManager/Contracts/Models/PayloadDto.cs index 0f7ca2733..c76f1bd33 100644 --- a/src/WorkflowManager/Contracts/Models/PayloadDto.cs +++ b/src/WorkflowManager/Contracts/Models/PayloadDto.cs @@ -36,6 +36,7 @@ public PayloadDto(Payload payload) Files = payload.Files; PatientDetails = payload.PatientDetails; PayloadDeleted = payload.PayloadDeleted; + SeriesInstanceUid = payload.SeriesInstanceUid; } [JsonProperty(PropertyName = "payload_status")] diff --git a/src/WorkflowManager/Logging/Log.600000.Dicom.cs b/src/WorkflowManager/Logging/Log.600000.Dicom.cs index 3a9dc909e..a84146f53 100644 --- a/src/WorkflowManager/Logging/Log.600000.Dicom.cs +++ b/src/WorkflowManager/Logging/Log.600000.Dicom.cs @@ -40,5 +40,11 @@ public static partial class Log [LoggerMessage(EventId = 600006, Level = LogLevel.Debug, Message = "Dicom export marked as failed with {fileStatusCount} files marked as exported.")] public static partial void DicomExportFailed(this ILogger logger, string fileStatusCount); + + [LoggerMessage(EventId = 600007, Level = LogLevel.Error, Message = "Failed to get DICOM metadata from bucket {bucketId}. Payload: {payloadId}")] + public static partial void FailedToGetDicomMetadataFromBucket(this ILogger logger, string payloadId, string bucketId, Exception ex); + + [LoggerMessage(EventId = 600000, Level = LogLevel.Error, Message = "Failed to get DICOM tag {dicomTag} from dictionary")] + public static partial void FailedToGetDicomTagFromDictoionary(this ILogger logger, string dicomTag, Exception ex); } } diff --git a/src/WorkflowManager/Storage/Constants/DicomTagConstants.cs b/src/WorkflowManager/Storage/Constants/DicomTagConstants.cs index 5caf044ae..2caee5c6a 100644 --- a/src/WorkflowManager/Storage/Constants/DicomTagConstants.cs +++ b/src/WorkflowManager/Storage/Constants/DicomTagConstants.cs @@ -29,5 +29,7 @@ public static class DicomTagConstants public const string PatientAgeTag = "00101010"; public const string PatientHospitalIdTag = "00100021"; + + public const string SeriesInstanceUIDTag = "0020000E"; } } diff --git a/src/WorkflowManager/Storage/Services/DicomService.cs b/src/WorkflowManager/Storage/Services/DicomService.cs index 3d560cea5..bd3528a07 100644 --- a/src/WorkflowManager/Storage/Services/DicomService.cs +++ b/src/WorkflowManager/Storage/Services/DicomService.cs @@ -16,7 +16,6 @@ using System.Globalization; using System.Text; -using Ardalis.GuardClauses; using Microsoft.Extensions.Logging; using Monai.Deploy.Storage.API; using Monai.Deploy.WorkflowManager.Common.Contracts.Models; @@ -69,18 +68,18 @@ public async Task GetPayloadPatientDetailsAsync(string payloadId ArgumentNullException.ThrowIfNullOrWhiteSpace(bucketName, nameof(bucketName)); ArgumentNullException.ThrowIfNullOrWhiteSpace(payloadId, nameof(payloadId)); - var items = await _storageService.ListObjectsAsync(bucketName, $"{payloadId}/dcm", true); + var dict = await GetMetaData(payloadId, bucketName); var patientDetails = new PatientDetails { - PatientName = await GetFirstValueAsync(items, payloadId, bucketName, DicomTagConstants.PatientNameTag), - PatientId = await GetFirstValueAsync(items, payloadId, bucketName, DicomTagConstants.PatientIdTag), - PatientSex = await GetFirstValueAsync(items, payloadId, bucketName, DicomTagConstants.PatientSexTag), - PatientAge = await GetFirstValueAsync(items, payloadId, bucketName, DicomTagConstants.PatientAgeTag), - PatientHospitalId = await GetFirstValueAsync(items, payloadId, bucketName, DicomTagConstants.PatientHospitalIdTag) + PatientName = GetFirstValueAsync(dict, DicomTagConstants.PatientNameTag), + PatientId = GetFirstValueAsync(dict, DicomTagConstants.PatientIdTag), + PatientSex = GetFirstValueAsync(dict, DicomTagConstants.PatientSexTag), + PatientAge = GetFirstValueAsync(dict, DicomTagConstants.PatientAgeTag), + PatientHospitalId = GetFirstValueAsync(dict, DicomTagConstants.PatientHospitalIdTag) }; - var dob = await GetFirstValueAsync(items, payloadId, bucketName, DicomTagConstants.PatientDateOfBirthTag); + var dob = GetFirstValueAsync(dict, DicomTagConstants.PatientDateOfBirthTag); if (DateTime.TryParseExact(dob, "yyyyMMdd", CultureInfo.InvariantCulture, DateTimeStyles.None, out var dateOfBirth)) { @@ -90,12 +89,43 @@ public async Task GetPayloadPatientDetailsAsync(string payloadId return patientDetails; } - public async Task GetFirstValueAsync(IList items, string payloadId, string bucketId, string keyId) + private string? GetFirstValueAsync(Dictionary? dict, string keyId) { - ArgumentNullException.ThrowIfNullOrWhiteSpace(bucketId, nameof(bucketId)); - ArgumentNullException.ThrowIfNullOrWhiteSpace(payloadId, nameof(payloadId)); ArgumentNullException.ThrowIfNullOrWhiteSpace(keyId, nameof(keyId)); + if (dict is null) + { + return null; + } + + try + { + if (dict is null) + { + return null; + } + + var value = GetValue(dict, keyId); + + if (!string.IsNullOrWhiteSpace(value)) + { + return value; + } + return null; + } + catch (Exception e) + { + _logger.FailedToGetDicomTagFromDictoionary(keyId, e); + } + return null; + } + + public async Task?> GetMetaData(string payloadId, string bucketId) + { + ArgumentNullException.ThrowIfNullOrWhiteSpace(bucketId, nameof(bucketId)); + ArgumentNullException.ThrowIfNullOrWhiteSpace(payloadId, nameof(payloadId)); + var items = await _storageService.ListObjectsAsync(bucketId, $"{payloadId}/dcm", true); + var dict = new Dictionary(StringComparer.OrdinalIgnoreCase); try { if (items is null || items.Any() is false) @@ -113,20 +143,26 @@ public async Task GetPayloadPatientDetailsAsync(string payloadId var stream = await _storageService.GetObjectAsync(bucketId, filePath); var jsonStr = Encoding.UTF8.GetString(((MemoryStream)stream).ToArray()); - var dict = new Dictionary(StringComparer.OrdinalIgnoreCase); - JsonConvert.PopulateObject(jsonStr, dict); + var dictCurrent = new Dictionary(StringComparer.OrdinalIgnoreCase); + JsonConvert.PopulateObject(jsonStr, dictCurrent); - var value = GetValue(dict, keyId); - if (!string.IsNullOrWhiteSpace(value)) + // merge the two dictionaries + foreach (var (key, value) in dictCurrent) { - return value; + if (dict.ContainsKey(key)) + { + continue; + } + + dict.Add(key, value); } } + return dict; } catch (Exception e) { - _logger.FailedToGetDicomTagFromPayload(payloadId, keyId, bucketId, e); + _logger.FailedToGetDicomMetadataFromBucket(payloadId, bucketId, e); } return null; @@ -141,7 +177,7 @@ public async Task> GetDicomPathsForTaskAsync(string outputDi var dicomFiles = files?.Where(f => f.FilePath.EndsWith(".dcm")); - return dicomFiles?.Select(d => d.FilePath)?.ToList() ?? new List(); + return dicomFiles?.Select(d => d.FilePath)?.ToList() ?? []; } public async Task GetAnyValueAsync(string keyId, string payloadId, string bucketId) @@ -180,7 +216,7 @@ public async Task GetAllValueAsync(string keyId, string payloadId, strin var matchValue = await GetDcmJsonFileValueAtIndexAsync(0, path, bucketId, keyId, listOfJsonFiles); var fileCount = listOfJsonFiles.Count; - for (int i = 0; i < fileCount; i++) + for (var i = 0; i < fileCount; i++) { if (listOfJsonFiles[i].Filename.EndsWith(".dcm")) { @@ -229,7 +265,7 @@ public async Task GetDcmJsonFileValueAtIndexAsync(int index, public string GetValue(Dictionary dict, string keyId) { - if (dict.Any() is false) + if (dict.Count == 0) { return string.Empty; } @@ -261,6 +297,20 @@ public string GetValue(Dictionary dict, string keyId) return result; } + public string? GetSeriesInstanceUID(Dictionary? dict) + { + if (dict is null) + { + return null; + } + + if (dict.TryGetValue(DicomTagConstants.SeriesInstanceUIDTag, out var value)) + { + return value.Value.ToString(); + } + return null; + } + private string TryGetValueAndLogSupported(string vrFullString, DicomValue value, string jsonString) { var result = TryGetValue(value); diff --git a/src/WorkflowManager/Storage/Services/IDicomService.cs b/src/WorkflowManager/Storage/Services/IDicomService.cs index 36edc8fd3..914c96ee5 100644 --- a/src/WorkflowManager/Storage/Services/IDicomService.cs +++ b/src/WorkflowManager/Storage/Services/IDicomService.cs @@ -63,5 +63,20 @@ public interface IDicomService /// /// string GetValue(Dictionary dict, string keyId); + + /// + /// Gets the first metadata froma payloads folder. + /// + /// + /// + /// a dictionary of tags and values + Task?> GetMetaData(string payloadId, string bucketId); + + /// + /// Get the seriers instance UID from the metadata. + /// + /// + /// a string containing the seriers instanceUid + string? GetSeriesInstanceUID(Dictionary? dict); } } From 09df102e6f3282ac54f5f43752360cfbdca68f7a Mon Sep 17 00:00:00 2001 From: Neil South Date: Thu, 8 Feb 2024 16:29:52 +0000 Subject: [PATCH 4/5] changes due to comments Signed-off-by: Neil South --- .../Storage/Services/DicomService.cs | 17 +---------------- .../Services/WorkflowExecuterServiceTests.cs | 12 +----------- 2 files changed, 2 insertions(+), 27 deletions(-) diff --git a/src/WorkflowManager/Storage/Services/DicomService.cs b/src/WorkflowManager/Storage/Services/DicomService.cs index bd3528a07..bf46503e0 100644 --- a/src/WorkflowManager/Storage/Services/DicomService.cs +++ b/src/WorkflowManager/Storage/Services/DicomService.cs @@ -99,11 +99,6 @@ public async Task GetPayloadPatientDetailsAsync(string payloadId try { - if (dict is null) - { - return null; - } - var value = GetValue(dict, keyId); if (!string.IsNullOrWhiteSpace(value)) @@ -150,12 +145,7 @@ public async Task GetPayloadPatientDetailsAsync(string payloadId // merge the two dictionaries foreach (var (key, value) in dictCurrent) { - if (dict.ContainsKey(key)) - { - continue; - } - - dict.Add(key, value); + dict.TryAdd(key, value); } } return dict; @@ -265,11 +255,6 @@ public async Task GetDcmJsonFileValueAtIndexAsync(int index, public string GetValue(Dictionary dict, string keyId) { - if (dict.Count == 0) - { - return string.Empty; - } - var result = string.Empty; if (dict.TryGetValue(keyId, out var value)) diff --git a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs index 6b92d5efa..710715eec 100755 --- a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs +++ b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs @@ -3811,17 +3811,7 @@ public async Task ProcessPayload_With_Multiple_Taskdestinations_One_Has_Inputs() { TaskId = "router", Status = TaskExecutionStatus.Created - }, - //new TaskExecution - //{ - // TaskId = "export1", - // Status = TaskExecutionStatus.Created - //}, - //new TaskExecution - //{ - // TaskId = "export2", - // Status = TaskExecutionStatus.Created - //} + } ] }; From f305fec46d6ee82cfde6051d06998afa612c7d26 Mon Sep 17 00:00:00 2001 From: Neil South Date: Thu, 8 Feb 2024 17:00:07 +0000 Subject: [PATCH 5/5] more changes for sonar cloud Signed-off-by: Neil South --- src/WorkflowManager/Logging/Log.600000.Dicom.cs | 2 +- .../Storage/Services/DicomService.cs | 2 +- .../Services/WorkflowExecuterService.cs | 14 +++++++++----- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/WorkflowManager/Logging/Log.600000.Dicom.cs b/src/WorkflowManager/Logging/Log.600000.Dicom.cs index a84146f53..67b6e3703 100644 --- a/src/WorkflowManager/Logging/Log.600000.Dicom.cs +++ b/src/WorkflowManager/Logging/Log.600000.Dicom.cs @@ -44,7 +44,7 @@ public static partial class Log [LoggerMessage(EventId = 600007, Level = LogLevel.Error, Message = "Failed to get DICOM metadata from bucket {bucketId}. Payload: {payloadId}")] public static partial void FailedToGetDicomMetadataFromBucket(this ILogger logger, string payloadId, string bucketId, Exception ex); - [LoggerMessage(EventId = 600000, Level = LogLevel.Error, Message = "Failed to get DICOM tag {dicomTag} from dictionary")] + [LoggerMessage(EventId = 600008, Level = LogLevel.Error, Message = "Failed to get DICOM tag {dicomTag} from dictionary")] public static partial void FailedToGetDicomTagFromDictoionary(this ILogger logger, string dicomTag, Exception ex); } } diff --git a/src/WorkflowManager/Storage/Services/DicomService.cs b/src/WorkflowManager/Storage/Services/DicomService.cs index bf46503e0..b94e79a17 100644 --- a/src/WorkflowManager/Storage/Services/DicomService.cs +++ b/src/WorkflowManager/Storage/Services/DicomService.cs @@ -167,7 +167,7 @@ public async Task> GetDicomPathsForTaskAsync(string outputDi var dicomFiles = files?.Where(f => f.FilePath.EndsWith(".dcm")); - return dicomFiles?.Select(d => d.FilePath)?.ToList() ?? []; + return dicomFiles?.Select(d => d.FilePath) ?? []; } public async Task GetAnyValueAsync(string keyId, string payloadId, string bucketId) diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index d4e67be83..c392cc67f 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -299,7 +299,7 @@ private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message private async Task AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, string taskId, string workflowInstanceId, WorkflowRevision workflowTemplate, IEnumerable receivedArtifacts) { - var taskExecution = workflowInstance.Tasks.FirstOrDefault(t => t.TaskId == taskId); + var taskExecution = Array.Find([.. workflowInstance.Tasks], t => t.TaskId == taskId); if (taskExecution is null) { @@ -326,7 +326,8 @@ await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstanceId, task private string[] GetTasksWithAllInputs(WorkflowRevision workflowTemplate, string currentTaskId, IEnumerable artifactsRecieved) { var excutableTasks = new List(); - var task = workflowTemplate.Workflow!.Tasks.FirstOrDefault(t => t.Id == currentTaskId); + + var task = Array.Find(workflowTemplate.Workflow!.Tasks, t => t.Id == currentTaskId); if (task is null) { _logger.ErrorFindingTask(currentTaskId); @@ -354,8 +355,9 @@ private string[] GetTasksWithAllInputs(WorkflowRevision workflowTemplate, string private Dictionary GetTasksInput(WorkflowRevision workflowTemplate, string taskId, string previousTaskId) { var results = new Dictionary(); - var task = workflowTemplate.Workflow!.Tasks.FirstOrDefault(t => t.Id == taskId); - var previousTask = workflowTemplate.Workflow!.Tasks.FirstOrDefault(t => t.Id == previousTaskId); + + var task = Array.Find(workflowTemplate.Workflow!.Tasks, t => t.Id == taskId); + var previousTask = Array.Find(workflowTemplate.Workflow!.Tasks, t => t.Id == previousTaskId); if (previousTask is null || task is null) { _logger.ErrorFindingTask(taskId); @@ -969,6 +971,8 @@ private async Task> CreateTaskDestinations( string taskId, IEnumerable receivedArtifacts) { + _ = workflow ?? throw new ArgumentNullException(nameof(workflow)); + var currentTaskDestinations = workflow.Workflow?.Tasks?.SingleOrDefault(t => t.Id == taskId)?.TaskDestinations; var newTaskExecutions = new List(); @@ -981,7 +985,7 @@ private async Task> CreateTaskDestinations( foreach (var taskDest in currentTaskDestinations) { // have we got all inputs we need ? - var neededInputs = GetTasksInput(workflow, taskDest.Name, taskId).Where(t => t.Value); + var neededInputs = GetTasksInput(workflow!, taskDest.Name, taskId).Where(t => t.Value); var remainingManditory = neededInputs.Where(i => receivedArtifacts.Any(a => a == i.Key) is false); if (remainingManditory.Count() is not 0) {