Skip to content

Commit

Permalink
Implement underscore prefixes for AMQP
Browse files Browse the repository at this point in the history
The [binding
specification](https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/amqp-protocol-binding.md)
has changed to prefer `cloudEvents_` over `cloudEvents:`. Previously
`cloudEvents_` wouldn't even have been valid.

With this change, users can either:

- Stick with the default prefix, which doesn't change immediately,
  but which will change in the first release on or after March 1st 2023
- Explicitly use one or other prefix using the explicitly-named
  methods

Other options considered:

- Changing the default now: that's too much of a breaking change. (I
  don't want to take a major version bump for this, and with enough
  time for the change, I think that's okay.)
- Adding a char or string parameter: that would invite using non-standard prefixes
- Adding a Boolean parameter: that would become problematic if we
  ever end up with a third prefix. (Let's hope we don't, but still...)
- Adding an enum and then a parameter for it: feels like overkill

Signed-off-by: Jon Skeet <jonskeet@google.com>
  • Loading branch information
ericdotnet committed Sep 7, 2022
1 parent d9c2662 commit 3940efd
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 66 deletions.
60 changes: 48 additions & 12 deletions src/CloudNative.CloudEvents.Amqp/AmqpExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ namespace CloudNative.CloudEvents.Amqp
/// </summary>
public static class AmqpExtensions
{
internal const string AmqpHeaderPrefix = "cloudEvents:";
// This is internal in CloudEventsSpecVersion.
private const string SpecVersionAttributeName = "specversion";

internal const string SpecVersionAmqpHeader = AmqpHeaderPrefix + "specversion";
internal const string AmqpHeaderUnderscorePrefix = "cloudEvents_";
internal const string AmqpHeaderColonPrefix = "cloudEvents:";

internal const string SpecVersionAmqpHeaderWithUnderscore = AmqpHeaderUnderscorePrefix + SpecVersionAttributeName;
internal const string SpecVersionAmqpHeaderWithColon = AmqpHeaderColonPrefix + SpecVersionAttributeName;

/// <summary>
/// Indicates whether this <see cref="Message"/> holds a single CloudEvent.
Expand All @@ -32,7 +37,8 @@ public static class AmqpExtensions
/// <returns>true, if the request is a CloudEvent</returns>
public static bool IsCloudEvent(this Message message) =>
HasCloudEventsContentType(Validation.CheckNotNull(message, nameof(message)), out _) ||
message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeader);
message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeaderWithUnderscore) ||
message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeaderWithColon);

/// <summary>
/// Converts this AMQP message into a CloudEvent object.
Expand Down Expand Up @@ -69,7 +75,8 @@ public static class AmqpExtensions
else
{
var propertyMap = message.ApplicationProperties.Map;
if (!propertyMap.TryGetValue(SpecVersionAmqpHeader, out var versionId))
if (!propertyMap.TryGetValue(SpecVersionAmqpHeaderWithUnderscore, out var versionId) &&
!propertyMap.TryGetValue(SpecVersionAmqpHeaderWithColon, out versionId))
{
throw new ArgumentException("Request is not a CloudEvent");
}
Expand All @@ -84,11 +91,14 @@ public static class AmqpExtensions

foreach (var property in propertyMap)
{
if (!(property.Key is string key && key.StartsWith(AmqpHeaderPrefix)))
if (!(property.Key is string key &&
(key.StartsWith(AmqpHeaderColonPrefix) || key.StartsWith(AmqpHeaderUnderscorePrefix))))
{
continue;
}
string attributeName = key.Substring(AmqpHeaderPrefix.Length).ToLowerInvariant();
// Note: both prefixes have the same length. If we ever need any prefixes with a different length, we'll need to know which
// prefix we're looking at.
string attributeName = key.Substring(AmqpHeaderUnderscorePrefix.Length).ToLowerInvariant();

// We've already dealt with the spec version.
if (attributeName == CloudEventsSpecVersion.SpecVersionAttribute.Name)
Expand Down Expand Up @@ -142,17 +152,43 @@ private static bool HasCloudEventsContentType(Message message, out string? conte
}

/// <summary>
/// Converts a CloudEvent to <see cref="Message"/>.
/// Converts a CloudEvent to <see cref="Message"/> using the default property prefix. Versions released prior to March 2023
/// use a default property prefix of "cloudEvents:". Versions released from March 2023 onwards use a property prefix of "cloudEvents_".
/// Code wishing to express the prefix explicitly should use <see cref="ToAmqpMessageWithColonPrefix(CloudEvent, ContentMode, CloudEventFormatter)"/> or
/// <see cref="ToAmqpMessageWithUnderscorePrefix(CloudEvent, ContentMode, CloudEventFormatter)"/>.
/// </summary>
/// <param name="cloudEvent">The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.</param>
/// <param name="contentMode">Content mode. Structured or binary.</param>
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
public static Message ToAmqpMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter) =>
ToAmqpMessage(cloudEvent, contentMode, formatter, AmqpHeaderColonPrefix);

/// <summary>
/// Converts a CloudEvent to <see cref="Message"/> using a property prefix of "cloudEvents_". This prefix was introduced as the preferred
/// prefix for the AMQP binding in August 2022.
/// </summary>
/// <param name="cloudEvent">The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.</param>
/// <param name="contentMode">Content mode. Structured or binary.</param>
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
public static Message ToAmqpMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter)
public static Message ToAmqpMessageWithUnderscorePrefix(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter) =>
ToAmqpMessage(cloudEvent, contentMode, formatter, AmqpHeaderUnderscorePrefix);

/// <summary>
/// Converts a CloudEvent to <see cref="Message"/> using a property prefix of "cloudEvents:". This prefix
/// is a legacy retained only for compatibility purposes; it can't be used by JMS due to constraints in JMS property names.
/// </summary>
/// <param name="cloudEvent">The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.</param>
/// <param name="contentMode">Content mode. Structured or binary.</param>
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
public static Message ToAmqpMessageWithColonPrefix(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter) =>
ToAmqpMessage(cloudEvent, contentMode, formatter, AmqpHeaderColonPrefix);

private static Message ToAmqpMessage(CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter, string prefix)
{
Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent));
Validation.CheckNotNull(formatter, nameof(formatter));

var applicationProperties = MapHeaders(cloudEvent);
var applicationProperties = MapHeaders(cloudEvent, prefix);
RestrictedDescribed bodySection;
Properties properties;

Expand Down Expand Up @@ -181,11 +217,11 @@ public static Message ToAmqpMessage(this CloudEvent cloudEvent, ContentMode cont
};
}

private static ApplicationProperties MapHeaders(CloudEvent cloudEvent)
private static ApplicationProperties MapHeaders(CloudEvent cloudEvent, string prefix)
{
var applicationProperties = new ApplicationProperties();
var properties = applicationProperties.Map;
properties.Add(SpecVersionAmqpHeader, cloudEvent.SpecVersion.VersionId);
properties.Add(prefix + SpecVersionAttributeName, cloudEvent.SpecVersion.VersionId);

foreach (var pair in cloudEvent.GetPopulatedAttributes())
{
Expand All @@ -197,7 +233,7 @@ private static ApplicationProperties MapHeaders(CloudEvent cloudEvent)
continue;
}

string propKey = AmqpHeaderPrefix + attribute.Name;
string propKey = prefix + attribute.Name;

// TODO: Check that AMQP can handle byte[], bool and int values
object propValue = pair.Value switch
Expand Down
112 changes: 58 additions & 54 deletions test/CloudNative.CloudEvents.UnitTests/Amqp/AmqpTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using Amqp;
using Amqp.Framing;
using CloudNative.CloudEvents.NewtonsoftJson;
using Newtonsoft.Json.Linq;
using System;
using System.Net.Mime;
using System.Text;
Expand All @@ -20,71 +19,26 @@ public class AmqpTest
public void AmqpStructuredMessageTest()
{
// The AMQPNetLite library is factored such that we don't need to do a wire test here.
var cloudEvent = new CloudEvent
{
Type = "com.github.pull.create",
Source = new Uri("https://github.com/cloudevents/spec/pull"),
Subject = "123",
Id = "A234-1234-1234",
Time = new DateTimeOffset(2018, 4, 5, 17, 31, 0, TimeSpan.Zero),
DataContentType = MediaTypeNames.Text.Xml,
Data = "<much wow=\"xml\"/>",
["comexampleextension1"] = "value"
};

var cloudEvent = CreateSampleCloudEvent();
var message = cloudEvent.ToAmqpMessage(ContentMode.Structured, new JsonEventFormatter());
Assert.True(message.IsCloudEvent());
var encodedAmqpMessage = message.Encode();

var message1 = Message.Decode(encodedAmqpMessage);
Assert.True(message1.IsCloudEvent());
var receivedCloudEvent = message1.ToCloudEvent(new JsonEventFormatter());

Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull"), receivedCloudEvent.Source);
Assert.Equal("123", receivedCloudEvent.Subject);
Assert.Equal("A234-1234-1234", receivedCloudEvent.Id);
AssertTimestampsEqual("2018-04-05T17:31:00Z", receivedCloudEvent.Time!.Value);
Assert.Equal(MediaTypeNames.Text.Xml, receivedCloudEvent.DataContentType);
Assert.Equal("<much wow=\"xml\"/>", receivedCloudEvent.Data);

Assert.Equal("value", (string?)receivedCloudEvent["comexampleextension1"]);
AssertDecodeThenEqual(cloudEvent, message);
}

[Fact]
public void AmqpBinaryMessageTest()
{
// The AMQPNetLite library is factored such that we don't need to do a wire test here.
var cloudEvent = new CloudEvent
{
Type = "com.github.pull.create",
Source = new Uri("https://github.com/cloudevents/spec/pull/123"),
Subject = "123",
Id = "A234-1234-1234",
Time = new DateTimeOffset(2018, 4, 5, 17, 31, 0, TimeSpan.Zero),
DataContentType = MediaTypeNames.Text.Xml,
Data = "<much wow=\"xml\"/>",
["comexampleextension1"] = "value"
};

var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, new JsonEventFormatter());
var cloudEvent = CreateSampleCloudEvent();
var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, new JsonEventFormatter());
Assert.True(message.IsCloudEvent());
var encodedAmqpMessage = message.Encode();

var message1 = Message.Decode(encodedAmqpMessage);
Assert.True(message1.IsCloudEvent());
var receivedCloudEvent = message1.ToCloudEvent(new JsonEventFormatter());

Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), receivedCloudEvent.Source);
Assert.Equal("A234-1234-1234", receivedCloudEvent.Id);
AssertTimestampsEqual("2018-04-05T17:31:00Z", receivedCloudEvent.Time!.Value);
Assert.Equal(MediaTypeNames.Text.Xml, receivedCloudEvent.DataContentType);
Assert.Equal("<much wow=\"xml\"/>", receivedCloudEvent.Data);

Assert.Equal("value", (string?)receivedCloudEvent["comexampleextension1"]);
AssertCloudEventsEqual(cloudEvent, receivedCloudEvent);
}

[Fact]
Expand All @@ -108,9 +62,7 @@ public void AmqpNormalizesTimestampsToUtc()
Source = new Uri("https://github.com/cloudevents/spec/pull/123"),
Id = "A234-1234-1234",
// 2018-04-05T18:31:00+01:00 => 2018-04-05T17:31:00Z
Time = new DateTimeOffset(2018, 4, 5, 18, 31, 0, TimeSpan.FromHours(1)),
DataContentType = MediaTypeNames.Text.Xml,
Data = "<much wow=\"xml\"/>"
Time = new DateTimeOffset(2018, 4, 5, 18, 31, 0, TimeSpan.FromHours(1))
};

var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, new JsonEventFormatter());
Expand All @@ -134,5 +86,57 @@ public void EncodeTextDataInBinaryMode_PopulatesDataProperty()
var text = Encoding.UTF8.GetString(body.Binary);
Assert.Equal("some text", text);
}

[Fact]
public void DefaultPrefix()
{
var cloudEvent = CreateSampleCloudEvent();

var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, new JsonEventFormatter());
Assert.Equal(cloudEvent.Id, message.ApplicationProperties["cloudEvents:id"]);
AssertDecodeThenEqual(cloudEvent, message);
}

[Fact]
public void UnderscorePrefix()
{
var cloudEvent = CreateSampleCloudEvent();
var message = cloudEvent.ToAmqpMessageWithUnderscorePrefix(ContentMode.Binary, new JsonEventFormatter());
Assert.Equal(cloudEvent.Id, message.ApplicationProperties["cloudEvents_id"]);
AssertDecodeThenEqual(cloudEvent, message);
}

[Fact]
public void ColonPrefix()
{
var cloudEvent = CreateSampleCloudEvent();
var message = cloudEvent.ToAmqpMessageWithColonPrefix(ContentMode.Binary, new JsonEventFormatter());
Assert.Equal(cloudEvent.Id, message.ApplicationProperties["cloudEvents:id"]);
AssertDecodeThenEqual(cloudEvent, message);
}

private void AssertDecodeThenEqual(CloudEvent cloudEvent, Message message)
{
var encodedAmqpMessage = message.Encode();

var message1 = Message.Decode(encodedAmqpMessage);
var receivedCloudEvent = message1.ToCloudEvent(new JsonEventFormatter());
AssertCloudEventsEqual(cloudEvent, receivedCloudEvent);
}

/// <summary>
/// Returns a CloudEvent with XML data and an extension.
/// </summary>
private static CloudEvent CreateSampleCloudEvent() => new CloudEvent
{
Type = "com.github.pull.create",
Source = new Uri("https://github.com/cloudevents/spec/pull"),
Subject = "123",
Id = "A234-1234-1234",
Time = new DateTimeOffset(2018, 4, 5, 17, 31, 0, TimeSpan.Zero),
DataContentType = MediaTypeNames.Text.Xml,
Data = "<much wow=\"xml\"/>",
["comexampleextension1"] = "value"
};
}
}

0 comments on commit 3940efd

Please sign in to comment.