From 802fc55ac03fb585835f88c2d2f31eabd3df8ebf Mon Sep 17 00:00:00 2001 From: Neil South Date: Wed, 25 Oct 2023 17:32:08 +0100 Subject: [PATCH 1/2] added code to register outputs from ArtifactRecievedd Signed-off-by: Neil South --- .../Services/WorkflowExecuterService.cs | 15 ++++++++ .../ArtifactReceivedEventStepDefinitions.cs | 4 +++ .../Support/MinioDataSeeding.cs | 7 ++++ .../Services/WorkflowExecuterServiceTests.cs | 34 +++++++++++++++++++ 4 files changed, 60 insertions(+) diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index c61205147..a52df669f 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -210,6 +210,8 @@ public async Task ProcessArtifactReceivedAsync(ArtifactsReceivedEvent mess return false; } + await ProcessArtifactReceivedOutputs(message, workflowInstance, taskTemplate, taskId); + var previouslyReceivedArtifactsFromRepo = await _artifactsRepository.GetAllAsync(workflowInstanceId, taskId).ConfigureAwait(false); if (previouslyReceivedArtifactsFromRepo is null || previouslyReceivedArtifactsFromRepo.Count == 0) { @@ -246,6 +248,19 @@ await _artifactsRepository return true; } + private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject task, string taskId) + { + + var artifactsInStorage = (await _storageService.VerifyObjectsExistAsync(workflowInstance.BucketId, message.Artifacts.Select(a => a.Path).ToList(), default)) ?? new Dictionary(); + var messageArtifactsInStorage = message.Artifacts.Where(m => artifactsInStorage.First(a => a.Key == m.Path).Value).ToList(); + + var validArtifacts = new Dictionary(); + messageArtifactsInStorage.ForEach(m => validArtifacts.Add(task.Artifacts.Output.First(t => t.Type == m.Type).Name, m.Path)); + + await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts); + + } + private async Task AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, string taskId, string workflowInstanceId, WorkflowRevision workflowTemplate) { diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/ArtifactReceivedEventStepDefinitions.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/ArtifactReceivedEventStepDefinitions.cs index c5c77c0f4..e6e5da63e 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/ArtifactReceivedEventStepDefinitions.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/ArtifactReceivedEventStepDefinitions.cs @@ -21,6 +21,7 @@ using Monai.Deploy.WorkflowManager.Common.IntegrationTests.Support; using Monai.Deploy.WorkflowManager.Common.WorkflowExecutor.IntegrationTests.Support; using MongoDB.Driver; +using NUnit.Framework; using Polly; using Polly.Retry; using TechTalk.SpecFlow.Infrastructure; @@ -77,6 +78,7 @@ public async Task GivenIHaveAClinicalWorkflowIHaveAWorkflowInstance(string clini _outputHelper.WriteLine("Seeding minio with workflow input artifacts"); await MinioDataSeeding.SeedWorkflowInputArtifacts(workflowInstance.PayloadId); + await MinioDataSeeding.SeedArtifactRecieviedArtifact(workflowInstance.PayloadId); _outputHelper.WriteLine($"Retrieving workflow instance with name={wfiName}"); await MongoClient.CreateWorkflowInstanceDocumentAsync(workflowInstance); @@ -120,7 +122,9 @@ public void ThenICanSeeXArtifactReceivedItemIsCreated(int count) { throw new Exception("Failing Test"); } + var wfitest = MongoClient.GetWorkflowInstanceById(artifactsReceivedItems.FirstOrDefault().WorkflowInstanceId); Assertions.AssertArtifactsReceivedItemMatchesExpectedWorkflow(artifactsReceivedItem, workflow, wfi); + Assert.AreEqual(wfitest.Tasks[1].OutputArtifacts.First().Value, "path"); // this was passed in the message } } }); diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MinioDataSeeding.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MinioDataSeeding.cs index 8b84ef1e8..a8a9624aa 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MinioDataSeeding.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MinioDataSeeding.cs @@ -64,6 +64,13 @@ public async Task SeedWorkflowInputArtifacts(string payloadId, string? folderNam OutputHelper.WriteLine($"Objects seeded"); } + public async Task SeedArtifactRecieviedArtifact(string payloadId) + { + var localPath = Path.Combine(GetDirectory() ?? "", "DICOMs", "full_patient_metadata", "dcm"); + + await MinioClient.AddFileToStorage(localPath, $"path"); + } + public async Task SeedTaskOutputArtifacts(string payloadId, string workflowInstanceId, string executionId, string? folderName = null) { string localPath; diff --git a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs index d443c8663..036bcd84c 100644 --- a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs +++ b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs @@ -3161,6 +3161,40 @@ public async Task ArtifactReceveid_Valid_ReturnesTrue() Assert.True(result); } + [Fact] + public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_UpdateTaskOutputArtifactsAsync() + { + //incoming artifacts + var message = new ArtifactsReceivedEvent + { + WorkflowInstanceId = "123", TaskId = "456", + Artifacts = new List() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = "some path here" } } + }; + var workflowInstance = new WorkflowInstance + { + WorkflowId = "789", Tasks = new List() + { new TaskExecution() { TaskId = "not456" } } + }; + _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))! + .ReturnsAsync(workflowInstance); + //expected artifacts + var templateArtifacts = new OutputArtifact[] + { + new OutputArtifact() { Type = ArtifactType.CT , Name = "CT scan"}, + }; + var taskTemplate = new TaskObject() { Id = "456", Artifacts = new ArtifactMap { Output = templateArtifacts } }; + var workflowTemplate = new WorkflowRevision { Workflow = new Workflow { Tasks = new[] { taskTemplate } } }; + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))! + .ReturnsAsync(workflowTemplate); + + //previously received artifacts + _artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id)) + .ReturnsAsync((List?)null); + + var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message); + + _workflowInstanceRepository.Verify(w => w.UpdateTaskOutputArtifactsAsync(It.IsAny(), It.IsAny(), It.IsAny>()), Times.Once()); + } } #pragma warning restore CS8625 // Cannot convert null literal to non-nullable reference type. } From 23234640b3ce4b0626283f78c83c5400118ea1c5 Mon Sep 17 00:00:00 2001 From: Neil South Date: Wed, 25 Oct 2023 17:58:23 +0100 Subject: [PATCH 2/2] fixup for tests Signed-off-by: Neil South --- .../Services/WorkflowExecuterService.cs | 12 +++++++----- .../Services/WorkflowExecuterServiceTests.cs | 5 ++++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index a52df669f..aafc45bd4 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -252,13 +252,15 @@ private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message { var artifactsInStorage = (await _storageService.VerifyObjectsExistAsync(workflowInstance.BucketId, message.Artifacts.Select(a => a.Path).ToList(), default)) ?? new Dictionary(); - var messageArtifactsInStorage = message.Artifacts.Where(m => artifactsInStorage.First(a => a.Key == m.Path).Value).ToList(); - - var validArtifacts = new Dictionary(); - messageArtifactsInStorage.ForEach(m => validArtifacts.Add(task.Artifacts.Output.First(t => t.Type == m.Type).Name, m.Path)); + if (artifactsInStorage.Any()) + { + var messageArtifactsInStorage = message.Artifacts.Where(m => artifactsInStorage.First(a => a.Key == m.Path).Value).ToList(); - await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts); + var validArtifacts = new Dictionary(); + messageArtifactsInStorage.ForEach(m => validArtifacts.Add(task.Artifacts.Output.First(t => t.Type == m.Type).Name, m.Path)); + await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts); + } } private async Task AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, diff --git a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs index 036bcd84c..d944c3888 100644 --- a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs +++ b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs @@ -3164,11 +3164,12 @@ public async Task ArtifactReceveid_Valid_ReturnesTrue() [Fact] public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_UpdateTaskOutputArtifactsAsync() { + var artifactPath = "some path here"; //incoming artifacts var message = new ArtifactsReceivedEvent { WorkflowInstanceId = "123", TaskId = "456", - Artifacts = new List() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = "some path here" } } + Artifacts = new List() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = artifactPath } } }; var workflowInstance = new WorkflowInstance { @@ -3187,6 +3188,8 @@ public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_Updat _workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))! .ReturnsAsync(workflowTemplate); + _storageService.Setup(s => s.VerifyObjectsExistAsync(It.IsAny(), It.IsAny>(), It.IsAny())).ReturnsAsync(new Dictionary { { artifactPath, true } }); + //previously received artifacts _artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id)) .ReturnsAsync((List?)null);