Skip to content

Commit

Permalink
gh-347 Remove incomplete payloads on timeout (#348)
Browse files Browse the repository at this point in the history
* gh-347 Remove incomplete payloads on timeout

Signed-off-by: Victor Chang <vicchang@nvidia.com>

* gh-347  Update unit test

Signed-off-by: Victor Chang <vicchang@nvidia.com>

* gh-347  Update integration tests

Signed-off-by: Victor Chang <vicchang@nvidia.com>

* gh-347  Fix unit test

Signed-off-by: Victor Chang <vicchang@nvidia.com>

* gh-347  Fix integration test

Signed-off-by: Victor Chang <vicchang@nvidia.com>

---------

Signed-off-by: Victor Chang <vicchang@nvidia.com>
  • Loading branch information
mocsharp committed Mar 11, 2023
1 parent 25bca2c commit 7d43ce6
Show file tree
Hide file tree
Showing 18 changed files with 71 additions and 51 deletions.
3 changes: 1 addition & 2 deletions src/Api/MonaiApplicationEntity.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 MONAI Consortium
* Copyright 2021-2023 MONAI Consortium
* Copyright 2019-2021 NVIDIA Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -15,7 +15,6 @@
* limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;
Expand Down
2 changes: 1 addition & 1 deletion src/Api/Storage/Payload.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 MONAI Consortium
* Copyright 2021-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
5 changes: 3 additions & 2 deletions src/Configuration/DicomWebConfiguration.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 MONAI Consortium
* Copyright 2021-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -55,7 +55,8 @@ public class DicomWebConfiguration
/// single POST request, therefore, the timeout value may be insignificant unless the load of the
/// network affects the upload speed.
/// </summary>
public uint Timeout { get; set; } = 2;
[ConfigurationKeyName("timeout")]
public uint Timeout { get; set; } = 10;

public DicomWebConfiguration()
{
Expand Down
4 changes: 2 additions & 2 deletions src/InformaticsGateway/Logging/Log.3000.PayloadAssembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static partial class Log
[LoggerMessage(EventId = 3012, Level = LogLevel.Information, Message = "Bucket {key} created with timeout {timeout}s.")]
public static partial void BucketCreated(this ILogger logger, string key, uint timeout);

[LoggerMessage(EventId = 3014, Level = LogLevel.Error, Message = "Payload deleted due to upload failure(s) {key}.")]
public static partial void PayloadRemovedWithFailureUploads(this ILogger logger, string key);
[LoggerMessage(EventId = 3014, Level = LogLevel.Error, Message = "Payload ({key}) with {totalNumberOfFiles} files deleted due to {failures} upload failure(s).")]
public static partial void PayloadRemovedWithFailureUploads(this ILogger logger, string key, int totalNumberOfFiles, int failures);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 MONAI Consortium
* Copyright 2021-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Ardalis.GuardClauses;
Expand Down Expand Up @@ -149,10 +150,10 @@ private async void OnTimedEvent(Object source, System.Timers.ElapsedEventArgs e)
_logger.BucketRemoveError(key);
}
}
else if (payload.AnyUploadFailures())
else if (payload.IsUploadCompletedWithFailures())
{
_payloads.TryRemove(key, out _);
_logger.PayloadRemovedWithFailureUploads(key);
_logger.PayloadRemovedWithFailureUploads(key, payload.Count, payload.Files.Count(p => p.IsUploadFailed));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 MONAI Consortium
* Copyright 2022-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,9 +28,9 @@ public static bool IsUploadCompleted(this Payload payload)
return payload.Files.All(p => p.IsUploaded);
}

public static bool AnyUploadFailures(this Payload payload)
public static bool IsUploadCompletedWithFailures(this Payload payload)
{
return payload.Files.Any(p => p.IsUploadFailed);
return payload.Files.Count(p => p.IsUploadFailed) + payload.Files.Count(p => p.IsUploaded) == payload.Count; ;
}

public static bool IsMoveCompleted(this Payload payload)
Expand Down
3 changes: 1 addition & 2 deletions src/InformaticsGateway/Services/Http/InferenceController.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 MONAI Consortium
* Copyright 2021-2023 MONAI Consortium
* Copyright 2019-2021 NVIDIA Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -90,7 +90,6 @@ public async Task<ActionResult> NewInferenceRequest([FromBody] InferenceRequest
using var _ = _logger.BeginScope(new LoggingDataDictionary<string, object> { { "TransactionId", request.TransactionId } });
try
{

if (await _inferenceRequestRepository.ExistsAsync(request.TransactionId, HttpContext.RequestAborted).ConfigureAwait(false))
{
return Problem(title: "Conflict", statusCode: (int)HttpStatusCode.Conflict, detail: "An existing request with same transaction ID already exists.");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 MONAI Consortium
* Copyright 2021-2023 MONAI Consortium
* Copyright 2019-2021 NVIDIA Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -103,7 +103,7 @@ private async Task StartWorker(int thread, CancellationToken cancellationToken)
{
try
{
var item = await _uplaodQueue.Dequeue(cancellationToken);
var item = await _uplaodQueue.Dequeue(cancellationToken).ConfigureAwait(false);
await ProcessObject(item).ConfigureAwait(false);
}
catch (OperationCanceledException ex)
Expand Down Expand Up @@ -164,6 +164,7 @@ private async Task ProcessObject(FileStorageMetadata blob)
}
catch (Exception ex)
{
blob.SetFailed();
_logger.FailedToUploadFile(blob.Id, ex);
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,23 @@ public async Task GivenAPayloadAssembler_WhenDisposed_ExpectResourceToBeCleanedU
}

[RetryFact(10, 200)]
public async Task GivenAPayloadThatHasNotCompleteUploads_WhenProcessedByTimedEvent_ExpectToBeAddedToQueue()
public async Task GivenAPayloadThatHasNotCompleteUploads_WhenProcessedByTimedEvent_ExpectToBeRemovedFromQueue()
{
var payloadAssembler = new PayloadAssembler(_options, _logger.Object, _serviceScopeFactory.Object);

var file = new TestStorageInfo(Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), "file1", ".txt");
file.File.SetUploaded("bucket");
var file1 = new TestStorageInfo(Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), "file1", ".txt");
var file2 = new TestStorageInfo(Guid.NewGuid().ToString(), Guid.NewGuid().ToString(), "file1", ".txt");

await payloadAssembler.Queue("A", file, 1);
await payloadAssembler.Queue("A", file1, 1);
await payloadAssembler.Queue("A", file2, 1);

file1.SetFailed();
file2.SetUploaded();
await Task.Delay(1001);
payloadAssembler.Dispose();

_repository.Verify(p => p.UpdateAsync(It.Is<Payload>(p => p.State == Payload.PayloadState.Move), It.IsAny<CancellationToken>()), Times.Once());
_repository.Verify(p => p.UpdateAsync(It.Is<Payload>(p => p.State == Payload.PayloadState.Move), It.IsAny<CancellationToken>()), Times.Never());
_logger.VerifyLoggingMessageBeginsWith($"Payload (A) with 2 files deleted due to 1 upload failure(s).", LogLevel.Error, Times.Once());
}

[RetryFact(10, 200)]
Expand Down
2 changes: 1 addition & 1 deletion src/InformaticsGateway/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,4 @@
"InformaticsGatewayServerEndpoint": "http://localhost:5000",
"DockerImagePrefix": "ghcr.io/project-monai/monai-deploy-informatics-gateway"
}
}
}
6 changes: 6 additions & 0 deletions src/Shared/Test/TestStorageInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

using Monai.Deploy.InformaticsGateway.Api.Storage;
using Monai.Deploy.Messaging;

namespace Monai.Deploy.InformaticsGateway.SharedTest;

Expand All @@ -33,4 +34,9 @@ public TestStorageInfo(string correlationsId, string identifier, string filePath
public override string DataTypeDirectoryName => "dir";

public override StorageObjectMetadata File { get; set; }

public void SetUploaded()
{
File.SetUploaded("test");
}
}
7 changes: 6 additions & 1 deletion tests/Integration.Test/Common/Hl7DataSink.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 MONAI Consortium
* Copyright 2022-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,6 +14,7 @@
* limitations under the License.
*/

using System.Diagnostics;
using System.Net.Sockets;
using System.Text;
using Ardalis.GuardClauses;
Expand Down Expand Up @@ -95,6 +96,8 @@ private async Task SendOneAsync(DataProvider dataProvider, params object[] args)

private async Task SendBatchAsync(DataProvider dataProvider, params object[] args)
{
var stopwatch = new Stopwatch();
stopwatch.Start();
var messages = new List<byte>();
foreach (var file in dataProvider.HL7Specs.Files.Keys)
{
Expand Down Expand Up @@ -134,6 +137,8 @@ private async Task SendBatchAsync(DataProvider dataProvider, params object[] arg
}
} while (true);
tcpClient.Close();
stopwatch.Stop();
_outputHelper.WriteLine($"Took {stopwatch.Elapsed.TotalSeconds}s to send {messages.Count} messages.");
}
}
}
34 changes: 17 additions & 17 deletions tests/Integration.Test/Features/DicomDimseScp.feature
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 MONAI Consortium
# Copyright 2022-2023 MONAI Consortium
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,47 +37,47 @@ Feature: DICOM DIMSE SCP Services

@messaging_workflow_request @messaging
Scenario Outline: Respond to C-STORE-RQ and group data by Study Instance UID
Given a called AE Title named 'C-STORE-STUDY' that groups by '0020,000D' for 3 seconds
Given a called AE Title named '<aet>' that groups by '0020,000D' for <timeout> seconds
And a DICOM client configured with 300 seconds timeout
And a DICOM client configured to send data over 1 associations and wait 0 between each association
And <count> <modality> studies
When a C-STORE-RQ is sent to 'Informatics Gateway' with AET 'C-STORE-STUDY' from 'TEST-RUNNER'
When a C-STORE-RQ is sent to 'Informatics Gateway' with AET '<aet>' from 'TEST-RUNNER'
Then a successful response should be received
And <count> workflow requests sent to message broker
And studies are uploaded to storage service

Examples:
| modality | count |
| MR | 1 |
| CT | 1 |
| MG | 2 |
| US | 1 |
| modality | count | aet | timeout |
| MR | 1 | C-STORE-STUDY30 | 3 |
| CT | 1 | C-STORE-STUDY30 | 3 |
| MG | 2 | C-STORE-STUDY10 | 3 |
| US | 1 | C-STORE-STUDY10 | 3 |

@messaging_workflow_request @messaging
Scenario Outline: Respond to C-STORE-RQ and group data by Series Instance UID
Given a called AE Title named 'C-STORE-SERIES' that groups by '0020,000E' for 3 seconds
Given a called AE Title named '<aet>' that groups by '0020,000E' for <timeout> seconds
And a DICOM client configured with 300 seconds timeout
And a DICOM client configured to send data over 1 associations and wait 0 between each association
And <study_count> <modality> studies with <series_count> series per study
When a C-STORE-RQ is sent to 'Informatics Gateway' with AET 'C-STORE-SERIES' from 'TEST-RUNNER'
When a C-STORE-RQ is sent to 'Informatics Gateway' with AET '<aet>' from 'TEST-RUNNER'
Then a successful response should be received
And <series_count> workflow requests sent to message broker
And studies are uploaded to storage service

Examples:
| modality | study_count | series_count |
| MR | 1 | 2 |
| CT | 1 | 2 |
| MG | 1 | 3 |
| US | 1 | 2 |
| modality | study_count | series_count | aet | timeout |
| MR | 1 | 2 | C-STORE-SER30 | 3 |
| CT | 1 | 2 | C-STORE-SER30 | 3 |
| MG | 1 | 3 | C-STORE-SER10 | 3 |
| US | 1 | 2 | C-STORE-SER10 | 3 |

@messaging_workflow_request @messaging
Scenario Outline: Respond to C-STORE-RQ and group data by Study Instance UID over multiple associations
Given a called AE Title named 'C-STORE-STUDY' that groups by '0020,000D' for 5 seconds
Given a called AE Title named 'C-STORE-MA' that groups by '0020,000D' for 5 seconds
And a DICOM client configured with 300 seconds timeout
And a DICOM client configured to send data over <series_count> associations and wait <seconds> between each association
And <study_count> <modality> studies with <series_count> series per study
When C-STORE-RQ are sent to 'Informatics Gateway' with AET 'C-STORE-STUDY' from 'TEST-RUNNER'
When C-STORE-RQ are sent to 'Informatics Gateway' with AET 'C-STORE-MA' from 'TEST-RUNNER'
Then a successful response should be received
And <workflow_requests> workflow requests sent to message broker
And studies are uploaded to storage service
Expand Down
6 changes: 3 additions & 3 deletions tests/Integration.Test/Features/DicomWebStow.feature
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 MONAI Consortium
# Copyright 2022-2023 MONAI Consortium
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,8 +39,8 @@ Feature: DICOMweb STOW-RS Service
And studies are uploaded to storage service
Examples:
| modality | count |
| CT | 2 |
| US | 1 |
| MR | 1 |
| MG | 2 |

@messaging_workflow_request @messaging
Scenario: Triggers a new workflow via DICOMWeb STOW-RS
Expand Down
4 changes: 2 additions & 2 deletions tests/Integration.Test/StepDefinitions/FhirDefinitions.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 MONAI Consortium
* Copyright 2022-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,7 +29,7 @@ public class FhirDefinitions
internal enum FileFormat
{ Xml, Json };

internal static readonly TimeSpan WaitTimeSpan = TimeSpan.FromMinutes(2);
internal static readonly TimeSpan WaitTimeSpan = TimeSpan.FromMinutes(3);
private readonly InformaticsGatewayConfiguration _informaticsGatewayConfiguration;
private readonly RabbitMqConsumer _receivedMessages;
private readonly DataProvider _dataProvider;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 MONAI Consortium
* Copyright 2022-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,7 +73,7 @@ public void ThenAcknowledgementAreReceived()
[Then(@"a workflow requests sent to message broker")]
public async Task ThenAWorkflowRequestIsSentToMessageBrokerAsync()
{
(await _receivedMessages.WaitforAsync(_dataProvider.HL7Specs.Files.Count, WaitTimeSpan)).Should().BeTrue();
(await _receivedMessages.WaitforAsync(1, WaitTimeSpan)).Should().BeTrue();
}

[Then(@"messages are uploaded to storage service")]
Expand Down
5 changes: 4 additions & 1 deletion tests/Integration.Test/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
"logDataPDUs": false
}
},
"dicomWeb": {
"timeout": 10
},
"messaging": {
"publisherServiceAssemblyName": "Monai.Deploy.Messaging.RabbitMQ.RabbitMQMessagePublisherService, Monai.Deploy.Messaging.RabbitMQ",
"publisherSettings": {
Expand Down Expand Up @@ -83,4 +86,4 @@
"InformaticsGatewayServerEndpoint": "http://127.0.0.1:5000",
"DockerImagePrefix": "ghcr.io/project-monai/monai-deploy-informatics-gateway"
}
}
}
4 changes: 2 additions & 2 deletions tests/Integration.Test/study.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
"CT": {
"SeriesMin": 1,
"SeriesMax": 2,
"InstanceMin": 60,
"InstanceMax": 1000,
"InstanceMin": 50,
"InstanceMax": 300,
"SizeMin": 0.5,
"SizeMax": 1
},
Expand Down

0 comments on commit 7d43ce6

Please sign in to comment.