Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/9 - Bug Fixes and Package Updates #3057

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 36 additions & 39 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,67 +4,64 @@
<CentralPackageTransitivePinningEnabled>false</CentralPackageTransitivePinningEnabled>
</PropertyGroup>
<ItemGroup>
<PackageVersion Include="AWSSDK.DynamoDBv2" Version="3.7.301.19" />
<PackageVersion Include="AWSSDK.DynamoDBv2" Version="3.7.302.15" />
<PackageVersion Include="AWSSDK.Extensions.NETCore.Setup" Version="3.7.300" />
<PackageVersion Include="AWSSDK.S3" Version="3.7.305.31" />
<PackageVersion Include="AWSSDK.SecurityToken" Version="3.7.300.56" />
<PackageVersion Include="AWSSDK.SimpleNotificationService" Version="3.7.301.3" />
<PackageVersion Include="AWSSDK.SQS" Version="3.7.300.55" />
<PackageVersion Include="Azure.Identity" Version="1.10.4" />
<PackageVersion Include="Azure.Messaging.ServiceBus" Version="7.17.4" />
<PackageVersion Include="AWSSDK.S3" Version="3.7.307.15" />
<PackageVersion Include="AWSSDK.SecurityToken" Version="3.7.300.75" />
<PackageVersion Include="AWSSDK.SimpleNotificationService" Version="3.7.301.22" />
<PackageVersion Include="AWSSDK.SQS" Version="3.7.300.74" />
<PackageVersion Include="Azure.Identity" Version="1.11.0" />
<PackageVersion Include="Azure.Messaging.ServiceBus" Version="7.17.5" />
<PackageVersion Include="Azure.Storage.Blobs" Version="12.19.1" />
<PackageVersion Include="Confluent.Kafka" Version="2.3.0" />
<PackageVersion Include="Confluent.SchemaRegistry" Version="2.3.0" />
<PackageVersion Include="Confluent.SchemaRegistry.Serdes.Json" Version="2.3.0" />
<PackageVersion Include="coverlet.collector" Version="6.0.1">
<PackageVersion Include="coverlet.collector" Version="6.0.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageVersion>
<PackageVersion Include="Dapper" Version="2.1.35" />
<PackageVersion Include="Dapper.Contrib" Version="2.0.78" />
<PackageVersion Include="DapperExtensions" Version="1.7.0" />
<PackageVersion Include="EventStore.ClientAPI.NetCore" Version="4.1.0.23" />
<PackageVersion Include="FakeItEasy" Version="8.1.0" />
<PackageVersion Include="FakeItEasy" Version="8.2.0" />
<PackageVersion Include="FakeItEasy.Analyzer.CSharp" Version="6.1.1" />
<PackageVersion Include="FluentAssertions" Version="6.12.0" />
<PackageVersion Include="FluentMigrator" Version="5.1.0" />
<PackageVersion Include="FluentMigrator.Runner" Version="5.1.0" />
<PackageVersion Include="FluentMigrator" Version="5.2.0" />
<PackageVersion Include="FluentMigrator.Runner" Version="5.2.0" />
<PackageVersion Include="MessagePack" Version="2.5.140" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="5.2.0" />
<PackageVersion Include="Microsoft.Data.Sqlite" Version="8.0.2" />
<PackageVersion Include="Microsoft.Data.Sqlite.Core" Version="8.0.2" />
<PackageVersion Include="Microsoft.Data.Sqlite" Version="8.0.4" />
<PackageVersion Include="Microsoft.Data.Sqlite.Core" Version="8.0.4" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Diagnostics.HealthChecks.Abstractions" Version="8.0.2" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.Diagnostics.HealthChecks.Abstractions" Version="8.0.4" />
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Http" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1" />
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageVersion Include="MySqlConnector" Version="2.3.5" />
<PackageVersion Include="MySqlConnector" Version="2.3.6" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
<PackageVersion Include="NJsonSchema" Version="10.9.0" />
<PackageVersion Include="Npgsql" Version="8.0.2" />
<PackageVersion Include="NUnit" Version="4.1.0" />
<PackageVersion Include="NUnit.Analyzers" Version="4.0.1">
<PackageVersion Include="NUnit.Analyzers" Version="4.1.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageVersion>
<PackageVersion Include="NUnit3TestAdapter" Version="4.5.0" />
<PackageVersion Include="OpenTelemetry" Version="1.7.0" />
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="1.7.0" />
<PackageVersion Include="OpenTelemetry.Exporter.InMemory" Version="1.7.0" />
<PackageVersion Include="OpenTelemetry" Version="1.8.0" />
<PackageVersion Include="OpenTelemetry.Exporter.InMemory" Version="1.8.0" />
<PackageVersion Include="OpenTelemetry.Exporter.Jaeger" Version="1.5.1" />
<PackageVersion Include="OpenTelemetry.Exporter.Zipkin" Version="1.7.0" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.7.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.7.1" />
<PackageVersion Include="Paramore.Darker" Version="3.0.0" />
<PackageVersion Include="Paramore.Darker.AspNetCore" Version="3.0.0" />
<PackageVersion Include="Paramore.Darker.Policies" Version="3.0.0" />
<PackageVersion Include="Paramore.Darker.QueryLogging" Version="3.0.0" />
<PackageVersion Include="OpenTelemetry.Exporter.Zipkin" Version="1.8.0" />
<PackageVersion Include="Paramore.Darker" Version="4.0.1" />
<PackageVersion Include="Paramore.Darker.AspNetCore" Version="4.0.1" />
<PackageVersion Include="Paramore.Darker.Policies" Version="4.0.1" />
<PackageVersion Include="Paramore.Darker.QueryLogging" Version="4.0.1" />
<PackageVersion Include="Polly" Version="8.3.1" />
<PackageVersion Include="Polly.Contrib.WaitAndRetry" Version="1.1.1" />
<PackageVersion Include="Polly.Extensions.Http" Version="3.0.0" />
Expand All @@ -76,11 +73,11 @@
<PackageVersion Include="Serilog.Sinks.TestCorrelator" Version="3.2.0" />
<PackageVersion Include="ServiceStack.Redis.Core" Version="8.2.2" />
<PackageVersion Include="Swashbuckle.AspNetCore" Version="6.5.0" />
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="8.0.0" />
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="8.0.1" />
<PackageVersion Include="System.Reflection.TypeExtensions" Version="4.7.0" />
<PackageVersion Include="System.Text.Json" Version="8.0.2" />
<PackageVersion Include="xunit" Version="2.7.0" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.5.7">
<PackageVersion Include="System.Text.Json" Version="8.0.3" />
<PackageVersion Include="xunit" Version="2.7.1" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.5.8">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageVersion>
Expand Down Expand Up @@ -113,13 +110,13 @@
<PackageVersion Include="Pomelo.EntityFrameworkCore.MySql" Version="7.0.0" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'net8.0' ">
<PackageVersion Include="Microsoft.EntityFrameworkCore" Version="8.0.2" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Design" Version="8.0.2" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Relational" Version="8.0.2" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Sqlite" Version="8.0.2" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Sqlite.Core" Version="8.0.2" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.SqlServer" Version="8.0.2" />
<PackageVersion Include="Pomelo.EntityFrameworkCore.MySql" Version="8.0.1" />
<PackageVersion Include="Microsoft.EntityFrameworkCore" Version="8.0.3" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Design" Version="8.0.4" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Relational" Version="8.0.4" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Sqlite" Version="8.0.4" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Sqlite.Core" Version="8.0.4" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.SqlServer" Version="8.0.4" />
<PackageVersion Include="Pomelo.EntityFrameworkCore.MySql" Version="8.0.2" />
</ItemGroup>
<ItemGroup>
<GlobalPackageReference Include="MinVer" Version="5.0.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,32 +168,58 @@ public class KafkaMessageConsumer : KafkaMessagingGateway, IAmAMessageConsumer
_readCommittedOffsetsTimeoutMs = readCommittedOffsetsTimeoutMs;

_consumer = new ConsumerBuilder<string, byte[]>(_consumerConfig)
.SetPartitionsAssignedHandler((consumer, list) =>
.SetPartitionsAssignedHandler((consumer, partitions) =>
{
var partitions = list.Select(p => $"{p.Topic} : {p.Partition.Value}");
var partitionInfo = partitions.Select(p => $"{p.Topic} : {p.Partition.Value}");

s_logger.LogInformation("Partition Added {Channels}", String.Join(",", partitions));

_partitions.AddRange(list);
// Determine strategy and act accordingly
if (_consumerConfig.PartitionAssignmentStrategy == PartitionAssignmentStrategy.CooperativeSticky)
consumer.IncrementalAssign(partitions);
else
consumer.Assign(partitions);
})
.SetPartitionsRevokedHandler((consumer, list) =>
.SetPartitionsRevokedHandler((consumer, partitions) =>
{
consumer.Commit(list);
var revokedPartitions = list.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();
//We should commit any offsets we have stored for these partitions
try
{
_consumer?.Commit(partitions);
}
catch (KafkaException error)
{
s_logger.LogError(
"Error Committing Offsets During Partition Revoke: {Message} Code: {ErrorCode}, Reason: {ErrorMessage}, Fatal: {FatalError}",
error.Message, error.Error.Code, error.Error.Reason, error.Error.IsFatal
);
}

var revokedPartitions = partitions.Select(tpo => tpo.TopicPartition);
var revokedPartitionInfo = partitions.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();

s_logger.LogInformation("Partitions for consumer revoked {Channels}", string.Join(",", revokedPartitions));
s_logger.LogInformation("Partitions for consumer revoked {Channels}", string.Join(",", revokedPartitionInfo));

_partitions = _partitions.Where(tp => list.All(tpo => tpo.TopicPartition != tp)).ToList();
})
.SetPartitionsLostHandler((consumer, list) =>
// Determine strategy and act accordingly
if (_consumerConfig.PartitionAssignmentStrategy == PartitionAssignmentStrategy.CooperativeSticky)
consumer.IncrementalUnassign(revokedPartitions );
else
consumer.Unassign();

_partitions = _partitions.Where(tp => partitions.All(tpo => tpo.TopicPartition != tp)).ToList();
})
.SetPartitionsLostHandler((consumer, partitions) =>
{
var lostPartitions = list.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();
var lostPartitions = partitions.Select(tpo => $"{tpo.Topic} : {tpo.Partition}").ToList();

s_logger.LogInformation("Partitions for consumer lost {Channels}", string.Join(",", lostPartitions));

_partitions = _partitions.Where(tp => list.All(tpo => tpo.TopicPartition != tp)).ToList();
_partitions = _partitions.Where(tp => partitions.All(tpo => tpo.TopicPartition != tp)).ToList();

// This is typically treated the same as revocation
consumer.IncrementalUnassign(_partitions);
})
.SetErrorHandler((consumer, error) =>
.SetErrorHandler((_, error) =>
{
s_logger.LogError("Code: {ErrorCode}, Reason: {ErrorMessage}, Fatal: {FatalError}", error.Code,
error.Reason, error.IsFatal);
Expand Down Expand Up @@ -237,7 +263,6 @@ public void Acknowledge(Message message)
s_logger.LogInformation("Storing offset {Offset} to topic {Topic} for partition {ChannelName}",
new Offset(topicPartitionOffset.Offset + 1).Value, topicPartitionOffset.TopicPartition.Topic,
topicPartitionOffset.TopicPartition.Partition.Value);
_consumer.StoreOffset(offset);
_offsetStorage.Add(offset);

if (_offsetStorage.Count % _maxBatchSize == 0)
Expand Down Expand Up @@ -294,14 +319,14 @@ public Message[] Receive(int timeoutInMilliseconds)
{
CheckHasPartitions();

s_logger.LogDebug("No messages available from Kafka stream");
return new Message[] {new Message()};
s_logger.LogDebug($"No messages available from Kafka stream");
return new[] {new Message()};
}

if (consumeResult.IsPartitionEOF)
{
s_logger.LogDebug("Consumer {ConsumerMemberId} has reached the end of the partition", _consumer.MemberId);
return new Message[] {new Message()};
return new[] {new Message()};
}

s_logger.LogDebug("Usable message retrieved from Kafka stream: {Request}", consumeResult.Message.Value);
Expand Down Expand Up @@ -358,15 +383,10 @@ public bool Requeue(Message message, int delayMilliseconds)
return false;
}

private bool CheckHasPartitions()
private void CheckHasPartitions()
{
if (_partitions.Count <= 0)
{
s_logger.LogDebug("Consumer is not allocated any partitions");
return false;
}

return true;
}


Expand Down Expand Up @@ -491,7 +511,7 @@ private void FlushOffsets()
{
//This is expensive, so use a background thread
Task.Factory.StartNew(
action: state => CommitOffsets(),
action: _ => CommitOffsets(),
state: now,
cancellationToken: CancellationToken.None,
creationOptions: TaskCreationOptions.DenyChildAttach,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ public MessageDispatcherRoutingAsyncTests()
_dispatcher.State.Should().Be(DispatcherState.DS_AWAITING);
_dispatcher.Receive();
}
#pragma warning disable xUnit1031

[Fact]
[Fact()]
public void When_a_message_dispatcher_is_asked_to_connect_a_channel_and_handler_async()
{
Task.Delay(5000).Wait();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public MessagePumpDispatchAsyncTests()
channel.Enqueue(quitMessage);
}

[Fact]
[Fact()]
public void When_a_message_is_dispatched_it_should_reach_a_handler_async()
{
_messagePump.Run();
Expand Down
Loading