Skip to content

Commit

Permalink
Merge pull request #841 from dlcs/feature/apiAssetModifiedCleanup
Browse files Browse the repository at this point in the history
Api changes to support asset modified cleanup
  • Loading branch information
JackLewis-digirati committed May 3, 2024
2 parents 9e1d84a + 9e38510 commit 96ff88c
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,22 @@ public void Create_SetsCorrectFields()
notification.Before.Should().BeNull();
}

[Fact]
public void Update_SetsCorrectFields()
[Theory]
[InlineData(true)]
[InlineData(false)]
public void Update_SetsCorrectFields(bool engineNotified)
{
// Arrange
var before = new Asset { Id = new AssetId(1, 2, "foo") };
var after = new Asset { Id = new AssetId(1, 2, "foo"), MaxUnauthorised = 10 };

// Act
var notification = AssetModificationRecord.Update(before, after);
var notification = AssetModificationRecord.Update(before, after, engineNotified);

// Assert
notification.ChangeType.Should().Be(ChangeType.Update);
notification.Before.Should().Be(before);
notification.After.Should().Be(after);
notification.EngineNotified.Should().Be(engineNotified);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using API.Infrastructure.Messaging;
using DLCS.AWS.SNS;
using DLCS.Core.Types;
Expand Down Expand Up @@ -33,7 +32,7 @@ public async Task SendAssetModifiedMessage_Single_SendsNotification_IfUpdate()
{
// Arrange
var assetModifiedRecord =
AssetModificationRecord.Update(new Asset(new AssetId(1, 2, "foo")), new Asset(new AssetId(1, 2, "bar")));
AssetModificationRecord.Update(new Asset(new AssetId(1, 2, "foo")), new Asset(new AssetId(1, 2, "bar")), true);

// Act
await sut.SendAssetModifiedMessage(assetModifiedRecord, CancellationToken.None);
Expand Down Expand Up @@ -76,7 +75,7 @@ public async Task SendAssetModifiedMessage_Single_SendsNotification_IfDelete()
A.CallTo(() =>
topicPublisher.PublishToAssetModifiedTopic(
A<IReadOnlyList<AssetModifiedNotification>>.That.Matches(n =>
n.Single().ChangeType == ChangeType.Delete && n.Single().MessageContents.Contains(customerName)),
n.Single().Attributes.Values.Contains(ChangeType.Delete.ToString()) && n.Single().MessageContents.Contains(customerName)),
A<CancellationToken>._)).MustHaveHappened();
}

Expand All @@ -102,7 +101,7 @@ public async Task SendAssetModifiedMessage_Multiple_SendsNotification_IfDelete()
topicPublisher.PublishToAssetModifiedTopic(
A<IReadOnlyList<AssetModifiedNotification>>.That.Matches(n =>
n.Count == 2 && n.All(m =>
m.ChangeType == ChangeType.Delete && m.MessageContents.Contains(customerName))),
n.First().Attributes.Values.Contains(ChangeType.Delete.ToString()) && m.MessageContents.Contains(customerName))),
A<CancellationToken>._)).MustHaveHappened();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public async Task<ModifyEntityResult<Asset>> Handle(CreateOrUpdateImage request,

var assetModificationRecord = existingAsset == null
? AssetModificationRecord.Create(assetAfterSave)
: AssetModificationRecord.Update(existingAsset, assetAfterSave);
: AssetModificationRecord.Update(existingAsset, assetAfterSave, processAssetResult.RequiresEngineNotification);

await assetNotificationSender.SendAssetModifiedMessage(assetModificationRecord, cancellationToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public async Task<ModifyEntityResult<Asset>> Handle(ReingestAsset request, Cance

var asset = await MarkAssetAsIngesting(cancellationToken, existingAsset!);

await assetNotificationSender.SendAssetModifiedMessage(AssetModificationRecord.Update(existingAsset!, asset),
await assetNotificationSender.SendAssetModifiedMessage(AssetModificationRecord.Update(existingAsset!, asset, true),
cancellationToken);
var statusCode = await ingestNotificationSender.SendImmediateIngestAssetRequest(asset, cancellationToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Data;
using API.Features.Image;
using API.Features.Image.Ingest;
using API.Infrastructure.Messaging;
using API.Infrastructure.Requests;
using DLCS.Core;
using DLCS.Model.Assets;
Expand Down Expand Up @@ -37,19 +38,22 @@ public class CreateBatchOfImagesHandler : IRequestHandler<CreateBatchOfImages, M
private readonly IBatchRepository batchRepository;
private readonly AssetProcessor assetProcessor;
private readonly IIngestNotificationSender ingestNotificationSender;
private readonly IAssetNotificationSender assetNotificationSender;
private readonly ILogger<CreateBatchOfImagesHandler> logger;

public CreateBatchOfImagesHandler(
DlcsContext dlcsContext,
IBatchRepository batchRepository,
AssetProcessor assetProcessor,
IIngestNotificationSender ingestNotificationSender,
IAssetNotificationSender assetNotificationSender,
ILogger<CreateBatchOfImagesHandler> logger)
{
this.dlcsContext = dlcsContext;
this.batchRepository = batchRepository;
this.assetProcessor = assetProcessor;
this.ingestNotificationSender = ingestNotificationSender;
this.assetNotificationSender = assetNotificationSender;
this.logger = logger;
}

Expand Down Expand Up @@ -84,7 +88,9 @@ public class CreateBatchOfImagesHandler : IRequestHandler<CreateBatchOfImages, M

var batch = await batchRepository.CreateBatch(request.CustomerId, request.AssetsBeforeProcessing.Select(a => a.Asset).ToList(), cancellationToken);

var assetNotificationList = new List<Asset>(request.AssetsBeforeProcessing.Count);
var engineNotificationList = new List<Asset>(request.AssetsBeforeProcessing.Count);
var assetModifiedNotificationList = new List<AssetModificationRecord>();

try
{
using var logScope = logger.BeginScope("Processing batch {BatchId}", batch.Id);
Expand All @@ -106,9 +112,15 @@ public class CreateBatchOfImagesHandler : IRequestHandler<CreateBatchOfImages, M

var savedAsset = processAssetResult.Result.Entity!;

var existingAsset = processAssetResult.ExistingAsset;
var assetModificationRecord = existingAsset == null
? AssetModificationRecord.Create(savedAsset)
: AssetModificationRecord.Update(existingAsset, savedAsset, processAssetResult.RequiresEngineNotification);
assetModifiedNotificationList.Add(assetModificationRecord);

if (processAssetResult.RequiresEngineNotification)
{
assetNotificationList.Add(savedAsset);
engineNotificationList.Add(savedAsset);
}
else
{
Expand All @@ -119,6 +131,8 @@ public class CreateBatchOfImagesHandler : IRequestHandler<CreateBatchOfImages, M
}
}

await assetNotificationSender.SendAssetModifiedMessage(assetModifiedNotificationList, cancellationToken);

if (batch.Completed > 0)
{
await dlcsContext.SaveChangesAsync(cancellationToken);
Expand Down Expand Up @@ -151,7 +165,7 @@ public class CreateBatchOfImagesHandler : IRequestHandler<CreateBatchOfImages, M
{
// Raise notifications
logger.LogDebug("Batch {BatchId} created - sending engine notifications", batch.Id);
await ingestNotificationSender.SendIngestAssetsRequest(assetNotificationList, request.IsPriority,
await ingestNotificationSender.SendIngestAssetsRequest(engineNotificationList, request.IsPriority,
cancellationToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,25 @@ public class AssetModificationRecord
public Asset? Before { get; }
public Asset? After { get; }

public bool EngineNotified { get; }

public ImageCacheType? DeleteFrom { get; }

private AssetModificationRecord(ChangeType changeType, Asset? before, Asset? after, ImageCacheType? deleteFrom)
private AssetModificationRecord(ChangeType changeType, Asset? before, Asset? after, ImageCacheType? deleteFrom, bool assetModifiedEngineNotified)
{
ChangeType = changeType;
Before = before;
After = after;
DeleteFrom = deleteFrom;
EngineNotified = assetModifiedEngineNotified;
}

public static AssetModificationRecord Delete(Asset before, ImageCacheType deleteFrom)
=> new(ChangeType.Delete, before.ThrowIfNull(nameof(before)), null, deleteFrom.ThrowIfNull(nameof(deleteFrom)));
=> new(ChangeType.Delete, before.ThrowIfNull(nameof(before)), null, deleteFrom.ThrowIfNull(nameof(deleteFrom)), false);

public static AssetModificationRecord Update(Asset before, Asset after)
=> new(ChangeType.Update, before.ThrowIfNull(nameof(before)), after.ThrowIfNull(nameof(after)), null);
public static AssetModificationRecord Update(Asset before, Asset after, bool assetModifiedEngineNotified)
=> new(ChangeType.Update, before.ThrowIfNull(nameof(before)), after.ThrowIfNull(nameof(after)), null, assetModifiedEngineNotified);

public static AssetModificationRecord Create(Asset after)
=> new(ChangeType.Create, null, after.ThrowIfNull(nameof(after)), null);
=> new(ChangeType.Create, null, after.ThrowIfNull(nameof(after)), null, false);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Collections.Generic;
using System.Text.Json;
using System.Text.Json.Serialization;
using Amazon.Runtime.Internal.Transform;
using DLCS.AWS.SNS;
using DLCS.Core.Collections;
using DLCS.Core.Strings;
Expand Down Expand Up @@ -40,28 +41,31 @@ public class AssetNotificationSender : IAssetNotificationSender
CancellationToken cancellationToken = default)
=> SendAssetModifiedMessage(notification.AsList(), cancellationToken);

public async Task SendAssetModifiedMessage(IReadOnlyCollection<AssetModificationRecord> notifications,
public async Task SendAssetModifiedMessage(IReadOnlyCollection<AssetModificationRecord> notifications,
CancellationToken cancellationToken = default)
{
// Iterate through AssetModifiedMessage objects and build list(s) of changes
var changes = new Dictionary<ChangeType, List<string>>()
{
[ChangeType.Create] = new(),
[ChangeType.Update] = new(),
[ChangeType.Delete] = new(),
};

var changes = new List<AssetModifiedNotification>();

foreach (var notification in notifications)
{
var serialisedNotification = await GetSerialisedNotification(notification);
if (serialisedNotification.HasText())
{
changes[notification.ChangeType].Add(serialisedNotification);
var attributes = new Dictionary<string, string>()
{
{ "messageType", notification.ChangeType.ToString() }
};
if (notification.EngineNotified)
{
attributes.Add("engineNotified", "True");
}

changes.Add(new AssetModifiedNotification(serialisedNotification!, attributes));
}
}

// Send notifications generated in above method
await SendAssetModifiedRequest(changes, cancellationToken);
await topicPublisher.PublishToAssetModifiedTopic(changes, cancellationToken);
}

private async Task<string?> GetSerialisedNotification(AssetModificationRecord notification)
Expand Down Expand Up @@ -131,16 +135,4 @@ private async Task<CustomerPathElement> GetCustomerPathElement(int customer)
customerPathElements[customer] = customerPathElement;
return customerPathElement;
}

private async Task<bool> SendAssetModifiedRequest(Dictionary<ChangeType, List<string>> change, CancellationToken cancellationToken)
{
if (change.IsNullOrEmpty()) return true;

var toSend = change
.SelectMany(kvp => kvp.Value
.Select(v => new AssetModifiedNotification(v, kvp.Key)))
.ToList();

return await topicPublisher.PublishToAssetModifiedTopic(toSend, cancellationToken);
}
}
68 changes: 61 additions & 7 deletions src/protagonist/DLCS.AWS.Tests/SNS/TopicPublisherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public TopicPublisherTests()
public async Task PublishToAssetModifiedTopicBatch_SuccessfullyPublishesSingleMessage_IfSingleItemInBatch()
{
// Arrange
var notification = new AssetModifiedNotification("message", ChangeType.Delete);
var notification = new AssetModifiedNotification("message", GetAttributes(ChangeType.Delete, false));

// Act
await sut.PublishToAssetModifiedTopic(new[] { notification });
Expand All @@ -52,7 +52,7 @@ public async Task PublishToAssetModifiedTopicBatch_SuccessfullyPublishesSingleMe
public async Task PublishToAssetModifiedTopicBatch_SingleItemInBatch_ReturnsSuccessDependentOnStatusCode(HttpStatusCode statusCode, bool expected)
{
// Arrange
var notification = new AssetModifiedNotification("message", ChangeType.Delete);
var notification = new AssetModifiedNotification("message", GetAttributes(ChangeType.Delete, false));
A.CallTo(() => snsClient.PublishAsync(A<PublishRequest>._, A<CancellationToken>._))
.Returns(new PublishResponse { HttpStatusCode = statusCode });

Expand All @@ -67,8 +67,8 @@ public async Task PublishToAssetModifiedTopicBatch_SingleItemInBatch_ReturnsSucc
public async Task PublishToAssetModifiedTopicBatch_SuccessfullyPublishesSingleBatch()
{
// Arrange
var notification = new AssetModifiedNotification("message", ChangeType.Delete);
var notification2 = new AssetModifiedNotification("message", ChangeType.Delete);
var notification = new AssetModifiedNotification("message", GetAttributes(ChangeType.Delete, false));
var notification2 = new AssetModifiedNotification("message", GetAttributes(ChangeType.Delete, false));

// Act
await sut.PublishToAssetModifiedTopic(new[] { notification, notification2 });
Expand All @@ -91,7 +91,7 @@ public async Task PublishToAssetModifiedTopicBatch_SuccessfullyPublishesMultiple
var notifications = new List<AssetModifiedNotification>(15);
for (int x = 0; x < 15; x++)
{
notifications.Add(new AssetModifiedNotification(x < 10 ? "message" : "next", ChangeType.Delete));
notifications.Add(new AssetModifiedNotification(x < 10 ? "message" : "next", GetAttributes(ChangeType.Delete, false)));
}

// Act
Expand Down Expand Up @@ -123,7 +123,7 @@ public async Task PublishToAssetModifiedTopicBatch_ReturnsTrue_IfAllBatchesSucce
var notifications = new List<AssetModifiedNotification>(15);
for (int x = 0; x < 15; x++)
{
notifications.Add(new AssetModifiedNotification("message", ChangeType.Delete));
notifications.Add(new AssetModifiedNotification("message", GetAttributes(ChangeType.Delete, false)));
}

A.CallTo(() => snsClient.PublishBatchAsync(A<PublishBatchRequest>._, A<CancellationToken>._))
Expand All @@ -143,7 +143,7 @@ public async Task PublishToAssetModifiedTopicBatch_ReturnsFalse_IfAnyBatchFails(
var notifications = new List<AssetModifiedNotification>(15);
for (int x = 0; x < 15; x++)
{
notifications.Add(new AssetModifiedNotification("message", ChangeType.Delete));
notifications.Add(new AssetModifiedNotification("message", GetAttributes(ChangeType.Delete, false)));
}

A.CallTo(() => snsClient.PublishBatchAsync(A<PublishBatchRequest>._, A<CancellationToken>._))
Expand All @@ -157,4 +157,58 @@ public async Task PublishToAssetModifiedTopicBatch_ReturnsFalse_IfAnyBatchFails(
// Assert
response.Should().BeFalse();
}

[Fact]
public async Task PublishToAssetModifiedTopicBatch_SuccessfullyPublishesSingleMessageWithEngineNotified_IfEngineNotifiedTrue()
{
// Arrange
var notification = new AssetModifiedNotification("message", GetAttributes(ChangeType.Update, true));

// Act
await sut.PublishToAssetModifiedTopic(new[] { notification });

// Assert
A.CallTo(() =>
snsClient.PublishAsync(
A<PublishRequest>.That.Matches(r =>
r.Message == "message" && r.MessageAttributes["messageType"].StringValue == "Update" &&
r.MessageAttributes["engineNotified"].StringValue == "True"),
A<CancellationToken>._)).MustHaveHappened();
}

[Fact]
public async Task PublishToAssetModifiedTopicBatch_SuccessfullyPublishesSingleBatchWithEngineNotified()
{
// Arrange
var notification = new AssetModifiedNotification("message", GetAttributes(ChangeType.Update, true));
var notification2 = new AssetModifiedNotification("message", GetAttributes(ChangeType.Update, true));

// Act
await sut.PublishToAssetModifiedTopic(new[] { notification, notification2 });

// Assert
A.CallTo(() =>
snsClient.PublishBatchAsync(
A<PublishBatchRequest>.That.Matches(b => b.PublishBatchRequestEntries.All(r =>
r.Message == "message" &&
r.MessageAttributes["messageType"].StringValue ==
"Update"&&
r.MessageAttributes["engineNotified"].StringValue == "True") &&
b.PublishBatchRequestEntries.Count == 2),
A<CancellationToken>._)).MustHaveHappened();
}

private Dictionary<string, string> GetAttributes(ChangeType changeType, bool engineNotified)
{
var attributes = new Dictionary<string, string>()
{
{ "messageType", changeType.ToString() }
};
if (engineNotified)
{
attributes.Add("engineNotified", "True");
}

return attributes;
}
}
4 changes: 2 additions & 2 deletions src/protagonist/DLCS.AWS/SNS/ITopicPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ public interface ITopicPublisher
/// <param name="messages">A collection of notifications to send</param>
/// <param name="cancellationToken">Current cancellation token</param>
/// <returns>Boolean representing the overall success/failure status of all requests</returns>
public Task<bool> PublishToAssetModifiedTopic(IReadOnlyList<AssetModifiedNotification> messages,
public Task<bool> PublishToAssetModifiedTopic(IReadOnlyList<AssetModifiedNotification> messages,
CancellationToken cancellationToken);
}

/// <summary>
/// Represents the contents + type of change for Asset modified notification
/// </summary>
public record AssetModifiedNotification(string MessageContents, ChangeType ChangeType);
public record AssetModifiedNotification(string MessageContents, Dictionary<string, string> Attributes);
Loading

0 comments on commit 96ff88c

Please sign in to comment.