Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shrohilla/kafka header bindings #308

Merged
merged 11 commits into from
Apr 6, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
Expand Down Expand Up @@ -32,14 +34,60 @@ public Task AddAsync(T item, CancellationToken cancellationToken)

object messageToSend = item;

if (item.GetType() == typeof(string) || item.GetType() == typeof(byte[]))
if (item.GetType() == typeof(string))
{
messageToSend = convertToKafkaEventData(item);
}

if (item.GetType() == typeof(byte[]))
{
messageToSend = new KafkaEventData<T>(item);
}

return entity.SendAndCreateEntityIfNotExistsAsync(messageToSend, functionInstanceId, cancellationToken);
}

private object convertToKafkaEventData(T item)
shrohilla marked this conversation as resolved.
Show resolved Hide resolved
{
try
{
return buildKafkaDataEvent(item);
}
catch (Exception)
{
return new KafkaEventData<T>(item);
}
}

private object buildKafkaDataEvent(T item)
{
JObject dataObj = JObject.Parse(item.ToString());
shrohilla marked this conversation as resolved.
Show resolved Hide resolved
if (dataObj == null)
{
return new KafkaEventData<T>(item);
}

if (dataObj.ContainsKey("Offset") && dataObj.ContainsKey("Partition") && dataObj.ContainsKey("Topic")
&& dataObj.ContainsKey("Timestamp") && dataObj.ContainsKey("Value") && dataObj.ContainsKey("Headers"))
{
return buildKafkaEventData(dataObj);
}

return new KafkaEventData<T>(item);
}

private KafkaEventData<string> buildKafkaEventData(JObject dataObj)
{
KafkaEventData<string> messageToSend = new KafkaEventData<string>((string)dataObj["Value"]);
messageToSend.Timestamp = (DateTime)dataObj["Timestamp"];
messageToSend.Partition = (int)dataObj["Partition"];
JArray headerList = (JArray)dataObj["Headers"];
foreach ( JObject header in headerList) {
shrohilla marked this conversation as resolved.
Show resolved Hide resolved
messageToSend.Headers.Add((string)header["Key"], Encoding.Unicode.GetBytes((string)header["Value"]));
}
return messageToSend;
}

public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{
// Batching not supported.
Expand Down