Skip to content

Commit

Permalink
Make consumer reading message header case insensitive (#2537)
Browse files Browse the repository at this point in the history
  • Loading branch information
honkuan86 committed Feb 28, 2023
1 parent 1e70c06 commit c9dc28e
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 30 deletions.
25 changes: 25 additions & 0 deletions src/Paramore.Brighter.MessagingGateway.Kafka/KafkaHeadersTools.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Confluent.Kafka;

namespace Paramore.Brighter.MessagingGateway.Kafka
{
internal static class KafkaHeadersTools
{
public static bool TryGetLastBytesIgnoreCase(this Headers headers, string key, out byte[] lastHeader)
{
var header = headers.FirstOrDefault(h => h.Key.Equals(key, StringComparison.OrdinalIgnoreCase));

if (header == null)
{
lastHeader = null;
return false;
}

lastHeader = header.GetValueBytes();
return true;
}
}
}
71 changes: 44 additions & 27 deletions src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,39 +93,58 @@ private HeaderResult<string> ReadContentType(Headers headers)

private HeaderResult<Guid> ReadCorrelationId(Headers headers)
{
if (headers.TryGetLastBytes(HeaderNames.CORRELATION_ID, out byte[] lastHeader))
{
var correlationValue = Encoding.UTF8.GetString(lastHeader);
if (Guid.TryParse(correlationValue, out Guid correlationId))
{
return new HeaderResult<Guid>(correlationId, true);
}
else
return ReadHeader(headers, HeaderNames.CORRELATION_ID)
.Map(s =>
{
s_logger.LogDebug("Could not parse message correlation id: {CorrelationValue}", correlationValue);
return new HeaderResult<Guid>(Guid.Empty, false);
}
}
if (string.IsNullOrEmpty(s))
{
s_logger.LogDebug("No correlation id found in message");
return new HeaderResult<Guid>(Guid.Empty, true);
}
return new HeaderResult<Guid>(Guid.Empty, false);
if (Guid.TryParse(s, out Guid correlationId))
{
return new HeaderResult<Guid>(correlationId, true);
}
s_logger.LogDebug("Could not parse message correlation id: {CorrelationValue}", s);
return new HeaderResult<Guid>(Guid.Empty, false);
});
}

private HeaderResult<string> ReadReplyTo(Headers headers)
{
if (headers.TryGetLastBytes(HeaderNames.REPLY_TO, out byte[] lastHeader))
{
var replyToValue = Encoding.UTF8.GetString(lastHeader);
return new HeaderResult<string>(replyToValue, true);
}

return new HeaderResult<string>(string.Empty, false);
return ReadHeader(headers, HeaderNames.REPLY_TO)
.Map(s =>
{
if (string.IsNullOrEmpty(s))
{
s_logger.LogDebug("No reply to found in message");
return new HeaderResult<string>(string.Empty, true);
}
return new HeaderResult<string>(s, true);
});
}

private HeaderResult<DateTime> ReadTimeStamp(Headers headers)
{
if (headers.TryGetLastBytes(HeaderNames.TIMESTAMP, out byte[] lastHeader))
if (headers.TryGetLastBytesIgnoreCase(HeaderNames.TIMESTAMP, out byte[] lastHeader))
{
return new HeaderResult<DateTime>(UnixTimestamp.DateTimeFromUnixTimestampSeconds(BitConverter.ToInt64(lastHeader, 0)), true);
//Additional testing for a non unixtimestamp string
if (DateTime.TryParse(lastHeader.FromByteArray(), out DateTime timestamp))
{
return new HeaderResult<DateTime>(timestamp, true);
}

try
{
return new HeaderResult<DateTime>(UnixTimestamp.DateTimeFromUnixTimestampSeconds(BitConverter.ToInt64(lastHeader, 0)), true);
}
catch (Exception)
{
return new HeaderResult<DateTime>(DateTime.UtcNow, true);
}
}

return new HeaderResult<DateTime>(DateTime.UtcNow, true);
Expand Down Expand Up @@ -197,7 +216,7 @@ private static object ParseHeaderValue(object value)

private HeaderResult<string> ReadHeader(Headers headers, string key, bool dieOnMissing = false)
{
if (headers.TryGetLastBytes(key, out byte[] lastHeader))
if (headers.TryGetLastBytesIgnoreCase(key, out byte[] lastHeader))
{
try
{
Expand All @@ -211,10 +230,8 @@ private HeaderResult<string> ReadHeader(Headers headers, string key, bool dieOnM
return new HeaderResult<string>(null, false);
}
}
else
{
return new HeaderResult<string>(string.Empty, !dieOnMissing);
}

return new HeaderResult<string>(string.Empty, !dieOnMissing);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ private void PublishResults(PersistenceStatus status, Headers headers)
{
if (status == PersistenceStatus.Persisted)
{
if (headers.TryGetLastBytes(HeaderNames.MESSAGE_ID, out byte[] messageIdBytes))
if (headers.TryGetLastBytesIgnoreCase(HeaderNames.MESSAGE_ID, out byte[] messageIdBytes))
{
var val = messageIdBytes.FromByteArray();
if (!string.IsNullOrEmpty(val) && (Guid.TryParse(val, out Guid messageId)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Paramore.Brighter.MessagingGateway.Kafka
{
static class StringTools
public static class StringTools
{
public static byte[] ToByteArray(this string str)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Paramore.Brighter/MessageHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class MessageHeader : IEquatable<MessageHeader>
/// name from UpperCase to camelCase
/// </summary>
/// <value>The bag.</value>
public Dictionary<string, object> Bag { get; set; } = new Dictionary<string, object>();
public Dictionary<string, object> Bag { get; set; } = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);

/// <summary>
/// Gets the number of times this message has been seen
Expand Down

0 comments on commit c9dc28e

Please sign in to comment.