diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index c61205147..aafc45bd4 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -210,6 +210,8 @@ public async Task ProcessArtifactReceivedAsync(ArtifactsReceivedEvent mess return false; } + await ProcessArtifactReceivedOutputs(message, workflowInstance, taskTemplate, taskId); + var previouslyReceivedArtifactsFromRepo = await _artifactsRepository.GetAllAsync(workflowInstanceId, taskId).ConfigureAwait(false); if (previouslyReceivedArtifactsFromRepo is null || previouslyReceivedArtifactsFromRepo.Count == 0) { @@ -246,6 +248,21 @@ await _artifactsRepository return true; } + private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject task, string taskId) + { + + var artifactsInStorage = (await _storageService.VerifyObjectsExistAsync(workflowInstance.BucketId, message.Artifacts.Select(a => a.Path).ToList(), default)) ?? new Dictionary(); + if (artifactsInStorage.Any()) + { + var messageArtifactsInStorage = message.Artifacts.Where(m => artifactsInStorage.First(a => a.Key == m.Path).Value).ToList(); + + var validArtifacts = new Dictionary(); + messageArtifactsInStorage.ForEach(m => validArtifacts.Add(task.Artifacts.Output.First(t => t.Type == m.Type).Name, m.Path)); + + await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts); + } + } + private async Task AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, string taskId, string workflowInstanceId, WorkflowRevision workflowTemplate) { diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/ArtifactReceivedEventStepDefinitions.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/ArtifactReceivedEventStepDefinitions.cs index c5c77c0f4..e6e5da63e 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/ArtifactReceivedEventStepDefinitions.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/ArtifactReceivedEventStepDefinitions.cs @@ -21,6 +21,7 @@ using Monai.Deploy.WorkflowManager.Common.IntegrationTests.Support; using Monai.Deploy.WorkflowManager.Common.WorkflowExecutor.IntegrationTests.Support; using MongoDB.Driver; +using NUnit.Framework; using Polly; using Polly.Retry; using TechTalk.SpecFlow.Infrastructure; @@ -77,6 +78,7 @@ public async Task GivenIHaveAClinicalWorkflowIHaveAWorkflowInstance(string clini _outputHelper.WriteLine("Seeding minio with workflow input artifacts"); await MinioDataSeeding.SeedWorkflowInputArtifacts(workflowInstance.PayloadId); + await MinioDataSeeding.SeedArtifactRecieviedArtifact(workflowInstance.PayloadId); _outputHelper.WriteLine($"Retrieving workflow instance with name={wfiName}"); await MongoClient.CreateWorkflowInstanceDocumentAsync(workflowInstance); @@ -120,7 +122,9 @@ public void ThenICanSeeXArtifactReceivedItemIsCreated(int count) { throw new Exception("Failing Test"); } + var wfitest = MongoClient.GetWorkflowInstanceById(artifactsReceivedItems.FirstOrDefault().WorkflowInstanceId); Assertions.AssertArtifactsReceivedItemMatchesExpectedWorkflow(artifactsReceivedItem, workflow, wfi); + Assert.AreEqual(wfitest.Tasks[1].OutputArtifacts.First().Value, "path"); // this was passed in the message } } }); diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MinioDataSeeding.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MinioDataSeeding.cs index 8b84ef1e8..a8a9624aa 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MinioDataSeeding.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MinioDataSeeding.cs @@ -64,6 +64,13 @@ public async Task SeedWorkflowInputArtifacts(string payloadId, string? folderNam OutputHelper.WriteLine($"Objects seeded"); } + public async Task SeedArtifactRecieviedArtifact(string payloadId) + { + var localPath = Path.Combine(GetDirectory() ?? "", "DICOMs", "full_patient_metadata", "dcm"); + + await MinioClient.AddFileToStorage(localPath, $"path"); + } + public async Task SeedTaskOutputArtifacts(string payloadId, string workflowInstanceId, string executionId, string? folderName = null) { string localPath; diff --git a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs index d443c8663..d944c3888 100644 --- a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs +++ b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs @@ -3161,6 +3161,43 @@ public async Task ArtifactReceveid_Valid_ReturnesTrue() Assert.True(result); } + [Fact] + public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_UpdateTaskOutputArtifactsAsync() + { + var artifactPath = "some path here"; + //incoming artifacts + var message = new ArtifactsReceivedEvent + { + WorkflowInstanceId = "123", TaskId = "456", + Artifacts = new List() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = artifactPath } } + }; + var workflowInstance = new WorkflowInstance + { + WorkflowId = "789", Tasks = new List() + { new TaskExecution() { TaskId = "not456" } } + }; + _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))! + .ReturnsAsync(workflowInstance); + //expected artifacts + var templateArtifacts = new OutputArtifact[] + { + new OutputArtifact() { Type = ArtifactType.CT , Name = "CT scan"}, + }; + var taskTemplate = new TaskObject() { Id = "456", Artifacts = new ArtifactMap { Output = templateArtifacts } }; + var workflowTemplate = new WorkflowRevision { Workflow = new Workflow { Tasks = new[] { taskTemplate } } }; + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))! + .ReturnsAsync(workflowTemplate); + + _storageService.Setup(s => s.VerifyObjectsExistAsync(It.IsAny(), It.IsAny>(), It.IsAny())).ReturnsAsync(new Dictionary { { artifactPath, true } }); + + //previously received artifacts + _artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id)) + .ReturnsAsync((List?)null); + + var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message); + + _workflowInstanceRepository.Verify(w => w.UpdateTaskOutputArtifactsAsync(It.IsAny(), It.IsAny(), It.IsAny>()), Times.Once()); + } } #pragma warning restore CS8625 // Cannot convert null literal to non-nullable reference type. }