Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support to set max retries for kafka pubsub processing #4

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/CloudEventDotNet.Kafka/KafkaAtLeastOnceConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public KafkaAtLeastOnceConsumer(
.SetLogHandler((_, log) => _telemetry.OnConsumerLog(log))
.Build();

_workItemContext = new KafkaWorkItemContext(registry, new(options, _telemetry));
_workItemContext = new KafkaWorkItemContext(registry, new KafkaCloudEventRepublisher(options, _telemetry));
_topics = registry.GetSubscribedTopics(pubSubName).ToArray();
_telemetry.Logger.LogDebug("KafkaAtLeastOnceConsumer created");
}
Expand Down
2 changes: 1 addition & 1 deletion src/CloudEventDotNet.Kafka/KafkaAtMostOnceConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public KafkaAtMostOnceConsumer(
Acks = Acks.Leader,
LingerMs = 10
};
_workItemContext = new KafkaWorkItemContext(registry, new(options, _telemetry));
_workItemContext = new KafkaWorkItemContext(registry, new KafkaCloudEventRepublisher(options, _telemetry));
_topics = registry.GetSubscribedTopics(pubSubName).ToArray();

var channelContext = new KafkaMessageChannelContext(
Expand Down
4 changes: 4 additions & 0 deletions src/CloudEventDotNet.Kafka/KafkaCloudEventPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ internal sealed class KafkaCloudEventPublisher : ICloudEventPublisher
{
private readonly IProducer<byte[], byte[]> _producer;
private readonly KafkaProducerTelemetry _telemetry;
private readonly ILogger _logger;

public KafkaCloudEventPublisher(
string pubSubName,
ILogger<KafkaCloudEventPublisher> logger,
KafkaPublishOptions options,
ILoggerFactory loggerFactory)
{
Expand All @@ -20,6 +22,7 @@ public KafkaCloudEventPublisher(
.SetErrorHandler((_, e) => _telemetry.OnProducerError(e))
.SetLogHandler((_, log) => _telemetry.OnProducerLog(log))
.Build();
_logger = logger;
}

public async Task PublishAsync<TData>(string topic, CloudEvent<TData> cloudEvent)
Expand All @@ -32,4 +35,5 @@ public async Task PublishAsync<TData>(string topic, CloudEvent<TData> cloudEvent
DeliveryResult<byte[], byte[]> result = await _producer.ProduceAsync(topic, message).ConfigureAwait(false);
_telemetry.OnMessageProduced(result, _producer.Name);
}

}
65 changes: 65 additions & 0 deletions src/CloudEventDotNet.Kafka/KafkaCloudEventRepublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@

using Confluent.Kafka;
using Microsoft.Extensions.Logging;

namespace CloudEventDotNet.Kafka;

internal class KafkaCloudEventRepublisher : ICloudEventRepublisher
{
private readonly IProducer<byte[], byte[]> _producer;
private readonly KafkaSubscribeOptions _options;
private readonly KafkaConsumerTelemetry _telemetry;

public KafkaCloudEventRepublisher(
KafkaSubscribeOptions options,
KafkaConsumerTelemetry telemetry
)
{
var producerConfig = new ProducerConfig()
{
BootstrapServers = options.ConsumerConfig.BootstrapServers,
Acks = Acks.Leader,
LingerMs = 10,

// producer retries sending messages by default
};

_producer = new ProducerBuilder<byte[], byte[]>(producerConfig)
.SetErrorHandler((_, e) => telemetry.OnProducerError(e))
.SetLogHandler((_, log) => telemetry.OnProducerLog(log))
.Build();
_options = options;
_telemetry = telemetry;
}

public async Task RepublishAsync(string topic, CloudEvent cloudEvent)
{
int retry = cloudEvent.Retry;

if (retry >= _options.MaxRetries)
{
CloudEventProcessingTelemetry.LogOnCloudEventDropped(_telemetry.Logger, cloudEvent.Id);
return;
}

cloudEvent.Retry = retry + 1;

var message = new Message<byte[], byte[]>
{
Value = JSON.SerializeToUtf8Bytes(cloudEvent)
};
DeliveryResult<byte[], byte[]> result;
try
{
result = await _producer.ProduceAsync(topic, message).ConfigureAwait(false);
}
catch (Exception ex)
{
_telemetry.Logger.LogError(ex, "Failed to republish cloud event {id}", cloudEvent.Id);
CloudEventProcessingTelemetry.LogOnCloudEventDropped(_telemetry.Logger, cloudEvent.Id);
return;
}
_telemetry.Logger.LogInformation("Republished cloud event {id} at {offset}", cloudEvent.Id, result.TopicPartitionOffset);

}
}
12 changes: 8 additions & 4 deletions src/CloudEventDotNet.Kafka/KafkaMessageWorkItem.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Text.Json;
using System.Diagnostics;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -49,6 +49,7 @@ public ValueTask WaitToCompleteAsync()

internal async Task ExecuteAsync()
{
Activity? activity = null;
try
{
var cloudEvent = JSON.Deserialize<CloudEvent>(_message.Message.Value)!;
Expand All @@ -58,12 +59,14 @@ internal async Task ExecuteAsync()
CloudEventProcessingTelemetry.OnHandlerNotFound(_telemetry.Logger, metadata);
return;
}
bool succeed = await handler.ProcessAsync(cloudEvent, _cancellationTokenSource.Token).ConfigureAwait(false);
KafkaConsumerTelemetry.OnConsumed(_channelContext.ConsumerName, _channelContext.ConsumerGroup);

// processing
activity = handler.StartProcessing(cloudEvent);
KafkaConsumerTelemetry.OnConsuming(activity, _channelContext.ConsumerName, _channelContext.ConsumerGroup);
bool succeed = await handler.ProcessAsync(cloudEvent, _cancellationTokenSource.Token).ConfigureAwait(false);
if (!succeed)
{
await _context.Producer.ReproduceAsync(_message).ConfigureAwait(false);
await _context.Republisher.RepublishAsync(_message.Topic, cloudEvent).ConfigureAwait(false);
}
}
catch (Exception ex)
Expand All @@ -73,6 +76,7 @@ internal async Task ExecuteAsync()
finally
{
_waiter.SetResult();
activity?.Dispose();
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/CloudEventDotNet.Kafka/KafkaPubSubOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public class KafkaSubscribeOptions
/// The limit of unprocessed CloudEvents in local process queue.
/// </summary>
public int RunningWorkItemLimit { get; set; } = 1024;

/// <summary>
/// Max retries for cloud events failed to processing, set 0 to disable retries, default to int.MaxValue.
/// </summary>
public int MaxRetries { get; set; } = int.MaxValue;
}

public enum DeliveryGuarantee
Expand Down
33 changes: 0 additions & 33 deletions src/CloudEventDotNet.Kafka/KafkaRedeliverProducer.cs

This file was deleted.

5 changes: 3 additions & 2 deletions src/CloudEventDotNet.Kafka/KafkaTelemetry.Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,17 @@ public void OnProducerLog(LogMessage log)
)]
public partial void OnCommitLoopError(Exception exception);

public static void OnConsumed(
public static void OnConsuming(
Activity? activity,
string consumerName,
string consumerGroup)
{
var activity = Activity.Current;
if (activity is not null)
{
// activity.SetTag("messaging.kafka.message_key", message.Key);
activity.SetTag("messaging.kafka.client_id", consumerName);
activity.SetTag("messaging.kafka.consumer_group", consumerGroup);
}
}

}
9 changes: 6 additions & 3 deletions src/CloudEventDotNet.Kafka/KafkaTelemetry.Producer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Diagnostics;
using System.Net.Http.Headers;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;

Expand All @@ -19,18 +20,20 @@ public KafkaProducerTelemetry(string pubSubName, ILoggerFactory loggerFactory)
Message = "Produced message {topic}:{partition}:{offset}"
)]
partial void LogOnMessageProduced(string topic, int partition, long offset);
public void LogOnMessageProduced(DeliveryResult<byte[], byte[]> result)
=> LogOnMessageProduced(result.Topic, result.Partition.Value, result.Offset.Value);
public void OnMessageProduced(DeliveryResult<byte[], byte[]> result, string clientId)
{
var activity = Activity.Current;
if (activity is not null)
if (Activity.Current is { } activity)
{
activity.SetTag("messaging.kafka.client_id", clientId);
activity.SetTag("messaging.kafka.partition", result.Partition.Value);
}
LogOnMessageProduced(result.Topic, result.Partition.Value, result.Offset.Value);
LogOnMessageProduced(result);
}



[LoggerMessage(
Level = LogLevel.Error,
Message = "Producer error: {error}"
Expand Down
2 changes: 1 addition & 1 deletion src/CloudEventDotNet.Kafka/KafkaWorkItemContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ namespace CloudEventDotNet.Kafka;

internal record KafkaWorkItemContext(
Registry Registry,
KafkaRedeliverProducer Producer);
ICloudEventRepublisher Republisher);
9 changes: 8 additions & 1 deletion src/CloudEventDotNet.Redis/RedisMessageWorkItem.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics;
using System.Text.Json;
using CloudEventDotNet.Redis.Instruments;
using StackExchange.Redis;
Expand Down Expand Up @@ -45,6 +46,8 @@ public ValueTask WaitToCompleteAsync()

internal async Task ExecuteAsync()
{
Activity? activity = null;

try
{
var cloudEvent = JSON.Deserialize<CloudEvent>((byte[])Message["data"]!)!;
Expand All @@ -53,6 +56,11 @@ internal async Task ExecuteAsync()
{
return;
}


// processing
activity = handler.StartProcessing(cloudEvent);
RedisTelemetry.OnMessageProcessing(activity, ChannelContext.ConsumerGroup, ChannelContext.ConsumerName);
bool succeed = await handler.ProcessAsync(cloudEvent, _cancellationTokenSource.Token).ConfigureAwait(false);
if (succeed)
{
Expand All @@ -62,7 +70,6 @@ await _context.Redis.StreamAcknowledgeAsync(
Message.Id).ConfigureAwait(false);
_context.RedisTelemetry.OnMessageAcknowledged(Message.Id.ToString());
}
RedisTelemetry.OnMessageProcessed(ChannelContext.ConsumerGroup, ChannelContext.ConsumerName);
}
catch (Exception ex)
{
Expand Down
4 changes: 2 additions & 2 deletions src/CloudEventDotNet.Redis/RedisTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ public static void OnMessageProduced(ILogger logger, IConnectionMultiplexer mult
activity?.SetTag("messaging.redis.client_name", multiplexer.ClientName);
}

public static void OnMessageProcessed(
public static void OnMessageProcessing(
Activity? activity,
string consumerGroup,
string consumerName
)
{
Activity? activity = Activity.Current;
if (activity is not null)
{
activity.SetTag("messaging.redis.client_id", consumerName);
Expand Down
13 changes: 13 additions & 0 deletions src/CloudEventDotNet/CloudEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ internal record CloudEvent(

[JsonExtensionData]
public Dictionary<string, JsonElement> Extensions { get; set; } = new();

[JsonIgnore]
public int Retry
{
get
{
return Extensions["retry"].GetInt32();
}
set
{
Extensions["retry"] = JsonSerializer.SerializeToElement(value);
}
}
}

/// <summary>
Expand Down
Loading