Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shrohilla/kafka header bindings #308

Merged
merged 11 commits into from
Apr 6, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json.Linq;

[assembly: InternalsVisibleTo("Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests")]
namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
internal class KafkaProducerAsyncCollector<T> : IAsyncCollector<T>
Expand Down Expand Up @@ -32,14 +36,60 @@ public Task AddAsync(T item, CancellationToken cancellationToken)

object messageToSend = item;

if (item.GetType() == typeof(string) || item.GetType() == typeof(byte[]))
if (item.GetType() == typeof(string))
{
messageToSend = ConvertToKafkaEventData(item);
}

if (item.GetType() == typeof(byte[]))
{
messageToSend = new KafkaEventData<T>(item);
}

return entity.SendAndCreateEntityIfNotExistsAsync(messageToSend, functionInstanceId, cancellationToken);
}

private object ConvertToKafkaEventData(T item)
{
try
{
return BuildKafkaDataEvent(item);
}
catch (Exception)
{
return new KafkaEventData<T>(item);
}
}

private object BuildKafkaDataEvent(T item)
{
JObject dataObj = JObject.Parse(item.ToString());
shrohilla marked this conversation as resolved.
Show resolved Hide resolved
if (dataObj == null)
{
return new KafkaEventData<T>(item);
}

if (dataObj.ContainsKey("Offset") && dataObj.ContainsKey("Partition") && dataObj.ContainsKey("Topic")
&& dataObj.ContainsKey("Timestamp") && dataObj.ContainsKey("Value") && dataObj.ContainsKey("Headers"))
{
return BuildKafkaEventData(dataObj);
}

return new KafkaEventData<T>(item);
}

private KafkaEventData<string> BuildKafkaEventData(JObject dataObj)
{
KafkaEventData<string> messageToSend = new KafkaEventData<string>((string)dataObj["Value"]);
messageToSend.Timestamp = (DateTime)dataObj["Timestamp"];
messageToSend.Partition = (int)dataObj["Partition"];
JArray headerList = (JArray)dataObj["Headers"];
foreach (JObject header in headerList) {
messageToSend.Headers.Add((string)header["Key"], Encoding.Unicode.GetBytes((string)header["Value"]));
}
return messageToSend;
}

public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{
// Batching not supported.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public async Task Java8_Smoke_Test_For_Output_And_SingleTrigger()

var result = consumer.Consume(10 * 1000);

Assert.Equal(inputMessage, result.Message.Value.ToKafkaEventData().Value);
Assert.Equal(inputMessage, result.Message.Value);
}

[Fact]
Expand All @@ -44,7 +44,7 @@ public async Task Python38_Smoke_Test_For_Output_And_SingleTrigger()
Assert.True(response.IsSuccessStatusCode);

var result = consumer.Consume(10 * 1000);
Assert.Equal(inputMessage, result.Message.Value.ToKafkaEventData().Value);
Assert.Equal(inputMessage, result.Message.Value);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Text;
using Xunit;
using Moq;
using System.Threading.Tasks;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests.output
{
public class KafkaProducerAsyncCollectorTest
{
private readonly string testValue = "test";
private Mock<IKafkaProducerFactory> kafkaProducerFactory = new Mock<IKafkaProducerFactory>();
private KafkaProducerEntity kafkaProducerEntity = new KafkaProducerEntity();
private Mock<IKafkaProducer> kafkaProducer = new Mock<IKafkaProducer>();
private readonly string jsonStringValueHeader = "{\r\n \"Offset\": 0,\r\n \"Partition\": 0,\r\n \"Topic\": \"\",\r\n \"Timestamp\": \"Wed, 30 Mar 2022 05:56:20 GMT\",\r\n \"Value\": \"shiv shambhu\",\r\n \"Headers\": [\r\n {\r\n \"Key\": \"test\",\r\n \"Value\": \"1\"\r\n },\r\n {\r\n \"Key\": \"test1\",\r\n \"Value\": \"2\"\r\n }\r\n ]\r\n}";
private readonly string jsonStringValue = "{\r\n \"Offset\": 0,\r\n \"Partition\": 0,\r\n \"Topic\": \"\",\r\n \"Timestamp\": \"Wed, 30 Mar 2022 05:56:20 GMT\",\r\n \"Value\": \"shiv shambhu\",\r\n \"Headers\": []\r\n}";

[Fact]
public async Task AddAsync_Item_Is_NullAsync()
{
var kafkaProducerEntity = new Mock<KafkaProducerEntity>();
IAsyncCollector<string> asyncCollector = new KafkaProducerAsyncCollector<string>(
kafkaProducerEntity.Object, Guid.NewGuid());
await Assert.ThrowsAsync<InvalidOperationException>(() => asyncCollector.AddAsync(null, default));
}

[Fact]
public void AddAsync_Item_Is_Of_Bytes_Types()
{
BuildMockData();
IAsyncCollector<byte[]> asyncCollector = new KafkaProducerAsyncCollector<byte[]>(
kafkaProducerEntity, Guid.NewGuid());
KafkaEventData<byte[]> kafkaEventData = new KafkaEventData<byte[]>(Encoding.UTF8.GetBytes(testValue));

Task task = asyncCollector.AddAsync(Encoding.UTF8.GetBytes(testValue), default);
Assert.True(task.IsCompleted);
}

private void BuildMockData()
{
kafkaProducerEntity.KafkaProducerFactory = kafkaProducerFactory.Object;
kafkaProducerFactory.Setup(e => e.Create(It.IsAny<KafkaProducerEntity>())).Returns(kafkaProducer.Object);
kafkaProducer.Setup(prod => prod.ProduceAsync(It.IsAny<string>(), It.IsAny<Object>()));
}

[Fact]
public void AddAsync_Item_Is_Of_String_Value_Types()
{
BuildMockData();
IAsyncCollector<string> asyncCollector = new KafkaProducerAsyncCollector<string>(
kafkaProducerEntity, Guid.NewGuid());
KafkaEventData<string> kafkaEventData = new KafkaEventData<string>(testValue);
Task task = asyncCollector.AddAsync(testValue, default);
Assert.True(task.IsCompleted);
}

[Fact]
public void AddAsync_Item_Is_Of_KafkaEventData_Json_String_Types()
{
BuildMockData();

IAsyncCollector<string> asyncCollector = new KafkaProducerAsyncCollector<string>(
kafkaProducerEntity, Guid.NewGuid());
Task task = asyncCollector.AddAsync(jsonStringValue, default);
Assert.True(task.IsCompleted);
}
[Fact]
public void AddAsync_Item_Is_Of_KafkaEventData_Json_String_Header_Types()
{
BuildMockData();

IAsyncCollector<string> asyncCollector = new KafkaProducerAsyncCollector<string>(
kafkaProducerEntity, Guid.NewGuid());
Task task = asyncCollector.AddAsync(jsonStringValueHeader, default);
Assert.True(task.IsCompleted);
}
}
}