Skip to content

Commit

Permalink
Add task artifacts within task executer (#148)
Browse files Browse the repository at this point in the history
* Add task artifacts within task executer

Signed-off-by: Jack Schofield <jack.schofield@answerdigital.com>

* Update to use new UpdateStatusEvent

Signed-off-by: Jack Schofield <jack.schofield@answerdigital.com>
  • Loading branch information
jackschofield23 committed Jun 9, 2022
1 parent d0fe7cc commit 9b7a8cd
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/Monai.Deploy.WorkflowManager.sln
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Monai.Deploy.WorkflowManage
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Monai.Deploy.WorkflowManager.ConditionsResolver.Tests", "..\tests\UnitTests\ConditionsResolver.Tests\Monai.Deploy.WorkflowManager.ConditionsResolver.Tests.csproj", "{918E4DE3-A7BF-4B7F-9B5A-5C36FEFA3C30}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Monai.Deploy.WorkflowManager.Common.Tests", "..\tests\UnitTests\WorkflowManager.Common.Tests\Monai.Deploy.WorkflowManager.Common.Tests.csproj", "{A44F975E-70CA-49D6-8513-78F2D5210EAF}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Monai.Deploy.WorkflowManager.Common.Tests", "..\tests\UnitTests\WorkflowManager.Common.Tests\Monai.Deploy.WorkflowManager.Common.Tests.csproj", "{A44F975E-70CA-49D6-8513-78F2D5210EAF}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down
3 changes: 2 additions & 1 deletion src/TaskManager/Plug-ins/Argo/ExitHookTemplate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ private TaskCallbackEvent GenerateTaskCallbackEvent() =>
TaskId = _taskDispatchEvent.TaskId,
ExecutionId = _taskDispatchEvent.ExecutionId,
CorrelationId = _taskDispatchEvent.CorrelationId,
Identity = "{{workflow.name}}"
Identity = "{{workflow.name}}",
Outputs = _taskDispatchEvent.Outputs ?? new List<Messaging.Common.Storage>()
};

private object GenerateTaskCallbackMessage() =>
Expand Down
3 changes: 2 additions & 1 deletion src/TaskManager/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ private void AcknowledgeMessage<T>(JsonMessage<T> message)
}
}

private static JsonMessage<TaskUpdateEvent> GenerateUpdateEventMessage<T>(JsonMessage<T> message, string executionId, string WorkflowInstanceId, string taskId, ExecutionStatus executionStatus)
private static JsonMessage<TaskUpdateEvent> GenerateUpdateEventMessage<T>(JsonMessage<T> message, string executionId, string WorkflowInstanceId, string taskId, ExecutionStatus executionStatus, List<Messaging.Common.Storage> outputs = null)
{
Guard.Against.Null(message, nameof(message));
Guard.Against.Null(executionStatus, nameof(executionStatus));
Expand All @@ -326,6 +326,7 @@ private static JsonMessage<TaskUpdateEvent> GenerateUpdateEventMessage<T>(JsonMe
WorkflowInstanceId = WorkflowInstanceId,
TaskId = taskId,
Message = executionStatus.Errors,
Outputs = outputs ?? new List<Messaging.Common.Storage>(),
}, TaskManagerApplicationId, message.CorrelationId);
}

Expand Down
11 changes: 10 additions & 1 deletion src/WorkflowExecuter/Common/EventMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,16 @@ public static TaskDispatchEvent ToTaskDispatchEvent(TaskExecution task, string w
Status = TaskExecutionStatus.Created,
TaskPluginArguments = task.TaskPluginArguments,
Inputs = inputs,
Metadata = task.Metadata
TaskPluginType = task.TaskType,
Metadata = task.Metadata,
IntermediateStorage = new Messaging.Common.Storage
{
Bucket = configuration.Settings["bucket"],
RelativeRootPath = $"{task.OutputDirectory}/tmp",
Endpoint = configuration.Settings["endpoint"],
Name = task.TaskId,
SecuredConnection = bool.Parse(configuration.Settings["securedConnection"])
}
};
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowManager/appsettings.Development.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
},
"storage": {
"serviceAssemblyName": "Monai.Deploy.Storage.MinIo.MinIoStorageService, Monai.Deploy.Storage",
"bucketName": "test-bucket",
"settings": {
"endpoint": "localhost:9000",
"credentialServiceUrl": "http://localhost:9000",
"accessKey": "minioadmin",
"accessToken": "minioadmin",
"bucket": "test-bucket",
"region": "eu-west-2",
"securedConnection": false
}
Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowManager/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@
},
"storage": {
"serviceAssemblyName": "Monai.Deploy.Storage.MinIo.MinIoStorageService, Monai.Deploy.Storage",
"bucketName": "test-bucket",
"settings": {
"endpoint": "localhost:9000",
"credentialServiceUrl": "http://localhost:9000",
"accessKey": "minioadmin",
"accessToken": "minioadmin",
"region": "eu-west-2",
"bucket": "test-bucket",
"securedConnection": false
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void ToTaskDispatchEvent_ValidAeTitleWorkflowRequest_ReturnesTrue()
ExecutionId = task.ExecutionId.ToString(),
CorrelationId = correlationId,
Status = TaskExecutionStatus.Created,
TaskPluginType = task.TaskType,
Inputs = new List<Messaging.Common.Storage>
{
new Messaging.Common.Storage
Expand All @@ -70,6 +71,14 @@ public void ToTaskDispatchEvent_ValidAeTitleWorkflowRequest_ReturnesTrue()
TaskPluginArguments = new Dictionary<string, string>
{
{ "key", "value" }
},
IntermediateStorage = new Messaging.Common.Storage
{
Bucket = configuration.Settings["bucket"],
Endpoint = configuration.Settings["endpoint"],
Name = task.TaskId,
RelativeRootPath = "minio/workflowid/taskid/tmp",
SecuredConnection = bool.Parse(configuration.Settings["securedConnection"])
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public WorkflowExecuterServiceTests()
_messageBrokerPublisherService = new Mock<IMessageBrokerPublisherService>();
_storageService = new Mock<IStorageService>();
_configuration = Options.Create(new WorkflowManagerOptions() { Messaging = new MessageBrokerConfiguration { Topics = new MessageBrokerConfigurationKeys { TaskDispatchRequest = "md.task.dispatch" } } });
_storageConfiguration = Options.Create(new StorageServiceConfiguration());
_storageConfiguration = Options.Create(new StorageServiceConfiguration() { Settings = new Dictionary<string, string> { { "bucket", "testbucket" }, { "endpoint", "localhost" }, { "securedConnection", "False" } } });

WorkflowExecuterService = new WorkflowExecuterService(_logger.Object, _configuration, _storageConfiguration, _workflowRepository.Object, _workflowInstanceRepository.Object, _messageBrokerPublisherService.Object, _artifactMapper.Object, _storageService.Object);
}
Expand Down

0 comments on commit 9b7a8cd

Please sign in to comment.