Skip to content

Commit

Permalink
Fixes for issue #3056
Browse files Browse the repository at this point in the history
  • Loading branch information
iancooper committed Apr 16, 2024
1 parent baa8a5f commit cacb359
Showing 1 changed file with 43 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,32 +157,58 @@ public class KafkaMessageConsumer : KafkaMessagingGateway, IAmAMessageConsumer
_readCommittedOffsetsTimeoutMs = readCommittedOffsetsTimeoutMs;

_consumer = new ConsumerBuilder<string, byte[]>(_consumerConfig)
.SetPartitionsAssignedHandler((consumer, list) =>
.SetPartitionsAssignedHandler((consumer, partitions) =>
{
var partitions = list.Select(p => $"{p.Topic} : {p.Partition.Value}");
var partitionInfo = partitions.Select(p => $"{p.Topic} : {p.Partition.Value}");
s_logger.LogInformation("Partition Added {Channels}", String.Join(",", partitions));
_partitions.AddRange(list);
// Determine strategy and act accordingly
if (_consumerConfig.PartitionAssignmentStrategy == PartitionAssignmentStrategy.CooperativeSticky)
consumer.IncrementalAssign(partitions);
else
consumer.Assign(partitions);
})
.SetPartitionsRevokedHandler((consumer, list) =>
.SetPartitionsRevokedHandler((consumer, partitions) =>
{
_consumer.Commit(list);
var revokedPartitions = list.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();
//We should commit any offsets we have stored for these partitions
try
{
_consumer?.Commit(partitions);
}
catch (KafkaException error)
{
s_logger.LogError(
"Error Committing Offsets During Partition Revoke: {Message} Code: {ErrorCode}, Reason: {ErrorMessage}, Fatal: {FatalError}",
error.Message, error.Error.Code, error.Error.Reason, error.Error.IsFatal
);
}
var revokedPartitions = partitions.Select(tpo => tpo.TopicPartition);
var revokedPartitionInfo = partitions.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();
s_logger.LogInformation("Partitions for consumer revoked {Channels}", string.Join(",", revokedPartitions));
s_logger.LogInformation("Partitions for consumer revoked {Channels}", string.Join(",", revokedPartitionInfo));
_partitions = _partitions.Where(tp => list.All(tpo => tpo.TopicPartition != tp)).ToList();
})
.SetPartitionsLostHandler((consumer, list) =>
// Determine strategy and act accordingly
if (_consumerConfig.PartitionAssignmentStrategy == PartitionAssignmentStrategy.CooperativeSticky)
consumer.IncrementalUnassign(revokedPartitions );
else
consumer.Unassign();
_partitions = _partitions.Where(tp => partitions.All(tpo => tpo.TopicPartition != tp)).ToList();
})
.SetPartitionsLostHandler((consumer, partitions) =>
{
var lostPartitions = list.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();
var lostPartitions = partitions.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();
s_logger.LogInformation("Partitions for consumer lost {Channels}", string.Join(",", lostPartitions));
_partitions = _partitions.Where(tp => list.All(tpo => tpo.TopicPartition != tp)).ToList();
_partitions = _partitions.Where(tp => partitions.All(tpo => tpo.TopicPartition != tp)).ToList();
// This is typically treated the same as revocation
consumer.IncrementalUnassign(_partitions);
})
.SetErrorHandler((consumer, error) =>
.SetErrorHandler((_, error) =>
{
s_logger.LogError("Code: {ErrorCode}, Reason: {ErrorMessage}, Fatal: {FatalError}", error.Code,
error.Reason, error.IsFatal);
Expand Down Expand Up @@ -248,8 +274,6 @@ public void Acknowledge(Message message)
/// There is no 'queue' to purge in Kafka, so we treat this as moving past to the offset to tne end of any assigned partitions,
/// thus skipping over anything that exists at that point.
/// </summary>
/// <param name="message">The message.</param>
/// <param name="requeue">if set to <c>true</c> [requeue].</param>
public void Purge()
{
if (!_consumer.Assignment.Any())
Expand Down Expand Up @@ -285,13 +309,13 @@ public Message[] Receive(int timeoutInMilliseconds)
CheckHasPartitions();

s_logger.LogDebug($"No messages available from Kafka stream");
return new Message[] {new Message()};
return new[] {new Message()};
}

if (consumeResult.IsPartitionEOF)
{
s_logger.LogDebug("Consumer {ConsumerMemberId} has reached the end of the partition", _consumer.MemberId);
return new Message[] {new Message()};
return new[] {new Message()};
}

s_logger.LogDebug("Usable message retrieved from Kafka stream: {Request}", consumeResult.Message.Value);
Expand Down Expand Up @@ -332,7 +356,6 @@ public Message[] Receive(int timeoutInMilliseconds)
/// Rejects the specified message. This is just a commit of the offset to move past the record without processing it
/// </summary>
/// <param name="message">The message.</param>
/// <param name="requeue">if set to <c>true</c> [requeue].</param>
public void Reject(Message message)
{
Acknowledge(message);
Expand All @@ -349,15 +372,10 @@ public bool Requeue(Message message, int delayMilliseconds)
return false;
}

private bool CheckHasPartitions()
private void CheckHasPartitions()
{
if (_partitions.Count <= 0)
{
s_logger.LogDebug("Consumer is not allocated any partitions");
return false;
}

return true;
}


Expand Down Expand Up @@ -482,7 +500,7 @@ private void FlushOffsets()
{
//This is expensive, so use a background thread
Task.Factory.StartNew(
action: state => CommitOffsets(),
action: _ => CommitOffsets(),
state: now,
cancellationToken: CancellationToken.None,
creationOptions: TaskCreationOptions.DenyChildAttach,
Expand Down

0 comments on commit cacb359

Please sign in to comment.