Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Api changes to support asset modified cleanup #841

Merged
merged 2 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,22 @@ public void Create_SetsCorrectFields()
notification.Before.Should().BeNull();
}

[Fact]
public void Update_SetsCorrectFields()
[Theory]
[InlineData(true)]
[InlineData(false)]
public void Update_SetsCorrectFields(bool engineNotified)
{
// Arrange
var before = new Asset { Id = new AssetId(1, 2, "foo") };
var after = new Asset { Id = new AssetId(1, 2, "foo"), MaxUnauthorised = 10 };

// Act
var notification = AssetModificationRecord.Update(before, after);
var notification = AssetModificationRecord.Update(before, after, engineNotified);

// Assert
notification.ChangeType.Should().Be(ChangeType.Update);
notification.Before.Should().Be(before);
notification.After.Should().Be(after);
notification.EngineNotified.Should().Be(engineNotified);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using API.Infrastructure.Messaging;
using DLCS.AWS.SNS;
using DLCS.Core.Types;
Expand Down Expand Up @@ -33,7 +32,7 @@ public async Task SendAssetModifiedMessage_Single_SendsNotification_IfUpdate()
{
// Arrange
var assetModifiedRecord =
AssetModificationRecord.Update(new Asset(new AssetId(1, 2, "foo")), new Asset(new AssetId(1, 2, "bar")));
AssetModificationRecord.Update(new Asset(new AssetId(1, 2, "foo")), new Asset(new AssetId(1, 2, "bar")), true);

// Act
await sut.SendAssetModifiedMessage(assetModifiedRecord, CancellationToken.None);
Expand Down Expand Up @@ -76,7 +75,7 @@ public async Task SendAssetModifiedMessage_Single_SendsNotification_IfDelete()
A.CallTo(() =>
topicPublisher.PublishToAssetModifiedTopic(
A<IReadOnlyList<AssetModifiedNotification>>.That.Matches(n =>
n.Single().ChangeType == ChangeType.Delete && n.Single().MessageContents.Contains(customerName)),
n.Single().Attributes.Values.Contains(ChangeType.Delete.ToString()) && n.Single().MessageContents.Contains(customerName)),
A<CancellationToken>._)).MustHaveHappened();
}

Expand All @@ -102,7 +101,7 @@ public async Task SendAssetModifiedMessage_Multiple_SendsNotification_IfDelete()
topicPublisher.PublishToAssetModifiedTopic(
A<IReadOnlyList<AssetModifiedNotification>>.That.Matches(n =>
n.Count == 2 && n.All(m =>
m.ChangeType == ChangeType.Delete && m.MessageContents.Contains(customerName))),
n.First().Attributes.Values.Contains(ChangeType.Delete.ToString()) && m.MessageContents.Contains(customerName))),
A<CancellationToken>._)).MustHaveHappened();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public async Task<ModifyEntityResult<Asset>> Handle(CreateOrUpdateImage request,

var assetModificationRecord = existingAsset == null
? AssetModificationRecord.Create(assetAfterSave)
: AssetModificationRecord.Update(existingAsset, assetAfterSave);
: AssetModificationRecord.Update(existingAsset, assetAfterSave, processAssetResult.RequiresEngineNotification);

await assetNotificationSender.SendAssetModifiedMessage(assetModificationRecord, cancellationToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public async Task<ModifyEntityResult<Asset>> Handle(ReingestAsset request, Cance

var asset = await MarkAssetAsIngesting(cancellationToken, existingAsset!);

await assetNotificationSender.SendAssetModifiedMessage(AssetModificationRecord.Update(existingAsset!, asset),
await assetNotificationSender.SendAssetModifiedMessage(AssetModificationRecord.Update(existingAsset!, asset, true),
cancellationToken);
var statusCode = await ingestNotificationSender.SendImmediateIngestAssetRequest(asset, cancellationToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Data;
using API.Features.Image;
using API.Features.Image.Ingest;
using API.Infrastructure.Messaging;
using API.Infrastructure.Requests;
using DLCS.Core;
using DLCS.Model.Assets;
Expand Down Expand Up @@ -37,19 +38,22 @@ public class CreateBatchOfImagesHandler : IRequestHandler<CreateBatchOfImages, M
private readonly IBatchRepository batchRepository;
private readonly AssetProcessor assetProcessor;
private readonly IIngestNotificationSender ingestNotificationSender;
private readonly IAssetNotificationSender assetNotificationSender;
private readonly ILogger<CreateBatchOfImagesHandler> logger;

public CreateBatchOfImagesHandler(
DlcsContext dlcsContext,
IBatchRepository batchRepository,
AssetProcessor assetProcessor,
IIngestNotificationSender ingestNotificationSender,
IAssetNotificationSender assetNotificationSender,
ILogger<CreateBatchOfImagesHandler> logger)
{
this.dlcsContext = dlcsContext;
this.batchRepository = batchRepository;
this.assetProcessor = assetProcessor;
this.ingestNotificationSender = ingestNotificationSender;
this.assetNotificationSender = assetNotificationSender;
this.logger = logger;
}

Expand Down Expand Up @@ -84,7 +88,9 @@ public async Task<ModifyEntityResult<Batch>> Handle(CreateBatchOfImages request,

var batch = await batchRepository.CreateBatch(request.CustomerId, request.AssetsBeforeProcessing.Select(a => a.Asset).ToList(), cancellationToken);

var assetNotificationList = new List<Asset>(request.AssetsBeforeProcessing.Count);
var engineNotificationList = new List<Asset>(request.AssetsBeforeProcessing.Count);
var assetModifiedNotificationList = new List<AssetModificationRecord>();

try
{
using var logScope = logger.BeginScope("Processing batch {BatchId}", batch.Id);
Expand All @@ -106,9 +112,15 @@ await assetProcessor.Process(assetBeforeProcessing, false, true, true,

var savedAsset = processAssetResult.Result.Entity!;

var existingAsset = processAssetResult.ExistingAsset;
var assetModificationRecord = existingAsset == null
? AssetModificationRecord.Create(savedAsset)
: AssetModificationRecord.Update(existingAsset, savedAsset, processAssetResult.RequiresEngineNotification);
assetModifiedNotificationList.Add(assetModificationRecord);

if (processAssetResult.RequiresEngineNotification)
{
assetNotificationList.Add(savedAsset);
engineNotificationList.Add(savedAsset);
}
else
{
Expand All @@ -119,6 +131,8 @@ await assetProcessor.Process(assetBeforeProcessing, false, true, true,
}
}

await assetNotificationSender.SendAssetModifiedMessage(assetModifiedNotificationList, cancellationToken);

if (batch.Completed > 0)
{
await dlcsContext.SaveChangesAsync(cancellationToken);
Expand Down Expand Up @@ -151,7 +165,7 @@ await assetProcessor.Process(assetBeforeProcessing, false, true, true,
{
// Raise notifications
logger.LogDebug("Batch {BatchId} created - sending engine notifications", batch.Id);
await ingestNotificationSender.SendIngestAssetsRequest(assetNotificationList, request.IsPriority,
await ingestNotificationSender.SendIngestAssetsRequest(engineNotificationList, request.IsPriority,
cancellationToken);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,25 @@ public class AssetModificationRecord
public Asset? Before { get; }
public Asset? After { get; }

public bool EngineNotified { get; }

public ImageCacheType? DeleteFrom { get; }

private AssetModificationRecord(ChangeType changeType, Asset? before, Asset? after, ImageCacheType? deleteFrom)
private AssetModificationRecord(ChangeType changeType, Asset? before, Asset? after, ImageCacheType? deleteFrom, bool assetModifiedEngineNotified)
{
ChangeType = changeType;
Before = before;
After = after;
DeleteFrom = deleteFrom;
EngineNotified = assetModifiedEngineNotified;
}

public static AssetModificationRecord Delete(Asset before, ImageCacheType deleteFrom)
=> new(ChangeType.Delete, before.ThrowIfNull(nameof(before)), null, deleteFrom.ThrowIfNull(nameof(deleteFrom)));
=> new(ChangeType.Delete, before.ThrowIfNull(nameof(before)), null, deleteFrom.ThrowIfNull(nameof(deleteFrom)), false);

public static AssetModificationRecord Update(Asset before, Asset after)
=> new(ChangeType.Update, before.ThrowIfNull(nameof(before)), after.ThrowIfNull(nameof(after)), null);
public static AssetModificationRecord Update(Asset before, Asset after, bool assetModifiedEngineNotified)
=> new(ChangeType.Update, before.ThrowIfNull(nameof(before)), after.ThrowIfNull(nameof(after)), null, assetModifiedEngineNotified);

public static AssetModificationRecord Create(Asset after)
=> new(ChangeType.Create, null, after.ThrowIfNull(nameof(after)), null);
=> new(ChangeType.Create, null, after.ThrowIfNull(nameof(after)), null, false);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Collections.Generic;
using System.Text.Json;
using System.Text.Json.Serialization;
using Amazon.Runtime.Internal.Transform;
using DLCS.AWS.SNS;
using DLCS.Core.Collections;
using DLCS.Core.Strings;
Expand Down Expand Up @@ -40,28 +41,31 @@ public Task SendAssetModifiedMessage(AssetModificationRecord notification,
CancellationToken cancellationToken = default)
=> SendAssetModifiedMessage(notification.AsList(), cancellationToken);

public async Task SendAssetModifiedMessage(IReadOnlyCollection<AssetModificationRecord> notifications,
public async Task SendAssetModifiedMessage(IReadOnlyCollection<AssetModificationRecord> notifications,
CancellationToken cancellationToken = default)
{
// Iterate through AssetModifiedMessage objects and build list(s) of changes
var changes = new Dictionary<ChangeType, List<string>>()
{
[ChangeType.Create] = new(),
[ChangeType.Update] = new(),
[ChangeType.Delete] = new(),
};

var changes = new List<AssetModifiedNotification>();

foreach (var notification in notifications)
{
var serialisedNotification = await GetSerialisedNotification(notification);
if (serialisedNotification.HasText())
{
changes[notification.ChangeType].Add(serialisedNotification);
var attributes = new Dictionary<string, string>()
{
{ "messageType", notification.ChangeType.ToString() }
};
if (notification.EngineNotified)
{
attributes.Add("engineNotified", "True");
}

changes.Add(new AssetModifiedNotification(serialisedNotification!, attributes));
}
}

// Send notifications generated in above method
await SendAssetModifiedRequest(changes, cancellationToken);
await topicPublisher.PublishToAssetModifiedTopic(changes, cancellationToken);
}

private async Task<string?> GetSerialisedNotification(AssetModificationRecord notification)
Expand Down Expand Up @@ -131,16 +135,4 @@ private async Task<CustomerPathElement> GetCustomerPathElement(int customer)
customerPathElements[customer] = customerPathElement;
return customerPathElement;
}

private async Task<bool> SendAssetModifiedRequest(Dictionary<ChangeType, List<string>> change, CancellationToken cancellationToken)
{
if (change.IsNullOrEmpty()) return true;

var toSend = change
.SelectMany(kvp => kvp.Value
.Select(v => new AssetModifiedNotification(v, kvp.Key)))
.ToList();

return await topicPublisher.PublishToAssetModifiedTopic(toSend, cancellationToken);
}
}
68 changes: 61 additions & 7 deletions src/protagonist/DLCS.AWS.Tests/SNS/TopicPublisherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public TopicPublisherTests()
public async Task PublishToAssetModifiedTopicBatch_SuccessfullyPublishesSingleMessage_IfSingleItemInBatch()
{
// Arrange
var notification = new AssetModifiedNotification("message", ChangeType.Delete);
var notification = new AssetModifiedNotification("message", GetAttributes(ChangeType.Delete, false));

// Act
await sut.PublishToAssetModifiedTopic(new[] { notification });
Expand All @@ -52,7 +52,7 @@ public async Task PublishToAssetModifiedTopicBatch_SuccessfullyPublishesSingleMe
public async Task PublishToAssetModifiedTopicBatch_SingleItemInBatch_ReturnsSuccessDependentOnStatusCode(HttpStatusCode statusCode, bool expected)
{
// Arrange
var notification = new AssetModifiedNotification("message", ChangeType.Delete);
var notification = new AssetModifiedNotification("message", GetAttributes(ChangeType.Delete, false));
A.CallTo(() => snsClient.PublishAsync(A<PublishRequest>._, A<CancellationToken>._))
.Returns(new PublishResponse { HttpStatusCode = statusCode });

Expand All @@ -67,8 +67,8 @@ public async Task PublishToAssetModifiedTopicBatch_SingleItemInBatch_ReturnsSucc
public async Task PublishToAssetModifiedTopicBatch_SuccessfullyPublishesSingleBatch()
{
// Arrange
var notification = new AssetModifiedNotification("message", ChangeType.Delete);
var notification2 = new AssetModifiedNotification("message", ChangeType.Delete);
var notification = new AssetModifiedNotification("message", GetAttributes(ChangeType.Delete, false));
var notification2 = new AssetModifiedNotification("message", GetAttributes(ChangeType.Delete, false));

// Act
await sut.PublishToAssetModifiedTopic(new[] { notification, notification2 });
Expand All @@ -91,7 +91,7 @@ public async Task PublishToAssetModifiedTopicBatch_SuccessfullyPublishesMultiple
var notifications = new List<AssetModifiedNotification>(15);
for (int x = 0; x < 15; x++)
{
notifications.Add(new AssetModifiedNotification(x < 10 ? "message" : "next", ChangeType.Delete));
notifications.Add(new AssetModifiedNotification(x < 10 ? "message" : "next", GetAttributes(ChangeType.Delete, false)));
}

// Act
Expand Down Expand Up @@ -123,7 +123,7 @@ public async Task PublishToAssetModifiedTopicBatch_ReturnsTrue_IfAllBatchesSucce
var notifications = new List<AssetModifiedNotification>(15);
for (int x = 0; x < 15; x++)
{
notifications.Add(new AssetModifiedNotification("message", ChangeType.Delete));
notifications.Add(new AssetModifiedNotification("message", GetAttributes(ChangeType.Delete, false)));
}

A.CallTo(() => snsClient.PublishBatchAsync(A<PublishBatchRequest>._, A<CancellationToken>._))
Expand All @@ -143,7 +143,7 @@ public async Task PublishToAssetModifiedTopicBatch_ReturnsFalse_IfAnyBatchFails(
var notifications = new List<AssetModifiedNotification>(15);
for (int x = 0; x < 15; x++)
{
notifications.Add(new AssetModifiedNotification("message", ChangeType.Delete));
notifications.Add(new AssetModifiedNotification("message", GetAttributes(ChangeType.Delete, false)));
}

A.CallTo(() => snsClient.PublishBatchAsync(A<PublishBatchRequest>._, A<CancellationToken>._))
Expand All @@ -157,4 +157,58 @@ public async Task PublishToAssetModifiedTopicBatch_ReturnsFalse_IfAnyBatchFails(
// Assert
response.Should().BeFalse();
}

[Fact]
public async Task PublishToAssetModifiedTopicBatch_SuccessfullyPublishesSingleMessageWithEngineNotified_IfEngineNotifiedTrue()
{
// Arrange
var notification = new AssetModifiedNotification("message", GetAttributes(ChangeType.Update, true));

// Act
await sut.PublishToAssetModifiedTopic(new[] { notification });

// Assert
A.CallTo(() =>
snsClient.PublishAsync(
A<PublishRequest>.That.Matches(r =>
r.Message == "message" && r.MessageAttributes["messageType"].StringValue == "Update" &&
r.MessageAttributes["engineNotified"].StringValue == "True"),
A<CancellationToken>._)).MustHaveHappened();
}

[Fact]
public async Task PublishToAssetModifiedTopicBatch_SuccessfullyPublishesSingleBatchWithEngineNotified()
{
// Arrange
var notification = new AssetModifiedNotification("message", GetAttributes(ChangeType.Update, true));
var notification2 = new AssetModifiedNotification("message", GetAttributes(ChangeType.Update, true));

// Act
await sut.PublishToAssetModifiedTopic(new[] { notification, notification2 });

// Assert
A.CallTo(() =>
snsClient.PublishBatchAsync(
A<PublishBatchRequest>.That.Matches(b => b.PublishBatchRequestEntries.All(r =>
r.Message == "message" &&
r.MessageAttributes["messageType"].StringValue ==
"Update"&&
r.MessageAttributes["engineNotified"].StringValue == "True") &&
b.PublishBatchRequestEntries.Count == 2),
A<CancellationToken>._)).MustHaveHappened();
}

private Dictionary<string, string> GetAttributes(ChangeType changeType, bool engineNotified)
{
var attributes = new Dictionary<string, string>()
{
{ "messageType", changeType.ToString() }
};
if (engineNotified)
{
attributes.Add("engineNotified", "True");
}

return attributes;
}
}
4 changes: 2 additions & 2 deletions src/protagonist/DLCS.AWS/SNS/ITopicPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ public interface ITopicPublisher
/// <param name="messages">A collection of notifications to send</param>
/// <param name="cancellationToken">Current cancellation token</param>
/// <returns>Boolean representing the overall success/failure status of all requests</returns>
public Task<bool> PublishToAssetModifiedTopic(IReadOnlyList<AssetModifiedNotification> messages,
public Task<bool> PublishToAssetModifiedTopic(IReadOnlyList<AssetModifiedNotification> messages,
CancellationToken cancellationToken);
}

/// <summary>
/// Represents the contents + type of change for Asset modified notification
/// </summary>
public record AssetModifiedNotification(string MessageContents, ChangeType ChangeType);
public record AssetModifiedNotification(string MessageContents, Dictionary<string, string> Attributes);
Loading
Loading