From 51e63466ecbe69fe4551aa755348467cd8a1a9f3 Mon Sep 17 00:00:00 2001 From: Jack Schofield Date: Wed, 10 Aug 2022 15:51:48 +0100 Subject: [PATCH 1/5] fix running tasks Signed-off-by: Jack Schofield --- .../Common/Interfaces/ITasksService.cs | 2 + .../Common/Services/TasksService.cs | 10 +- .../Database/Interfaces/ITasksRepository.cs | 10 +- .../Database/Repositories/TasksRepository.cs | 50 +++----- .../Controllers/TasksController.cs | 7 +- .../Features/TasksApi.feature | 13 +- .../TasksApiStepDefinitions.cs | 10 ++ .../TestData/WorkflowInstanceTestData.cs | 112 ++++++++++++++++++ .../Services/TasksServiceTests.cs | 5 +- .../Controllers/TasksControllerTests.cs | 3 +- 10 files changed, 160 insertions(+), 62 deletions(-) diff --git a/src/WorkflowManager/Common/Interfaces/ITasksService.cs b/src/WorkflowManager/Common/Interfaces/ITasksService.cs index 6f7cc4913..fff838fbc 100644 --- a/src/WorkflowManager/Common/Interfaces/ITasksService.cs +++ b/src/WorkflowManager/Common/Interfaces/ITasksService.cs @@ -20,6 +20,8 @@ namespace Monai.Deploy.WorkflowManager.Common.Interfaces { public interface ITasksService : IPaginatedApi { + Task<(IList, long)> GetAllAsync(int? skip = null, int? limit = null); + Task GetTaskAsync(string workflowInstanceId, string taskId, string executionId); } } diff --git a/src/WorkflowManager/Common/Services/TasksService.cs b/src/WorkflowManager/Common/Services/TasksService.cs index 47b7e3d7f..2f19d4043 100644 --- a/src/WorkflowManager/Common/Services/TasksService.cs +++ b/src/WorkflowManager/Common/Services/TasksService.cs @@ -30,15 +30,7 @@ public TasksService(ITasksRepository workflowInstanceRepository) } /// - /// Not Required For Tasks Service is - /// - /// - /// - public async Task CountAsync() - => await _tasksRepository.CountAsync(); - - /// - /// Gets all running tasks + /// Gets all running tasks and a total running task count. /// /// /// diff --git a/src/WorkflowManager/Database/Interfaces/ITasksRepository.cs b/src/WorkflowManager/Database/Interfaces/ITasksRepository.cs index 49fb2eb8c..e30b90df2 100644 --- a/src/WorkflowManager/Database/Interfaces/ITasksRepository.cs +++ b/src/WorkflowManager/Database/Interfaces/ITasksRepository.cs @@ -23,18 +23,12 @@ namespace Monai.Deploy.WorkflowManager.Database.Interfaces public interface ITasksRepository { /// - /// Gets all running tasks. + /// Gets all running tasks and total number of running tasks. /// /// skip. /// limit. /// - Task> GetAllAsync(int? skip, int? limit); - - /// - /// Gets count of Tasks. - /// - /// - Task CountAsync(); + Task<(IList, long)> GetAllAsync(int? skip, int? limit); /// /// Gets Task Execution given workflowInstanceId, taskId and executionId. diff --git a/src/WorkflowManager/Database/Repositories/TasksRepository.cs b/src/WorkflowManager/Database/Repositories/TasksRepository.cs index a1ccd9984..f25f9df58 100644 --- a/src/WorkflowManager/Database/Repositories/TasksRepository.cs +++ b/src/WorkflowManager/Database/Repositories/TasksRepository.cs @@ -80,17 +80,24 @@ public async Task> GetAllAsync(int? ski { try { - var builder = Builders.Filter; - - var filter = builder.Eq("Tasks.Status", TaskExecutionStatus.Accepted); - filter &= builder.Ne("Tasks.Status", TaskExecutionStatus.Dispatched); + var filter = new FilterDefinitionBuilder().Where(x => + x.Status != Status.Succeeded && x.Status != Status.Failed); - return await _workflowInstanceCollection.Aggregate() - .Match(filter) - .Unwind(wf => wf.Tasks) - .Skip(skip ?? 0) - .Limit(limit ?? 500) + var result = await _workflowInstanceCollection + .Find(filter) .ToListAsync(); + + if (result is null || result.Count == 0) + { + return (new List(), 0); + } + + var tasks = result.SelectMany(r => r.Tasks.Where(t => + t.Status != TaskExecutionStatus.Succeeded && + t.Status != TaskExecutionStatus.Failed && + t.Status != TaskExecutionStatus.Canceled)); + + return (tasks.Skip(skip ?? 0).Take(limit ?? 500).ToList(), tasks.Count()); } catch (Exception e) { @@ -121,30 +128,5 @@ public async Task> GetAllAsync(int? ski return default; } } - - public async Task CountAsync() - { - try - { - var builder = Builders.Filter; - - var filter = builder.Eq("Tasks.Status", TaskExecutionStatus.Accepted); - filter &= builder.Ne("Tasks.Status", TaskExecutionStatus.Dispatched); - - var result = await _workflowInstanceCollection.Aggregate() - .Match(filter).ToListAsync(); - if (result is null || result.Count == 0) - { - return 0; - } - return result.Select(r => r.Tasks).Count(); - } - catch (Exception e) - { - _logger.DbCallFailed(nameof(GetAllAsync), e); - - return default; - } - } } } diff --git a/src/WorkflowManager/WorkflowManager/Controllers/TasksController.cs b/src/WorkflowManager/WorkflowManager/Controllers/TasksController.cs index d875417c1..bb3159595 100644 --- a/src/WorkflowManager/WorkflowManager/Controllers/TasksController.cs +++ b/src/WorkflowManager/WorkflowManager/Controllers/TasksController.cs @@ -79,14 +79,11 @@ public async Task GetListAsync([FromQuery] PaginationFilter filte var pageSize = filter.PageSize ?? _options.Value.EndpointSettings.DefaultPageSize; var validFilter = new PaginationFilter(filter.PageNumber, pageSize, _options.Value.EndpointSettings.MaxPageSize); - var result = await _tasksService.GetAllAsync( + var pagedData = await _tasksService.GetAllAsync( (validFilter.PageNumber - 1) * validFilter.PageSize, validFilter.PageSize); - var pagedData = result.IsNullOrEmpty() ? new List() : result.Select(r => r.Tasks).ToList(); - - var dataTotal = await _tasksService.CountAsync(); - var pagedReponse = CreatePagedReponse(pagedData.ToList(), validFilter, dataTotal, _uriService, route); + var pagedReponse = CreatePagedReponse(pagedData.Item1.ToList(), validFilter, pagedData.Item2, _uriService, route); return Ok(pagedReponse); } diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Features/TasksApi.feature b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Features/TasksApi.feature index 2348fa599..bfdb35dd3 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Features/TasksApi.feature +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Features/TasksApi.feature @@ -55,10 +55,19 @@ Scenario Outline: Get details of a Task with non-existent id payload | Non_Existent_ExecutionID_Task_Details_2 | | Non_Existent_TaskID_Task_Details_3 | -@Task_Api @ignore #/tasks/running endpoint needs some dev work to ensure the filter works correctly. Ticket https://github.com/Project-MONAI/monai-deploy-workflow-manager/issues/308 +@Task_Api Scenario: Get details of all Tasks Given I have an endpoint /tasks/running?pageNumber=1&pageSize=10 And I have a Workflow Instance WorkflowInstance_TaskApi_1 with no artifacts When I send a GET request Then I will get a 200 response - And I can see task payload is returned + And I can see 3 tasks are returned + +@Task_Api +Scenario: Get details of all Tasks From Multiple Workflows + Given I have an endpoint /tasks/running?pageNumber=1&pageSize=10 + And I have a Workflow Instance WorkflowInstance_TaskApi_1 with no artifacts + And I have a Workflow Instance WorkflowInstance_TaskApi_2 with no artifacts + When I send a GET request + Then I will get a 200 response + And I can see 6 tasks are returned diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/TasksApiStepDefinitions.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/TasksApiStepDefinitions.cs index a1d074147..859797864 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/TasksApiStepDefinitions.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/TasksApiStepDefinitions.cs @@ -17,7 +17,9 @@ using BoDi; using Monai.Deploy.WorkflowManager.Contracts.Models; using Monai.Deploy.WorkflowManager.IntegrationTests.Support; +using Monai.Deploy.WorkflowManager.Wrappers; using Newtonsoft.Json; +using NUnit.Framework; namespace Monai.Deploy.WorkflowManager.WorkflowExecutor.IntegrationTests.StepDefinitions { @@ -41,5 +43,13 @@ public void ThenICanSeeAnIndividualTaskIsReturned() var response = JsonConvert.DeserializeObject(ApiHelper.Response.Content.ReadAsStringAsync().Result); Assertions.AssertTaskPayload(DataHelper.WorkflowInstances, response); } + + [Then(@"I can see (.*) tasks are returned")] + public void ThenICanSeeTasksAreReturned(int number) + { + var result = ApiHelper.Response.Content.ReadAsStringAsync().Result; + var response = JsonConvert.DeserializeObject>>(result); + Assert.AreEqual(number, response?.Data.Count); + } } } diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs index 8c397db99..2c46321cc 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs @@ -1312,6 +1312,118 @@ public static WorkflowInstance CreateWorkflowInstance(string workflowName) } } }, + new WorkflowInstanceTestData() + { + Name = "WorkflowInstance_TaskApi_2", + WorkflowInstance = new WorkflowInstance() + { + Id = "44a63094-9e36-4ba4-9fea-8e9b76aa875b2", + AeTitle = "Ae_test", + WorkflowId = "a971f5f8-68fa-4cd0-ad34-f20b66675d214", + PayloadId = "e908ff53-d808-4c9b-82b6-698b8c60e8111", + StartTime = DateTime.Now, + Status = Status.Created, + BucketId = TestExecutionConfig.MinioConfig.Bucket, + InputMetaData = new Dictionary() + { + { "", "" } + }, + Tasks = new List + { + new TaskExecution() + { + ExecutionId = "8ff3ea90-0113-4071-9b92-5068f956daeff", + TaskId = "7b8ea05b-8abe-4848-928d-d55f5eef1bc3", + TaskType = "router", + Status = TaskExecutionStatus.Accepted, + InputArtifacts = null, + OutputArtifacts = null, + }, + new TaskExecution() + { + ExecutionId = "a1cd5b89-85e8-4d32-b9aa-bdbc0ff4bbba5", + TaskId = "953c0236-5292-4186-80ee-ef7d4073220b", + TaskType = "export", + Status = TaskExecutionStatus.Succeeded, + InputArtifacts = new Dictionary() + { + {"key_1", "value_1" } + }, + OutputArtifacts = new Dictionary() + { + {"key_1", "value_1" } + }, + OutputDirectory = "payload_id/dcm", + Reason = FailureReason.None, + PreviousTaskId = "PreviousTask", + ExecutionStats = new Dictionary() + { + {"key_1", "value_1" } + }, + ResultMetadata = new Dictionary() + { + {"key_1", "value_1" }, + {"key_2", 1 } + }, + InputParameters = new Dictionary() + { + {"key_1", "value_1" }, + {"key_2", 1 } + }, + TaskPluginArguments = new Dictionary() + { + {"key_1", "value_1" } + }, + TaskStartTime = DateTime.UtcNow + }, + new TaskExecution() + { + ExecutionId = "d1e0a3b7-3026-42cf7-ba04-71c1f50d98f6", + TaskId = "b3b537ae-79e8-4d13-9154-982ee4743595", + TaskType = "argo", + Status = TaskExecutionStatus.Created, + InputArtifacts = null, + OutputArtifacts = null, + }, + new TaskExecution() + { + ExecutionId = "666faff9-c702-481a9-ae37-5d92e1f6b324", + TaskId = "aad3762a-5c49-499b-a368-e5f9b98408e4", + TaskType = "export", + Status = TaskExecutionStatus.Exported, + InputArtifacts = null, + OutputArtifacts = null, + }, + new TaskExecution() + { + ExecutionId = "3b30b992-a87b-48865-9176-824b751f076e", + TaskId = "1ecdcc90-a999-48fa-a507-99d4ea7c9cb0", + TaskType = "argo", + Status = TaskExecutionStatus.Canceled, + InputArtifacts = null, + OutputArtifacts = null, + }, + new TaskExecution() + { + ExecutionId = "d9b54e70-d016-476d-b9c3-86ff8b17ef786", + TaskId = "a08920eb-0895-4272-ad81-54f2046d8438", + TaskType = "argo", + Status = TaskExecutionStatus.Succeeded, + InputArtifacts = null, + OutputArtifacts = null, + }, + new TaskExecution() + { + ExecutionId = "0c01f526-e574-40df-b21b-99cd16f5c305", + TaskId = "93d8e1c5-39b1-43f0-9bac-372c438b91c0", + TaskType = "argo", + Status = TaskExecutionStatus.Failed, + InputArtifacts = null, + OutputArtifacts = null, + }, + } + } + } }; } } diff --git a/tests/UnitTests/Common.Tests/Services/TasksServiceTests.cs b/tests/UnitTests/Common.Tests/Services/TasksServiceTests.cs index 204e8e2a5..3f0a96174 100644 --- a/tests/UnitTests/Common.Tests/Services/TasksServiceTests.cs +++ b/tests/UnitTests/Common.Tests/Services/TasksServiceTests.cs @@ -58,12 +58,13 @@ public async Task GetListAsync_TasksExist_ReturnsList() }; _tasksRepository.Setup(tr => tr.GetAllAsync(It.IsAny(), It.IsAny())) - .ReturnsAsync(() => taskExecution); + .ReturnsAsync(() => (taskExecution, 1)); var result = await TasksService.GetAllAsync(); result.Should().NotBeNull(); - result.Count.Should().Be(1); + result.Item1.Count.Should().Be(1); + result.Item2.Should().Be(1); var objectResult = Assert.IsType(result[0].Tasks); diff --git a/tests/UnitTests/WorkflowManager.Tests/Controllers/TasksControllerTests.cs b/tests/UnitTests/WorkflowManager.Tests/Controllers/TasksControllerTests.cs index 63a0c8330..94a42f93c 100644 --- a/tests/UnitTests/WorkflowManager.Tests/Controllers/TasksControllerTests.cs +++ b/tests/UnitTests/WorkflowManager.Tests/Controllers/TasksControllerTests.cs @@ -77,8 +77,7 @@ public async Task GetListAsync_TasksExist_ReturnsList() } } }; - _tasksService.Setup(w => w.CountAsync()).ReturnsAsync(() => 1); - _tasksService.Setup(w => w.GetAllAsync(It.IsAny(), It.IsAny())).ReturnsAsync(() => new List { new WorkflowInstanceTasksUnwindResult { Tasks = taskExecution } }); + _tasksService.Setup(w => w.GetAllAsync(It.IsAny(), It.IsAny())).ReturnsAsync(() => (new List { taskExecution }, 1)); _uriService.Setup(s => s.GetPageUriString(It.IsAny(), It.IsAny())).Returns(() => "unitTest"); var result = await TasksController.GetListAsync(new Filter.PaginationFilter()); From a71dc7695be3c10e39401729badca4c080cb40ee Mon Sep 17 00:00:00 2001 From: Jack Schofield Date: Thu, 11 Aug 2022 11:33:30 +0100 Subject: [PATCH 2/5] add no task test Signed-off-by: Jack Schofield --- .../Features/TasksApi.feature | 9 ++ .../TestData/WorkflowInstanceTestData.cs | 112 ++++++++++++++++++ 2 files changed, 121 insertions(+) diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Features/TasksApi.feature b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Features/TasksApi.feature index bfdb35dd3..77975b77b 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Features/TasksApi.feature +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Features/TasksApi.feature @@ -71,3 +71,12 @@ Scenario: Get details of all Tasks From Multiple Workflows When I send a GET request Then I will get a 200 response And I can see 6 tasks are returned + +@Task_Api +Scenario: Get details of all Tasks From Multiple Workflows returns no tasks + Given I have an endpoint /tasks/running?pageNumber=1&pageSize=10 + And I have a Workflow Instance WorkflowInstance_TaskApi_1 with no artifacts + And I have a Workflow Instance WorkflowInstance_TaskApi_2 with no artifacts + When I send a GET request + Then I will get a 200 response + And I can see 0 tasks are returned diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs index 2c46321cc..f805f8c29 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs @@ -1423,6 +1423,118 @@ public static WorkflowInstance CreateWorkflowInstance(string workflowName) }, } } + }, + new WorkflowInstanceTestData() + { + Name = "WorkflowInstance_TaskApi_3", + WorkflowInstance = new WorkflowInstance() + { + Id = "44a63094-9e36-4ba4-9fea-8e9b76aa875b2", + AeTitle = "Ae_test", + WorkflowId = "a971f5f8-68fa-4cd0-ad34-f20b66675d214", + PayloadId = "e908ff53-d808-4c9b-82b6-698b8c60e8111", + StartTime = DateTime.Now, + Status = Status.Created, + BucketId = TestExecutionConfig.MinioConfig.Bucket, + InputMetaData = new Dictionary() + { + { "", "" } + }, + Tasks = new List + { + new TaskExecution() + { + ExecutionId = "8ff3ea90-0113-4071-9b92-5068f956daeff", + TaskId = "7b8ea05b-8abe-4848-928d-d55f5eef1bc3", + TaskType = "router", + Status = TaskExecutionStatus.Succeeded, + InputArtifacts = null, + OutputArtifacts = null, + }, + new TaskExecution() + { + ExecutionId = "a1cd5b89-85e8-4d32-b9aa-bdbc0ff4bbba5", + TaskId = "953c0236-5292-4186-80ee-ef7d4073220b", + TaskType = "export", + Status = TaskExecutionStatus.Succeeded, + InputArtifacts = new Dictionary() + { + {"key_1", "value_1" } + }, + OutputArtifacts = new Dictionary() + { + {"key_1", "value_1" } + }, + OutputDirectory = "payload_id/dcm", + Reason = FailureReason.None, + PreviousTaskId = "PreviousTask", + ExecutionStats = new Dictionary() + { + {"key_1", "value_1" } + }, + ResultMetadata = new Dictionary() + { + {"key_1", "value_1" }, + {"key_2", 1 } + }, + InputParameters = new Dictionary() + { + {"key_1", "value_1" }, + {"key_2", 1 } + }, + TaskPluginArguments = new Dictionary() + { + {"key_1", "value_1" } + }, + TaskStartTime = DateTime.UtcNow + }, + new TaskExecution() + { + ExecutionId = "d1e0a3b7-3026-42cf7-ba04-71c1f50d98f6", + TaskId = "b3b537ae-79e8-4d13-9154-982ee4743595", + TaskType = "argo", + Status = TaskExecutionStatus.Succeeded, + InputArtifacts = null, + OutputArtifacts = null, + }, + new TaskExecution() + { + ExecutionId = "666faff9-c702-481a9-ae37-5d92e1f6b324", + TaskId = "aad3762a-5c49-499b-a368-e5f9b98408e4", + TaskType = "export", + Status = TaskExecutionStatus.Succeeded, + InputArtifacts = null, + OutputArtifacts = null, + }, + new TaskExecution() + { + ExecutionId = "3b30b992-a87b-48865-9176-824b751f076e", + TaskId = "1ecdcc90-a999-48fa-a507-99d4ea7c9cb0", + TaskType = "argo", + Status = TaskExecutionStatus.Canceled, + InputArtifacts = null, + OutputArtifacts = null, + }, + new TaskExecution() + { + ExecutionId = "d9b54e70-d016-476d-b9c3-86ff8b17ef786", + TaskId = "a08920eb-0895-4272-ad81-54f2046d8438", + TaskType = "argo", + Status = TaskExecutionStatus.Failed, + InputArtifacts = null, + OutputArtifacts = null, + }, + new TaskExecution() + { + ExecutionId = "0c01f526-e574-40df-b21b-99cd16f5c305", + TaskId = "93d8e1c5-39b1-43f0-9bac-372c438b91c0", + TaskType = "argo", + Status = TaskExecutionStatus.Failed, + InputArtifacts = null, + OutputArtifacts = null, + }, + } + } } }; } From 5ea4df50e926111d171be5238cf7afad6b96fd2a Mon Sep 17 00:00:00 2001 From: Jack Schofield Date: Thu, 11 Aug 2022 13:28:27 +0100 Subject: [PATCH 3/5] fix test Signed-off-by: Jack Schofield --- .../Features/TasksApi.feature | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Features/TasksApi.feature b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Features/TasksApi.feature index 77975b77b..559ed3db7 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Features/TasksApi.feature +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Features/TasksApi.feature @@ -75,8 +75,7 @@ Scenario: Get details of all Tasks From Multiple Workflows @Task_Api Scenario: Get details of all Tasks From Multiple Workflows returns no tasks Given I have an endpoint /tasks/running?pageNumber=1&pageSize=10 - And I have a Workflow Instance WorkflowInstance_TaskApi_1 with no artifacts - And I have a Workflow Instance WorkflowInstance_TaskApi_2 with no artifacts + And I have a Workflow Instance WorkflowInstance_TaskApi_3 with no artifacts When I send a GET request Then I will get a 200 response And I can see 0 tasks are returned From 689969c9f463f88a9d6768436fd425fdefc37572 Mon Sep 17 00:00:00 2001 From: Jack Schofield Date: Thu, 11 Aug 2022 13:33:21 +0100 Subject: [PATCH 4/5] remove unused file Signed-off-by: Jack Schofield --- .../WorkflowInstanceTasksUnwindResult.cs | 52 ------------------- 1 file changed, 52 deletions(-) delete mode 100644 src/WorkflowManager/Contracts/Models/WorkflowInstanceTasksUnwindResult.cs diff --git a/src/WorkflowManager/Contracts/Models/WorkflowInstanceTasksUnwindResult.cs b/src/WorkflowManager/Contracts/Models/WorkflowInstanceTasksUnwindResult.cs deleted file mode 100644 index 9bb7a0401..000000000 --- a/src/WorkflowManager/Contracts/Models/WorkflowInstanceTasksUnwindResult.cs +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2021-2022 MONAI Consortium - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using Newtonsoft.Json; -using System; -using System.Collections.Generic; - -namespace Monai.Deploy.WorkflowManager.Contracts.Models -{ - public class WorkflowInstanceTasksUnwindResult - { - [JsonProperty(PropertyName = "id")] - public string Id { get; set; } = string.Empty; - - [JsonProperty(PropertyName = "ae_title")] - public string AeTitle { get; set; } = string.Empty; - - [JsonProperty(PropertyName = "workflow_id")] - public string WorkflowId { get; set; } = string.Empty; - - [JsonProperty(PropertyName = "payload_id")] - public string PayloadId { get; set; } = string.Empty; - - [JsonProperty(PropertyName = "start_time")] - public DateTime StartTime { get; set; } - - [JsonProperty(PropertyName = "status")] - public Status Status { get; set; } - - [JsonProperty(PropertyName = "bucket_id")] - public string BucketId { get; set; } = string.Empty; - - [JsonProperty(PropertyName = "input_metadata")] - public Dictionary InputMetaData { get; set; } = new Dictionary(); - - [JsonProperty(PropertyName = "tasks")] - public TaskExecution Tasks { get; set; } - } -} From bfd16587e54a9ab56e98e7d8c8067a31affe038a Mon Sep 17 00:00:00 2001 From: Jack Schofield Date: Fri, 12 Aug 2022 15:44:59 +0100 Subject: [PATCH 5/5] fix rebase issues Signed-off-by: Jack Schofield --- .../Common/Interfaces/ITasksService.cs | 2 +- .../Common/Services/TasksService.cs | 2 +- .../Contracts/Models/TaskExecution.cs | 3 ++ .../Database/Repositories/TasksRepository.cs | 4 +-- .../MonaiBackgroundService/Worker.cs | 10 +++---- .../Services/WorkflowExecuterService.cs | 1 + .../Services/TasksServiceTests.cs | 16 ++++------ .../WorkerTests.cs | 30 ++++++++----------- 8 files changed, 31 insertions(+), 37 deletions(-) diff --git a/src/WorkflowManager/Common/Interfaces/ITasksService.cs b/src/WorkflowManager/Common/Interfaces/ITasksService.cs index fff838fbc..93254d907 100644 --- a/src/WorkflowManager/Common/Interfaces/ITasksService.cs +++ b/src/WorkflowManager/Common/Interfaces/ITasksService.cs @@ -18,7 +18,7 @@ namespace Monai.Deploy.WorkflowManager.Common.Interfaces { - public interface ITasksService : IPaginatedApi + public interface ITasksService { Task<(IList, long)> GetAllAsync(int? skip = null, int? limit = null); diff --git a/src/WorkflowManager/Common/Services/TasksService.cs b/src/WorkflowManager/Common/Services/TasksService.cs index 2f19d4043..743e82368 100644 --- a/src/WorkflowManager/Common/Services/TasksService.cs +++ b/src/WorkflowManager/Common/Services/TasksService.cs @@ -35,7 +35,7 @@ public TasksService(ITasksRepository workflowInstanceRepository) /// /// /// - public async Task> GetAllAsync(int? skip = null, int? limit = null) + public async Task<(IList, long)> GetAllAsync(int? skip = null, int? limit = null) => await _tasksRepository.GetAllAsync(skip, limit); /// diff --git a/src/WorkflowManager/Contracts/Models/TaskExecution.cs b/src/WorkflowManager/Contracts/Models/TaskExecution.cs index c08f2ebd9..3d2fd9636 100644 --- a/src/WorkflowManager/Contracts/Models/TaskExecution.cs +++ b/src/WorkflowManager/Contracts/Models/TaskExecution.cs @@ -26,6 +26,9 @@ public class TaskExecution [JsonProperty(PropertyName = "execution_id")] public string ExecutionId { get; set; } = string.Empty; + [JsonProperty(PropertyName = "workflow_instance_id")] + public string WorkflowInstanceId { get; set; } = string.Empty; + [JsonProperty(PropertyName = "task_type")] public string TaskType { get; set; } = string.Empty; diff --git a/src/WorkflowManager/Database/Repositories/TasksRepository.cs b/src/WorkflowManager/Database/Repositories/TasksRepository.cs index f25f9df58..44f7f8005 100644 --- a/src/WorkflowManager/Database/Repositories/TasksRepository.cs +++ b/src/WorkflowManager/Database/Repositories/TasksRepository.cs @@ -76,7 +76,7 @@ private static async Task EnsureIndex(IMongoCollection workflo } } - public async Task> GetAllAsync(int? skip, int? limit) + public async Task<(IList, long)> GetAllAsync(int? skip, int? limit) { try { @@ -103,7 +103,7 @@ public async Task> GetAllAsync(int? ski { _logger.DbCallFailed(nameof(GetAllAsync), e); - return new List(); + return (new List(), 0); } } diff --git a/src/WorkflowManager/MonaiBackgroundService/Worker.cs b/src/WorkflowManager/MonaiBackgroundService/Worker.cs index c5c21c407..c63fd2a4d 100644 --- a/src/WorkflowManager/MonaiBackgroundService/Worker.cs +++ b/src/WorkflowManager/MonaiBackgroundService/Worker.cs @@ -67,17 +67,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) public async Task DoWork() { var runningTasks = await _tasksService.GetAllAsync(); - foreach (var workflow in runningTasks.Where(t => t.Tasks.TimeoutInterval != 0 && t.Tasks.Timeout < DateTime.UtcNow)) + foreach (var task in runningTasks.Item1.Where(t => t.TimeoutInterval != 0 && t.Timeout < DateTime.UtcNow)) { - var task = workflow.Tasks; - task.ExecutionStats.TryGetValue(IdentityKey, out var identity); - var workflowInstanceId = workflow.WorkflowId; + var correlationId = Guid.NewGuid().ToString(); - await PublishTimeoutUpdateEvent(task, correlationId, workflowInstanceId).ConfigureAwait(false); // -> task manager + await PublishTimeoutUpdateEvent(task, correlationId, task.WorkflowInstanceId).ConfigureAwait(false); // -> task manager - await PublishCancellationEvent(task, correlationId, identity ?? String.Empty, workflowInstanceId).ConfigureAwait(false); // -> workflow executor + await PublishCancellationEvent(task, correlationId, identity ?? String.Empty, task.WorkflowInstanceId).ConfigureAwait(false); // -> workflow executor } } diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index 40c4d328d..84e3007fa 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -614,6 +614,7 @@ private async Task CreateTaskExecutionAsync(TaskObject task, TaskPluginArguments = newTaskArgs, TaskStartTime = DateTime.UtcNow, TaskId = task.Id, + WorkflowInstanceId = workflowInstanceId, Status = TaskExecutionStatus.Created, Reason = FailureReason.None, InputArtifacts = await _artifactMapper.ConvertArtifactVariablesToPath(task?.Artifacts?.Input ?? Array.Empty(), payloadId, workflowInstanceId, bucketName), diff --git a/tests/UnitTests/Common.Tests/Services/TasksServiceTests.cs b/tests/UnitTests/Common.Tests/Services/TasksServiceTests.cs index 3f0a96174..d961a68fb 100644 --- a/tests/UnitTests/Common.Tests/Services/TasksServiceTests.cs +++ b/tests/UnitTests/Common.Tests/Services/TasksServiceTests.cs @@ -44,16 +44,12 @@ public async Task GetListAsync_TasksExist_ReturnsList() var expectedTaskId = Guid.NewGuid().ToString(); var expectedExecutionId = Guid.NewGuid().ToString(); - var taskExecution = new List { - new WorkflowInstanceTasksUnwindResult + var taskExecution = new List { + new TaskExecution { - Tasks = - new TaskExecution - { - ExecutionId = expectedExecutionId, - TaskId = expectedTaskId, - Status = TaskExecutionStatus.Dispatched - } + ExecutionId = expectedExecutionId, + TaskId = expectedTaskId, + Status = TaskExecutionStatus.Dispatched } }; @@ -66,7 +62,7 @@ public async Task GetListAsync_TasksExist_ReturnsList() result.Item1.Count.Should().Be(1); result.Item2.Should().Be(1); - var objectResult = Assert.IsType(result[0].Tasks); + var objectResult = Assert.IsType(result.Item1.First()); objectResult.Should().NotBeNull(); objectResult.ExecutionId.Should().Be(expectedExecutionId); diff --git a/tests/UnitTests/MonaiBackgroundService.Tests/WorkerTests.cs b/tests/UnitTests/MonaiBackgroundService.Tests/WorkerTests.cs index f9626e2b3..c4ba9885c 100644 --- a/tests/UnitTests/MonaiBackgroundService.Tests/WorkerTests.cs +++ b/tests/UnitTests/MonaiBackgroundService.Tests/WorkerTests.cs @@ -59,28 +59,24 @@ public async Task MonaiBackgroundService_DoWork_ShouldPublishMessages() var expectedExecutionId = Guid.NewGuid().ToString(); var workflowInstanceId = Guid.NewGuid().ToString(); - var taskExecution = new List { - new WorkflowInstanceTasksUnwindResult + var taskExecution = new List { + new TaskExecution { - WorkflowId = workflowInstanceId, - Tasks = - new TaskExecution - { - ExecutionId = expectedExecutionId, - TaskId = expectedTaskId, - Status = TaskExecutionStatus.Dispatched, - TimeoutInterval = -2, - TaskStartTime = DateTime.UtcNow, - ExecutionStats = new Dictionary() - { - { IdentityKey, Guid.NewGuid().ToString() } - } - } + ExecutionId = expectedExecutionId, + TaskId = expectedTaskId, + WorkflowInstanceId = workflowInstanceId, + Status = TaskExecutionStatus.Dispatched, + TimeoutInterval = -2, + TaskStartTime = DateTime.UtcNow, + ExecutionStats = new Dictionary() + { + { IdentityKey, Guid.NewGuid().ToString() } + } } }; _pubService.Setup(p => p.Publish(It.IsAny(), It.IsAny())).Returns(Task.CompletedTask); - _repo.Setup(r => r.GetAllAsync(It.IsAny(), It.IsAny())).ReturnsAsync(() => taskExecution); + _repo.Setup(r => r.GetAllAsync(It.IsAny(), It.IsAny())).ReturnsAsync(() => (taskExecution, 1)); var tokenSource = new CancellationTokenSource(); await _service.DoWork();