diff --git a/src/Common/Configuration/WorkflowManagerOptions.cs b/src/Common/Configuration/WorkflowManagerOptions.cs index 4ed820a4a..dce87277c 100644 --- a/src/Common/Configuration/WorkflowManagerOptions.cs +++ b/src/Common/Configuration/WorkflowManagerOptions.cs @@ -74,6 +74,9 @@ public class WorkflowManagerOptions : PagedOptions [ConfigurationKeyName("migExternalAppPlugins")] public string[] MigExternalAppPlugins { get; set; } + [ConfigurationKeyName("dataRetentionDays")] + public int DataRetentionDays { get; set; } + public WorkflowManagerOptions() { Messaging = new MessageBrokerConfiguration(); diff --git a/src/WorkflowManager/Common/Interfaces/IPayloadService.cs b/src/WorkflowManager/Common/Interfaces/IPayloadService.cs index 5362ffca9..6c28f99a0 100644 --- a/src/WorkflowManager/Common/Interfaces/IPayloadService.cs +++ b/src/WorkflowManager/Common/Interfaces/IPayloadService.cs @@ -54,5 +54,13 @@ Task> GetAllAsync(int? skip = null, /// /// Task UpdateWorkflowInstanceIdsAsync(string payloadId, IEnumerable workflowInstances); + + /// + /// Gets the expiry date for a payload. + /// + /// + /// + /// date of expiry or null + Task GetExpiry(DateTime now, string? workflowInstanceId); } } diff --git a/src/WorkflowManager/Common/Monai.Deploy.WorkflowManager.Common.csproj b/src/WorkflowManager/Common/Monai.Deploy.WorkflowManager.Common.csproj index e7562cafd..55dfb1f71 100644 --- a/src/WorkflowManager/Common/Monai.Deploy.WorkflowManager.Common.csproj +++ b/src/WorkflowManager/Common/Monai.Deploy.WorkflowManager.Common.csproj @@ -31,6 +31,7 @@ + diff --git a/src/WorkflowManager/Common/Services/PayloadService.cs b/src/WorkflowManager/Common/Services/PayloadService.cs index f6131bf94..15d90bc3b 100644 --- a/src/WorkflowManager/Common/Services/PayloadService.cs +++ b/src/WorkflowManager/Common/Services/PayloadService.cs @@ -25,6 +25,8 @@ using Monai.Deploy.WorkflowManager.Common.Database.Interfaces; using Monai.Deploy.WorkflowManager.Common.Logging; using Monai.Deploy.WorkflowManager.Common.Storage.Services; +using Microsoft.Extensions.Options; +using Monai.Deploy.WorkflowManager.Common.Configuration; namespace Monai.Deploy.WorkflowManager.Common.Miscellaneous.Services { @@ -34,23 +36,31 @@ public class PayloadService : IPayloadService private readonly IWorkflowInstanceRepository _workflowInstanceRepository; + private readonly IWorkflowRepository _workflowRepository; + private readonly IDicomService _dicomService; private readonly IStorageService _storageService; + private readonly WorkflowManagerOptions _options; + private readonly ILogger _logger; public PayloadService( IPayloadRepository payloadRepository, IDicomService dicomService, IWorkflowInstanceRepository workflowInstanceRepository, + IWorkflowRepository workflowRepository, IServiceScopeFactory serviceScopeFactory, + IOptions options, ILogger logger) { _payloadRepository = payloadRepository ?? throw new ArgumentNullException(nameof(payloadRepository)); _workflowInstanceRepository = workflowInstanceRepository ?? throw new ArgumentNullException(nameof(workflowInstanceRepository)); _dicomService = dicomService ?? throw new ArgumentNullException(nameof(dicomService)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _workflowRepository = workflowRepository ?? throw new ArgumentNullException(nameof(workflowRepository)); + _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); var scopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory)); var scope = scopeFactory.CreateScope(); @@ -85,7 +95,8 @@ public PayloadService( DataTrigger = eventPayload.DataTrigger, Timestamp = eventPayload.Timestamp, PatientDetails = patientDetails, - PayloadDeleted = PayloadDeleted.No + PayloadDeleted = PayloadDeleted.No, + Expires = await GetExpiry(DateTime.UtcNow, eventPayload.WorkflowInstanceId) }; if (await _payloadRepository.CreateAsync(payload)) @@ -106,6 +117,27 @@ public PayloadService( return null; } + public async Task GetExpiry(DateTime now, string? workflowInstanceId) + { + var daysToKeep = await GetWorkflowDataExpiry(workflowInstanceId); + daysToKeep ??= _options.DataRetentionDays; + + if (daysToKeep == -1) { return null; } + + return now.AddDays(daysToKeep.Value); + } + + private async Task GetWorkflowDataExpiry(string? workflowInstanceId) + { + if (string.IsNullOrWhiteSpace(workflowInstanceId)) { return null; } + + var workflowInstance = await _workflowInstanceRepository.GetByWorkflowInstanceIdAsync(workflowInstanceId); + + if (workflowInstance is null) { return null; } + + return (await _workflowRepository.GetByWorkflowIdAsync(workflowInstance.WorkflowId))?.Workflow?.DataRetentionDays ?? null; + } + public async Task GetByIdAsync(string payloadId) { ArgumentNullException.ThrowIfNullOrWhiteSpace(payloadId, nameof(payloadId)); diff --git a/src/WorkflowManager/Contracts/Migrations/M004_Payload_expires.cs b/src/WorkflowManager/Contracts/Migrations/M004_Payload_expires.cs new file mode 100644 index 000000000..44a52090e --- /dev/null +++ b/src/WorkflowManager/Contracts/Migrations/M004_Payload_expires.cs @@ -0,0 +1,42 @@ +// +// Copyright 2023 Guy’s and St Thomas’ NHS Foundation Trust +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Monai.Deploy.WorkflowManager.Common.Contracts.Models; +using Mongo.Migration.Migrations.Document; +using MongoDB.Bson; + +namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations +{ + public class M004_Payload_Expires : DocumentMigration + { + public M004_Payload_Expires() : base("1.0.4") { } + + public override void Up(BsonDocument document) + { + document.Add("Expires", BsonNull.Create(null).ToJson(), true); //null = never expires + } + + public override void Down(BsonDocument document) + { + try + { + document.Remove("DataTrigger"); + } + catch + { // can ignore we dont want failures stopping startup ! + } + } + } +} diff --git a/src/WorkflowManager/Contracts/Migrations/M004_WorkflowRevision_addDataRetension.cs b/src/WorkflowManager/Contracts/Migrations/M004_WorkflowRevision_addDataRetension.cs new file mode 100644 index 000000000..ca2aae938 --- /dev/null +++ b/src/WorkflowManager/Contracts/Migrations/M004_WorkflowRevision_addDataRetension.cs @@ -0,0 +1,46 @@ +// +// Copyright 2023 Guy’s and St Thomas’ NHS Foundation Trust +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Monai.Deploy.WorkflowManager.Common.Contracts.Models; +using Mongo.Migration.Migrations.Document; +using MongoDB.Bson; + + +namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations +{ + public class M004_WorkflowRevision_AddDataRetension : DocumentMigration + { + public M004_WorkflowRevision_AddDataRetension() : base("1.0.1") { } + + public override void Up(BsonDocument document) + { +// document.Add("Workflow.DataRetentionDays", BsonNull.Create(null).ToJson(), true); + var workflow = document["Workflow"].AsBsonDocument; + workflow.Add("DataRetentionDays", BsonNull.Create(null).ToJson(), true); + } + + public override void Down(BsonDocument document) + { + try + { + var workflow = document["Workflow"].AsBsonDocument; + workflow.Remove("DataRetentionDays"); + } + catch + { // can ignore we dont want failures stopping startup ! + } + } + } +} diff --git a/src/WorkflowManager/Contracts/Models/Payload.cs b/src/WorkflowManager/Contracts/Models/Payload.cs index 4164ed9a3..96033100e 100755 --- a/src/WorkflowManager/Contracts/Models/Payload.cs +++ b/src/WorkflowManager/Contracts/Models/Payload.cs @@ -27,11 +27,11 @@ namespace Monai.Deploy.WorkflowManager.Common.Contracts.Models { - [CollectionLocation("Payloads"), RuntimeVersion("1.0.3")] + [CollectionLocation("Payloads"), RuntimeVersion("1.0.4")] public class Payload : IDocument { [JsonConverter(typeof(DocumentVersionConvert)), BsonSerializer(typeof(DocumentVersionConverBson))] - public DocumentVersion Version { get; set; } = new DocumentVersion(1, 0, 3); + public DocumentVersion Version { get; set; } = new DocumentVersion(1, 0, 4); [JsonProperty(PropertyName = "id")] public string Id { get; set; } = string.Empty; @@ -67,6 +67,10 @@ public class Payload : IDocument public PatientDetails PatientDetails { get; set; } = new PatientDetails(); public DataOrigin DataTrigger { get; set; } = new DataOrigin { DataService = DataService.DIMSE }; + + [JsonProperty(PropertyName = "expires")] + public DateTime? Expires { get; set; } + } public enum PayloadDeleted diff --git a/src/WorkflowManager/Contracts/Models/Workflow.cs b/src/WorkflowManager/Contracts/Models/Workflow.cs index da6435c49..35aae2649 100755 --- a/src/WorkflowManager/Contracts/Models/Workflow.cs +++ b/src/WorkflowManager/Contracts/Models/Workflow.cs @@ -36,5 +36,8 @@ public class Workflow [JsonProperty(PropertyName = "tasks")] public TaskObject[] Tasks { get; set; } = System.Array.Empty(); + [JsonProperty(PropertyName = "dataRetentionDays")] + public int? DataRetentionDays { get; set; } = 3;// note. -1 = never delete + } } diff --git a/src/WorkflowManager/Contracts/Models/WorkflowRevision.cs b/src/WorkflowManager/Contracts/Models/WorkflowRevision.cs index af34bdbd9..e28abebee 100755 --- a/src/WorkflowManager/Contracts/Models/WorkflowRevision.cs +++ b/src/WorkflowManager/Contracts/Models/WorkflowRevision.cs @@ -23,7 +23,7 @@ namespace Monai.Deploy.WorkflowManager.Common.Contracts.Models { - [CollectionLocation("Workflows"), RuntimeVersion("1.0.0")] + [CollectionLocation("Workflows"), RuntimeVersion("1.0.1")] public class WorkflowRevision : ISoftDeleteable, IDocument { [BsonId] @@ -31,7 +31,7 @@ public class WorkflowRevision : ISoftDeleteable, IDocument public string? Id { get; set; } [JsonConverter(typeof(DocumentVersionConvert)), BsonSerializer(typeof(DocumentVersionConverBson))] - public DocumentVersion Version { get; set; } = new DocumentVersion(1, 0, 0); + public DocumentVersion Version { get; set; } = new DocumentVersion(1, 0, 1); [JsonProperty(PropertyName = "workflow_id")] public string WorkflowId { get; set; } = string.Empty; diff --git a/src/WorkflowManager/Database/Interfaces/IPayloadRepository.cs b/src/WorkflowManager/Database/Interfaces/IPayloadRepository.cs index 61fed986d..92f55c37c 100644 --- a/src/WorkflowManager/Database/Interfaces/IPayloadRepository.cs +++ b/src/WorkflowManager/Database/Interfaces/IPayloadRepository.cs @@ -14,6 +14,7 @@ * limitations under the License. */ +using System; using System.Collections.Generic; using System.Threading.Tasks; using Monai.Deploy.WorkflowManager.Common.Contracts.Models; @@ -52,11 +53,27 @@ public interface IPayloadRepository /// The updated payload. Task UpdateAsync(Payload payload); + /// /// Updates a payload in the database. /// /// /// /// Task UpdateAssociatedWorkflowInstancesAsync(string payloadId, IEnumerable workflowInstances); + + /// + /// Gets all the payloads that might need deleted + /// + /// the current datetime + /// + Task> GetPayloadsToDelete(DateTime now); + + /// + /// Marks a bunch of payloads as a new deleted state + /// + /// a list of payloadIds to mark in new status + /// the status to mark as + /// + Task MarkDeletedState(IList Ids, PayloadDeleted status); } } diff --git a/src/WorkflowManager/Database/Interfaces/IWorkflowRepository.cs b/src/WorkflowManager/Database/Interfaces/IWorkflowRepository.cs index b8b8107be..c821753be 100644 --- a/src/WorkflowManager/Database/Interfaces/IWorkflowRepository.cs +++ b/src/WorkflowManager/Database/Interfaces/IWorkflowRepository.cs @@ -17,7 +17,6 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; -using Monai.Deploy.Messaging.Events; using Monai.Deploy.WorkflowManager.Common.Contracts.Models; namespace Monai.Deploy.WorkflowManager.Common.Database.Interfaces diff --git a/src/WorkflowManager/Database/Repositories/PayloadRepository.cs b/src/WorkflowManager/Database/Repositories/PayloadRepository.cs index c6b5cba7d..a4fc8cee8 100644 --- a/src/WorkflowManager/Database/Repositories/PayloadRepository.cs +++ b/src/WorkflowManager/Database/Repositories/PayloadRepository.cs @@ -16,6 +16,7 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using Ardalis.GuardClauses; using Microsoft.Extensions.Logging; @@ -47,6 +48,28 @@ public PayloadRepository( _logger = logger ?? throw new ArgumentNullException(nameof(logger)); var mongoDatabase = client.GetDatabase(databaseSettings.Value.DatabaseName); _payloadCollection = mongoDatabase.GetCollection("Payloads"); + EnsureIndex().GetAwaiter().GetResult(); + } + + private async Task EnsureIndex() + { + var indexName = "PayloadDeletedIndex"; + + var model = new CreateIndexModel( + Builders.IndexKeys.Ascending(s => s.PayloadDeleted), + new CreateIndexOptions { Name = indexName } + ); + + + var asyncCursor = (await _payloadCollection.Indexes.ListAsync()); + var bsonDocuments = (await asyncCursor.ToListAsync()); + var indexes = bsonDocuments.Select(_ => _.GetElement("name").Value.ToString()).ToList(); + + // If index not present create it else skip. + if (!indexes.Exists(i => i is not null && i.Equals(indexName))) + { + await _payloadCollection.Indexes.CreateOneAsync(model); + } } public Task CountAsync() => CountAsync(_payloadCollection, null); @@ -137,5 +160,37 @@ await _payloadCollection.FindOneAndUpdateAsync( return false; } } + + public async Task> GetPayloadsToDelete(DateTime now) + { + try + { + var filter = (Builders.Filter.Eq(p => p.PayloadDeleted, PayloadDeleted.No) | + Builders.Filter.Eq(p => p.PayloadDeleted, PayloadDeleted.Failed)) & + Builders.Filter.Lt(p => p.Expires, now); + + return await (await _payloadCollection.FindAsync(filter)).ToListAsync(); + + } + catch (Exception ex) + { + _logger.DbGetPayloadsToDeleteError(ex); + return new List(); + } + } + + public async Task MarkDeletedState(IList Ids, PayloadDeleted status) + { + try + { + var filter = Builders.Filter.In(p => p.PayloadId, Ids); + var update = Builders.Update.Set(p => p.PayloadDeleted, status); + await _payloadCollection.UpdateManyAsync(filter, update); + } + catch (Exception ex) + { + _logger.DbGetPayloadsToDeleteError(ex); + } + } } } diff --git a/src/WorkflowManager/Database/Repositories/WorkflowRepository.cs b/src/WorkflowManager/Database/Repositories/WorkflowRepository.cs index e5d11a49b..8ac32ec05 100755 --- a/src/WorkflowManager/Database/Repositories/WorkflowRepository.cs +++ b/src/WorkflowManager/Database/Repositories/WorkflowRepository.cs @@ -218,6 +218,7 @@ public async Task> GetWorkflowsForWorkflowRequestAsync(s x.Workflow.InformaticsGateway.DataOrigins.Any(d => d == callingAeTitle)) && x.Deleted == null) .ToListAsync(); + return wfs; } diff --git a/src/WorkflowManager/Logging/Log.200000.Workflow.cs b/src/WorkflowManager/Logging/Log.200000.Workflow.cs index fff85817a..c3bc807dc 100644 --- a/src/WorkflowManager/Logging/Log.200000.Workflow.cs +++ b/src/WorkflowManager/Logging/Log.200000.Workflow.cs @@ -109,7 +109,7 @@ public static partial class Log [LoggerMessage(EventId = 210007, Level = LogLevel.Information, Message = "Exporting to MIG task Id {taskid}, export destination {destination} number of files {fileCount} Mig data plugins {plugins}.")] public static partial void LogMigExport(this ILogger logger, string taskid, string destination, int fileCount, string plugins); - [LoggerMessage(EventId = 200018, Level = LogLevel.Error, Message = "ExportList or Artifacts are empty! workflowInstanceId {workflowInstanceId} TaskId {taskId}")] + [LoggerMessage(EventId = 210018, Level = LogLevel.Error, Message = "ExportList or Artifacts are empty! workflowInstanceId {workflowInstanceId} TaskId {taskId}")] public static partial void ExportListOrArtifactsAreEmpty(this ILogger logger, string taskId, string workflowInstanceId); } } diff --git a/src/WorkflowManager/Logging/Log.800000.Database.cs b/src/WorkflowManager/Logging/Log.800000.Database.cs index fc2a9c854..e67a57249 100644 --- a/src/WorkflowManager/Logging/Log.800000.Database.cs +++ b/src/WorkflowManager/Logging/Log.800000.Database.cs @@ -63,5 +63,8 @@ public static partial class Log [LoggerMessage(EventId = 800014, Level = LogLevel.Error, Message = "Failed to update payload: '{payloadId}'.")] public static partial void DbUpdatePayloadError(this ILogger logger, string payloadId, Exception ex); + + [LoggerMessage(EventId = 800015, Level = LogLevel.Error, Message = "Failed to get payloads to delete.")] + public static partial void DbGetPayloadsToDeleteError(this ILogger logger, Exception ex); } } diff --git a/src/WorkflowManager/MonaiBackgroundService/Worker.cs b/src/WorkflowManager/MonaiBackgroundService/Worker.cs index cd9de5368..f68c2e715 100644 --- a/src/WorkflowManager/MonaiBackgroundService/Worker.cs +++ b/src/WorkflowManager/MonaiBackgroundService/Worker.cs @@ -23,6 +23,8 @@ using Monai.Deploy.WorkflowManager.Common.Logging; using Monai.Deploy.WorkflowManager.Common.WorkflowExecuter.Common; using Monai.Deploy.WorkflowManager.MonaiBackgroundService.Logging; +using Monai.Deploy.WorkflowManager.Common.Database.Interfaces; +using Monai.Deploy.Storage.API; namespace Monai.Deploy.WorkflowManager.Common.MonaiBackgroundService { @@ -33,18 +35,24 @@ public class Worker : BackgroundService private readonly ITasksService _tasksService; private readonly IMessageBrokerPublisherService _publisherService; private readonly IOptions _options; + private readonly IPayloadRepository _payloadRepository; + private readonly IStorageService _storageService; public bool IsRunning { get; set; } = false; public Worker( ILogger logger, ITasksService tasksService, IMessageBrokerPublisherService publisherService, + IPayloadRepository payloadRepository, + IStorageService storageService, IOptions options) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _tasksService = tasksService ?? throw new ArgumentNullException(nameof(tasksService)); _publisherService = publisherService ?? throw new ArgumentNullException(nameof(publisherService)); _options = options ?? throw new ArgumentNullException(nameof(options)); + _payloadRepository = payloadRepository ?? throw new ArgumentNullException(nameof(payloadRepository)); + _storageService = storageService ?? throw new ArgumentNullException(nameof(storageService)); } public static string ServiceName => "Monai Background Service"; @@ -68,6 +76,12 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } public async Task DoWork() + { + await ProcessTimedoutTasks().ConfigureAwait(false); + await ProcessExpiredPayloads().ConfigureAwait(false); + } + + private async Task ProcessTimedoutTasks() { try { @@ -89,6 +103,59 @@ public async Task DoWork() } } + private async Task ProcessExpiredPayloads() + { + var payloads = new List(); + try + { + payloads = (await _payloadRepository.GetPayloadsToDelete(DateTime.UtcNow).ConfigureAwait(false)).ToList(); + + if (payloads.Count != 0) + { + var ids = payloads.Select(p => p.PayloadId).ToList(); + + await _payloadRepository.MarkDeletedState(ids, PayloadDeleted.InProgress).ConfigureAwait(false); + } + + } + catch (Exception e) + { + _logger.WorkerException(e.Message); + } + + try + { + await RemoveStoredFiles(payloads.ToList()); + } + catch (Exception e) + { + + _logger.WorkerException(e.Message); + } + } + + private async Task RemoveStoredFiles(List payloads) + { + var tasks = new List(); + + foreach (var payload in payloads) + { + var filepaths = (payload.Files.Select(f => f.Path)).ToList(); + + var all = await _storageService.ListObjectsAsync(payload.Bucket, payload.PayloadId, true); + + filepaths.AddRange(all.Select(f => f.FilePath)); + + foreach (var filepath in filepaths) + { + await _storageService.RemoveObjectAsync(payload.Bucket, filepath); + } + + tasks.Add(_payloadRepository.MarkDeletedState(new List { payload.PayloadId }, PayloadDeleted.Yes)); + } + await Task.WhenAll(tasks); + } + private async Task PublishCancellationEvent(TaskExecution task, string correlationId, string identity, string workflowInstanceId) { _logger.TimingOutTaskCancellationEvent(identity, task.WorkflowInstanceId); diff --git a/src/WorkflowManager/PayloadListener/packages.lock.json b/src/WorkflowManager/PayloadListener/packages.lock.json index 0dff6209a..5faa09ad0 100644 --- a/src/WorkflowManager/PayloadListener/packages.lock.json +++ b/src/WorkflowManager/PayloadListener/packages.lock.json @@ -797,6 +797,7 @@ "monai.deploy.workflowmanager.common": { "type": "Project", "dependencies": { + "Monai.Deploy.WorkflowManager.Common.Configuration": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Contracts": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Database": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Storage": "[1.0.0, )" diff --git a/src/WorkflowManager/Services/packages.lock.json b/src/WorkflowManager/Services/packages.lock.json index ad7e2bef1..2d5fc421d 100644 --- a/src/WorkflowManager/Services/packages.lock.json +++ b/src/WorkflowManager/Services/packages.lock.json @@ -762,6 +762,7 @@ "monai.deploy.workflowmanager.common": { "type": "Project", "dependencies": { + "Monai.Deploy.WorkflowManager.Common.Configuration": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Contracts": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Database": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Storage": "[1.0.0, )" diff --git a/src/WorkflowManager/WorkflowExecuter/packages.lock.json b/src/WorkflowManager/WorkflowExecuter/packages.lock.json index 7ac4c1c04..978e97a87 100644 --- a/src/WorkflowManager/WorkflowExecuter/packages.lock.json +++ b/src/WorkflowManager/WorkflowExecuter/packages.lock.json @@ -798,6 +798,7 @@ "monai.deploy.workflowmanager.common": { "type": "Project", "dependencies": { + "Monai.Deploy.WorkflowManager.Common.Configuration": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Contracts": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Database": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Storage": "[1.0.0, )" diff --git a/src/WorkflowManager/WorkflowManager/appsettings.json b/src/WorkflowManager/WorkflowManager/appsettings.json index 502dabf97..4b946a500 100755 --- a/src/WorkflowManager/WorkflowManager/appsettings.json +++ b/src/WorkflowManager/WorkflowManager/appsettings.json @@ -104,7 +104,8 @@ } }, "dicomTagsDisallowed": "PatientName,PatientID,IssuerOfPatientID,TypeOfPatientID,IssuerOfPatientIDQualifiersSequence,SourcePatientGroupIdentificationSequence,GroupOfPatientsIdentificationSequence,SubjectRelativePositionInImage,PatientBirthDate,PatientBirthTime,PatientBirthDateInAlternativeCalendar,PatientDeathDateInAlternativeCalendar,PatientAlternativeCalendar,PatientSex,PatientInsurancePlanCodeSequence,PatientPrimaryLanguageCodeSequence,PatientPrimaryLanguageModifierCodeSequence,QualityControlSubject,QualityControlSubjectTypeCodeSequence,StrainDescription,StrainNomenclature,StrainStockNumber,StrainSourceRegistryCodeSequence,StrainStockSequence,StrainSource,StrainAdditionalInformation,StrainCodeSequence,GeneticModificationsSequence,GeneticModificationsDescription,GeneticModificationsNomenclature,GeneticModificationsCodeSequence,OtherPatientIDsRETIRED,OtherPatientNames,OtherPatientIDsSequence,PatientBirthName,PatientAge,PatientSize,PatientSizeCodeSequence,PatientBodyMassIndex,MeasuredAPDimension,MeasuredLateralDimension,PatientWeight,PatientAddress,InsurancePlanIdentificationRETIRED,PatientMotherBirthName,MilitaryRank,BranchOfService,MedicalRecordLocatorRETIRED,ReferencedPatientPhotoSequence,MedicalAlerts,Allergies,CountryOfResidence,RegionOfResidence,PatientTelephoneNumbers,PatientTelecomInformation,EthnicGroup,Occupation,SmokingStatus,AdditionalPatientHistory,PregnancyStatus,LastMenstrualDate,PatientReligiousPreference,PatientSpeciesDescription,PatientSpeciesCodeSequence,PatientSexNeutered,AnatomicalOrientationType,PatientBreedDescription,PatientBreedCodeSequence,BreedRegistrationSequence,BreedRegistrationNumber,BreedRegistryCodeSequence,ResponsiblePerson,ResponsiblePersonRole,ResponsibleOrganization,PatientComments,ExaminedBodyThickness", - "migExternalAppPlugins": [ "Monai.Deploy.InformaticsGateway.PlugIns.RemoteAppExecution.DicomDeidentifier, Monai.Deploy.InformaticsGateway.PlugIns.RemoteAppExecution, Version=0.0.0.0" ] + "migExternalAppPlugins": [ "Monai.Deploy.InformaticsGateway.PlugIns.RemoteAppExecution.DicomDeidentifier, Monai.Deploy.InformaticsGateway.PlugIns.RemoteAppExecution, Version=0.0.0.0" ], + "dataRetentionDays": 10 // note. -1 = never delete }, "InformaticsGateway": { "apiHost": "http://localhost:5010", diff --git a/src/WorkflowManager/WorkflowManager/packages.lock.json b/src/WorkflowManager/WorkflowManager/packages.lock.json index 5815664bb..39edde3bc 100644 --- a/src/WorkflowManager/WorkflowManager/packages.lock.json +++ b/src/WorkflowManager/WorkflowManager/packages.lock.json @@ -1083,6 +1083,7 @@ "monai.deploy.workflowmanager.common": { "type": "Project", "dependencies": { + "Monai.Deploy.WorkflowManager.Common.Configuration": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Contracts": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Database": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Storage": "[1.0.0, )" diff --git a/tests/UnitTests/Common.Tests/Services/PayloadServiceTests.cs b/tests/UnitTests/Common.Tests/Services/PayloadServiceTests.cs index dcc1ed626..6b3f9557d 100644 --- a/tests/UnitTests/Common.Tests/Services/PayloadServiceTests.cs +++ b/tests/UnitTests/Common.Tests/Services/PayloadServiceTests.cs @@ -27,6 +27,8 @@ using Monai.Deploy.WorkflowManager.Common.Storage.Services; using Moq; using Xunit; +using Microsoft.Extensions.Options; +using Monai.Deploy.WorkflowManager.Common.Configuration; namespace Monai.Deploy.WorkflowManager.Common.Miscellaneous.Tests.Services { @@ -36,6 +38,7 @@ public class PayloadServiceTests private readonly Mock _payloadRepository; private readonly Mock _workflowInstanceRepository; + private readonly Mock _workflowRepository; private readonly Mock _dicomService; private readonly Mock _serviceScopeFactory; private readonly Mock _serviceProvider; @@ -47,6 +50,7 @@ public PayloadServiceTests() { _payloadRepository = new Mock(); _workflowInstanceRepository = new Mock(); + _workflowRepository = new Mock(); _dicomService = new Mock(); _serviceProvider = new Mock(); _storageService = new Mock(); @@ -65,7 +69,16 @@ public PayloadServiceTests() .Setup(x => x.GetService(typeof(IStorageService))) .Returns(_storageService.Object); - PayloadService = new PayloadService(_payloadRepository.Object, _dicomService.Object, _workflowInstanceRepository.Object, _serviceScopeFactory.Object, _logger.Object); + var opts = Options.Create(new WorkflowManagerOptions { DataRetentionDays = 99 }); + + PayloadService = new PayloadService( + _payloadRepository.Object, + _dicomService.Object, + _workflowInstanceRepository.Object, + _workflowRepository.Object, + _serviceScopeFactory.Object, + opts, + _logger.Object); } [Fact] @@ -372,5 +385,108 @@ public async Task DeletePayloadFromStorageAsync_ThrowsMonaiBadRequestExceptionWh await Assert.ThrowsAsync(async () => await PayloadService.DeletePayloadFromStorageAsync(payloadId)); } + + + [Fact] + public async Task GetExpiry_Should_use_Config_if_not_set() + { + _workflowInstanceRepository.Setup(r => + r.GetByPayloadIdsAsync(It.IsAny>()) + ).ReturnsAsync(() => new List()); + var workflow = new WorkflowRevision { Workflow = new Workflow { DataRetentionDays = null } }; + + + _workflowRepository.Setup(r => + r.GetByWorkflowIdAsync(It.IsAny()) + ).ReturnsAsync(workflow); + + var now = new DateTime(2021, 1, 1); + var expires = await PayloadService.GetExpiry(now, "workflowInstanceId"); + Assert.Equal(now.AddDays(99), expires); + } + + [Fact] + public async Task GetExpiry_Should_return_null_if_minusOne() + { + _workflowInstanceRepository.Setup(r => + r.GetByWorkflowInstanceIdAsync(It.IsAny()) + ).ReturnsAsync(() => new WorkflowInstance()); + var workflow = new WorkflowRevision { Workflow = new Workflow { DataRetentionDays = -1 } }; + + + _workflowRepository.Setup(r => + r.GetByWorkflowIdAsync(It.IsAny()) + ).ReturnsAsync(workflow); + + var now = new DateTime(2021, 1, 1); + var expires = await PayloadService.GetExpiry(now, "workflowInstanceId"); + Assert.Null(expires); + } + + [Fact] + public async Task GetExpiry_Should_use_Workflow_Value_if_set() + { + _workflowInstanceRepository.Setup(r => + r.GetByWorkflowInstanceIdAsync(It.IsAny()) + ).ReturnsAsync(() => new WorkflowInstance()); + var workflow = new WorkflowRevision { Workflow = new Workflow { DataRetentionDays = 4 } }; + + _workflowRepository.Setup(r => + r.GetByWorkflowIdAsync(It.IsAny()) + ).ReturnsAsync(workflow); + var now = new DateTime(2021, 1, 1); + var expires = await PayloadService.GetExpiry(now, "workflowInstanceId"); + Assert.Equal(now.AddDays(4), expires); + } + + [Fact] + public void PayloadServiceCreate_Should_Throw_If_No_Options_Passed() + { + Assert.Throws(() => new PayloadService( + _payloadRepository.Object, + _dicomService.Object, + _workflowInstanceRepository.Object, + _workflowRepository.Object, + _serviceScopeFactory.Object, + null!, + _logger.Object)); + } + + [Fact] + public void PayloadServiceCreate_Should_Throw_If_No_workflowRepository_Passed() + { + var opts = Options.Create(new WorkflowManagerOptions { DataRetentionDays = 99 }); + + Assert.Throws(() => new PayloadService( + _payloadRepository.Object, + _dicomService.Object, + _workflowInstanceRepository.Object, + null!, + _serviceScopeFactory.Object, + opts, + _logger.Object)); + } + + [Fact] + public async Task PayloadServiceCreate_Should_Call_GetExpiry() + { + _payloadRepository.Setup(p => p.CreateAsync(It.IsAny())).ReturnsAsync(true); + + var payload = await PayloadService.CreateAsync(new WorkflowRequestEvent + { + Timestamp = DateTime.UtcNow, + Bucket = "bucket", + DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" }, + CorrelationId = Guid.NewGuid().ToString(), + PayloadId = Guid.NewGuid(), + Workflows = new List { Guid.NewGuid().ToString() }, + FileCount = 0 + }); + + var daysdiff = (payload!.Expires! - DateTime.UtcNow).Value.TotalDays + 0.5; + + Assert.Equal(99, (int)daysdiff); + } + } } diff --git a/tests/UnitTests/MonaiBackgroundService.Tests/WorkerTests.cs b/tests/UnitTests/MonaiBackgroundService.Tests/WorkerTests.cs index bd715e18c..7999b66df 100644 --- a/tests/UnitTests/MonaiBackgroundService.Tests/WorkerTests.cs +++ b/tests/UnitTests/MonaiBackgroundService.Tests/WorkerTests.cs @@ -24,6 +24,7 @@ using Monai.Deploy.WorkflowManager.Common.Contracts.Models; using Monai.Deploy.WorkflowManager.Common.Database.Interfaces; using Moq; +using Monai.Deploy.Storage.API; namespace Monai.Deploy.WorkflowManager.Common.MonaiBackgroundService.Tests { @@ -33,6 +34,8 @@ public class WorkerTests private readonly Worker _service; private readonly Mock _pubService; private readonly IOptions _options; + private readonly Mock _storageService; + private readonly Mock _payloadRepository; private readonly Mock _repo; public WorkerTests() @@ -42,7 +45,9 @@ public WorkerTests() var taskService = new TasksService(_repo.Object); _pubService = new Mock(); _options = Options.Create(new WorkflowManagerOptions()); - _service = new Worker(logger.Object, taskService, _pubService.Object, _options); + _storageService = new Mock(); + _payloadRepository = new Mock(); + _service = new Worker(logger.Object, taskService, _pubService.Object, _payloadRepository.Object, _storageService.Object, _options); } [Fact] @@ -89,5 +94,22 @@ public async Task MonaiBackgroundService_DoWork_ShouldPublishMessages() Assert.False(_service.IsRunning); } + + [Fact] + public async Task MonaiBackgroundService_DoWork_Should_Delete_Expired_Payload_Files() + { + var payloadToRemove = new Payload { PayloadId = "removeMe " }; + + + _payloadRepository.Setup(p => p.GetPayloadsToDelete(It.IsAny())).ReturnsAsync(() => new List { payloadToRemove }); + _storageService.Setup(s => s.ListObjectsAsync(It.IsAny(), It.IsAny(), true, It.IsAny())) + .ReturnsAsync(() => new List { new VirtualFileInfo(payloadToRemove.PayloadId, payloadToRemove.PayloadId, "", 5) }); + + await _service.DoWork(); + + _storageService.Verify(s => s.RemoveObjectAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once()); + _payloadRepository.Verify(p => p.MarkDeletedState(It.IsAny>(), It.IsAny()), Times.Exactly(2)); //once for in-progress once for deleted + Assert.False(_service.IsRunning); + } } } diff --git a/tests/UnitTests/WorkflowManager.Tests/packages.lock.json b/tests/UnitTests/WorkflowManager.Tests/packages.lock.json index d30975a16..f1a71974a 100644 --- a/tests/UnitTests/WorkflowManager.Tests/packages.lock.json +++ b/tests/UnitTests/WorkflowManager.Tests/packages.lock.json @@ -1941,6 +1941,7 @@ "monai.deploy.workflowmanager.common": { "type": "Project", "dependencies": { + "Monai.Deploy.WorkflowManager.Common.Configuration": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Contracts": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Database": "[1.0.0, )", "Monai.Deploy.WorkflowManager.Storage": "[1.0.0, )"