diff --git a/src/TaskManager/API/ITaskDispatchEventService.cs b/src/TaskManager/API/ITaskDispatchEventService.cs index 3aa8c1ca8..51d724327 100644 --- a/src/TaskManager/API/ITaskDispatchEventService.cs +++ b/src/TaskManager/API/ITaskDispatchEventService.cs @@ -27,6 +27,13 @@ public interface ITaskDispatchEventService /// Returns the created TaskDispatchEventInfo. Task CreateAsync(TaskDispatchEventInfo taskDispatchEvent); + /// + /// Updates user accounts of a task dispatch event in the database. + /// + /// A TaskDispatchEvent to update. + /// Returns the created TaskDispatchEventInfo. + Task UpdateUserAccountsAsync(TaskDispatchEventInfo taskDispatchEventInfo); + /// /// Retrieves a task dispatch event by the task execution ID. /// diff --git a/src/TaskManager/Database/ITaskDispatchEventRepository.cs b/src/TaskManager/Database/ITaskDispatchEventRepository.cs index 65dc68690..a25b39607 100644 --- a/src/TaskManager/Database/ITaskDispatchEventRepository.cs +++ b/src/TaskManager/Database/ITaskDispatchEventRepository.cs @@ -27,6 +27,13 @@ public interface ITaskDispatchEventRepository /// Returns the created TaskDispatchEventInfo. Task CreateAsync(TaskDispatchEventInfo taskDispatchEventInfo); + /// + /// Updates user accounts of a task dispatch event in the database. + /// + /// A TaskDispatchEvent to update. + /// Returns the created TaskDispatchEventInfo. + Task UpdateUserAccountsAsync(TaskDispatchEventInfo taskDispatchEventInfo); + /// /// Retrieves a task dispatch event by the task execution ID. /// diff --git a/src/TaskManager/Database/TaskDispatchEventRepository.cs b/src/TaskManager/Database/TaskDispatchEventRepository.cs index f49d3f1ff..1111963dc 100644 --- a/src/TaskManager/Database/TaskDispatchEventRepository.cs +++ b/src/TaskManager/Database/TaskDispatchEventRepository.cs @@ -60,6 +60,22 @@ public TaskDispatchEventRepository( } } + public async Task UpdateUserAccountsAsync(TaskDispatchEventInfo taskDispatchEventInfo) + { + Guard.Against.Null(taskDispatchEventInfo, nameof(taskDispatchEventInfo)); + + try + { + await _taskDispatchEventCollection.FindOneAndUpdateAsync(i => i.Id == taskDispatchEventInfo.Id, Builders.Update.Set(p => p.UserAccounts, taskDispatchEventInfo.UserAccounts)).ConfigureAwait(false); + return await GetByTaskExecutionIdAsync(taskDispatchEventInfo.Event.ExecutionId).ConfigureAwait(false); + } + catch (Exception e) + { + _logger.DatabaseException(nameof(UpdateUserAccountsAsync), e); + return default; + } + } + public async Task GetByTaskExecutionIdAsync(string taskExecutionId) { Guard.Against.NullOrWhiteSpace(taskExecutionId, nameof(taskExecutionId)); diff --git a/src/TaskManager/TaskManager/Logging/Log.cs b/src/TaskManager/TaskManager/Logging/Log.cs index e98568f27..427fa44df 100644 --- a/src/TaskManager/TaskManager/Logging/Log.cs +++ b/src/TaskManager/TaskManager/Logging/Log.cs @@ -116,8 +116,5 @@ public static partial class Log [LoggerMessage(EventId = 118, Level = LogLevel.Error, Message = "Error removing storage user account {username}.")] public static partial void ErrorRemovingStorageUserAccount(this ILogger logger, string username, Exception exception); - - [LoggerMessage(EventId = 119, Level = LogLevel.Error, Message = "Error removing dispatch event {executionId} from the database.")] - public static partial void ErrorRemovingDispatchEventFromDatabase(this ILogger logger, string executionId, Exception exception); } } diff --git a/src/TaskManager/TaskManager/Services/TaskDispatchEventService.cs b/src/TaskManager/TaskManager/Services/TaskDispatchEventService.cs index c6c20ae22..3de7b74bd 100644 --- a/src/TaskManager/TaskManager/Services/TaskDispatchEventService.cs +++ b/src/TaskManager/TaskManager/Services/TaskDispatchEventService.cs @@ -49,6 +49,20 @@ public TaskDispatchEventService(ITaskDispatchEventRepository taskDispatchEventRe } } + public async Task UpdateUserAccountsAsync(TaskDispatchEventInfo taskDispatchEvent) + { + Guard.Against.Null(taskDispatchEvent, nameof(taskDispatchEvent)); + + try + { + return await _taskDispatchEventRepository.CreateAsync(taskDispatchEvent).ConfigureAwait(false); + } + finally + { + _logger.TaskDispatchEventSaved(taskDispatchEvent.Event.ExecutionId); + } + } + public async Task GetByTaskExecutionIdAsync(string taskExecutionId) { Guard.Against.NullOrWhiteSpace(taskExecutionId, nameof(taskExecutionId)); diff --git a/src/TaskManager/TaskManager/TaskManager.cs b/src/TaskManager/TaskManager/TaskManager.cs index 7d0f7a437..a7441dfe2 100644 --- a/src/TaskManager/TaskManager/TaskManager.cs +++ b/src/TaskManager/TaskManager/TaskManager.cs @@ -317,8 +317,10 @@ private async Task HandleDispatchTask(JsonMessage message) Guard.Against.Null(message, nameof(message)); var pluginAssembly = string.Empty; + var eventInfo = new API.Models.TaskDispatchEventInfo(message.Body); try { + await _taskDispatchEventService.CreateAsync(eventInfo).ConfigureAwait(false); message.Body.Validate(); pluginAssembly = _options.Value.TaskManager.PluginAssemblyMappings[message.Body.TaskPluginType]; } @@ -337,7 +339,6 @@ private async Task HandleDispatchTask(JsonMessage message) return; } - var eventInfo = new API.Models.TaskDispatchEventInfo(message.Body); try { if (string.Equals(message.Body.TaskPluginType, @@ -354,6 +355,8 @@ await Task.WhenAll( PopulateTemporaryStorageCredentials(message.Body.Outputs.ToArray()) ).ConfigureAwait(true); } + + await _taskDispatchEventService.UpdateUserAccountsAsync(eventInfo).ConfigureAwait(false); } catch (Exception ex) { @@ -378,7 +381,6 @@ await Task.WhenAll( try { var executionStatus = await taskRunner.ExecuteTask(_cancellationTokenSource.Token).ConfigureAwait(false); - await _taskDispatchEventService.CreateAsync(eventInfo).ConfigureAwait(false); var updateMessage = GenerateUpdateEventMessage(message, message.Body.ExecutionId, message.Body.WorkflowInstanceId, message.Body.TaskId, executionStatus); await SendUpdateEvent(updateMessage).ConfigureAwait(false); AcknowledgeMessage(message); diff --git a/tests/IntegrationTests/TaskManager.IntegrationTests/Features/TaskUpdate.feature b/tests/IntegrationTests/TaskManager.IntegrationTests/Features/TaskUpdate.feature index e4cd31dcc..6c020ed8d 100644 --- a/tests/IntegrationTests/TaskManager.IntegrationTests/Features/TaskUpdate.feature +++ b/tests/IntegrationTests/TaskManager.IntegrationTests/Features/TaskUpdate.feature @@ -46,7 +46,7 @@ Scenario: TaskUpdateEvent is published with status Successful after receiving a And A Task Callback event is published Task_Callback_Basic And A Task Update event with status Succeeded is published with Task Callback details -@TaskDispatch_Persistance @ignore # Currently failing due to https://github.com/Project-MONAI/monai-deploy-workflow-manager/issues/328 +@TaskDispatch_Persistance Scenario: TaskDispatchEvent with different permutations is published and matching TaskDispatchEvent is saved in Mongo When A Task Dispatch event is published Then The Task Dispatch event is saved in mongo diff --git a/tests/IntegrationTests/TaskManager.IntegrationTests/StepDefinitions/TaskUpdateStepDefinitions.cs b/tests/IntegrationTests/TaskManager.IntegrationTests/StepDefinitions/TaskUpdateStepDefinitions.cs index 77781ef57..91a1621bc 100644 --- a/tests/IntegrationTests/TaskManager.IntegrationTests/StepDefinitions/TaskUpdateStepDefinitions.cs +++ b/tests/IntegrationTests/TaskManager.IntegrationTests/StepDefinitions/TaskUpdateStepDefinitions.cs @@ -16,7 +16,9 @@ using Monai.Deploy.Messaging.Events; using Monai.Deploy.WorkflowManager.TaskManager.IntegrationTests.Support; - +using Polly; +using Polly.Retry; + namespace Monai.Deploy.WorkflowManager.TaskManager.IntegrationTests.StepDefinitions { [Binding] @@ -27,11 +29,13 @@ public TaskUpdateStepDefinitions(ObjectContainer objectContainer, ISpecFlowOutpu _outputHelper = outputHelper ?? throw new ArgumentNullException(nameof(outputHelper)); DataHelper = objectContainer.Resolve() ?? throw new ArgumentNullException(nameof(DataHelper)); MongoClient = objectContainer.Resolve(); + RetryPolicy = Policy.Handle().WaitAndRetry(retryCount: 10, sleepDurationProvider: _ => TimeSpan.FromMilliseconds(500)); Assertions = new Assertions(_outputHelper); } private readonly ISpecFlowOutputHelper _outputHelper; private MongoClientUtil MongoClient { get; } + private RetryPolicy RetryPolicy { get; set; } public DataHelper DataHelper { get; } public Assertions Assertions { get; } @@ -59,8 +63,13 @@ public void ATaskUpdateEventIsPublishedWithTaskDispatchDetails(string status) [Then(@"The Task Dispatch event is saved in mongo")] public void TheTaskDispatchEventIsSavedInMongo() { - var storedTaskDispatchEvent = MongoClient.GetTaskDispatchEventInfoByExecutionId(DataHelper.TaskDispatchEvent.ExecutionId); - Assertions.AssertTaskDispatchEventStoredInMongo(storedTaskDispatchEvent, DataHelper.TaskDispatchEvent); + RetryPolicy.Execute(() => + { + _outputHelper.WriteLine($"Retrieving task dispatch by id={DataHelper.TaskDispatchEvent.ExecutionId}"); + var storedTaskDispatchEvent = MongoClient.GetTaskDispatchEventInfoByExecutionId(DataHelper.TaskDispatchEvent.ExecutionId); + _outputHelper.WriteLine("Retrieved task dispatch"); + Assertions.AssertTaskDispatchEventStoredInMongo(storedTaskDispatchEvent, DataHelper.TaskDispatchEvent); + }); } [Then(@"A Task Update event with status (.*) is published with Task Callback details")] diff --git a/tests/IntegrationTests/TaskManager.IntegrationTests/Support/Assertions.cs b/tests/IntegrationTests/TaskManager.IntegrationTests/Support/Assertions.cs index b78ae6253..65dd99ca4 100644 --- a/tests/IntegrationTests/TaskManager.IntegrationTests/Support/Assertions.cs +++ b/tests/IntegrationTests/TaskManager.IntegrationTests/Support/Assertions.cs @@ -64,13 +64,19 @@ public void AssertTaskDispatchEventStoredInMongo(List sto // Remove AccessKey, AccessToken & SessionToken as they are modified by Task Manager storedTaskDispatchEvent.ForEach(e => { - e.Event.Inputs.ForEach(i => i.Credentials = null); - e.Event.Outputs.ForEach(i => i.Credentials = null); - e.Event.IntermediateStorage.Credentials = null; + e.Event.Inputs?.ForEach(i => i.Credentials = null); + e.Event.Outputs?.ForEach(i => i.Credentials = null); + if (e.Event.IntermediateStorage is not null) + { + e.Event.IntermediateStorage.Credentials = null; + } }); taskDispatchEvent.Inputs.ForEach(i => i.Credentials = null); taskDispatchEvent.Outputs.ForEach(i => i.Credentials = null); - taskDispatchEvent.IntermediateStorage.Credentials = null; + if (taskDispatchEvent.IntermediateStorage is not null) + { + taskDispatchEvent.IntermediateStorage.Credentials = null; + } storedTaskDispatchEvent[0].Event.Should().BeEquivalentTo(taskDispatchEvent); Output.WriteLine("Details of TaskDispatchEvent stored matches original TaskDispatchEvent");