Skip to content

Commit

Permalink
Adding sample cases for byte[] and byte[][] triggers (#52)
Browse files Browse the repository at this point in the history
* Remove KafkaEventData(byte[]) ctor
Add to sample function example of custom deserialisation
Add to sample function binding to strings

* adding logging to configProvider during converter operations

* task 3 in #44 doesn't require any additional work; added sample handlers to demonstrate

* adding logging to configProvider during converter operations

* Adding tests to prove pt3 of #44
  • Loading branch information
brandonh-msft committed Apr 4, 2019
1 parent 7bcbdf7 commit b0ec76a
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 47 deletions.
33 changes: 21 additions & 12 deletions samples/dotnet/KafkaFunctionSample/AvroSpecificTriggers.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Confluent.SchemaRegistry.Serdes;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Confluent.SchemaRegistry.Serdes;

namespace KafkaFunctionSample
{
Expand Down Expand Up @@ -41,7 +36,7 @@ public static class AvroSpecificTriggers
}
}

static AvroDeserializer<UserRecord> myCustomDeserialiser = new AvroDeserializer<UserRecord>(new LocalSchemaRegistry(UserRecord.SchemaText));
private static AvroDeserializer<UserRecord> myCustomDeserialiser = new AvroDeserializer<UserRecord>(new LocalSchemaRegistry(UserRecord.SchemaText));

/// <summary>
/// This function shows how to implement a custom deserialiser in the function method
Expand All @@ -51,17 +46,31 @@ public static class AvroSpecificTriggers
/// <param name="logger">Logger.</param>
[FunctionName(nameof(UserAsBytes))]
public static async Task UserAsBytes(
[KafkaTrigger("LocalBroker", "users", ValueType = typeof(byte[]), ConsumerGroup = "azfunc_bytes")] KafkaEventData[] kafkaEvents,
[KafkaTrigger("LocalBroker", "users", ValueType = typeof(byte[]), ConsumerGroup = "azfunc_bytes")] byte[][] kafkaEvents,
ILogger logger)
{
foreach (var kafkaEvent in kafkaEvents)
{
var desUserRecord = await myCustomDeserialiser.DeserializeAsync(((byte[])kafkaEvent.Value), false, Confluent.Kafka.SerializationContext.Empty);

logger.LogInformation($"Custom deserialised user: {JsonConvert.SerializeObject(desUserRecord)}");
var desUserRecord = await myCustomDeserialiser.DeserializeAsync(kafkaEvent, false, Confluent.Kafka.SerializationContext.Empty);
logger.LogInformation($"Custom deserialised user from batch: {JsonConvert.SerializeObject(desUserRecord)}");
}
}

/// <summary>
/// This function shows how to implement a custom deserialiser in the function method
/// </summary>
/// <returns>The as bytes.</returns>
/// <param name="kafkaEvents">Kafka events.</param>
/// <param name="logger">Logger.</param>
[FunctionName(nameof(UserAsByte))]
public static async Task UserAsByte(
[KafkaTrigger("LocalBroker", "users", ValueType = typeof(byte[]), ConsumerGroup = "azfunc_byte")] byte[] kafkaEvent,
ILogger logger)
{
var desUserRecord = await myCustomDeserialiser.DeserializeAsync(kafkaEvent, false, Confluent.Kafka.SerializationContext.Empty);
logger.LogInformation($"Custom deserialised user: {JsonConvert.SerializeObject(desUserRecord)}");
}

[FunctionName(nameof(PageViewsFemale))]
public static void PageViewsFemale(
[KafkaTrigger("LocalBroker", "PAGEVIEWS_FEMALE", ValueType = typeof(PageViewsFemale), ConsumerGroup = "azfunc")] KafkaEventData[] kafkaEvents,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using Avro.Generic;
using Avro.Specific;
using Microsoft.Azure.WebJobs.Description;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Azure.WebJobs.Host.Configuration;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
Expand All @@ -28,6 +27,7 @@ public class KafkaExtensionConfigProvider : IExtensionConfigProvider
private readonly INameResolver nameResolver;
private readonly IWebJobsExtensionConfiguration<KafkaExtensionConfigProvider> configuration;
private readonly IKafkaProducerProvider kafkaProducerManager;
private readonly ILogger logger;

public KafkaExtensionConfigProvider(
IConfiguration config,
Expand All @@ -45,19 +45,20 @@ public class KafkaExtensionConfigProvider : IExtensionConfigProvider
this.nameResolver = nameResolver;
this.configuration = configuration;
this.kafkaProducerManager = kafkaProducerManager;
this.logger = loggerFactory.CreateLogger(LogCategories.CreateTriggerCategory("Kafka"));
}

public void Initialize(ExtensionConfigContext context)
{
this.configuration.ConfigurationSection.Bind(this.options);
configuration.ConfigurationSection.Bind(options);

context
.AddConverter<KafkaEventData, string>(ConvertKafkaEventData2String)
.AddConverter<KafkaEventData, ISpecificRecord>(ConvertKafkaEventData2AvroSpecific)
.AddConverter<KafkaEventData, byte[]>(ConvertKafkaEventData2Bytes);

// register our trigger binding provider
var triggerBindingProvider = new KafkaTriggerAttributeBindingProvider(this.config, this.options, this.converterManager, this.nameResolver, this.loggerFactory);
var triggerBindingProvider = new KafkaTriggerAttributeBindingProvider(config, options, converterManager, nameResolver, loggerFactory);
context.AddBindingRule<KafkaTriggerAttribute>()
.BindToTrigger(triggerBindingProvider);

Expand All @@ -68,15 +69,23 @@ public void Initialize(ExtensionConfigContext context)

private IAsyncCollector<KafkaEventData> BuildCollectorFromAttribute(KafkaAttribute attribute)
{
return new KafkaAsyncCollector(attribute.Topic, this.kafkaProducerManager.Get(attribute));
return new KafkaAsyncCollector(attribute.Topic, kafkaProducerManager.Get(attribute));
}

private ISpecificRecord ConvertKafkaEventData2AvroSpecific(KafkaEventData kafkaEventData)
{
return (ISpecificRecord)kafkaEventData.Value;
if (kafkaEventData.Value is ISpecificRecord specRecord)
{
return specRecord;
}
else
{
logger.LogWarning($@"Unable to convert incoming data to Avro format. Expected ISpecificRecord, got {kafkaEventData.Value.GetType()}. Returning [null]");
return null;
}
}

private static string ConvertKafkaEventData2String(KafkaEventData kafkaEventData)
private string ConvertKafkaEventData2String(KafkaEventData kafkaEventData)
{
try
{
Expand All @@ -91,13 +100,13 @@ private static string ConvertKafkaEventData2String(KafkaEventData kafkaEventData
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
logger.LogError(ex, $@"Unable to convert incoming data to string.");
throw;
}
}


private static string GenericRecord2String(GenericRecord record)
private string GenericRecord2String(GenericRecord record)
{
var props = new Dictionary<string, object>();
foreach (var field in record.Schema.Fields)
Expand All @@ -111,13 +120,14 @@ private static string GenericRecord2String(GenericRecord record)
return JsonConvert.SerializeObject(props);
}

private static byte[] ConvertKafkaEventData2Bytes(KafkaEventData input)
private byte[] ConvertKafkaEventData2Bytes(KafkaEventData input)
{
if (input.Value is byte[] bytes)
{
return bytes;
}

logger.LogWarning($@"Unable to convert incoming data to byte[] as underlying data stream was not byte[]. Returning [null]");
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka.Listeners
internal static class KafkaListenerFactory
{
public static IListener CreateFor(KafkaTriggerAttribute attribute,
Type parameterType,
ITriggeredFunctionExecutor executor,
bool singleDispatch,
KafkaOptions options,
Expand All @@ -24,7 +25,7 @@ internal static class KafkaListenerFactory
string eventHubConnectionString,
ILogger logger)
{
var valueType = GetValueType(attribute, out string avroSchema);
var valueType = GetValueType(attribute, parameterType, out string avroSchema);
return (IListener)typeof(KafkaListenerFactory)
.GetMethod(nameof(CreateFor), BindingFlags.Static | BindingFlags.NonPublic | BindingFlags.InvokeMethod)
.MakeGenericMethod(attribute.KeyType ?? typeof(Ignore), valueType)
Expand All @@ -42,7 +43,7 @@ internal static class KafkaListenerFactory
});
}

private static Type GetValueType(KafkaTriggerAttribute attribute, out string avroSchema)
private static Type GetValueType(KafkaTriggerAttribute attribute, Type parameterType, out string avroSchema)
{
avroSchema = null;

Expand All @@ -54,6 +55,11 @@ private static Type GetValueType(KafkaTriggerAttribute attribute, out string avr
avroSchema = attribute.AvroSchema;
return typeof(Avro.Generic.GenericRecord);
}
else if (parameterType == typeof(byte[][])
|| parameterType == typeof(byte[]))
{
return typeof(byte[]);
}
else
{
return typeof(string);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex
Task<IListener> listenerCreator(ListenerFactoryContext factoryContext, bool singleDispatch)
{
var listener = Listeners.KafkaListenerFactory.CreateFor(attribute,
parameter.ParameterType,
factoryContext.Executor,
singleDispatch,
options.Value,
Expand Down
Loading

0 comments on commit b0ec76a

Please sign in to comment.