Skip to content

Commit

Permalink
task 3 in #44 doesn't require any additional work; added sample handl…
Browse files Browse the repository at this point in the history
…ers to demonstrate
  • Loading branch information
brandonh-msft committed Apr 3, 2019
1 parent a9ea661 commit 29a5bc6
Showing 1 changed file with 21 additions and 12 deletions.
33 changes: 21 additions & 12 deletions samples/dotnet/KafkaFunctionSample/AvroSpecificTriggers.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Confluent.SchemaRegistry.Serdes;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Confluent.SchemaRegistry.Serdes;

namespace KafkaFunctionSample
{
Expand Down Expand Up @@ -41,7 +36,7 @@ public static class AvroSpecificTriggers
}
}

static AvroDeserializer<UserRecord> myCustomDeserialiser = new AvroDeserializer<UserRecord>(new LocalSchemaRegistry(UserRecord.SchemaText));
private static AvroDeserializer<UserRecord> myCustomDeserialiser = new AvroDeserializer<UserRecord>(new LocalSchemaRegistry(UserRecord.SchemaText));

/// <summary>
/// This function shows how to implement a custom deserialiser in the function method
Expand All @@ -51,17 +46,31 @@ public static class AvroSpecificTriggers
/// <param name="logger">Logger.</param>
[FunctionName(nameof(UserAsBytes))]
public static async Task UserAsBytes(
[KafkaTrigger("LocalBroker", "users", ValueType = typeof(byte[]), ConsumerGroup = "azfunc_bytes")] KafkaEventData[] kafkaEvents,
[KafkaTrigger("LocalBroker", "users", ValueType = typeof(byte[]), ConsumerGroup = "azfunc_bytes")] byte[][] kafkaEvents,
ILogger logger)
{
foreach (var kafkaEvent in kafkaEvents)
{
var desUserRecord = await myCustomDeserialiser.DeserializeAsync(((byte[])kafkaEvent.Value), false, Confluent.Kafka.SerializationContext.Empty);

logger.LogInformation($"Custom deserialised user: {JsonConvert.SerializeObject(desUserRecord)}");
var desUserRecord = await myCustomDeserialiser.DeserializeAsync(kafkaEvent, false, Confluent.Kafka.SerializationContext.Empty);
logger.LogInformation($"Custom deserialised user from batch: {JsonConvert.SerializeObject(desUserRecord)}");
}
}

/// <summary>
/// This function shows how to implement a custom deserialiser in the function method
/// </summary>
/// <returns>The as bytes.</returns>
/// <param name="kafkaEvents">Kafka events.</param>
/// <param name="logger">Logger.</param>
[FunctionName(nameof(UserAsByte))]
public static async Task UserAsByte(
[KafkaTrigger("LocalBroker", "users", ValueType = typeof(byte[]), ConsumerGroup = "azfunc_byte")] byte[] kafkaEvent,
ILogger logger)
{
var desUserRecord = await myCustomDeserialiser.DeserializeAsync(kafkaEvent, false, Confluent.Kafka.SerializationContext.Empty);
logger.LogInformation($"Custom deserialised user: {JsonConvert.SerializeObject(desUserRecord)}");
}

[FunctionName(nameof(PageViewsFemale))]
public static void PageViewsFemale(
[KafkaTrigger("LocalBroker", "PAGEVIEWS_FEMALE", ValueType = typeof(PageViewsFemale), ConsumerGroup = "azfunc")] KafkaEventData[] kafkaEvents,
Expand Down

0 comments on commit 29a5bc6

Please sign in to comment.