Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align the logging approach with Microsoft and Rebus guidelines. #13

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions Rebus.Kafka/Core/KafkaSubscriptionStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ internal Task<TransportMessage> Receive(ITransactionContext context, Cancellatio
}
catch (OperationCanceledException e)
{
_log?.Warn($"Consume warning: {e.Message}");
_log?.Warn("Consume warning: {exception}", e);
resume = false;
consumeResult = null;
}
catch (ConsumeException e)
{
_log?.Error($"Consume error: {e.Error}");
_log?.Error("Consume error: {error}", e.Error);
}
} while (resume);

Expand Down Expand Up @@ -178,18 +178,20 @@ internal void CreateTopics(params string[] topics)
}
else
{
_log.Warn($"The consumer configuration specifies \"AllowAutoCreateTopics = true\", so topics were automatically created: {string.Join(",", missingTopics)}!\nIt is better that the topics are not created by the bus.");
_log.Warn("The consumer configuration specifies \"AllowAutoCreateTopics = true\", so topics were automatically created: {topics}!\nIt is better that the topics are not created by the bus.",
string.Join(",", missingTopics));
}
}
catch (CreateTopicsException e)
{
_log.Error($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
_log.Error("An error occured creating topic {topic}: {error}", e.Results[0].Topic, e.Results[0].Error);
throw;
}
}
else
{
_log.Warn($"There are not enough topics: {string.Join(",", missingTopics)}. Create them using the built-in tools. If you enable \"Allow Auto Create Topics = true\" in the consumer configuration, then the bus transport will create these topics automatically, but this is NOT recommended in production!");
_log.Warn("There are not enough topics: {topics}. Create them using the built-in tools. If you enable \"Allow Auto Create Topics = true\" in the consumer configuration, then the bus transport will create these topics automatically, but this is NOT recommended in production!",
string.Join(",", missingTopics));
}
}
}
Expand Down Expand Up @@ -242,16 +244,19 @@ Result CommitIncrementedOffset(IReadOnlyList<TopicPartitionOffset> tpos)
private void ConsumerOnLogHandler(IConsumer<string, byte[]> sender, LogMessage logMessage)
{
if (!logMessage.Message.Contains("MessageSet size 0, error \"Success\""))//Чтобы не видеть сообщений о пустых чтениях
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} Consuming from Kafka. Client: '{logMessage.Name}', message: '{logMessage.Message}'.");
_log.Debug("Thread #{threadId} Consuming from Kafka. Client: '{clientName}', message: '{message}'.",
Thread.CurrentThread.ManagedThreadId, logMessage.Name, logMessage.Message);
}

private void ConsumerOnStatisticsHandler(IConsumer<string, byte[]> sender, string json)
=> _log.Info($"Thread #{Thread.CurrentThread.ManagedThreadId} Consumer statistics: {json}");
=> _log.Info("Thread #{threadId} Consumer statistics: {json}",
Thread.CurrentThread.ManagedThreadId, json);

private void ConsumerOnErrorHandler(IConsumer<string, byte[]> sender, Error error)
{
if (!error.IsFatal)
_log.Warn($"Thread #{Thread.CurrentThread.ManagedThreadId} Consumer error: {error}. No action required.");
_log.Warn("Thread #{threadId} Consumer error: {error}. No action required.",
Thread.CurrentThread.ManagedThreadId, error);
else
{
var values = sender.Assignment;
Expand All @@ -268,15 +273,16 @@ private void ConsumerOnErrorHandler(IConsumer<string, byte[]> sender, Error erro

private void ConsumerOnPartitionsAssignedHandler(IConsumer<string, byte[]> sender, List<TopicPartition> partitions)
{
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} Assigned partitions: \n\t{string.Join("\n\t", partitions.Select(p => $"Topic:\"{p.Topic}\" Partition:{p.Partition.Value}"))}]");
_log.Debug("Thread #{threadId} Assigned partitions: \n\t{partitions}]",
Thread.CurrentThread.ManagedThreadId, string.Join("\n\t", partitions.Select(p => $"Topic:\"{p.Topic}\" Partition:{p.Partition.Value}")));
if (_waitAssigned.Count > 0)
{
var topics = partitions.Select(p => p.Topic).Distinct();
var keys = _waitAssigned.Keys.Where(k => !k.Except(topics).Any());
foreach (var key in keys)
{
_waitAssigned.TryRemove(key, out var task);
_log.Info($"Subscribe on \"{task.Key}\"");
_log.Info("Subscribe on \"{topic}\"", task.Key);
Task.Run(() => task.Value.SetResult(true));
}
}
Expand All @@ -286,15 +292,16 @@ private void ConsumerOnPartitionsAssignedHandler(IConsumer<string, byte[]> sende

private void ConsumerOnPartitionsRevokedHandler(IConsumer<string, byte[]> sender, List<TopicPartitionOffset> partitionOffsets)
{
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} Revoked partitions: \n\t{string.Join("\n\t", partitionOffsets.Select(p => $"Topic:\"{p.Topic}\" Partition:{p.Partition.Value}"))}]");
_log.Debug("Thread #{threadId} Revoked partitions: \n\t{partitions}]",
Thread.CurrentThread.ManagedThreadId, string.Join("\n\t", partitionOffsets.Select(p => $"Topic:\"{p.Topic}\" Partition:{p.Partition.Value}")));
if (_waitRevoked.Count > 0)
{
var topics = partitionOffsets.Select(p => p.Topic).Distinct();
var keys = _waitRevoked.Keys.Where(k => !k.Except(topics).Any());
foreach (var key in keys)
{
_waitRevoked.TryRemove(key, out var task);
_log.Info($"Unsubscribe from \"{task.Key}\"");
_log.Info("Unsubscribe from \"{topic}\"", task.Key);
Task.Run(() => task.Value.SetResult(true));
}
}
Expand All @@ -304,13 +311,15 @@ private void ConsumerOnPartitionsRevokedHandler(IConsumer<string, byte[]> sender
private void ConsumerOnPartitionsLostHandler(IConsumer<string, byte[]> consumer, List<TopicPartitionOffset> topicPartitionOffsets)
{
var tpoView = topicPartitionOffsets.Select(t => $"Topic: {t.Topic}, Partition: {t.Partition}, Offset: {t.Offset}");
_log.Warn($"Thread #{Thread.CurrentThread.ManagedThreadId} Partitions lost: \n\t{string.Join("\n\t", tpoView)}");
_log.Warn("Thread #{threadId} Partitions lost: \n\t{partitions}",
Thread.CurrentThread.ManagedThreadId, string.Join("\n\t", tpoView));
}

private void ConsumerOnOffsetsCommittedHandler(IConsumer<string, byte[]> consumer, CommittedOffsets committedOffsets)
{
var tpoView = committedOffsets.Offsets.Select(t => $"Topic: {t.Topic}, Partition: {t.Partition}, Offset: {t.Offset}");
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} Offsets committed: \n\t{string.Join("\n\t", tpoView)}");
_log.Debug("Thread #{threadId} Offsets committed: \n\t{partitions}",
Thread.CurrentThread.ManagedThreadId, string.Join("\n\t", tpoView));
}

#endregion
Expand Down Expand Up @@ -442,7 +451,8 @@ protected virtual void Dispose(bool disposing)
}
catch (Exception) { /* ignored */ }
_consumer?.Close();
_log.Info($"Closed consumer BootstrapServers:{_config.BootstrapServers}, gropId: {_config.GroupId}.");
_log.Info("Closed consumer BootstrapServers:{bootstrapServers}, gropId: {groupId}.",
_config.BootstrapServers, _config.GroupId);
_consumer?.Dispose();
}
isDisposed = true;
Expand Down
22 changes: 13 additions & 9 deletions Rebus.Kafka/Dispatcher/CommitDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ internal Result AppendMessage(TransportMessage message, TopicPartitionOffset top
if (_messageInfos.TryAdd(messageId, new ProcessedMessage(topicPartitionOffset, MessageProcessingStatuses.Processing)))
{
#if DEBUG
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} AppendMessage message: {messageId}.{DicpatcherStateToStrting()}");
_log.Debug("Thread #{threadId} AppendMessage message: {messageId}.{dispatcherState}",
Thread.CurrentThread.ManagedThreadId, messageId, DispatcherStateToStrting());
#endif
return Result.Ok();
}
else
{
return Result.Fail($"Already exist {messageId} in {DicpatcherStateToStrting()}");
return Result.Fail($"Already exist {messageId} in {DispatcherStateToStrting()}");
}
}

Expand All @@ -45,7 +46,8 @@ internal Result Completing(TransportMessage message)
if (_messageInfos.TryUpdate(messageId, new ProcessedMessage(oldProcessedMessage.TopicPartitionOffset, MessageProcessingStatuses.Completed), oldProcessedMessage))
{
#if DEBUG
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} Completing message: {messageId}.{DicpatcherStateToStrting()}");
_log.Debug("Thread #{threadId} Completing message: {messageId}.{dispatcherState}",
Thread.CurrentThread.ManagedThreadId, messageId, DispatcherStateToStrting());
#endif
if (_messageInfos.Count >= _behaviorConfig.CommitPeriod && TryGetOffsetsThatCanBeCommit(out var tpos))
{
Expand Down Expand Up @@ -74,12 +76,13 @@ internal Result Reprocessing(TransportMessage message)
if (_messageInfos.TryUpdate(messageId, newProcessedMessage, oldProcessedMessage))
{
#if DEBUG
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} Reprocessing message: {messageId}.{DicpatcherStateToStrting()}");
_log.Debug("Thread #{threadId} Reprocessing message: {messageId}.{dispatcherState}",
Thread.CurrentThread.ManagedThreadId, messageId, DispatcherStateToStrting());
#endif
return Result.Ok();
}
}
return Result.Fail($"No such message: {message.ToReadableText()}.{DicpatcherStateToStrting()}");
return Result.Fail($"No such message: {message.ToReadableText()}.{DispatcherStateToStrting()}");
}

internal bool TryConsumeMessageToRestarted(out TransportMessage reprocessMessage)
Expand All @@ -94,7 +97,8 @@ internal bool TryConsumeMessageToRestarted(out TransportMessage reprocessMessage
{
reprocessMessage = reprocessMessageInfo.Value.Message;
#if DEBUG
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} TryConsumeMessageToRestarted message: {reprocessMessageInfo.Key}.{DicpatcherStateToStrting()}");
_log.Debug("Thread #{threadId} TryConsumeMessageToRestarted message: {messageId}.{dispatcherState}",
Thread.CurrentThread.ManagedThreadId, reprocessMessageInfo.Key, DispatcherStateToStrting());
#endif
return true;
}
Expand Down Expand Up @@ -134,14 +138,14 @@ internal bool TryGetOffsetsThatCanBeCommit(out List<TopicPartitionOffset> tpos)
if (tpos.Count > 0)
{
#if DEBUG
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} TryCommitLastBlock offsets:\n\t{string.Join(",\n\t", tpos.Select(tpo => $"Topic:{tpo.Topic}, Partition:{tpo.Partition.Value}, Offset:{tpo.Offset.Value}"))}.{DicpatcherStateToStrting()}");
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} TryCommitLastBlock offsets:\n\t{string.Join(",\n\t", tpos.Select(tpo => $"Topic:{tpo.Topic}, Partition:{tpo.Partition.Value}, Offset:{tpo.Offset.Value}"))}.{DispatcherStateToStrting()}");
#endif
return true;
}
else
{
//#if DEBUG
// _log.Debug($"CommitDispatcher.TryCommitLastBlock there is nothing to commit.{DicpatcherStateToStrting()}");
// _log.Debug($"CommitDispatcher.TryCommitLastBlock there is nothing to commit.{dispatcherStateToStrting()}");
//#endif
return false;
}
Expand Down Expand Up @@ -172,7 +176,7 @@ internal CommitDispatcher(IRebusLoggerFactory rebusLoggerFactory, ConsumerBehavi
_behaviorConfig = behaviorConfig;
}

private string DicpatcherStateToStrting()
private string DispatcherStateToStrting()
{
StringBuilder sb = new StringBuilder();
sb.AppendLine("\nMessage infos:");
Expand Down
22 changes: 14 additions & 8 deletions Rebus.Kafka/KafkaTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ protected override async Task SendOutgoingMessages(IEnumerable<OutgoingTransport
{
const int waitSecont = 300; //5 minutes
int count = waitSecont * 10;
_log.Info($"Thread #{Thread.CurrentThread.ManagedThreadId} Start waiting for the initialization to complete for {count / 600:N0} minutes...");
_log.Info($"Thread #{{threadId}} Start waiting for the initialization to complete for {count / 600:N0} minutes...",
Thread.CurrentThread.ManagedThreadId);
while (_queueSubscriptionStorage?.IsInitialized == false)
{
Thread.Sleep(100);
Expand All @@ -60,7 +61,8 @@ protected override async Task SendOutgoingMessages(IEnumerable<OutgoingTransport
+ " Try pausing before sending the first message, or handling this exception in a"
+ " loop to wait for the consumer's subscription to your queue to complete.");
}
_log.Info($"Thread #{Thread.CurrentThread.ManagedThreadId} The transport initialization is complete.");
_log.Info("Thread #{threadId} The transport initialization is complete.",
Thread.CurrentThread.ManagedThreadId);
}
}
await Task.WhenAll(outgoingMessages.GroupBy(m => new { m.DestinationAddress }).Select(async group =>
Expand All @@ -82,7 +84,8 @@ protected override async Task SendOutgoingMessages(IEnumerable<OutgoingTransport
throw new InvalidOperationException($"The message could not be sent. Try to resend the message: {outgoingMessage.TransportMessage.ToReadableText()}");
}
#if DEBUG
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} the following message was sent to the topic \"{outgoingMessage.DestinationAddress}\" in t: {outgoingMessage.TransportMessage.ToReadableText()}");
_log.Debug("Thread #{threadId} the following message was sent to the topic \"{topic}\" in t: {message}",
Thread.CurrentThread.ManagedThreadId, outgoingMessage.DestinationAddress, outgoingMessage.TransportMessage.ToReadableText());
#endif
}
catch (Exception ex)
Expand Down Expand Up @@ -110,15 +113,17 @@ public override async Task<TransportMessage> Receive(ITransactionContext context
context.OnAck(tc =>
{
#if DEBUG
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} context.OnAck : {receivedMessage.ToReadableText()}");
_log.Debug("Thread #{threadId} context.OnAck : {message}",
Thread.CurrentThread.ManagedThreadId, receivedMessage.ToReadableText());
#endif
_queueSubscriptionStorage.Ack(receivedMessage);
return Task.CompletedTask;
});
context.OnNack(tc =>
{
#if DEBUG
_log.Debug($"Thread #{Thread.CurrentThread.ManagedThreadId} context.OnNack : {receivedMessage.ToReadableText()}");
_log.Debug("Thread #{threadId} context.OnNack : {message}",
Thread.CurrentThread.ManagedThreadId, receivedMessage.ToReadableText());
#endif
_queueSubscriptionStorage.Nack(receivedMessage);
return Task.CompletedTask;
Expand Down Expand Up @@ -161,7 +166,8 @@ public async Task UnregisterSubscriber(string topic, string subscriberAddress)
/// <summary>Initializes the transport by ensuring that the input queue has been created</summary>
public void Initialize()
{
_log.Info($"Thread #{Thread.CurrentThread.ManagedThreadId} Initializing Kafka transport with queue \"{Address}\"");
_log.Info("Thread #{threadId} Initializing Kafka transport with queue \"{queue}\"",
Thread.CurrentThread.ManagedThreadId, Address);
var builder = new ProducerBuilder<string, byte[]>(_producerConfig)
.SetKeySerializer(Serializers.Utf8)
.SetValueSerializer(Serializers.ByteArray)
Expand All @@ -178,9 +184,9 @@ public void Initialize()
{
string directory = System.IO.Path.GetDirectoryName(System.Reflection.Assembly.GetEntryAssembly().GetName().CodeBase.Substring(8));
var pathToLibrd = System.IO.Path.Combine(directory, $"librdkafka\\{(Environment.Is64BitOperatingSystem ? "x64" : "x86")}\\librdkafka.dll");
_log.Info($"librdkafka is not loaded. Trying to load {pathToLibrd}");
_log.Info("librdkafka is not loaded. Trying to load {pathToLibrd}", pathToLibrd);
Library.Load(pathToLibrd);
_log.Info($"Using librdkafka version: {Library.Version}");
_log.Info("Using librdkafka version: {version}", Library.Version);
}
_producer = builder.Build();
}
Expand Down