From 97bfa29a6294ee3acd342f9789b4d5104ff5a72d Mon Sep 17 00:00:00 2001 From: Neil South Date: Thu, 22 Feb 2024 11:49:16 +0000 Subject: [PATCH] adding conditions Signed-off-by: Neil South --- .../M002_WorkflowRevision_addVerion.cs | 41 +++++++ ...M003_WorkflowRevision_addDataRetension.cs} | 4 +- .../M004_WorkflowRevision_addConditions.cs | 45 +++++++ .../Contracts/Models/Workflow.cs | 3 + .../Contracts/Models/WorkflowRevision.cs | 2 +- .../Logging/Log.200000.Workflow.cs | 3 + .../Services/WorkflowExecuterService.cs | 57 ++++++--- .../Services/WorkflowExecuterServiceTests.cs | 114 +++++++++++++++++- 8 files changed, 249 insertions(+), 20 deletions(-) create mode 100644 src/WorkflowManager/Contracts/Migrations/M002_WorkflowRevision_addVerion.cs rename src/WorkflowManager/Contracts/Migrations/{M004_WorkflowRevision_addDataRetension.cs => M003_WorkflowRevision_addDataRetension.cs} (100%) create mode 100644 src/WorkflowManager/Contracts/Migrations/M004_WorkflowRevision_addConditions.cs diff --git a/src/WorkflowManager/Contracts/Migrations/M002_WorkflowRevision_addVerion.cs b/src/WorkflowManager/Contracts/Migrations/M002_WorkflowRevision_addVerion.cs new file mode 100644 index 000000000..c621d60d2 --- /dev/null +++ b/src/WorkflowManager/Contracts/Migrations/M002_WorkflowRevision_addVerion.cs @@ -0,0 +1,41 @@ +// +// Copyright 2023 Guy’s and St Thomas’ NHS Foundation Trust +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Monai.Deploy.WorkflowManager.Common.Contracts.Models; +using Mongo.Migration.Migrations.Document; +using MongoDB.Bson; + +namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations +{ + public class M002_WorkflowRevision_addVerion : DocumentMigration + { + public M002_WorkflowRevision_addVerion() : base("1.0.0") { } + + public override void Up(BsonDocument document) + { + // empty, but this will make all objects re-saved with a version + } + public override void Down(BsonDocument document) + { + try + { + document.Remove("Version"); + } + catch + { // can ignore we dont want failures stopping startup ! + } + } + } +} diff --git a/src/WorkflowManager/Contracts/Migrations/M004_WorkflowRevision_addDataRetension.cs b/src/WorkflowManager/Contracts/Migrations/M003_WorkflowRevision_addDataRetension.cs similarity index 100% rename from src/WorkflowManager/Contracts/Migrations/M004_WorkflowRevision_addDataRetension.cs rename to src/WorkflowManager/Contracts/Migrations/M003_WorkflowRevision_addDataRetension.cs index 104a3a662..d652ba096 100644 --- a/src/WorkflowManager/Contracts/Migrations/M004_WorkflowRevision_addDataRetension.cs +++ b/src/WorkflowManager/Contracts/Migrations/M003_WorkflowRevision_addDataRetension.cs @@ -20,9 +20,9 @@ namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations { - public class M004_WorkflowRevision_AddDataRetension : DocumentMigration + public class M003_WorkflowRevision_addDataRetension : DocumentMigration { - public M004_WorkflowRevision_AddDataRetension() : base("1.0.1") { } + public M003_WorkflowRevision_addDataRetension() : base("1.0.1") { } public override void Up(BsonDocument document) { diff --git a/src/WorkflowManager/Contracts/Migrations/M004_WorkflowRevision_addConditions.cs b/src/WorkflowManager/Contracts/Migrations/M004_WorkflowRevision_addConditions.cs new file mode 100644 index 000000000..03de8644e --- /dev/null +++ b/src/WorkflowManager/Contracts/Migrations/M004_WorkflowRevision_addConditions.cs @@ -0,0 +1,45 @@ +// +// Copyright 2023 Guy’s and St Thomas’ NHS Foundation Trust +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Monai.Deploy.WorkflowManager.Common.Contracts.Models; +using Mongo.Migration.Migrations.Document; +using MongoDB.Bson; + + +namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations +{ + public class M004_WorkflowRevision_addConditions : DocumentMigration + { + public M004_WorkflowRevision_addConditions() : base("1.0.2") { } + + public override void Up(BsonDocument document) + { + var workflow = document["Workflow"].AsBsonDocument; + workflow.Add("Conditions", new BsonArray { }); + } + + public override void Down(BsonDocument document) + { + try + { + var workflow = document["Workflow"].AsBsonDocument; + workflow.Remove("Conditions"); + } + catch + { // can ignore we dont want failures stopping startup ! + } + } + } +} diff --git a/src/WorkflowManager/Contracts/Models/Workflow.cs b/src/WorkflowManager/Contracts/Models/Workflow.cs index 35aae2649..f09178c99 100755 --- a/src/WorkflowManager/Contracts/Models/Workflow.cs +++ b/src/WorkflowManager/Contracts/Models/Workflow.cs @@ -39,5 +39,8 @@ public class Workflow [JsonProperty(PropertyName = "dataRetentionDays")] public int? DataRetentionDays { get; set; } = 3;// note. -1 = never delete + [JsonProperty(PropertyName = "conditions")] + public string[] Conditions { get; set; } = []; + } } diff --git a/src/WorkflowManager/Contracts/Models/WorkflowRevision.cs b/src/WorkflowManager/Contracts/Models/WorkflowRevision.cs index e28abebee..cd43c9c7d 100755 --- a/src/WorkflowManager/Contracts/Models/WorkflowRevision.cs +++ b/src/WorkflowManager/Contracts/Models/WorkflowRevision.cs @@ -23,7 +23,7 @@ namespace Monai.Deploy.WorkflowManager.Common.Contracts.Models { - [CollectionLocation("Workflows"), RuntimeVersion("1.0.1")] + [CollectionLocation("Workflows"), RuntimeVersion("1.0.2")] public class WorkflowRevision : ISoftDeleteable, IDocument { [BsonId] diff --git a/src/WorkflowManager/Logging/Log.200000.Workflow.cs b/src/WorkflowManager/Logging/Log.200000.Workflow.cs index 584429807..62527c0cc 100644 --- a/src/WorkflowManager/Logging/Log.200000.Workflow.cs +++ b/src/WorkflowManager/Logging/Log.200000.Workflow.cs @@ -114,5 +114,8 @@ public static partial class Log [LoggerMessage(EventId = 210019, Level = LogLevel.Error, Message = "Task is missing required input artifacts {taskId} Artifacts {ArtifactsJson}")] public static partial void TaskIsMissingRequiredInputArtifacts(this ILogger logger, string taskId, string ArtifactsJson); + + [LoggerMessage(EventId = 200020, Level = LogLevel.Warning, Message = "no workflow to execute for the given workflow request.")] + public static partial void DidntToCreateWorkflowInstances(this ILogger logger); } } diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index c392cc67f..a8779ea1c 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -146,6 +146,13 @@ public async Task ProcessPayload(WorkflowRequestEvent message, Payload pay var tasks = workflows.Select(workflow => CreateWorkflowInstanceAsync(message, workflow)); var newInstances = await Task.WhenAll(tasks).ConfigureAwait(false); + + if (newInstances is null || newInstances.Length == 0 || newInstances[0] is null) // if null then it because it didnt meet the conditions needed to create a workflow instance + { + _logger.DidntToCreateWorkflowInstances(); + return false; + } + workflowInstances.AddRange(newInstances); var existingInstances = await _workflowInstanceRepository.GetByWorkflowsIdsAsync(workflowInstances.Select(w => w.WorkflowId).ToList()); @@ -1103,29 +1110,34 @@ private async Task ClinicalReviewTimeOutEvent(WorkflowInstance workflowIns return true; } - private async Task CreateWorkflowInstanceAsync(WorkflowRequestEvent message, WorkflowRevision workflow) + private async Task CreateWorkflowInstanceAsync(WorkflowRequestEvent message, WorkflowRevision workflow) { ArgumentNullException.ThrowIfNull(message, nameof(message)); ArgumentNullException.ThrowIfNull(workflow, nameof(workflow)); ArgumentNullException.ThrowIfNull(workflow.Workflow, nameof(workflow.Workflow)); - var workflowInstanceId = Guid.NewGuid().ToString(); + var workflowInstance = MakeInstance(message, workflow); - var workflowInstance = new WorkflowInstance() + // check if the conditionals allow the workflow to be created + + if (workflow.Workflow.Conditions.Length != 0) { - Id = workflowInstanceId, - WorkflowId = workflow.WorkflowId, - WorkflowName = workflow.Workflow.Name, - PayloadId = message.PayloadId.ToString(), - StartTime = DateTime.UtcNow, - Status = Status.Created, - AeTitle = workflow.Workflow?.InformaticsGateway?.AeTitle, - BucketId = message.Bucket, - InputMetaData = { } //Functionality to be added later - }; + var conditionalMet = _conditionalParameterParser.TryParse(workflow.Workflow.Conditions, workflowInstance, out var resolvedConditional); + if (conditionalMet is false) + { + return null; + } + } + + await CreateTaskExecutionForFirstTask(message, workflow, workflowInstance); + return workflowInstance; + } + + private async Task CreateTaskExecutionForFirstTask(WorkflowRequestEvent message, WorkflowRevision workflow, WorkflowInstance workflowInstance) + { var tasks = new List(); - // part of this ticket just take the first task + if (workflow?.Workflow?.Tasks.Length > 0) { var firstTask = workflow.Workflow.Tasks.First(); @@ -1141,7 +1153,24 @@ private async Task CreateWorkflowInstanceAsync(WorkflowRequest } workflowInstance.Tasks = tasks; + } + private static WorkflowInstance MakeInstance(WorkflowRequestEvent message, WorkflowRevision workflow) + { + var workflowInstanceId = Guid.NewGuid().ToString(); + + var workflowInstance = new WorkflowInstance() + { + Id = workflowInstanceId, + WorkflowId = workflow.WorkflowId, + WorkflowName = workflow.Workflow.Name, + PayloadId = message.PayloadId.ToString(), + StartTime = DateTime.UtcNow, + Status = Status.Created, + AeTitle = workflow.Workflow?.InformaticsGateway?.AeTitle, + BucketId = message.Bucket, + InputMetaData = { } //Functionality to be added later + }; return workflowInstance; } diff --git a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs index 710715eec..3312894b9 100755 --- a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs +++ b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs @@ -67,6 +67,7 @@ public class WorkflowExecuterServiceTests private readonly IOptions _configuration; private readonly IOptions _storageConfiguration; private readonly Mock _taskExecutionStatsRepository; + private readonly Mock _dicom = new Mock(); private readonly int _timeoutForTypeTask = 999; private readonly int _timeoutForDefault = 966; @@ -98,11 +99,10 @@ public WorkflowExecuterServiceTests() _storageConfiguration = Options.Create(new StorageServiceConfiguration() { Settings = new Dictionary { { "bucket", "testbucket" }, { "endpoint", "localhost" }, { "securedConnection", "False" } } }); - var dicom = new Mock(); var logger = new Mock>(); var conditionalParser = new ConditionalParameterParser(logger.Object, - dicom.Object, + _dicom.Object, _workflowInstanceService.Object, _payloadService.Object, _workflowService.Object); @@ -3868,7 +3868,115 @@ public async Task ProcessPayload_With_Multiple_Taskdestinations_One_Has_Inputs() #pragma warning restore CS8604 // Possible null reference argument. } - } + [Fact] + public async Task ProcessPayload_With_Failing_Workflow_Conditional_Should_Not_Procced() + { + var workflowRequest = new WorkflowRequestEvent + { + Bucket = "testbucket", + DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" }, + CorrelationId = Guid.NewGuid().ToString(), + Timestamp = DateTime.UtcNow + }; + + var workflows = new List + { + new() { + Id = Guid.NewGuid().ToString(), + WorkflowId = Guid.NewGuid().ToString(), + Revision = 1, + Workflow = new Workflow + { + Name = "Workflowname", + Description = "Workflowdesc", + Version = "1", + InformaticsGateway = new InformaticsGateway + { + AeTitle = "aetitle" + }, + Tasks = + [ + new TaskObject { + Id = Guid.NewGuid().ToString(), + Type = "type", + Description = "taskdesc" + } + ], + Conditions = ["{{ context.dicom.series.any('0010','0040') }} == 'lordge'"] + } + } + }; + + _workflowRepository.Setup(w => w.GetWorkflowsByAeTitleAsync(It.IsAny>())).ReturnsAsync(workflows); + _workflowRepository.Setup(w => w.GetWorkflowsForWorkflowRequestAsync(It.IsAny(), It.IsAny())).ReturnsAsync(workflows); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflows[0].WorkflowId)).ReturnsAsync(workflows[0]); + _workflowInstanceRepository.Setup(w => w.CreateAsync(It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.GetByWorkflowsIdsAsync(It.IsAny>())).ReturnsAsync(new List()); + _workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(true); + + var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() }); + + _messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.TaskDispatchRequest, It.IsAny()), Times.Never()); + + Assert.False(result); + } + + [Fact] + public async Task ProcessPayload_With_Passing_Workflow_Conditional_Should_Procced() + { + var workflowRequest = new WorkflowRequestEvent + { + Bucket = "testbucket", + DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" }, + CorrelationId = Guid.NewGuid().ToString(), + Timestamp = DateTime.UtcNow + }; + + var workflows = new List + { + new() { + Id = Guid.NewGuid().ToString(), + WorkflowId = Guid.NewGuid().ToString(), + Revision = 1, + Workflow = new Workflow + { + Name = "Workflowname", + Description = "Workflowdesc", + Version = "1", + InformaticsGateway = new InformaticsGateway + { + AeTitle = "aetitle" + }, + Tasks = + [ + new TaskObject { + Id = Guid.NewGuid().ToString(), + Type = "type", + Description = "taskdesc" + } + ], + Conditions = ["{{ context.dicom.series.any('0010','0040') }} == 'lordge'"] + } + } + }; + + _dicom.Setup(w => w.GetAnyValueAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(() => "lordge"); + + _workflowRepository.Setup(w => w.GetWorkflowsByAeTitleAsync(It.IsAny>())).ReturnsAsync(workflows); + _workflowRepository.Setup(w => w.GetWorkflowsForWorkflowRequestAsync(It.IsAny(), It.IsAny())).ReturnsAsync(workflows); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflows[0].WorkflowId)).ReturnsAsync(workflows[0]); + _workflowInstanceRepository.Setup(w => w.CreateAsync(It.IsAny>())).ReturnsAsync(true); + _workflowInstanceRepository.Setup(w => w.GetByWorkflowsIdsAsync(It.IsAny>())).ReturnsAsync(new List()); + _workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(true); + + var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() }); + + _messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.TaskDispatchRequest, It.IsAny()), Times.Once()); + + Assert.True(result); + } + } #pragma warning restore CS8625 // Cannot convert null literal to non-nullable reference type. }