Skip to content

Commit

Permalink
Merge pull request #13 from idoroshenko/feature/improve_logger_usage
Browse files Browse the repository at this point in the history
Align the logging approach with Microsoft and Rebus guidelines.
  • Loading branch information
glazkovalex committed Jun 7, 2024
2 parents b33393e + 2ed7444 commit 1603550
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 32 deletions.
40 changes: 25 additions & 15 deletions Rebus.Kafka/Core/KafkaSubscriptionStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ internal Task<TransportMessage> Receive(ITransactionContext context, Cancellatio
}
catch (OperationCanceledException e)
{
_log?.Warn($"Consume warning: {e.Message}");
_log?.Warn("Consume warning: {exception}", e);
resume = false;
consumeResult = null;
}
catch (ConsumeException e)
{
_log?.Error($"Consume error: {e.Error}");
_log?.Error("Consume error: {error}", e.Error);
}
} while (resume);

Expand Down Expand Up @@ -178,18 +178,20 @@ internal void CreateTopics(params string[] topics)
}
else
{
_log.Warn($"The consumer configuration specifies \"AllowAutoCreateTopics = true\", so topics were automatically created: {string.Join(",", missingTopics)}!\nIt is better that the topics are not created by the bus.");
_log.Warn("The consumer configuration specifies \"AllowAutoCreateTopics = true\", so topics were automatically created: {topics}!\nIt is better that the topics are not created by the bus.",
string.Join(",", missingTopics));
}
}
catch (CreateTopicsException e)
{
_log.Error($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
_log.Error("An error occured creating topic {topic}: {error}", e.Results[0].Topic, e.Results[0].Error);
throw;
}
}
else
{
_log.Warn($"There are not enough topics: {string.Join(",", missingTopics)}. Create them using the built-in tools. If you enable \"Allow Auto Create Topics = true\" in the consumer configuration, then the bus transport will create these topics automatically, but this is NOT recommended in production!");
_log.Warn("There are not enough topics: {topics}. Create them using the built-in tools. If you enable \"Allow Auto Create Topics = true\" in the consumer configuration, then the bus transport will create these topics automatically, but this is NOT recommended in production!",
string.Join(",", missingTopics));
}
}
}
Expand Down Expand Up @@ -242,16 +244,19 @@ Result CommitIncrementedOffset(IReadOnlyList<TopicPartitionOffset> tpos)
private void ConsumerOnLogHandler(IConsumer<string, byte[]> sender, LogMessage logMessage)
{
if (!logMessage.Message.Contains("MessageSet size 0, error \"Success\""))//Чтобы не видеть сообщений о пустых чтениях
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} Consuming from Kafka. Client: '{logMessage.Name}', message: '{logMessage.Message}'.");
_log.Debug("Thread #{threadId} Consuming from Kafka. Client: '{clientName}', message: '{message}'.",
Thread.CurrentThread.ManagedThreadId, logMessage.Name, logMessage.Message);
}

private void ConsumerOnStatisticsHandler(IConsumer<string, byte[]> sender, string json)
=> _log.Info($"Thread #{Thread.CurrentThread.ManagedThreadId} Consumer statistics: {json}");
=> _log.Info("Thread #{threadId} Consumer statistics: {json}",
Thread.CurrentThread.ManagedThreadId, json);

private void ConsumerOnErrorHandler(IConsumer<string, byte[]> sender, Error error)
{
if (!error.IsFatal)
_log.Warn($"Thread #{Thread.CurrentThread.ManagedThreadId} Consumer error: {error}. No action required.");
_log.Warn("Thread #{threadId} Consumer error: {error}. No action required.",
Thread.CurrentThread.ManagedThreadId, error);
else
{
var values = sender.Assignment;
Expand All @@ -268,15 +273,16 @@ private void ConsumerOnErrorHandler(IConsumer<string, byte[]> sender, Error erro

private void ConsumerOnPartitionsAssignedHandler(IConsumer<string, byte[]> sender, List<TopicPartition> partitions)
{
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} Assigned partitions: \n\t{string.Join("\n\t", partitions.Select(p => $"Topic:\"{p.Topic}\" Partition:{p.Partition.Value}"))}]");
_log.Debug("Thread #{threadId} Assigned partitions: \n\t{partitions}]",
Thread.CurrentThread.ManagedThreadId, string.Join("\n\t", partitions.Select(p => $"Topic:\"{p.Topic}\" Partition:{p.Partition.Value}")));
if (_waitAssigned.Count > 0)
{
var topics = partitions.Select(p => p.Topic).Distinct();
var keys = _waitAssigned.Keys.Where(k => !k.Except(topics).Any());
foreach (var key in keys)
{
_waitAssigned.TryRemove(key, out var task);
_log.Info($"Subscribe on \"{task.Key}\"");
_log.Info("Subscribe on \"{topic}\"", task.Key);
Task.Run(() => task.Value.SetResult(true));
}
}
Expand All @@ -286,15 +292,16 @@ private void ConsumerOnPartitionsAssignedHandler(IConsumer<string, byte[]> sende

private void ConsumerOnPartitionsRevokedHandler(IConsumer<string, byte[]> sender, List<TopicPartitionOffset> partitionOffsets)
{
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} Revoked partitions: \n\t{string.Join("\n\t", partitionOffsets.Select(p => $"Topic:\"{p.Topic}\" Partition:{p.Partition.Value}"))}]");
_log.Debug("Thread #{threadId} Revoked partitions: \n\t{partitions}]",
Thread.CurrentThread.ManagedThreadId, string.Join("\n\t", partitionOffsets.Select(p => $"Topic:\"{p.Topic}\" Partition:{p.Partition.Value}")));
if (_waitRevoked.Count > 0)
{
var topics = partitionOffsets.Select(p => p.Topic).Distinct();
var keys = _waitRevoked.Keys.Where(k => !k.Except(topics).Any());
foreach (var key in keys)
{
_waitRevoked.TryRemove(key, out var task);
_log.Info($"Unsubscribe from \"{task.Key}\"");
_log.Info("Unsubscribe from \"{topic}\"", task.Key);
Task.Run(() => task.Value.SetResult(true));
}
}
Expand All @@ -304,13 +311,15 @@ private void ConsumerOnPartitionsRevokedHandler(IConsumer<string, byte[]> sender
private void ConsumerOnPartitionsLostHandler(IConsumer<string, byte[]> consumer, List<TopicPartitionOffset> topicPartitionOffsets)
{
var tpoView = topicPartitionOffsets.Select(t => $"Topic: {t.Topic}, Partition: {t.Partition}, Offset: {t.Offset}");
_log.Warn($"Thread #{Thread.CurrentThread.ManagedThreadId} Partitions lost: \n\t{string.Join("\n\t", tpoView)}");
_log.Warn("Thread #{threadId} Partitions lost: \n\t{partitions}",
Thread.CurrentThread.ManagedThreadId, string.Join("\n\t", tpoView));
}

private void ConsumerOnOffsetsCommittedHandler(IConsumer<string, byte[]> consumer, CommittedOffsets committedOffsets)
{
var tpoView = committedOffsets.Offsets.Select(t => $"Topic: {t.Topic}, Partition: {t.Partition}, Offset: {t.Offset}");
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} Offsets committed: \n\t{string.Join("\n\t", tpoView)}");
_log.Debug("Thread #{threadId} Offsets committed: \n\t{partitions}",
Thread.CurrentThread.ManagedThreadId, string.Join("\n\t", tpoView));
}

#endregion
Expand Down Expand Up @@ -442,7 +451,8 @@ protected virtual void Dispose(bool disposing)
}
catch (Exception) { /* ignored */ }
_consumer?.Close();
_log.Info($"Closed consumer BootstrapServers:{_config.BootstrapServers}, gropId: {_config.GroupId}.");
_log.Info("Closed consumer BootstrapServers:{bootstrapServers}, gropId: {groupId}.",
_config.BootstrapServers, _config.GroupId);
_consumer?.Dispose();
}
isDisposed = true;
Expand Down
22 changes: 13 additions & 9 deletions Rebus.Kafka/Dispatcher/CommitDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ internal Result AppendMessage(TransportMessage message, TopicPartitionOffset top
if (_messageInfos.TryAdd(messageId, new ProcessedMessage(topicPartitionOffset, MessageProcessingStatuses.Processing)))
{
#if DEBUG
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} AppendMessage message: {messageId}.{DicpatcherStateToStrting()}");
_log.Debug("Thread #{threadId} AppendMessage message: {messageId}.{dispatcherState}",
Thread.CurrentThread.ManagedThreadId, messageId, DispatcherStateToStrting());
#endif
return Result.Ok();
}
else
{
return Result.Fail($"Already exist {messageId} in {DicpatcherStateToStrting()}");
return Result.Fail($"Already exist {messageId} in {DispatcherStateToStrting()}");
}
}

Expand All @@ -45,7 +46,8 @@ internal Result Completing(TransportMessage message)
if (_messageInfos.TryUpdate(messageId, new ProcessedMessage(oldProcessedMessage.TopicPartitionOffset, MessageProcessingStatuses.Completed), oldProcessedMessage))
{
#if DEBUG
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} Completing message: {messageId}.{DicpatcherStateToStrting()}");
_log.Debug("Thread #{threadId} Completing message: {messageId}.{dispatcherState}",
Thread.CurrentThread.ManagedThreadId, messageId, DispatcherStateToStrting());
#endif
if (_messageInfos.Count >= _behaviorConfig.CommitPeriod && TryGetOffsetsThatCanBeCommit(out var tpos))
{
Expand Down Expand Up @@ -74,12 +76,13 @@ internal Result Reprocessing(TransportMessage message)
if (_messageInfos.TryUpdate(messageId, newProcessedMessage, oldProcessedMessage))
{
#if DEBUG
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} Reprocessing message: {messageId}.{DicpatcherStateToStrting()}");
_log.Debug("Thread #{threadId} Reprocessing message: {messageId}.{dispatcherState}",
Thread.CurrentThread.ManagedThreadId, messageId, DispatcherStateToStrting());
#endif
return Result.Ok();
}
}
return Result.Fail($"No such message: {message.ToReadableText()}.{DicpatcherStateToStrting()}");
return Result.Fail($"No such message: {message.ToReadableText()}.{DispatcherStateToStrting()}");
}

internal bool TryConsumeMessageToRestarted(out TransportMessage reprocessMessage)
Expand All @@ -94,7 +97,8 @@ internal bool TryConsumeMessageToRestarted(out TransportMessage reprocessMessage
{
reprocessMessage = reprocessMessageInfo.Value.Message;
#if DEBUG
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} TryConsumeMessageToRestarted message: {reprocessMessageInfo.Key}.{DicpatcherStateToStrting()}");
_log.Debug("Thread #{threadId} TryConsumeMessageToRestarted message: {messageId}.{dispatcherState}",
Thread.CurrentThread.ManagedThreadId, reprocessMessageInfo.Key, DispatcherStateToStrting());
#endif
return true;
}
Expand Down Expand Up @@ -134,14 +138,14 @@ internal bool TryGetOffsetsThatCanBeCommit(out List<TopicPartitionOffset> tpos)
if (tpos.Count > 0)
{
#if DEBUG
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} TryCommitLastBlock offsets:\n\t{string.Join(",\n\t", tpos.Select(tpo => $"Topic:{tpo.Topic}, Partition:{tpo.Partition.Value}, Offset:{tpo.Offset.Value}"))}.{DicpatcherStateToStrting()}");
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} TryCommitLastBlock offsets:\n\t{string.Join(",\n\t", tpos.Select(tpo => $"Topic:{tpo.Topic}, Partition:{tpo.Partition.Value}, Offset:{tpo.Offset.Value}"))}.{DispatcherStateToStrting()}");
#endif
return true;
}
else
{
//#if DEBUG
// _log.Debug($"CommitDispatcher.TryCommitLastBlock there is nothing to commit.{DicpatcherStateToStrting()}");
// _log.Debug($"CommitDispatcher.TryCommitLastBlock there is nothing to commit.{dispatcherStateToStrting()}");
//#endif
return false;
}
Expand Down Expand Up @@ -172,7 +176,7 @@ internal CommitDispatcher(IRebusLoggerFactory rebusLoggerFactory, ConsumerBehavi
_behaviorConfig = behaviorConfig;
}

private string DicpatcherStateToStrting()
private string DispatcherStateToStrting()
{
StringBuilder sb = new StringBuilder();
sb.AppendLine("\nMessage infos:");
Expand Down
22 changes: 14 additions & 8 deletions Rebus.Kafka/KafkaTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ protected override async Task SendOutgoingMessages(IEnumerable<OutgoingTransport
{
const int waitSecont = 300; //5 minutes
int count = waitSecont * 10;
_log.Info($"Thread #{Thread.CurrentThread.ManagedThreadId} Start waiting for the initialization to complete for {count / 600:N0} minutes...");
_log.Info($"Thread #{{threadId}} Start waiting for the initialization to complete for {count / 600:N0} minutes...",
Thread.CurrentThread.ManagedThreadId);
while (_queueSubscriptionStorage?.IsInitialized == false)
{
Thread.Sleep(100);
Expand All @@ -60,7 +61,8 @@ protected override async Task SendOutgoingMessages(IEnumerable<OutgoingTransport
+ " Try pausing before sending the first message, or handling this exception in a"
+ " loop to wait for the consumer's subscription to your queue to complete.");
}
_log.Info($"Thread #{Thread.CurrentThread.ManagedThreadId} The transport initialization is complete.");
_log.Info("Thread #{threadId} The transport initialization is complete.",
Thread.CurrentThread.ManagedThreadId);
}
}
await Task.WhenAll(outgoingMessages.GroupBy(m => new { m.DestinationAddress }).Select(async group =>
Expand All @@ -82,7 +84,8 @@ protected override async Task SendOutgoingMessages(IEnumerable<OutgoingTransport
throw new InvalidOperationException($"The message could not be sent. Try to resend the message: {outgoingMessage.TransportMessage.ToReadableText()}");
}
#if DEBUG
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} the following message was sent to the topic \"{outgoingMessage.DestinationAddress}\" in t: {outgoingMessage.TransportMessage.ToReadableText()}");
_log.Debug("Thread #{threadId} the following message was sent to the topic \"{topic}\" in t: {message}",
Thread.CurrentThread.ManagedThreadId, outgoingMessage.DestinationAddress, outgoingMessage.TransportMessage.ToReadableText());
#endif
}
catch (Exception ex)
Expand Down Expand Up @@ -110,15 +113,17 @@ public override async Task<TransportMessage> Receive(ITransactionContext context
context.OnAck(tc =>
{
#if DEBUG
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} context.OnAck : {receivedMessage.ToReadableText()}");
_log.Debug("Thread #{threadId} context.OnAck : {message}",
Thread.CurrentThread.ManagedThreadId, receivedMessage.ToReadableText());
#endif
_queueSubscriptionStorage.Ack(receivedMessage);
return Task.CompletedTask;
});
context.OnNack(tc =>
{
#if DEBUG
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} context.OnNack : {receivedMessage.ToReadableText()}");
_log.Debug("Thread #{threadId} context.OnNack : {message}",
Thread.CurrentThread.ManagedThreadId, receivedMessage.ToReadableText());
#endif
_queueSubscriptionStorage.Nack(receivedMessage);
return Task.CompletedTask;
Expand Down Expand Up @@ -161,7 +166,8 @@ public async Task UnregisterSubscriber(string topic, string subscriberAddress)
/// <summary>Initializes the transport by ensuring that the input queue has been created</summary>
public void Initialize()
{
_log.Info($"Thread #{Thread.CurrentThread.ManagedThreadId} Initializing Kafka transport with queue \"{Address}\"");
_log.Info("Thread #{threadId} Initializing Kafka transport with queue \"{queue}\"",
Thread.CurrentThread.ManagedThreadId, Address);
var builder = new ProducerBuilder<string, byte[]>(_producerConfig)
.SetKeySerializer(Serializers.Utf8)
.SetValueSerializer(Serializers.ByteArray)
Expand All @@ -178,9 +184,9 @@ public void Initialize()
{
string directory = System.IO.Path.GetDirectoryName(System.Reflection.Assembly.GetEntryAssembly().GetName().CodeBase.Substring(8));
var pathToLibrd = System.IO.Path.Combine(directory, $"librdkafka\\{(Environment.Is64BitOperatingSystem ? "x64" : "x86")}\\librdkafka.dll");
_log.Info($"librdkafka is not loaded. Trying to load {pathToLibrd}");
_log.Info("librdkafka is not loaded. Trying to load {pathToLibrd}", pathToLibrd);
Library.Load(pathToLibrd);
_log.Info($"Using librdkafka version: {Library.Version}");
_log.Info("Using librdkafka version: {version}", Library.Version);
}
_producer = builder.Build();
}
Expand Down

0 comments on commit 1603550

Please sign in to comment.