diff --git a/samples/dotnet/KafkaFunctionSample/AvroSpecificTriggers.cs b/samples/dotnet/KafkaFunctionSample/AvroSpecificTriggers.cs index 38e5b9e8..ed2ebbaf 100644 --- a/samples/dotnet/KafkaFunctionSample/AvroSpecificTriggers.cs +++ b/samples/dotnet/KafkaFunctionSample/AvroSpecificTriggers.cs @@ -1,14 +1,9 @@ -using System; -using System.IO; using System.Threading.Tasks; -using Microsoft.AspNetCore.Mvc; +using Confluent.SchemaRegistry.Serdes; using Microsoft.Azure.WebJobs; -using Microsoft.Azure.WebJobs.Extensions.Http; -using Microsoft.AspNetCore.Http; +using Microsoft.Azure.WebJobs.Extensions.Kafka; using Microsoft.Extensions.Logging; using Newtonsoft.Json; -using Microsoft.Azure.WebJobs.Extensions.Kafka; -using Confluent.SchemaRegistry.Serdes; namespace KafkaFunctionSample { @@ -41,7 +36,7 @@ public static class AvroSpecificTriggers } } - static AvroDeserializer myCustomDeserialiser = new AvroDeserializer(new LocalSchemaRegistry(UserRecord.SchemaText)); + private static AvroDeserializer myCustomDeserialiser = new AvroDeserializer(new LocalSchemaRegistry(UserRecord.SchemaText)); /// /// This function shows how to implement a custom deserialiser in the function method @@ -51,17 +46,31 @@ public static class AvroSpecificTriggers /// Logger. [FunctionName(nameof(UserAsBytes))] public static async Task UserAsBytes( - [KafkaTrigger("LocalBroker", "users", ValueType = typeof(byte[]), ConsumerGroup = "azfunc_bytes")] KafkaEventData[] kafkaEvents, + [KafkaTrigger("LocalBroker", "users", ValueType = typeof(byte[]), ConsumerGroup = "azfunc_bytes")] byte[][] kafkaEvents, ILogger logger) { foreach (var kafkaEvent in kafkaEvents) { - var desUserRecord = await myCustomDeserialiser.DeserializeAsync(((byte[])kafkaEvent.Value), false, Confluent.Kafka.SerializationContext.Empty); - - logger.LogInformation($"Custom deserialised user: {JsonConvert.SerializeObject(desUserRecord)}"); + var desUserRecord = await myCustomDeserialiser.DeserializeAsync(kafkaEvent, false, Confluent.Kafka.SerializationContext.Empty); + logger.LogInformation($"Custom deserialised user from batch: {JsonConvert.SerializeObject(desUserRecord)}"); } } + /// + /// This function shows how to implement a custom deserialiser in the function method + /// + /// The as bytes. + /// Kafka events. + /// Logger. + [FunctionName(nameof(UserAsByte))] + public static async Task UserAsByte( + [KafkaTrigger("LocalBroker", "users", ValueType = typeof(byte[]), ConsumerGroup = "azfunc_byte")] byte[] kafkaEvent, + ILogger logger) + { + var desUserRecord = await myCustomDeserialiser.DeserializeAsync(kafkaEvent, false, Confluent.Kafka.SerializationContext.Empty); + logger.LogInformation($"Custom deserialised user: {JsonConvert.SerializeObject(desUserRecord)}"); + } + [FunctionName(nameof(PageViewsFemale))] public static void PageViewsFemale( [KafkaTrigger("LocalBroker", "PAGEVIEWS_FEMALE", ValueType = typeof(PageViewsFemale), ConsumerGroup = "azfunc")] KafkaEventData[] kafkaEvents,