Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/WorkflowManager/Common/Interfaces/ITasksService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

namespace Monai.Deploy.WorkflowManager.Common.Interfaces
{
public interface ITasksService : IPaginatedApi<WorkflowInstanceTasksUnwindResult>
public interface ITasksService
{
Task<(IList<TaskExecution>, long)> GetAllAsync(int? skip = null, int? limit = null);

Task<TaskExecution?> GetTaskAsync(string workflowInstanceId, string taskId, string executionId);
}
}
12 changes: 2 additions & 10 deletions src/WorkflowManager/Common/Services/TasksService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,12 @@ public TasksService(ITasksRepository workflowInstanceRepository)
}

/// <summary>
/// Not Required For Tasks Service is
/// </summary>
/// <returns></returns>
/// <exception cref="NotImplementedException"></exception>
public async Task<long> CountAsync()
=> await _tasksRepository.CountAsync();

/// <summary>
/// Gets all running tasks
/// Gets all running tasks and a total running task count.
/// </summary>
/// <param name="skip"></param>
/// <param name="limit"></param>
/// <returns></returns>
public async Task<IList<WorkflowInstanceTasksUnwindResult>> GetAllAsync(int? skip = null, int? limit = null)
public async Task<(IList<TaskExecution>, long)> GetAllAsync(int? skip = null, int? limit = null)
=> await _tasksRepository.GetAllAsync(skip, limit);

/// <summary>
Expand Down
3 changes: 3 additions & 0 deletions src/WorkflowManager/Contracts/Models/TaskExecution.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ public class TaskExecution
[JsonProperty(PropertyName = "execution_id")]
public string ExecutionId { get; set; } = string.Empty;

[JsonProperty(PropertyName = "workflow_instance_id")]
public string WorkflowInstanceId { get; set; } = string.Empty;

[JsonProperty(PropertyName = "task_type")]
public string TaskType { get; set; } = string.Empty;

Expand Down

This file was deleted.

10 changes: 2 additions & 8 deletions src/WorkflowManager/Database/Interfaces/ITasksRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,12 @@ namespace Monai.Deploy.WorkflowManager.Database.Interfaces
public interface ITasksRepository
{
/// <summary>
/// Gets all running tasks.
/// Gets all running tasks and total number of running tasks.
/// </summary>
/// <param name="skip">skip.</param>
/// <param name="limit">limit.</param>
/// <returns></returns>
Task<IList<WorkflowInstanceTasksUnwindResult>> GetAllAsync(int? skip, int? limit);

/// <summary>
/// Gets count of Tasks.
/// </summary>
/// <returns></returns>
Task<long> CountAsync();
Task<(IList<TaskExecution>, long)> GetAllAsync(int? skip, int? limit);

/// <summary>
/// Gets Task Execution given workflowInstanceId, taskId and executionId.
Expand Down
54 changes: 18 additions & 36 deletions src/WorkflowManager/Database/Repositories/TasksRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,34 @@ private static async Task EnsureIndex(IMongoCollection<WorkflowInstance> workflo
}
}

public async Task<IList<WorkflowInstanceTasksUnwindResult>> GetAllAsync(int? skip, int? limit)
public async Task<(IList<TaskExecution>, long)> GetAllAsync(int? skip, int? limit)
{
try
{
var builder = Builders<WorkflowInstance>.Filter;

var filter = builder.Eq("Tasks.Status", TaskExecutionStatus.Accepted);
filter &= builder.Ne("Tasks.Status", TaskExecutionStatus.Dispatched);
var filter = new FilterDefinitionBuilder<WorkflowInstance>().Where(x =>
x.Status != Status.Succeeded && x.Status != Status.Failed);

return await _workflowInstanceCollection.Aggregate()
.Match(filter)
.Unwind<WorkflowInstance, WorkflowInstanceTasksUnwindResult>(wf => wf.Tasks)
.Skip(skip ?? 0)
.Limit(limit ?? 500)
var result = await _workflowInstanceCollection
.Find(filter)
.ToListAsync();

if (result is null || result.Count == 0)
{
return (new List<TaskExecution>(), 0);
}

var tasks = result.SelectMany(r => r.Tasks.Where(t =>
t.Status != TaskExecutionStatus.Succeeded &&
t.Status != TaskExecutionStatus.Failed &&
t.Status != TaskExecutionStatus.Canceled));

return (tasks.Skip(skip ?? 0).Take(limit ?? 500).ToList(), tasks.Count());
}
catch (Exception e)
{
_logger.DbCallFailed(nameof(GetAllAsync), e);

return new List<WorkflowInstanceTasksUnwindResult>();
return (new List<TaskExecution>(), 0);
}
}

Expand All @@ -121,30 +128,5 @@ public async Task<IList<WorkflowInstanceTasksUnwindResult>> GetAllAsync(int? ski
return default;
}
}

public async Task<long> CountAsync()
{
try
{
var builder = Builders<WorkflowInstance>.Filter;

var filter = builder.Eq("Tasks.Status", TaskExecutionStatus.Accepted);
filter &= builder.Ne("Tasks.Status", TaskExecutionStatus.Dispatched);

var result = await _workflowInstanceCollection.Aggregate()
.Match(filter).ToListAsync();
if (result is null || result.Count == 0)
{
return 0;
}
return result.Select(r => r.Tasks).Count();
}
catch (Exception e)
{
_logger.DbCallFailed(nameof(GetAllAsync), e);

return default;
}
}
}
}
10 changes: 4 additions & 6 deletions src/WorkflowManager/MonaiBackgroundService/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
public async Task DoWork()
{
var runningTasks = await _tasksService.GetAllAsync();
foreach (var workflow in runningTasks.Where(t => t.Tasks.TimeoutInterval != 0 && t.Tasks.Timeout < DateTime.UtcNow))
foreach (var task in runningTasks.Item1.Where(t => t.TimeoutInterval != 0 && t.Timeout < DateTime.UtcNow))
{
var task = workflow.Tasks;

task.ExecutionStats.TryGetValue(IdentityKey, out var identity);
var workflowInstanceId = workflow.WorkflowId;

var correlationId = Guid.NewGuid().ToString();

await PublishTimeoutUpdateEvent(task, correlationId, workflowInstanceId).ConfigureAwait(false); // -> task manager
await PublishTimeoutUpdateEvent(task, correlationId, task.WorkflowInstanceId).ConfigureAwait(false); // -> task manager

await PublishCancellationEvent(task, correlationId, identity ?? String.Empty, workflowInstanceId).ConfigureAwait(false); // -> workflow executor
await PublishCancellationEvent(task, correlationId, identity ?? String.Empty, task.WorkflowInstanceId).ConfigureAwait(false); // -> workflow executor
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ private async Task<TaskExecution> CreateTaskExecutionAsync(TaskObject task,
TaskPluginArguments = newTaskArgs,
TaskStartTime = DateTime.UtcNow,
TaskId = task.Id,
WorkflowInstanceId = workflowInstanceId,
Status = TaskExecutionStatus.Created,
Reason = FailureReason.None,
InputArtifacts = await _artifactMapper.ConvertArtifactVariablesToPath(task?.Artifacts?.Input ?? Array.Empty<Artifact>(), payloadId, workflowInstanceId, bucketName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,11 @@ public async Task<IActionResult> GetListAsync([FromQuery] PaginationFilter filte
var pageSize = filter.PageSize ?? _options.Value.EndpointSettings.DefaultPageSize;
var validFilter = new PaginationFilter(filter.PageNumber, pageSize, _options.Value.EndpointSettings.MaxPageSize);

var result = await _tasksService.GetAllAsync(
var pagedData = await _tasksService.GetAllAsync(
(validFilter.PageNumber - 1) * validFilter.PageSize,
validFilter.PageSize);

var pagedData = result.IsNullOrEmpty() ? new List<TaskExecution>() : result.Select(r => r.Tasks).ToList();

var dataTotal = await _tasksService.CountAsync();
var pagedReponse = CreatePagedReponse(pagedData.ToList(), validFilter, dataTotal, _uriService, route);
var pagedReponse = CreatePagedReponse(pagedData.Item1.ToList(), validFilter, pagedData.Item2, _uriService, route);

return Ok(pagedReponse);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,27 @@ Scenario Outline: Get details of a Task with non-existent id payload
| Non_Existent_ExecutionID_Task_Details_2 |
| Non_Existent_TaskID_Task_Details_3 |

@Task_Api @ignore #/tasks/running endpoint needs some dev work to ensure the filter works correctly. Ticket https://github.com/Project-MONAI/monai-deploy-workflow-manager/issues/308
@Task_Api
Scenario: Get details of all Tasks
Given I have an endpoint /tasks/running?pageNumber=1&pageSize=10
And I have a Workflow Instance WorkflowInstance_TaskApi_1 with no artifacts
When I send a GET request
Then I will get a 200 response
And I can see task payload is returned
And I can see 3 tasks are returned

@Task_Api
Scenario: Get details of all Tasks From Multiple Workflows
Given I have an endpoint /tasks/running?pageNumber=1&pageSize=10
And I have a Workflow Instance WorkflowInstance_TaskApi_1 with no artifacts
And I have a Workflow Instance WorkflowInstance_TaskApi_2 with no artifacts
When I send a GET request
Then I will get a 200 response
And I can see 6 tasks are returned

@Task_Api
Scenario: Get details of all Tasks From Multiple Workflows returns no tasks
Given I have an endpoint /tasks/running?pageNumber=1&pageSize=10
And I have a Workflow Instance WorkflowInstance_TaskApi_3 with no artifacts
When I send a GET request
Then I will get a 200 response
And I can see 0 tasks are returned
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
using BoDi;
using Monai.Deploy.WorkflowManager.Contracts.Models;
using Monai.Deploy.WorkflowManager.IntegrationTests.Support;
using Monai.Deploy.WorkflowManager.Wrappers;
using Newtonsoft.Json;
using NUnit.Framework;

namespace Monai.Deploy.WorkflowManager.WorkflowExecutor.IntegrationTests.StepDefinitions
{
Expand All @@ -41,5 +43,13 @@ public void ThenICanSeeAnIndividualTaskIsReturned()
var response = JsonConvert.DeserializeObject<TaskExecution>(ApiHelper.Response.Content.ReadAsStringAsync().Result);
Assertions.AssertTaskPayload(DataHelper.WorkflowInstances, response);
}

[Then(@"I can see (.*) tasks are returned")]
public void ThenICanSeeTasksAreReturned(int number)
{
var result = ApiHelper.Response.Content.ReadAsStringAsync().Result;
var response = JsonConvert.DeserializeObject<PagedResponse<IList<TaskExecution>>>(result);
Assert.AreEqual(number, response?.Data.Count);
}
}
}
Loading