Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public async Task AddOrUpdateItemAsync(string workflowInstanceId, string taskId,
}
else
{
item.Artifacts = item.Artifacts.Concat(existing.Artifacts).ToList();
item.Artifacts = item.Artifacts.Union(existing.Artifacts).ToList();
var update = Builders<ArtifactReceivedItems>.Update.Set(a => a.Artifacts, item.Artifacts);
await _artifactReceivedItemsCollection
.UpdateOneAsync(a => a.WorkflowInstanceId == workflowInstanceId && a.TaskId == taskId, update)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ await _artifactsRepository
return true;
}

private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject task, string taskId)
private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject taskTemplate, string taskId)
{
var artifactList = message.Artifacts.Select(a => $"{a.Path}").ToList();
var artifactsInStorage = (await _storageService.VerifyObjectsExistAsync(workflowInstance.BucketId, artifactList, default)) ?? new Dictionary<string, bool>();
Expand All @@ -263,22 +263,36 @@ private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message

var messageArtifactsInStorage = message.Artifacts.Where(m => artifactsInStorage.First(a => a.Value && a.Key == $"{m.Path}").Value).ToList();

var addedNew = false;
var validArtifacts = new Dictionary<string, string>();
foreach (var artifact in messageArtifactsInStorage)
{
var match = task.Artifacts.Output.FirstOrDefault(t => t.Type == artifact.Type);
var match = taskTemplate.Artifacts.Output.FirstOrDefault(t => t.Type == artifact.Type);
if (match is not null && validArtifacts.ContainsKey(match!.Name) is false)
{
validArtifacts.Add(match.Name, $"{artifact.Path}");

}
}

var currentTask = workflowInstance.Tasks?.Find(t => t.TaskId == taskId);

currentTask!.OutputArtifacts = validArtifacts; // adding the actual paths here, the parent function does the saving of the changes

_logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts));
await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts);
foreach (var artifact in validArtifacts)
{
if (currentTask?.OutputArtifacts.ContainsKey(artifact.Key) is false)
{
// adding the actual paths here, the parent function does the saving of the changes
currentTask?.OutputArtifacts.Add(artifact.Key, artifact.Value);
addedNew = true;
}
}

if (currentTask is not null && addedNew)
{
_logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts));
await _workflowInstanceRepository.UpdateTaskAsync(workflowInstance.Id, taskId, currentTask);
}
}

private async Task<bool> AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance,
Expand Down Expand Up @@ -511,9 +525,13 @@ public async Task<bool> ProcessExportComplete(ExportCompleteEvent message, strin
return false;
}

if (string.Compare(task.TaskType, ValidationConstants.ExportTaskType, true) == 0)
switch (task.TaskType)
{
return await HandleTaskDestinations(workflowInstance, workflow, task, correlationId);
case TaskTypeConstants.DicomExportTask:
case TaskTypeConstants.HL7ExportTask:
return await HandleTaskDestinations(workflowInstance, workflow, task, correlationId);
default:
break;
}
}

Expand Down Expand Up @@ -612,7 +630,12 @@ private async Task<bool> ExternalAppRequest(ExternalAppRequestEvent externalAppR
return true;
}

private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId, List<string>? plugins = null)
private async Task HandleDicomExportAsync(
WorkflowRevision workflow,
WorkflowInstance workflowInstance,
TaskExecution task,
string correlationId,
List<string>? plugins = null)
{
plugins ??= new List<string>();
var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId);
Expand All @@ -629,15 +652,20 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns
await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched);
}

private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId)
private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts(
WorkflowRevision workflow,
WorkflowInstance workflowInstance,
TaskExecution task,
string correlationId,
bool enforceDcmOnly = true)
{
var exportList = workflow.Workflow?.Tasks?.FirstOrDefault(t => t.Id == task.TaskId)?.ExportDestinations.Select(e => e.Name).ToArray();
if (exportList is null || !exportList.Any())
{
exportList = null;
}

var artifactValues = await GetArtifactValues(workflow, workflowInstance, task, exportList, correlationId);
var artifactValues = await GetArtifactValues(workflow, workflowInstance, task, exportList, correlationId, enforceDcmOnly);

if (artifactValues.IsNullOrEmpty())
{
Expand All @@ -646,7 +674,12 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns
return (exportList, artifactValues);
}

private async Task<string[]> GetArtifactValues(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string[]? exportList, string correlationId)
private async Task<string[]> GetArtifactValues(
WorkflowRevision workflow, WorkflowInstance workflowInstance,
TaskExecution task,
string[]? exportList,
string correlationId,
bool enforceDcmOnly = true)
{
var artifactValues = GetDicomExports(workflow, task, exportList);

Expand All @@ -660,7 +693,7 @@ private async Task<string[]> GetArtifactValues(WorkflowRevision workflow, Workfl
artifact,
true);

var dcmFiles = objects?.Where(o => o.IsValidDicomFile())?.ToList();
var dcmFiles = objects?.Where(o => o.IsValidDicomFile() || enforceDcmOnly is false)?.ToList();

if (dcmFiles?.IsNullOrEmpty() is false)
{
Expand All @@ -681,7 +714,7 @@ private async Task<string[]> GetArtifactValues(WorkflowRevision workflow, Workfl

private async Task HandleHl7ExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId)
{
var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId);
var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId, false);

if (exportList is null || artifactValues is null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

using Monai.Deploy.Messaging.Events;
using Monai.Deploy.WorkflowManager.Common.Contracts.Constants;
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
using Monai.Deploy.WorkflowManager.Common.IntegrationTests.POCO;
#pragma warning disable CS8602 // Dereference of a possibly null reference.
Expand Down Expand Up @@ -209,7 +210,7 @@ public static WorkflowInstance CreateWorkflowInstance(string workflowName)
ExecutionId = Guid.NewGuid().ToString(),
TaskId = "7d7c8b83-6628-413c-9912-a89314e5e2d5",
OutputDirectory = "payloadId/workflows/workflowInstanceId/executionId/",
TaskType = "Export",
TaskType = TaskTypeConstants.DicomExportTask,
Status = TaskExecutionStatus.Dispatched
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

using Monai.Deploy.Messaging.Common;
using Monai.Deploy.WorkflowManager.Common.Contracts.Constants;
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
using Artifact = Monai.Deploy.WorkflowManager.Common.Contracts.Models.Artifact;
// ReSharper disable ArrangeObjectCreationWhenTypeEvident
Expand Down Expand Up @@ -2174,7 +2175,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_1",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 2",
ExportDestinations = new ExportDestination[]
{
Expand Down Expand Up @@ -2235,7 +2236,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_1",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 2",
ExportDestinations = new ExportDestination[]
{
Expand Down Expand Up @@ -2296,7 +2297,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_1",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 2",
ExportDestinations = new ExportDestination[]
{
Expand Down Expand Up @@ -2358,7 +2359,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_1",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 2",
ExportDestinations = new ExportDestination[]
{
Expand All @@ -2375,7 +2376,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_2",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 3",
ExportDestinations = new ExportDestination[]
{
Expand Down Expand Up @@ -2437,7 +2438,7 @@ public static class WorkflowRevisionsTestData
new TaskObject
{
Id = "export_task_1",
Type = "Export",
Type = TaskTypeConstants.DicomExportTask,
Description = "Export Workflow 1 Task 2",
ExportDestinations = new ExportDestination[]
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3184,44 +3184,7 @@ public async Task ArtifactReceveid_Valid_ReturnesTrue()

Assert.True(result);
}
[Fact]
public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_UpdateTaskOutputArtifactsAsync()
{
var artifactPath = "some path here";
//incoming artifacts
var message = new ArtifactsReceivedEvent
{
WorkflowInstanceId = "123", TaskId = "456",
Artifacts = new List<Messaging.Common.Artifact>() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = $"{new Guid()}/{artifactPath}" } }
};
var workflowInstance = new WorkflowInstance
{
WorkflowId = "789", Tasks = new List<TaskExecution>()
{ new TaskExecution() { TaskId = "456" } }
};
_workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))!
.ReturnsAsync(workflowInstance);
//expected artifacts
var templateArtifacts = new OutputArtifact[]
{
new OutputArtifact() { Type = ArtifactType.CT , Name = "CT scan"},
};
var taskTemplate = new TaskObject() { Id = "456", Artifacts = new ArtifactMap { Output = templateArtifacts } };
var workflowTemplate = new WorkflowRevision { Workflow = new Workflow { Tasks = new[] { taskTemplate } } };
_workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))!
.ReturnsAsync(workflowTemplate);

_storageService.Setup(s => s.VerifyObjectsExistAsync(It.IsAny<string>(), It.IsAny<List<string>>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new Dictionary<string, bool> { { $"{message.PayloadId}/{artifactPath}", true } });

//previously received artifacts
_artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id))
.ReturnsAsync((List<ArtifactReceivedItems>?)null);

var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message);

_workflowInstanceRepository.Verify(w => w.UpdateTaskOutputArtifactsAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Dictionary<string, string>>()), Times.Once());
}
[Fact]
public async Task ProcessPayload_WithExportTask_NoExportsFails()
{
Expand Down