Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer and consumer optimizations #73

Merged
merged 1 commit into from
Apr 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion samples/dotnet/KafkaFunctionSample/ProduceStringTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static class ProduceStringTopic
[FunctionName("ProduceStringTopic")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
[Kafka("stringTopicTenPartitions", BrokerList = "LocalBroker")] IAsyncCollector<KafkaEventData> events,
[Kafka("LocalBroker", "stringTopicTenPartitions", ValueType = typeof(string))] IAsyncCollector<KafkaEventData> events,
ILogger log)
{
try
Expand Down
2 changes: 1 addition & 1 deletion samples/dotnet/KafkaFunctionSample/RawTypeTriggers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static class RawTypeTriggers
{
//[FunctionName(nameof(StringTopic))]
//public static void StringTopic(
// [KafkaTrigger("LocalBroker", "stringTopic", ConsumerGroup = "azfunc")] KafkaEventData kafkaEvent,
// [KafkaTrigger("LocalBroker", "stringTopic", ConsumerGroup = "azfunc", ValueType = typeof(string))] KafkaEventData kafkaEvent,
// ILogger logger)
//{
// logger.LogInformation(kafkaEvent.Value.ToString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Text;
using Avro.Generic;
using Avro.Specific;
using Microsoft.Azure.WebJobs.Description;
Expand All @@ -26,7 +27,7 @@ public class KafkaExtensionConfigProvider : IExtensionConfigProvider
private readonly IConverterManager converterManager;
private readonly INameResolver nameResolver;
private readonly IWebJobsExtensionConfiguration<KafkaExtensionConfigProvider> configuration;
private readonly IKafkaProducerProvider kafkaProducerManager;
private readonly IKafkaProducerFactory kafkaProducerManager;
private readonly ILogger logger;

public KafkaExtensionConfigProvider(
Expand All @@ -36,7 +37,7 @@ public class KafkaExtensionConfigProvider : IExtensionConfigProvider
IConverterManager converterManager,
INameResolver nameResolver,
IWebJobsExtensionConfiguration<KafkaExtensionConfigProvider> configuration,
IKafkaProducerProvider kafkaProducerManager)
IKafkaProducerFactory kafkaProducerManager)
{
this.config = config;
this.options = options;
Expand Down Expand Up @@ -69,7 +70,7 @@ public void Initialize(ExtensionConfigContext context)

private IAsyncCollector<KafkaEventData> BuildCollectorFromAttribute(KafkaAttribute attribute)
{
return new KafkaAsyncCollector(attribute.Topic, kafkaProducerManager.Get(attribute));
return new KafkaAsyncCollector(attribute.Topic, kafkaProducerManager.Create(attribute));
}

private ISpecificRecord ConvertKafkaEventData2AvroSpecific(KafkaEventData kafkaEventData)
Expand All @@ -93,6 +94,17 @@ private string ConvertKafkaEventData2String(KafkaEventData kafkaEventData)
{
return GenericRecord2String(genericRecord);
}
else if (kafkaEventData.Value is byte[] binaryContent)
{
if (binaryContent != null)
{
return Encoding.UTF8.GetString(binaryContent);
}
else
{
return string.Empty;
}
}
else
{
return JsonConvert.SerializeObject(kafkaEventData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public static IWebJobsBuilder AddKafka(this IWebJobsBuilder builder, Action<Kafk
configure(options);
});

builder.Services.AddSingleton<IKafkaProducerProvider, KafkaProducerProvider>();
builder.Services.AddSingleton<IKafkaProducerFactory, KafkaProducerFactory>();

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ internal class KafkaListener<TKey, TValue> : IListener
private CancellationTokenSource cancellationTokenSource;
private SemaphoreSlim subscriberFinished;


readonly object valueDeserializer;
/// <summary>
/// Gets the value deserializer
/// </summary>
/// <value>The value deserializer.</value>
internal object ValueDeserializer => valueDeserializer;

public KafkaListener(
ITriggeredFunctionExecutor executor,
bool singleDispatch,
Expand All @@ -48,8 +56,10 @@ internal class KafkaListener<TKey, TValue> : IListener
string topic,
string consumerGroup,
string eventHubConnectionString,
object valueDeserializer,
ILogger logger)
{
this.valueDeserializer = valueDeserializer;
this.executor = executor;
this.singleDispatch = singleDispatch;
this.options = options;
Expand All @@ -66,72 +76,46 @@ public void Cancel()
this.SafeCloseConsumerAsync().GetAwaiter().GetResult();
}

/// <summary>
/// Need to return a <see cref="IConsumer{TKey, TValue}"/> for unit tests.
/// Unfortunately <see cref="ConsumerBuilder{TKey, TValue}"/> returns <see cref="Consumer{TKey, TValue}"/>
/// </summary>
protected virtual IConsumer<TKey, TValue> CreateConsumer(
ConsumerConfig config,
Action<Consumer<TKey, TValue>, Error> errorHandler,
Action<IConsumer<TKey, TValue>, List<TopicPartition>> partitionsAssignedHandler,
Action<IConsumer<TKey, TValue>, List<TopicPartitionOffset>> partitionsRevokedHandler,
IAsyncDeserializer<TValue> asyncValueDeserializer = null,
IDeserializer<TValue> valueDeserializer = null,
IAsyncDeserializer<TKey> keyDeserializer = null
)
public Task StartAsync(CancellationToken cancellationToken)
{
var builder = new ConsumerBuilder<TKey, TValue>(config)
.SetErrorHandler(errorHandler)
.SetPartitionsAssignedHandler(partitionsAssignedHandler)
.SetPartitionsRevokedHandler(partitionsRevokedHandler);
var builder = this.CreateConsumerBuilder(GetConsumerConfiguration());

if (keyDeserializer != null)
builder.SetErrorHandler((_, e) =>
{
builder.SetKeyDeserializer(keyDeserializer);
}

if (asyncValueDeserializer != null)
logger.LogError(e.Reason);
})
.SetPartitionsAssignedHandler((_, e) =>
{
builder.SetValueDeserializer(asyncValueDeserializer);
}
else if (valueDeserializer != null)
logger.LogInformation($"Assigned partitions: [{string.Join(", ", e)}]");
})
.SetPartitionsRevokedHandler((_, e) =>
{
builder.SetValueDeserializer(valueDeserializer);
}

return builder.Build();
}
logger.LogInformation($"Revoked partitions: [{string.Join(", ", e)}]");
});

public virtual Task StartAsync(CancellationToken cancellationToken)
{
SetConsumerAndExecutor(null, null, null);
if (valueDeserializer != null)
{
if (valueDeserializer is IAsyncDeserializer<TValue> asyncValueDeserializer)
{
builder.SetValueDeserializer(asyncValueDeserializer);
}
else if (valueDeserializer is IDeserializer<TValue> syncValueDeserializer)
{
builder.SetValueDeserializer(syncValueDeserializer);
}
else
{
throw new InvalidOperationException($"Value deserializer must implement either IAsyncDeserializer or IDeserializer. Type {valueDeserializer.GetType().Name} does not");
}
}

return Task.CompletedTask;
}
this.consumer = builder.Build();

protected void SetConsumerAndExecutor(IAsyncDeserializer<TValue> asyncValueDeserializer, IDeserializer<TValue> valueDeserializer, IAsyncDeserializer<TKey> keyDeserializer)
{
consumer = CreateConsumer(
config: GetConsumerConfiguration(),
errorHandler: (_, e) =>
{
logger.LogError(e.Reason);
},
partitionsAssignedHandler: (_, e) =>
{
logger.LogInformation($"Assigned partitions: [{string.Join(", ", e)}]");
},
partitionsRevokedHandler: (_, e) =>
{
logger.LogInformation($"Revoked partitions: [{string.Join(", ", e)}]");
},
asyncValueDeserializer: asyncValueDeserializer,
valueDeserializer: valueDeserializer,
keyDeserializer: keyDeserializer);
var commitStrategy = new AsyncCommitStrategy<TKey, TValue>(consumer, this.logger);

functionExecutor = singleDispatch ?
(FunctionExecutorBase<TKey, TValue>)new SingleItemFunctionExecutor<TKey, TValue>(executor, consumer, options.ExecutorChannelCapacity, options.ChannelFullRetryIntervalInMs, logger) :
new MultipleItemFunctionExecutor<TKey, TValue>(executor, consumer, options.ExecutorChannelCapacity, options.ChannelFullRetryIntervalInMs, logger);
(FunctionExecutorBase<TKey, TValue>)new SingleItemFunctionExecutor<TKey, TValue>(executor, consumer, options.ExecutorChannelCapacity, options.ChannelFullRetryIntervalInMs, commitStrategy, logger) :
new MultipleItemFunctionExecutor<TKey, TValue>(executor, consumer, options.ExecutorChannelCapacity, options.ChannelFullRetryIntervalInMs, commitStrategy, logger);

consumer.Subscribe(topic);

Expand All @@ -142,13 +126,22 @@ protected void SetConsumerAndExecutor(IAsyncDeserializer<TValue> asyncValueDeser
IsBackground = true,
};
thread.Start(cancellationTokenSource.Token);

return Task.CompletedTask;
}

/// <summary>
/// Creates the ConsumerBuilder. Overriding in unit tests
/// </summary>
protected virtual ConsumerBuilder<TKey, TValue> CreateConsumerBuilder(ConsumerConfig config) => new ConsumerBuilder<TKey, TValue>(config);

private ConsumerConfig GetConsumerConfiguration()
{
ConsumerConfig conf = new ConsumerConfig()
{
EnableAutoCommit = false, // we will commit manually
EnableAutoCommit = true, // enabled auto-commit
EnableAutoOffsetStore = false, // disable auto storing read offsets since we need to store them after calling the trigger function
AutoCommitIntervalMs = 200, // every 200ms auto commits what has been stored in memory
AutoOffsetReset = AutoOffsetReset.Earliest, // start from earliest if no checkpoint has been committed
};

Expand Down

This file was deleted.

Loading