Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add properties for producer configuration to Output Binding Attribute definition #57

Merged
merged 4 commits into from
Apr 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public KafkaAttribute()
/// </summary>
public Type KeyType { get; set; }

Type valueType;
private Type valueType;

/// <summary>
/// Gets or sets the Avro data type
Expand All @@ -75,13 +75,44 @@ public Type ValueType
/// </summary>
public string AvroSchema { get; set; }

bool IsValidValueType(Type value)
private bool IsValidValueType(Type value)
{
return
typeof(ISpecificRecord).IsAssignableFrom(value) ||
typeof(Google.Protobuf.IMessage).IsAssignableFrom(value) ||
value == typeof(byte[]) ||
value == typeof(string);
}

/// <summary>
/// Gets or sets the Maximum transmit message size. Default: 1MB
/// </summary>
public int? MaxMessageBytes { get; set; }

/// <summary>
/// Maximum number of messages batched in one MessageSet. default: 10000
/// </summary>
public int? BatchSize { get; set; }

/// <summary>
/// When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. default: false
/// </summary>
public bool? EnableIdempotence { get; set; }

/// <summary>
/// Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. default: 300000
/// </summary>
public int? MessageTimeoutMs { get; set; }

/// <summary>
/// The ack timeout of the producer request in milliseconds. default: 5000
/// </summary>
public int? RequestTimeoutMs { get; set; }

/// <summary>
/// How many times to retry sending a failing Message. **Note:** default: 2
/// </summary>
/// <remarks>Retrying may cause reordering unless <c>EnableIdempotence</c> is set to <c>true</c>.</remarks>
public int? MaxRetries { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.SchemaRegistry.Serdes;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -140,7 +139,7 @@ private void DeliveryHandler(DeliveryReport<TKey, TValue> deliveredItem)
}
else
{
this.logger.LogError("Failed to delivery message to {topic} / {partition} / {offset}. Error: {error}", deliveredItem.Topic, (int)deliveredItem.Partition, (long)deliveredItem.Offset, deliveredItem.Error.ToString());
logger.LogError("Failed to delivery message to {topic} / {partition} / {offset}. Reason: {reason}. Full Error: {error}", deliveredItem.Topic, (int)deliveredItem.Partition, (long)deliveredItem.Offset, deliveredItem.Error.Reason, deliveredItem.Error.ToString());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class KafkaProducerProvider : IKafkaProducerProvider
private readonly IConfiguration config;
private readonly INameResolver nameResolver;
private readonly ILoggerProvider loggerProvider;
ConcurrentDictionary<string, IKafkaProducer> producers = new ConcurrentDictionary<string, IKafkaProducer>();
private ConcurrentDictionary<string, IKafkaProducer> producers = new ConcurrentDictionary<string, IKafkaProducer>();

public KafkaProducerProvider(IConfiguration config, INameResolver nameResolver, ILoggerProvider loggerProvider)
{
Expand All @@ -33,8 +33,8 @@ public KafkaProducerProvider(IConfiguration config, INameResolver nameResolver,

public IKafkaProducer Get(KafkaAttribute attribute)
{
var resolvedBrokerList = this.nameResolver.ResolveWholeString(attribute.BrokerList);
var brokerListFromConfig = this.config.GetConnectionStringOrSetting(resolvedBrokerList);
var resolvedBrokerList = nameResolver.ResolveWholeString(attribute.BrokerList);
var brokerListFromConfig = config.GetConnectionStringOrSetting(resolvedBrokerList);
if (!string.IsNullOrEmpty(brokerListFromConfig))
{
resolvedBrokerList = brokerListFromConfig;
Expand All @@ -44,7 +44,7 @@ public IKafkaProducer Get(KafkaAttribute attribute)
var valueTypeName = attribute.ValueType == null ? string.Empty : attribute.ValueType.AssemblyQualifiedName;
var producerKey = $"{resolvedBrokerList}:keyTypeName:valueTypeName";

return producers.GetOrAdd(producerKey, (k) => this.Create(attribute, resolvedBrokerList));
return producers.GetOrAdd(producerKey, (k) => Create(attribute, resolvedBrokerList));
}

private IKafkaProducer Create(KafkaAttribute attribute, string brokerList)
Expand Down Expand Up @@ -75,17 +75,19 @@ private IKafkaProducer Create(KafkaAttribute attribute, string brokerList)

return (IKafkaProducer)Activator.CreateInstance(
typeof(KafkaProducer<,>).MakeGenericType(keyType, valueType),
this.GetProducerConfig(brokerList),
GetProducerConfig(attribute, brokerList),
avroSchema,
this.loggerProvider.CreateLogger(LogCategories.CreateTriggerCategory("Kafka")));
loggerProvider.CreateLogger(LogCategories.CreateTriggerCategory("Kafka")));
}

private ProducerConfig GetProducerConfig(string brokerList)
private ProducerConfig GetProducerConfig(KafkaAttribute attribute, string brokerList) => new ProducerConfig
{
return new ProducerConfig()
{
BootstrapServers = brokerList,
};
}
BootstrapServers = brokerList,
BatchNumMessages = attribute.BatchSize,
EnableIdempotence = attribute.EnableIdempotence,
MessageSendMaxRetries = attribute.MaxRetries,
MessageTimeoutMs = attribute.MessageTimeoutMs,
RequestTimeoutMs = attribute.RequestTimeoutMs,
};
}
}