Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/TaskManager/API/ITaskDispatchEventService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ public interface ITaskDispatchEventService
/// <returns>Returns the created TaskDispatchEventInfo.</returns>
Task<TaskDispatchEventInfo?> CreateAsync(TaskDispatchEventInfo taskDispatchEvent);

/// <summary>
/// Updates user accounts of a task dispatch event in the database.
/// </summary>
/// <param name="taskDispatchEvent">A TaskDispatchEvent to update.</param>
/// <returns>Returns the created TaskDispatchEventInfo.</returns>
Task<TaskDispatchEventInfo?> UpdateUserAccountsAsync(TaskDispatchEventInfo taskDispatchEventInfo);

/// <summary>
/// Retrieves a task dispatch event by the task execution ID.
/// </summary>
Expand Down
7 changes: 7 additions & 0 deletions src/TaskManager/Database/ITaskDispatchEventRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ public interface ITaskDispatchEventRepository
/// <returns>Returns the created TaskDispatchEventInfo.</returns>
Task<TaskDispatchEventInfo?> CreateAsync(TaskDispatchEventInfo taskDispatchEventInfo);

/// <summary>
/// Updates user accounts of a task dispatch event in the database.
/// </summary>
/// <param name="taskDispatchEvent">A TaskDispatchEvent to update.</param>
/// <returns>Returns the created TaskDispatchEventInfo.</returns>
Task<TaskDispatchEventInfo?> UpdateUserAccountsAsync(TaskDispatchEventInfo taskDispatchEventInfo);

/// <summary>
/// Retrieves a task dispatch event by the task execution ID.
/// </summary>
Expand Down
16 changes: 16 additions & 0 deletions src/TaskManager/Database/TaskDispatchEventRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ public TaskDispatchEventRepository(
}
}

public async Task<TaskDispatchEventInfo?> UpdateUserAccountsAsync(TaskDispatchEventInfo taskDispatchEventInfo)
{
Guard.Against.Null(taskDispatchEventInfo, nameof(taskDispatchEventInfo));

try
{
await _taskDispatchEventCollection.FindOneAndUpdateAsync(i => i.Id == taskDispatchEventInfo.Id, Builders<TaskDispatchEventInfo>.Update.Set(p => p.UserAccounts, taskDispatchEventInfo.UserAccounts)).ConfigureAwait(false);
return await GetByTaskExecutionIdAsync(taskDispatchEventInfo.Event.ExecutionId).ConfigureAwait(false);
}
catch (Exception e)
{
_logger.DatabaseException(nameof(UpdateUserAccountsAsync), e);
return default;
}
}

public async Task<TaskDispatchEventInfo?> GetByTaskExecutionIdAsync(string taskExecutionId)
{
Guard.Against.NullOrWhiteSpace(taskExecutionId, nameof(taskExecutionId));
Expand Down
3 changes: 0 additions & 3 deletions src/TaskManager/TaskManager/Logging/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,5 @@ public static partial class Log

[LoggerMessage(EventId = 118, Level = LogLevel.Error, Message = "Error removing storage user account {username}.")]
public static partial void ErrorRemovingStorageUserAccount(this ILogger logger, string username, Exception exception);

[LoggerMessage(EventId = 119, Level = LogLevel.Error, Message = "Error removing dispatch event {executionId} from the database.")]
public static partial void ErrorRemovingDispatchEventFromDatabase(this ILogger logger, string executionId, Exception exception);
}
}
14 changes: 14 additions & 0 deletions src/TaskManager/TaskManager/Services/TaskDispatchEventService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,20 @@ public TaskDispatchEventService(ITaskDispatchEventRepository taskDispatchEventRe
}
}

public async Task<TaskDispatchEventInfo?> UpdateUserAccountsAsync(TaskDispatchEventInfo taskDispatchEvent)
{
Guard.Against.Null(taskDispatchEvent, nameof(taskDispatchEvent));

try
{
return await _taskDispatchEventRepository.CreateAsync(taskDispatchEvent).ConfigureAwait(false);
}
finally
{
_logger.TaskDispatchEventSaved(taskDispatchEvent.Event.ExecutionId);
}
}

public async Task<TaskDispatchEventInfo?> GetByTaskExecutionIdAsync(string taskExecutionId)
{
Guard.Against.NullOrWhiteSpace(taskExecutionId, nameof(taskExecutionId));
Expand Down
6 changes: 4 additions & 2 deletions src/TaskManager/TaskManager/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,10 @@ private async Task HandleDispatchTask(JsonMessage<TaskDispatchEvent> message)
Guard.Against.Null(message, nameof(message));

var pluginAssembly = string.Empty;
var eventInfo = new API.Models.TaskDispatchEventInfo(message.Body);
try
{
await _taskDispatchEventService.CreateAsync(eventInfo).ConfigureAwait(false);
message.Body.Validate();
pluginAssembly = _options.Value.TaskManager.PluginAssemblyMappings[message.Body.TaskPluginType];
}
Expand All @@ -337,7 +339,6 @@ private async Task HandleDispatchTask(JsonMessage<TaskDispatchEvent> message)
return;
}

var eventInfo = new API.Models.TaskDispatchEventInfo(message.Body);
try
{
if (string.Equals(message.Body.TaskPluginType,
Expand All @@ -354,6 +355,8 @@ await Task.WhenAll(
PopulateTemporaryStorageCredentials(message.Body.Outputs.ToArray())
).ConfigureAwait(true);
}

await _taskDispatchEventService.UpdateUserAccountsAsync(eventInfo).ConfigureAwait(false);
}
catch (Exception ex)
{
Expand All @@ -378,7 +381,6 @@ await Task.WhenAll(
try
{
var executionStatus = await taskRunner.ExecuteTask(_cancellationTokenSource.Token).ConfigureAwait(false);
await _taskDispatchEventService.CreateAsync(eventInfo).ConfigureAwait(false);
var updateMessage = GenerateUpdateEventMessage(message, message.Body.ExecutionId, message.Body.WorkflowInstanceId, message.Body.TaskId, executionStatus);
await SendUpdateEvent(updateMessage).ConfigureAwait(false);
AcknowledgeMessage(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Scenario: TaskUpdateEvent is published with status Successful after receiving a
And A Task Callback event is published Task_Callback_Basic
And A Task Update event with status Succeeded is published with Task Callback details

@TaskDispatch_Persistance @ignore # Currently failing due to https://github.com/Project-MONAI/monai-deploy-workflow-manager/issues/328
@TaskDispatch_Persistance
Scenario: TaskDispatchEvent with different permutations is published and matching TaskDispatchEvent is saved in Mongo
When A Task Dispatch event is published <taskDispatchMessage>
Then The Task Dispatch event is saved in mongo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

using Monai.Deploy.Messaging.Events;
using Monai.Deploy.WorkflowManager.TaskManager.IntegrationTests.Support;

using Polly;
using Polly.Retry;

namespace Monai.Deploy.WorkflowManager.TaskManager.IntegrationTests.StepDefinitions
{
[Binding]
Expand All @@ -27,11 +29,13 @@ public TaskUpdateStepDefinitions(ObjectContainer objectContainer, ISpecFlowOutpu
_outputHelper = outputHelper ?? throw new ArgumentNullException(nameof(outputHelper));
DataHelper = objectContainer.Resolve<DataHelper>() ?? throw new ArgumentNullException(nameof(DataHelper));
MongoClient = objectContainer.Resolve<MongoClientUtil>();
RetryPolicy = Policy.Handle<Exception>().WaitAndRetry(retryCount: 10, sleepDurationProvider: _ => TimeSpan.FromMilliseconds(500));
Assertions = new Assertions(_outputHelper);
}

private readonly ISpecFlowOutputHelper _outputHelper;
private MongoClientUtil MongoClient { get; }
private RetryPolicy RetryPolicy { get; set; }
public DataHelper DataHelper { get; }
public Assertions Assertions { get; }

Expand Down Expand Up @@ -59,8 +63,13 @@ public void ATaskUpdateEventIsPublishedWithTaskDispatchDetails(string status)
[Then(@"The Task Dispatch event is saved in mongo")]
public void TheTaskDispatchEventIsSavedInMongo()
{
var storedTaskDispatchEvent = MongoClient.GetTaskDispatchEventInfoByExecutionId(DataHelper.TaskDispatchEvent.ExecutionId);
Assertions.AssertTaskDispatchEventStoredInMongo(storedTaskDispatchEvent, DataHelper.TaskDispatchEvent);
RetryPolicy.Execute(() =>
{
_outputHelper.WriteLine($"Retrieving task dispatch by id={DataHelper.TaskDispatchEvent.ExecutionId}");
var storedTaskDispatchEvent = MongoClient.GetTaskDispatchEventInfoByExecutionId(DataHelper.TaskDispatchEvent.ExecutionId);
_outputHelper.WriteLine("Retrieved task dispatch");
Assertions.AssertTaskDispatchEventStoredInMongo(storedTaskDispatchEvent, DataHelper.TaskDispatchEvent);
});
}

[Then(@"A Task Update event with status (.*) is published with Task Callback details")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,19 @@ public void AssertTaskDispatchEventStoredInMongo(List<TaskDispatchEventInfo> sto
// Remove AccessKey, AccessToken & SessionToken as they are modified by Task Manager
storedTaskDispatchEvent.ForEach(e =>
{
e.Event.Inputs.ForEach(i => i.Credentials = null);
e.Event.Outputs.ForEach(i => i.Credentials = null);
e.Event.IntermediateStorage.Credentials = null;
e.Event.Inputs?.ForEach(i => i.Credentials = null);
e.Event.Outputs?.ForEach(i => i.Credentials = null);
if (e.Event.IntermediateStorage is not null)
{
e.Event.IntermediateStorage.Credentials = null;
}
});
taskDispatchEvent.Inputs.ForEach(i => i.Credentials = null);
taskDispatchEvent.Outputs.ForEach(i => i.Credentials = null);
taskDispatchEvent.IntermediateStorage.Credentials = null;
if (taskDispatchEvent.IntermediateStorage is not null)
{
taskDispatchEvent.IntermediateStorage.Credentials = null;
}

storedTaskDispatchEvent[0].Event.Should().BeEquivalentTo(taskDispatchEvent);
Output.WriteLine("Details of TaskDispatchEvent stored matches original TaskDispatchEvent");
Expand Down