Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .autover/changes/56a61b2c-4302-4077-9dc6-3fc3bc60a998.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"Projects": [
{
"Name": "AWS.Messaging",
"Type": "Minor",
"ChangelogMessages": [
"Update MessageSerializer to store data as actual json. Fixes #168"
]
}
]
}
6 changes: 6 additions & 0 deletions src/AWS.Messaging/MessageEnvelope.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public abstract class MessageEnvelope
[JsonPropertyName("time")]
public DateTimeOffset TimeStamp { get; set; } = DateTimeOffset.MinValue;

/// <summary>
/// The data content type.
/// </summary>
[JsonPropertyName("datacontenttype")]
public string? DataContentType { get; set; }

/// <summary>
/// This stores different metadata that is not modeled as a top-level property in MessageEnvelope class.
/// These entries will also be serialized as top-level properties when sending the message, which
Expand Down
54 changes: 49 additions & 5 deletions src/AWS.Messaging/Serialization/EnvelopeSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;
using Amazon.SQS.Model;
using AWS.Messaging.Configuration;
using AWS.Messaging.Internal;
using AWS.Messaging.Serialization.Helpers;
using AWS.Messaging.Internal;
using AWS.Messaging.Services;
using Microsoft.Extensions.Logging;
using AWS.Messaging.Serialization.Parsers;
Expand Down Expand Up @@ -103,10 +102,23 @@ public async ValueTask<string> SerializeAsync<T>(MessageEnvelope<T> envelope)
["source"] = envelope.Source?.ToString(),
["specversion"] = envelope.Version,
["type"] = envelope.MessageTypeIdentifier,
["time"] = envelope.TimeStamp,
["data"] = _messageSerializer.Serialize(message)
["time"] = envelope.TimeStamp
};

var messageSerializerResults = _messageSerializer.Serialize(message);

blob["datacontenttype"] = messageSerializerResults.ContentType;

if (IsJsonContentType(messageSerializerResults.ContentType))
{
blob["data"] = JsonNode.Parse(messageSerializerResults.Data);
}
else
{
blob["data"] = messageSerializerResults.Data;

}

// Write any Metadata as top-level keys
// This may be useful for any extensions defined in
// https://github.com/cloudevents/spec/tree/main/cloudevents/extensions
Expand Down Expand Up @@ -174,6 +186,34 @@ public async ValueTask<ConvertToEnvelopeResult> ConvertToEnvelopeAsync(Message s
}
}

private bool IsJsonContentType(string? dataContentType)
{
if (string.IsNullOrWhiteSpace(dataContentType))
{
// If dataContentType is not specified, it should be treated as "application/json"
return true;
}

// Remove any parameters from the content type
var mediaType = dataContentType.Split(';')[0].Trim().ToLower();

// Check if the media type is "application/json"
if (mediaType == "application/json")
{
return true;
}

// Check if the media subtype is "json" or ends with "+json"
var parts = mediaType.Split('/');
if (parts.Length == 2)
{
var subtype = parts[1];
return subtype == "json" || subtype.EndsWith("+json");
}

return false;
}

private (MessageEnvelope Envelope, SubscriberMapping Mapping) DeserializeEnvelope(string envelopeString)
{
using var document = JsonDocument.Parse(envelopeString);
Expand Down Expand Up @@ -204,6 +244,7 @@ public async ValueTask<ConvertToEnvelopeResult> ConvertToEnvelopeAsync(Message s
envelope.Version = JsonPropertyHelper.GetRequiredProperty(root, "specversion", element => element.GetString()!);
envelope.MessageTypeIdentifier = JsonPropertyHelper.GetRequiredProperty(root, "type", element => element.GetString()!);
envelope.TimeStamp = JsonPropertyHelper.GetRequiredProperty(root, "time", element => element.GetDateTimeOffset());
envelope.DataContentType = JsonPropertyHelper.GetStringProperty(root, "datacontenttype");

// Handle metadata - copy any properties that aren't standard envelope properties
foreach (var property in root.EnumerateObject())
Expand All @@ -215,7 +256,10 @@ public async ValueTask<ConvertToEnvelopeResult> ConvertToEnvelopeAsync(Message s
}

// Deserialize the message content using the custom serializer
var dataContent = JsonPropertyHelper.GetRequiredProperty(root, "data", element => element.GetString()!);
var dataContent = JsonPropertyHelper.GetRequiredProperty(root, "data", element =>
IsJsonContentType(envelope.DataContentType)
? element.GetRawText()
: element.GetString()!);
var message = _messageSerializer.Deserialize(dataContent, subscriberMapping.MessageType);
envelope.SetMessage(message);

Expand Down
5 changes: 3 additions & 2 deletions src/AWS.Messaging/Serialization/IMessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ namespace AWS.Messaging.Serialization;
public interface IMessageSerializer
{
/// <summary>
/// Serializes the .NET message object into a string.
/// Serializes the .NET message object into a string and specifies the content type of the serialized data.
/// </summary>
/// <param name="message">The .NET object that will be serialized.</param>
string Serialize(object message);
/// <returns>A <see cref="MessageSerializerResults"/> containing the serialized string and its content type.</returns>
MessageSerializerResults Serialize(object message);

/// <summary>
/// Deserializes the raw string message into the .NET type.
Expand Down
4 changes: 2 additions & 2 deletions src/AWS.Messaging/Serialization/MessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public object Deserialize(string message, Type deserializedType)
Justification = "Consumers relying on trimming would have been required to call the AddAWSMessageBus overload that takes in JsonSerializerContext that will be used here to avoid the call that requires unreferenced code.")]
[System.Diagnostics.CodeAnalysis.UnconditionalSuppressMessage("ReflectionAnalysis", "IL3050",
Justification = "Consumers relying on trimming would have been required to call the AddAWSMessageBus overload that takes in JsonSerializerContext that will be used here to avoid the call that requires unreferenced code.")]
public string Serialize(object message)
public MessageSerializerResults Serialize(object message)
{
try
{
Expand All @@ -100,7 +100,7 @@ public string Serialize(object message)
_logger.LogTrace("Serialized the message object to a raw string with a content length of {ContentLength}.", jsonString.Length);
}

return jsonString;
return new MessageSerializerResults(jsonString, "application/json");
}
catch (JsonException) when (!_messageConfiguration.LogMessageContent)
{
Expand Down
34 changes: 34 additions & 0 deletions src/AWS.Messaging/Serialization/MessageSerializerResults.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

namespace AWS.Messaging.Serialization;

/// <summary>
/// Represents the results of a message serialization operation, containing both the serialized data
/// and its corresponding content type.
/// </summary>
public class MessageSerializerResults
{
/// <summary>
/// Initializes a new instance of the MessageSerializerResults class.
/// </summary>
/// <param name="data">The serialized message data as a string.</param>
/// <param name="contentType">The MIME content type of the serialized data.</param>
public MessageSerializerResults(string data, string contentType)
{
Data = data;
ContentType = contentType;
}

/// <summary>
/// Gets or sets the MIME content type of the serialized data.
/// Common values include "application/json" or "application/xml".
/// </summary>
public string ContentType { get; }

/// <summary>
/// Gets or sets the serialized message data as a string.
/// This contains the actual serialized content of the message.
/// </summary>
public string Data { get; }
}
25 changes: 14 additions & 11 deletions test/AWS.Messaging.IntegrationTests/EventBridgePublisherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
using Amazon.SQS.Model;
using Amazon.SecurityToken;
using Amazon.SecurityToken.Model;
using AWS.Messaging.IntegrationTests.Handlers;
using AWS.Messaging.Serialization;

namespace AWS.Messaging.IntegrationTests;

Expand Down Expand Up @@ -105,6 +107,7 @@ await _eventBridgeClient.PutTargetsAsync(new PutTargetsRequest
{
builder.AddEventBridgePublisher<ChatMessage>(_eventBusArn);
builder.AddMessageSource("/aws/messaging");
builder.AddMessageHandler<ChatMessageHandler, ChatMessage>();
});
_serviceProvider = serviceCollection.BuildServiceProvider();
}
Expand All @@ -126,23 +129,23 @@ await publisher.PublishAsync(new ChatMessage
var receiveMessageResponse = await _sqsClient.ReceiveMessageAsync(_sqsQueueUrl);
var message = Assert.Single(receiveMessageResponse.Messages);

// EventBridge adds an external envelope which we need to strip away
var eventBridgeEnvelope = JsonSerializer.Deserialize<EventBridgeEnvelope>(message.Body);
Assert.NotNull(eventBridgeEnvelope);
var envelopeSerializer = _serviceProvider.GetRequiredService<IEnvelopeSerializer>();

Assert.NotNull(eventBridgeEnvelope.Detail);
var envelope = eventBridgeEnvelope.Detail;
// Use the EnvelopeSerializer to convert the message
var result = await envelopeSerializer.ConvertToEnvelopeAsync(message);
var envelope = result.Envelope as MessageEnvelope<ChatMessage>;

Assert.NotNull(envelope);
Assert.False(string.IsNullOrEmpty(envelope.Id));
Assert.Equal("/aws/messaging", envelope.Source.ToString());
Assert.True(envelope.TimeStamp > publishStartTime);
Assert.True(envelope.TimeStamp < publishEndTime);
Assert.Equal(typeof(ChatMessage).ToString(), envelope.MessageTypeIdentifier);

var messageType = Type.GetType(eventBridgeEnvelope.Detail.MessageTypeIdentifier);
Assert.NotNull(messageType);

var chatMessageObject = JsonSerializer.Deserialize(eventBridgeEnvelope.Detail.Message, messageType);
var chatMessage = Assert.IsType<ChatMessage>(chatMessageObject);
Assert.Equal("Test1", chatMessage.MessageDescription);
var chatMessage = envelope.Message;
Assert.NotNull(chatMessage);
Assert.IsType<ChatMessage>(chatMessage);
Assert.Equal("Test1", chatMessage.MessageDescription);;
}

public async Task DisposeAsync()
Expand Down
20 changes: 14 additions & 6 deletions test/AWS.Messaging.IntegrationTests/SNSPublisherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
using System.Text.Json;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using AWS.Messaging.IntegrationTests.Handlers;
using AWS.Messaging.Serialization;

namespace AWS.Messaging.IntegrationTests;

Expand Down Expand Up @@ -45,6 +47,7 @@ public async Task InitializeAsync()
{
builder.AddSNSPublisher<ChatMessage>(_snsTopicArn);
builder.AddMessageSource("/aws/messaging");
builder.AddMessageHandler<ChatMessageHandler, ChatMessage>();
});
_serviceProvider = serviceCollection.BuildServiceProvider();
}
Expand All @@ -70,18 +73,23 @@ await publisher.PublishAsync(new ChatMessage
var snsEnvelope = JsonSerializer.Deserialize<SNSEnvelope>(message.Body);
Assert.NotNull(snsEnvelope);

var envelope = JsonSerializer.Deserialize<MessageEnvelope<string>>(snsEnvelope.Message);
// Get the EnvelopeSerializer from the service provider
var envelopeSerializer = _serviceProvider.GetRequiredService<IEnvelopeSerializer>();

// Use the EnvelopeSerializer to convert the message
var result = await envelopeSerializer.ConvertToEnvelopeAsync(message);
var envelope = result.Envelope as MessageEnvelope<ChatMessage>;

Assert.NotNull(envelope);
Assert.False(string.IsNullOrEmpty(envelope.Id));
Assert.Equal("/aws/messaging", envelope.Source.ToString());
Assert.True(envelope.TimeStamp > publishStartTime);
Assert.True(envelope.TimeStamp < publishEndTime);
Assert.Equal(typeof(ChatMessage).ToString(), envelope.MessageTypeIdentifier);

var messageType = Type.GetType(envelope.MessageTypeIdentifier);
Assert.NotNull(messageType);

var chatMessageObject = JsonSerializer.Deserialize(envelope.Message, messageType);
var chatMessage = Assert.IsType<ChatMessage>(chatMessageObject);
var chatMessage = envelope.Message;
Assert.NotNull(chatMessage);
Assert.IsType<ChatMessage>(chatMessage);
Assert.Equal("Test1", chatMessage.MessageDescription);
}

Expand Down
24 changes: 18 additions & 6 deletions test/AWS.Messaging.IntegrationTests/SQSPublisherTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
using Microsoft.Extensions.DependencyInjection;
using AWS.Messaging.IntegrationTests.Models;
using System.Text.Json;
using AWS.Messaging.IntegrationTests.Handlers;
using AWS.Messaging.Serialization;

namespace AWS.Messaging.IntegrationTests;

Expand Down Expand Up @@ -33,6 +35,8 @@ public async Task InitializeAsync()
{
builder.AddSQSPublisher<ChatMessage>(_sqsQueueUrl);
builder.AddMessageSource("/aws/messaging");
builder.AddMessageHandler<ChatMessageHandler, ChatMessage>();

});
_serviceProvider = serviceCollection.BuildServiceProvider();
}
Expand All @@ -54,19 +58,27 @@ await publisher.PublishAsync(new ChatMessage
var receiveMessageResponse = await _sqsClient.ReceiveMessageAsync(_sqsQueueUrl);
var message = Assert.Single(receiveMessageResponse.Messages);

var envelope = JsonSerializer.Deserialize<MessageEnvelope<string>>(message.Body);
// Get the EnvelopeSerializer from the service provider
var envelopeSerializer = _serviceProvider.GetRequiredService<IEnvelopeSerializer>();

// Use the EnvelopeSerializer to convert the message
var result = await envelopeSerializer.ConvertToEnvelopeAsync(message);
var envelope = result.Envelope as MessageEnvelope<ChatMessage>;

Assert.NotNull(envelope);
Assert.False(string.IsNullOrEmpty(envelope.Id));
Assert.Equal("/aws/messaging", envelope.Source.ToString());
Assert.True(envelope.TimeStamp > publishStartTime);
Assert.True(envelope.TimeStamp > publishStartTime);
Assert.True(envelope.TimeStamp < publishEndTime);
var messageType = Type.GetType(envelope.MessageTypeIdentifier);
Assert.NotNull(messageType);
var chatMessageObject = JsonSerializer.Deserialize(envelope.Message, messageType);
var chatMessage = Assert.IsType<ChatMessage>(chatMessageObject);
Assert.Equal(typeof(ChatMessage).ToString(), envelope.MessageTypeIdentifier);

var chatMessage = envelope.Message;
Assert.NotNull(chatMessage);
Assert.IsType<ChatMessage>(chatMessage);
Assert.Equal("Test1", chatMessage.MessageDescription);
}


public async Task DisposeAsync()
{
try
Expand Down
10 changes: 10 additions & 0 deletions test/AWS.Messaging.UnitTests/MessageHandlers/Handlers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ public Task<MessageProcessStatus> HandleAsync(MessageEnvelope<ChatMessage> messa
}
}

public class PlainTextHandler : IMessageHandler<string>
{
public Task<MessageProcessStatus> HandleAsync(MessageEnvelope<string> messageEnvelope, CancellationToken token = default)
{
// Simple handler implementation for test purposes
return Task.FromResult(MessageProcessStatus.Success());
}
}


/// <summary>
/// Implements handling for mutiple message types
/// </summary>
Expand Down
Loading
Loading