diff --git a/Directory.Packages.props b/Directory.Packages.props
index e744c4f64..a6c8c627c 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -4,19 +4,19 @@
false
-
+
-
-
-
-
-
-
+
+
+
+
+
+
-
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
@@ -24,47 +24,44 @@
-
+
-
-
+
+
-
-
+
+
-
-
+
+
-
+
-
+
-
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
-
+
+
-
-
-
-
-
-
-
+
+
+
+
+
@@ -76,11 +73,11 @@
-
+
-
-
-
+
+
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
@@ -113,13 +110,13 @@
-
-
-
-
-
-
-
+
+
+
+
+
+
+
diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs
index eda892306..84194f79a 100644
--- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs
+++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs
@@ -168,32 +168,58 @@ public class KafkaMessageConsumer : KafkaMessagingGateway, IAmAMessageConsumer
_readCommittedOffsetsTimeoutMs = readCommittedOffsetsTimeoutMs;
_consumer = new ConsumerBuilder(_consumerConfig)
- .SetPartitionsAssignedHandler((consumer, list) =>
+ .SetPartitionsAssignedHandler((consumer, partitions) =>
{
- var partitions = list.Select(p => $"{p.Topic} : {p.Partition.Value}");
+ var partitionInfo = partitions.Select(p => $"{p.Topic} : {p.Partition.Value}");
s_logger.LogInformation("Partition Added {Channels}", String.Join(",", partitions));
- _partitions.AddRange(list);
+ // Determine strategy and act accordingly
+ if (_consumerConfig.PartitionAssignmentStrategy == PartitionAssignmentStrategy.CooperativeSticky)
+ consumer.IncrementalAssign(partitions);
+ else
+ consumer.Assign(partitions);
})
- .SetPartitionsRevokedHandler((consumer, list) =>
+ .SetPartitionsRevokedHandler((consumer, partitions) =>
{
- consumer.Commit(list);
- var revokedPartitions = list.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();
+ //We should commit any offsets we have stored for these partitions
+ try
+ {
+ _consumer?.Commit(partitions);
+ }
+ catch (KafkaException error)
+ {
+ s_logger.LogError(
+ "Error Committing Offsets During Partition Revoke: {Message} Code: {ErrorCode}, Reason: {ErrorMessage}, Fatal: {FatalError}",
+ error.Message, error.Error.Code, error.Error.Reason, error.Error.IsFatal
+ );
+ }
+
+ var revokedPartitions = partitions.Select(tpo => tpo.TopicPartition);
+ var revokedPartitionInfo = partitions.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();
- s_logger.LogInformation("Partitions for consumer revoked {Channels}", string.Join(",", revokedPartitions));
+ s_logger.LogInformation("Partitions for consumer revoked {Channels}", string.Join(",", revokedPartitionInfo));
- _partitions = _partitions.Where(tp => list.All(tpo => tpo.TopicPartition != tp)).ToList();
- })
- .SetPartitionsLostHandler((consumer, list) =>
+ // Determine strategy and act accordingly
+ if (_consumerConfig.PartitionAssignmentStrategy == PartitionAssignmentStrategy.CooperativeSticky)
+ consumer.IncrementalUnassign(revokedPartitions );
+ else
+ consumer.Unassign();
+
+ _partitions = _partitions.Where(tp => partitions.All(tpo => tpo.TopicPartition != tp)).ToList();
+ })
+ .SetPartitionsLostHandler((consumer, partitions) =>
{
- var lostPartitions = list.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();
+ var lostPartitions = partitions.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();
s_logger.LogInformation("Partitions for consumer lost {Channels}", string.Join(",", lostPartitions));
- _partitions = _partitions.Where(tp => list.All(tpo => tpo.TopicPartition != tp)).ToList();
+ _partitions = _partitions.Where(tp => partitions.All(tpo => tpo.TopicPartition != tp)).ToList();
+
+ // This is typically treated the same as revocation
+ consumer.IncrementalUnassign(_partitions);
})
- .SetErrorHandler((consumer, error) =>
+ .SetErrorHandler((_, error) =>
{
s_logger.LogError("Code: {ErrorCode}, Reason: {ErrorMessage}, Fatal: {FatalError}", error.Code,
error.Reason, error.IsFatal);
@@ -237,7 +263,6 @@ public void Acknowledge(Message message)
s_logger.LogInformation("Storing offset {Offset} to topic {Topic} for partition {ChannelName}",
new Offset(topicPartitionOffset.Offset + 1).Value, topicPartitionOffset.TopicPartition.Topic,
topicPartitionOffset.TopicPartition.Partition.Value);
- _consumer.StoreOffset(offset);
_offsetStorage.Add(offset);
if (_offsetStorage.Count % _maxBatchSize == 0)
@@ -294,14 +319,14 @@ public Message[] Receive(int timeoutInMilliseconds)
{
CheckHasPartitions();
- s_logger.LogDebug("No messages available from Kafka stream");
- return new Message[] {new Message()};
+ s_logger.LogDebug($"No messages available from Kafka stream");
+ return new[] {new Message()};
}
if (consumeResult.IsPartitionEOF)
{
s_logger.LogDebug("Consumer {ConsumerMemberId} has reached the end of the partition", _consumer.MemberId);
- return new Message[] {new Message()};
+ return new[] {new Message()};
}
s_logger.LogDebug("Usable message retrieved from Kafka stream: {Request}", consumeResult.Message.Value);
@@ -358,15 +383,10 @@ public bool Requeue(Message message, int delayMilliseconds)
return false;
}
- private bool CheckHasPartitions()
+ private void CheckHasPartitions()
{
if (_partitions.Count <= 0)
- {
s_logger.LogDebug("Consumer is not allocated any partitions");
- return false;
- }
-
- return true;
}
@@ -491,7 +511,7 @@ private void FlushOffsets()
{
//This is expensive, so use a background thread
Task.Factory.StartNew(
- action: state => CommitOffsets(),
+ action: _ => CommitOffsets(),
state: now,
cancellationToken: CancellationToken.None,
creationOptions: TaskCreationOptions.DenyChildAttach,
diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_Is_asked_to_connect_a_channel_and_handler_async.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_Is_asked_to_connect_a_channel_and_handler_async.cs
index fc7d727e9..81d3e0847 100644
--- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_Is_asked_to_connect_a_channel_and_handler_async.cs
+++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_dispatcher_Is_asked_to_connect_a_channel_and_handler_async.cs
@@ -54,9 +54,8 @@ public MessageDispatcherRoutingAsyncTests()
_dispatcher.State.Should().Be(DispatcherState.DS_AWAITING);
_dispatcher.Receive();
}
-#pragma warning disable xUnit1031
- [Fact]
+ [Fact()]
public void When_a_message_dispatcher_is_asked_to_connect_a_channel_and_handler_async()
{
Task.Delay(5000).Wait();
diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_is_dispatched_it_should_reach_a_handler_async.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_is_dispatched_it_should_reach_a_handler_async.cs
index 37f17d04a..4839265fe 100644
--- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_is_dispatched_it_should_reach_a_handler_async.cs
+++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/When_a_message_is_dispatched_it_should_reach_a_handler_async.cs
@@ -46,7 +46,7 @@ public MessagePumpDispatchAsyncTests()
channel.Enqueue(quitMessage);
}
- [Fact]
+ [Fact()]
public void When_a_message_is_dispatched_it_should_reach_a_handler_async()
{
_messagePump.Run();