Skip to content

Commit

Permalink
Reworking Listener creating in to a Factory pattern. Resolves two ite…
Browse files Browse the repository at this point in the history
…ms in Azure#44
  • Loading branch information
brandonh-msft committed Apr 2, 2019
1 parent e71c90f commit bc62a2d
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.SchemaRegistry.Serdes;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Extensions.Logging;
Expand All @@ -35,7 +34,6 @@ internal class KafkaListener<TKey, TValue> : IListener
private readonly string topic;
private readonly string consumerGroup;
private readonly string eventHubConnectionString;
private readonly string avroSchema;
private FunctionExecutorBase<TKey, TValue> functionExecutor;
private IConsumer<TKey, TValue> consumer;
private bool disposed;
Expand All @@ -50,7 +48,6 @@ internal class KafkaListener<TKey, TValue> : IListener
string topic,
string consumerGroup,
string eventHubConnectionString,
string avroSchema,
ILogger logger)
{
this.executor = executor;
Expand All @@ -61,8 +58,7 @@ internal class KafkaListener<TKey, TValue> : IListener
this.topic = topic;
this.consumerGroup = consumerGroup;
this.eventHubConnectionString = eventHubConnectionString;
this.avroSchema = avroSchema;
this.cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource = new CancellationTokenSource();
}

public void Cancel()
Expand Down Expand Up @@ -106,59 +102,46 @@ public void Cancel()
return builder.Build();
}

public Task StartAsync(CancellationToken cancellationToken)
public virtual Task StartAsync(CancellationToken cancellationToken)
{
IAsyncDeserializer<TValue> asyncValueDeserializer = null;
IDeserializer<TValue> valueDeserializer = null;
IAsyncDeserializer<TKey> keyDeserializer = null;
SetConsumerAndExecutor(null, null, null);

if (!string.IsNullOrEmpty(this.avroSchema))
{
var schemaRegistry = new LocalSchemaRegistry(this.avroSchema);
asyncValueDeserializer = new AvroDeserializer<TValue>(schemaRegistry);
}
else
{
if (typeof(Google.Protobuf.IMessage).IsAssignableFrom(typeof(TValue)))
{
// protobuf: need to create using reflection due to generic requirements in ProtobufDeserializer
valueDeserializer = (IDeserializer<TValue>)Activator.CreateInstance(typeof(ProtobufDeserializer<>).MakeGenericType(typeof(TValue)));
}
}
return Task.CompletedTask;
}

this.consumer = this.CreateConsumer(
config: this.GetConsumerConfiguration(),
errorHandler: (_, e) =>
{
this.logger.LogError(e.Reason);
},
partitionsAssignedHandler: (_, e) =>
{
this.logger.LogInformation($"Assigned partitions: [{string.Join(", ", e)}]");
},
partitionsRevokedHandler: (_, e) =>
{
this.logger.LogInformation($"Revoked partitions: [{string.Join(", ", e)}]");
},
asyncValueDeserializer: asyncValueDeserializer,
valueDeserializer: valueDeserializer,
keyDeserializer: keyDeserializer);
protected void SetConsumerAndExecutor(IAsyncDeserializer<TValue> asyncValueDeserializer, IDeserializer<TValue> valueDeserializer, IAsyncDeserializer<TKey> keyDeserializer)
{
consumer = CreateConsumer(
config: GetConsumerConfiguration(),
errorHandler: (_, e) =>
{
logger.LogError(e.Reason);
},
partitionsAssignedHandler: (_, e) =>
{
logger.LogInformation($"Assigned partitions: [{string.Join(", ", e)}]");
},
partitionsRevokedHandler: (_, e) =>
{
logger.LogInformation($"Revoked partitions: [{string.Join(", ", e)}]");
},
asyncValueDeserializer: asyncValueDeserializer,
valueDeserializer: valueDeserializer,
keyDeserializer: keyDeserializer);

this.functionExecutor = singleDispatch ?
(FunctionExecutorBase<TKey, TValue>)new SingleItemFunctionExecutor<TKey, TValue>(this.executor, this.consumer, this.options.ExecutorChannelCapacity, this.options.ChannelFullRetryIntervalInMs, this.logger) :
new MultipleItemFunctionExecutor<TKey, TValue>(this.executor, this.consumer, this.options.ExecutorChannelCapacity, this.options.ChannelFullRetryIntervalInMs, this.logger);
functionExecutor = singleDispatch ?
(FunctionExecutorBase<TKey, TValue>)new SingleItemFunctionExecutor<TKey, TValue>(executor, consumer, options.ExecutorChannelCapacity, options.ChannelFullRetryIntervalInMs, logger) :
new MultipleItemFunctionExecutor<TKey, TValue>(executor, consumer, options.ExecutorChannelCapacity, options.ChannelFullRetryIntervalInMs, logger);

this.consumer.Subscribe(this.topic);
consumer.Subscribe(topic);

// Using a thread as opposed to a task since this will be long running
// https://github.com/davidfowl/AspNetCoreDiagnosticScenarios/blob/master/AsyncGuidance.md#avoid-using-taskrun-for-long-running-work-that-blocks-the-thread
var thread = new Thread(ProcessSubscription)
{
IsBackground = true,
};
thread.Start(this.cancellationTokenSource.Token);

return Task.CompletedTask;
thread.Start(cancellationTokenSource.Token);
}

private ConsumerConfig GetConsumerConfiguration()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.


using System.Threading;
using System.Threading.Tasks;
using Confluent.SchemaRegistry.Serdes;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
/// <summary>
/// Kafka listener.
/// Connects a Kafka trigger function with a Kafka Consumer
/// </summary>
internal class KafkaListenerAvro<TKey, TValue> : KafkaListener<TKey, TValue>
{
private readonly string avroSchema;

public KafkaListenerAvro(
ITriggeredFunctionExecutor executor,
bool singleDispatch,
KafkaOptions options,
string brokerList,
string topic,
string consumerGroup,
string eventHubConnectionString,
string avroSchema,
ILogger logger) : base(executor, singleDispatch, options, brokerList, topic, consumerGroup, eventHubConnectionString, logger)
{
this.avroSchema = avroSchema;
}

public override Task StartAsync(CancellationToken cancellationToken)
{
var schemaRegistry = new LocalSchemaRegistry(avroSchema);
AvroDeserializer<TValue> avroDeserializer = new AvroDeserializer<TValue>(schemaRegistry);
SetConsumerAndExecutor(avroDeserializer, null, null);

return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Linq;
using Avro.Specific;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka.Listeners
{
internal static class KafkaListenerFactory
{
public static KafkaListener<TKey, TValue> CreateFor<TKey, TValue>(
ITriggeredFunctionExecutor executor,
bool singleDispatch,
KafkaOptions options,
string brokerList,
string topic,
string consumerGroup,
string eventHubConnectionString,
ILogger logger,
string avroSchema = null)
{
if (typeof(ISpecificRecord).IsAssignableFrom(typeof(TValue)))
{
if (string.IsNullOrWhiteSpace(avroSchema))
{
throw new ArgumentNullException(nameof(avroSchema), $@"parameter is required when creating an Avro-based Listener");
}

return new KafkaListenerAvro<TKey, TValue>(executor,
singleDispatch,
options,
brokerList,
topic,
consumerGroup,
eventHubConnectionString,
avroSchema,
logger);
}

if (typeof(Google.Protobuf.IMessage).IsAssignableFrom(typeof(TValue)))
{
return new KafkaListenerProtoBuf<TKey, TValue>(executor,
singleDispatch,
options,
brokerList,
topic,
consumerGroup,
eventHubConnectionString,
logger);
}

return new KafkaListener<TKey, TValue>(executor,
singleDispatch,
options,
brokerList,
topic,
consumerGroup,
eventHubConnectionString,
logger);
}

public static IListener CreateFor(Type keyType, Type valueType,
ITriggeredFunctionExecutor executor,
bool singleDispatch,
KafkaOptions options,
string brokerList,
string topic,
string consumerGroup,
string eventHubConnectionString,
ILogger logger, string avroSchema = null) => (IListener)typeof(KafkaListenerFactory)
.GetMethods().Single(m => m.Name == nameof(CreateFor) && m.IsGenericMethod)
.MakeGenericMethod(keyType, valueType)
.Invoke(null, new object[]
{
executor,
singleDispatch,
options,
brokerList,
topic,
consumerGroup,
eventHubConnectionString,
logger, avroSchema
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.


using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
/// <summary>
/// Kafka listener.
/// Connects a Kafka trigger function with a Kafka Consumer
/// </summary>
internal class KafkaListenerProtoBuf<TKey, TValue> : KafkaListener<TKey, TValue>
{
public KafkaListenerProtoBuf(
ITriggeredFunctionExecutor executor,
bool singleDispatch,
KafkaOptions options,
string brokerList,
string topic,
string consumerGroup,
string eventHubConnectionString,
ILogger logger) : base(executor, singleDispatch, options, brokerList, topic, consumerGroup, eventHubConnectionString, logger) { }

public override Task StartAsync(CancellationToken cancellationToken)
{
// protobuf: need to create using reflection due to generic requirements in ProtobufDeserializer
var valueDeserializer = (IDeserializer<TValue>)Activator.CreateInstance(typeof(ProtobufDeserializer<>).MakeGenericType(typeof(TValue)));
SetConsumerAndExecutor(null, valueDeserializer, null);

return Task.CompletedTask;
}
}
}
Loading

0 comments on commit bc62a2d

Please sign in to comment.