Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ public async Task<bool> ProcessArtifactReceivedAsync(ArtifactsReceivedEvent mess
return false;
}

await ProcessArtifactReceivedOutputs(message, workflowInstance, taskTemplate, taskId);

var previouslyReceivedArtifactsFromRepo = await _artifactsRepository.GetAllAsync(workflowInstanceId, taskId).ConfigureAwait(false);
if (previouslyReceivedArtifactsFromRepo is null || previouslyReceivedArtifactsFromRepo.Count == 0)
{
Expand Down Expand Up @@ -246,6 +248,21 @@ await _artifactsRepository
return true;
}

private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject task, string taskId)
{

var artifactsInStorage = (await _storageService.VerifyObjectsExistAsync(workflowInstance.BucketId, message.Artifacts.Select(a => a.Path).ToList(), default)) ?? new Dictionary<string, bool>();
if (artifactsInStorage.Any())
{
var messageArtifactsInStorage = message.Artifacts.Where(m => artifactsInStorage.First(a => a.Key == m.Path).Value).ToList();

var validArtifacts = new Dictionary<string, string>();
messageArtifactsInStorage.ForEach(m => validArtifacts.Add(task.Artifacts.Output.First(t => t.Type == m.Type).Name, m.Path));

await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts);
}
}

private async Task<bool> AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance,
string taskId, string workflowInstanceId, WorkflowRevision workflowTemplate)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using Monai.Deploy.WorkflowManager.Common.IntegrationTests.Support;
using Monai.Deploy.WorkflowManager.Common.WorkflowExecutor.IntegrationTests.Support;
using MongoDB.Driver;
using NUnit.Framework;
using Polly;
using Polly.Retry;
using TechTalk.SpecFlow.Infrastructure;
Expand Down Expand Up @@ -77,6 +78,7 @@ public async Task GivenIHaveAClinicalWorkflowIHaveAWorkflowInstance(string clini

_outputHelper.WriteLine("Seeding minio with workflow input artifacts");
await MinioDataSeeding.SeedWorkflowInputArtifacts(workflowInstance.PayloadId);
await MinioDataSeeding.SeedArtifactRecieviedArtifact(workflowInstance.PayloadId);

_outputHelper.WriteLine($"Retrieving workflow instance with name={wfiName}");
await MongoClient.CreateWorkflowInstanceDocumentAsync(workflowInstance);
Expand Down Expand Up @@ -120,7 +122,9 @@ public void ThenICanSeeXArtifactReceivedItemIsCreated(int count)
{
throw new Exception("Failing Test");
}
var wfitest = MongoClient.GetWorkflowInstanceById(artifactsReceivedItems.FirstOrDefault().WorkflowInstanceId);
Assertions.AssertArtifactsReceivedItemMatchesExpectedWorkflow(artifactsReceivedItem, workflow, wfi);
Assert.AreEqual(wfitest.Tasks[1].OutputArtifacts.First().Value, "path"); // this was passed in the message
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ public async Task SeedWorkflowInputArtifacts(string payloadId, string? folderNam
OutputHelper.WriteLine($"Objects seeded");
}

public async Task SeedArtifactRecieviedArtifact(string payloadId)
{
var localPath = Path.Combine(GetDirectory() ?? "", "DICOMs", "full_patient_metadata", "dcm");

await MinioClient.AddFileToStorage(localPath, $"path");
}

public async Task SeedTaskOutputArtifacts(string payloadId, string workflowInstanceId, string executionId, string? folderName = null)
{
string localPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3161,6 +3161,43 @@ public async Task ArtifactReceveid_Valid_ReturnesTrue()

Assert.True(result);
}
[Fact]
public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_UpdateTaskOutputArtifactsAsync()
{
var artifactPath = "some path here";
//incoming artifacts
var message = new ArtifactsReceivedEvent
{
WorkflowInstanceId = "123", TaskId = "456",
Artifacts = new List<Messaging.Common.Artifact>() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = artifactPath } }
};
var workflowInstance = new WorkflowInstance
{
WorkflowId = "789", Tasks = new List<TaskExecution>()
{ new TaskExecution() { TaskId = "not456" } }
};
_workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))!
.ReturnsAsync(workflowInstance);
//expected artifacts
var templateArtifacts = new OutputArtifact[]
{
new OutputArtifact() { Type = ArtifactType.CT , Name = "CT scan"},
};
var taskTemplate = new TaskObject() { Id = "456", Artifacts = new ArtifactMap { Output = templateArtifacts } };
var workflowTemplate = new WorkflowRevision { Workflow = new Workflow { Tasks = new[] { taskTemplate } } };
_workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))!
.ReturnsAsync(workflowTemplate);

_storageService.Setup(s => s.VerifyObjectsExistAsync(It.IsAny<string>(), It.IsAny<List<string>>(), It.IsAny<CancellationToken>())).ReturnsAsync(new Dictionary<string, bool> { { artifactPath, true } });

//previously received artifacts
_artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id))
.ReturnsAsync((List<ArtifactReceivedItems>?)null);

var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message);

_workflowInstanceRepository.Verify(w => w.UpdateTaskOutputArtifactsAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Dictionary<string, string>>()), Times.Once());
}
}
#pragma warning restore CS8625 // Cannot convert null literal to non-nullable reference type.
}