From 07b7cfd5a89470fc4e99de8c398b50ad9f834851 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Sousa?= Date: Tue, 19 Sep 2023 22:17:18 +0100 Subject: [PATCH] refactor!: merge projects into core framework --- .github/workflows/deploy-website.yml | 12 --- .github/workflows/test-deploy-website.yml | 12 --- .../KafkaFlow.Sample.BatchOperations.csproj | 2 - .../PrintConsoleMiddleware.cs | 2 +- .../Program.cs | 5 +- ...KafkaFlow.Sample.ConsumerThrottling.csproj | 2 - .../Program.cs | 4 +- .../KafkaFlow.Sample.FlowControl.csproj | 1 - .../KafkaFlow.Sample.FlowControl/Program.cs | 2 +- ...fkaFlow.Sample.PauseConsumerOnError.csproj | 2 - .../MessageHandler.cs | 2 +- .../Program.cs | 3 +- .../Handlers/AvroMessageHandler.cs | 31 +++--- .../Handlers/AvroMessageHandler2.cs | 2 +- .../Handlers/JsonMessageHandler.cs | 2 +- .../Handlers/ProtobufMessageHandler.cs | 2 +- .../KafkaFlow.Sample.SchemaRegistry.csproj | 2 - .../Program.cs | 5 +- .../KafkaFlow.Sample/KafkaFlow.Sample.csproj | 3 - .../KafkaFlow.Sample/PrintConsoleHandler.cs | 2 +- samples/KafkaFlow.Sample/Program.cs | 3 +- .../{IMessageCompressor.cs => ICompressor.cs} | 9 +- src/KafkaFlow.Abstractions/IDecompressor.cs | 15 +++ src/KafkaFlow.Abstractions/IDeserializer.cs | 21 ++++ .../IMessageHandler.cs | 2 +- src/KafkaFlow.Abstractions/ISerializer.cs | 10 -- .../ISerializerContext.cs | 2 +- .../ClusterConfigurationBuilderExtensions.cs | 5 +- .../ChangeConsumerWorkersCountHandler.cs | 2 +- .../ConsumerTelemetryMetricHandler.cs | 2 +- .../Handlers/PauseConsumerByNameHandler.cs | 2 +- .../Handlers/PauseConsumersByGroupHandler.cs | 2 +- .../Handlers/ResetConsumerOffsetHandler.cs | 2 +- .../Handlers/RestartConsumerByNameHandler.cs | 2 +- .../Handlers/ResumeConsumerByNameHandler.cs | 2 +- .../Handlers/ResumeConsumersByGroupHandler.cs | 2 +- .../RewindConsumerOffsetToDateTimeHandler.cs | 2 +- .../Handlers/StartConsumerByNameHandler.cs | 2 +- .../Handlers/StopConsumerByNameHandler.cs | 2 +- src/KafkaFlow.Admin/KafkaFlow.Admin.csproj | 2 - src/KafkaFlow.BatchConsume/AssemblyInfo.cs | 4 - .../KafkaFlow.BatchConsume.csproj | 15 --- .../GzipMessageCompressor.cs | 16 +--- .../GzipMessageDecompressor.cs | 25 +++++ .../KafkaFlow.Compressor.Gzip.csproj | 1 + .../ConfigurationBuilderExtensions.cs | 68 ------------- .../KafkaFlow.Compressor.csproj | 13 --- .../Core/Bootstrapper.cs | 22 ++--- .../Core/Handlers/AvroMessageHandler.cs | 2 +- .../Handlers/ConfluentJsonMessageHandler.cs | 2 +- .../ConfluentProtobufMessageHandler.cs | 2 +- .../Core/Handlers/MessageHandler.cs | 2 +- .../Core/Handlers/MessageHandler1.cs | 2 +- .../Core/Handlers/MessageHandler2.cs | 2 +- .../Core/Handlers/PauseResumeHandler.cs | 2 +- .../GlobalEventsTest.cs | 2 +- .../KafkaFlow.IntegrationTests.csproj | 3 - .../OpenTelemetryTests.cs | 2 +- .../KafkaFlow.SchemaRegistry.csproj | 4 +- .../SchemaRegistryTypeResolver.cs | 1 + .../JsonCoreDeserializer.cs | 40 ++++++++ .../JsonCoreSerializer.cs | 11 +-- .../NewtonsoftJsonDeserializer.cs | 51 ++++++++++ .../NewtonsoftJsonSerializer.cs | 16 ---- .../ProtobufNetDeserializer.cs | 19 ++++ .../ProtobufNetSerializer.cs | 7 -- .../ConfluentAvroDeserializer.cs | 42 ++++++++ .../ConfluentAvroSerializer.cs | 14 --- .../ConsumerConfigurationBuilderExtensions.cs | 7 +- ...alizer.SchemaRegistry.ConfluentAvro.csproj | 1 - .../ProducerConfigurationBuilderExtensions.cs | 1 + .../ConfluentJsonDeserializer.cs | 39 ++++++++ .../ConfluentJsonSerializer.cs | 14 --- .../ConsumerConfigurationBuilderExtensions.cs | 5 +- ...alizer.SchemaRegistry.ConfluentJson.csproj | 1 - .../ProducerConfigurationBuilderExtensions.cs | 1 + .../ConfluentProtobufDeserializer.cs | 27 ++++++ .../ConfluentProtobufSerializer.cs | 14 --- .../ConsumerConfigurationBuilderExtensions.cs | 7 +- ...er.SchemaRegistry.ConfluentProtobuf.csproj | 1 - .../ProducerConfigurationBuilderExtensions.cs | 1 + .../KafkaFlow.Serializer.csproj | 19 ---- src/KafkaFlow.TypedHandler/AssemblyInfo.cs | 4 - .../ConfigurationBuilderExtensions.cs | 34 ------- .../KafkaFlow.TypedHandler.csproj | 13 --- .../BatchConsumeMiddlewareTests.cs | 2 +- .../CompressorConsumerMiddlewareTests.cs | 18 ++-- .../CompressorProducerMiddlewareTests.cs | 8 +- .../KafkaFlow.UnitTests.csproj | 4 - .../NewtonsoftJsonDeserializerTests.cs | 48 ++++++++++ .../NewtonsoftJsonSerializerTests.cs | 17 ---- .../SerializerConsumerMiddlewareTests.cs | 22 +++-- .../SerializerProducerMiddlewareTests.cs | 2 + .../TypedHandler/HandlerTypeMappingTests.cs | 2 +- src/KafkaFlow.sln | 28 ------ .../Batching}/BatchConsumeMessageContext.cs | 2 +- .../Batching}/BatchConsumeMiddleware.cs | 2 +- .../Batching/BatchingExtensions.cs} | 9 +- .../ConfigurationBuilderExtensions.cs | 87 +++++++++++++++++ src/KafkaFlow/KafkaFlow.csproj | 2 + .../CompressorProducerMiddleware.cs | 8 +- .../DecompressorConsumerMiddleware.cs} | 16 ++-- ...ConsumerMiddlewareConfigurationBuilder.cs} | 96 ++++++++++--------- .../ProducerMiddlewareConfigurationBuilder.cs | 2 + .../DeserializerConsumerMiddleware.cs} | 19 ++-- .../Resolvers}/DefaultTypeResolver.cs | 2 +- .../Resolvers}/IMessageTypeResolver.cs | 2 +- .../Resolvers}/SingleMessageTypeResolver.cs | 2 +- .../SerializerProducerMiddleware.cs | 3 +- .../TypedHandlerConfiguration.cs | 2 +- .../TypedHandlerConfigurationBuilder.cs | 4 +- .../TypedHandler}/HandlerExecutor.cs | 2 +- .../TypedHandler}/HandlerTypeMapping.cs | 2 +- .../TypedHandler}/TypedHandlerMiddleware.cs | 3 +- 114 files changed, 626 insertions(+), 558 deletions(-) rename src/KafkaFlow.Abstractions/{IMessageCompressor.cs => ICompressor.cs} (55%) create mode 100644 src/KafkaFlow.Abstractions/IDecompressor.cs create mode 100644 src/KafkaFlow.Abstractions/IDeserializer.cs rename src/{KafkaFlow.TypedHandler => KafkaFlow.Abstractions}/IMessageHandler.cs (96%) delete mode 100644 src/KafkaFlow.BatchConsume/AssemblyInfo.cs delete mode 100644 src/KafkaFlow.BatchConsume/KafkaFlow.BatchConsume.csproj create mode 100644 src/KafkaFlow.Compressor.Gzip/GzipMessageDecompressor.cs delete mode 100644 src/KafkaFlow.Compressor/ConfigurationBuilderExtensions.cs delete mode 100644 src/KafkaFlow.Compressor/KafkaFlow.Compressor.csproj create mode 100644 src/KafkaFlow.Serializer.JsonCore/JsonCoreDeserializer.cs create mode 100644 src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonDeserializer.cs create mode 100644 src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetDeserializer.cs create mode 100644 src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroDeserializer.cs create mode 100644 src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonDeserializer.cs create mode 100644 src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufDeserializer.cs delete mode 100644 src/KafkaFlow.Serializer/KafkaFlow.Serializer.csproj delete mode 100644 src/KafkaFlow.TypedHandler/AssemblyInfo.cs delete mode 100644 src/KafkaFlow.TypedHandler/ConfigurationBuilderExtensions.cs delete mode 100644 src/KafkaFlow.TypedHandler/KafkaFlow.TypedHandler.csproj create mode 100644 src/KafkaFlow.UnitTests/Serializers/NewtonsoftJsonDeserializerTests.cs rename src/{KafkaFlow.BatchConsume => KafkaFlow/Batching}/BatchConsumeMessageContext.cs (97%) rename src/{KafkaFlow.BatchConsume => KafkaFlow/Batching}/BatchConsumeMiddleware.cs (99%) rename src/{KafkaFlow.BatchConsume/BatchConsumeExtensions.cs => KafkaFlow/Batching/BatchingExtensions.cs} (90%) rename src/{KafkaFlow.Compressor => KafkaFlow/Middlewares/Compressor}/CompressorProducerMiddleware.cs (79%) rename src/{KafkaFlow.Compressor/CompressorConsumerMiddleware.cs => KafkaFlow/Middlewares/Compressor/DecompressorConsumerMiddleware.cs} (57%) rename src/{KafkaFlow.Serializer/ConsumerMiddlewareConfigurationBuilderExtensions.cs => KafkaFlow/Middlewares/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs} (62%) rename src/{KafkaFlow.Serializer => KafkaFlow/Middlewares/Serializer/Configuration}/ProducerMiddlewareConfigurationBuilder.cs (98%) rename src/{KafkaFlow.Serializer/SerializerConsumerMiddleware.cs => KafkaFlow/Middlewares/Serializer/DeserializerConsumerMiddleware.cs} (78%) rename src/{KafkaFlow.Serializer => KafkaFlow/Middlewares/Serializer/Resolvers}/DefaultTypeResolver.cs (94%) rename src/{KafkaFlow.Serializer => KafkaFlow/Middlewares/Serializer/Resolvers}/IMessageTypeResolver.cs (93%) rename src/{KafkaFlow.Serializer => KafkaFlow/Middlewares/Serializer/Resolvers}/SingleMessageTypeResolver.cs (94%) rename src/{KafkaFlow.Serializer => KafkaFlow/Middlewares/Serializer}/SerializerProducerMiddleware.cs (95%) rename src/{KafkaFlow.TypedHandler => KafkaFlow/Middlewares/TypedHandler/Configuration}/TypedHandlerConfiguration.cs (79%) rename src/{KafkaFlow.TypedHandler => KafkaFlow/Middlewares/TypedHandler/Configuration}/TypedHandlerConfigurationBuilder.cs (98%) rename src/{KafkaFlow.TypedHandler => KafkaFlow/Middlewares/TypedHandler}/HandlerExecutor.cs (95%) rename src/{KafkaFlow.TypedHandler => KafkaFlow/Middlewares/TypedHandler}/HandlerTypeMapping.cs (95%) rename src/{KafkaFlow.TypedHandler => KafkaFlow/Middlewares/TypedHandler}/TypedHandlerMiddleware.cs (93%) diff --git a/.github/workflows/deploy-website.yml b/.github/workflows/deploy-website.yml index 129761e90..7fa32abe6 100644 --- a/.github/workflows/deploy-website.yml +++ b/.github/workflows/deploy-website.yml @@ -36,12 +36,6 @@ jobs: - run: xmldocmd-docusaurus ./drop/KafkaFlow.Admin.dll website/docs/reference/KafkaFlow.Admin --type-folders shell: bash - - run: xmldocmd-docusaurus ./drop/KafkaFlow.BatchConsume.dll website/docs/reference/KafkaFlow.BatchConsume --type-folders - shell: bash - - - run: xmldocmd-docusaurus ./drop/KafkaFlow.Compressor.dll website/docs/reference/KafkaFlow.Compressor --type-folders - shell: bash - - run: xmldocmd-docusaurus ./drop/KafkaFlow.Extensions.Hosting.dll website/docs/reference/KafkaFlow.Extensions.Hosting --type-folders shell: bash @@ -57,9 +51,6 @@ jobs: - run: xmldocmd-docusaurus ./drop/KafkaFlow.SchemaRegistry.dll website/docs/reference/KafkaFlow.SchemaRegistry --type-folders shell: bash - - run: xmldocmd-docusaurus ./drop/KafkaFlow.Serializer.dll website/docs/reference/KafkaFlow.Serializer --type-folders - shell: bash - - run: xmldocmd-docusaurus ./drop/KafkaFlow.Serializer.JsonCore.dll website/docs/reference/KafkaFlow.Serializer.JsonCore --type-folders shell: bash @@ -78,9 +69,6 @@ jobs: - run: xmldocmd-docusaurus ./drop/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf.dll website/docs/reference/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf --type-folders shell: bash - - run: xmldocmd-docusaurus ./drop/KafkaFlow.TypedHandler.dll website/docs/reference/KafkaFlow.TypedHandler --type-folders - shell: bash - - run: xmldocmd-docusaurus ./drop/KafkaFlow.Unity.dll website/docs/reference/KafkaFlow.Unity --type-folders shell: bash diff --git a/.github/workflows/test-deploy-website.yml b/.github/workflows/test-deploy-website.yml index b2b49043d..04584aec9 100644 --- a/.github/workflows/test-deploy-website.yml +++ b/.github/workflows/test-deploy-website.yml @@ -30,12 +30,6 @@ jobs: - run: xmldocmd-docusaurus ./drop/KafkaFlow.Admin.dll website/docs/reference/KafkaFlow.Admin --type-folders shell: bash - - run: xmldocmd-docusaurus ./drop/KafkaFlow.BatchConsume.dll website/docs/reference/KafkaFlow.BatchConsume --type-folders - shell: bash - - - run: xmldocmd-docusaurus ./drop/KafkaFlow.Compressor.dll website/docs/reference/KafkaFlow.Compressor --type-folders - shell: bash - - run: xmldocmd-docusaurus ./drop/KafkaFlow.Extensions.Hosting.dll website/docs/reference/KafkaFlow.Extensions.Hosting --type-folders shell: bash @@ -51,9 +45,6 @@ jobs: - run: xmldocmd-docusaurus ./drop/KafkaFlow.SchemaRegistry.dll website/docs/reference/KafkaFlow.SchemaRegistry --type-folders shell: bash - - run: xmldocmd-docusaurus ./drop/KafkaFlow.Serializer.dll website/docs/reference/KafkaFlow.Serializer --type-folders - shell: bash - - run: xmldocmd-docusaurus ./drop/KafkaFlow.Serializer.JsonCore.dll website/docs/reference/KafkaFlow.Serializer.JsonCore --type-folders shell: bash @@ -72,9 +63,6 @@ jobs: - run: xmldocmd-docusaurus ./drop/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf.dll website/docs/reference/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf --type-folders shell: bash - - run: xmldocmd-docusaurus ./drop/KafkaFlow.TypedHandler.dll website/docs/reference/KafkaFlow.TypedHandler --type-folders - shell: bash - - run: xmldocmd-docusaurus ./drop/KafkaFlow.Unity.dll website/docs/reference/KafkaFlow.Unity --type-folders shell: bash diff --git a/samples/KafkaFlow.Sample.BatchOperations/KafkaFlow.Sample.BatchOperations.csproj b/samples/KafkaFlow.Sample.BatchOperations/KafkaFlow.Sample.BatchOperations.csproj index f51dbe58e..6336785ab 100644 --- a/samples/KafkaFlow.Sample.BatchOperations/KafkaFlow.Sample.BatchOperations.csproj +++ b/samples/KafkaFlow.Sample.BatchOperations/KafkaFlow.Sample.BatchOperations.csproj @@ -22,11 +22,9 @@ - - diff --git a/samples/KafkaFlow.Sample.BatchOperations/PrintConsoleMiddleware.cs b/samples/KafkaFlow.Sample.BatchOperations/PrintConsoleMiddleware.cs index 317098b43..438689b7a 100644 --- a/samples/KafkaFlow.Sample.BatchOperations/PrintConsoleMiddleware.cs +++ b/samples/KafkaFlow.Sample.BatchOperations/PrintConsoleMiddleware.cs @@ -1,7 +1,7 @@ using System; using System.Linq; using System.Threading.Tasks; -using KafkaFlow.BatchConsume; +using KafkaFlow.Batching; namespace KafkaFlow.Sample.BatchOperations; diff --git a/samples/KafkaFlow.Sample.BatchOperations/Program.cs b/samples/KafkaFlow.Sample.BatchOperations/Program.cs index 1515286ed..65fc08fc2 100644 --- a/samples/KafkaFlow.Sample.BatchOperations/Program.cs +++ b/samples/KafkaFlow.Sample.BatchOperations/Program.cs @@ -1,7 +1,6 @@ using System; using System.Linq; using KafkaFlow; -using KafkaFlow.BatchConsume; using KafkaFlow.Producers; using KafkaFlow.Sample.BatchOperations; using KafkaFlow.Serializer; @@ -35,8 +34,8 @@ .WithWorkersCount(1) .AddMiddlewares( middlewares => middlewares - .AddSerializer() - .BatchConsume(10, TimeSpan.FromSeconds(10)) + .AddDeserializer() + .AddBatching(10, TimeSpan.FromSeconds(10)) .Add() ) ) diff --git a/samples/KafkaFlow.Sample.ConsumerThrottling/KafkaFlow.Sample.ConsumerThrottling.csproj b/samples/KafkaFlow.Sample.ConsumerThrottling/KafkaFlow.Sample.ConsumerThrottling.csproj index 25d305383..ba75e35c2 100644 --- a/samples/KafkaFlow.Sample.ConsumerThrottling/KafkaFlow.Sample.ConsumerThrottling.csproj +++ b/samples/KafkaFlow.Sample.ConsumerThrottling/KafkaFlow.Sample.ConsumerThrottling.csproj @@ -22,8 +22,6 @@ - - diff --git a/samples/KafkaFlow.Sample.ConsumerThrottling/Program.cs b/samples/KafkaFlow.Sample.ConsumerThrottling/Program.cs index 3e6699c70..d6396e9dc 100644 --- a/samples/KafkaFlow.Sample.ConsumerThrottling/Program.cs +++ b/samples/KafkaFlow.Sample.ConsumerThrottling/Program.cs @@ -48,7 +48,7 @@ .WithWorkersCount(1) .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .Add() ) ) @@ -68,7 +68,7 @@ .AddAction(a => a.AboveThreshold(10).ApplyDelay(1_000)) .AddAction(a => a.AboveThreshold(20).ApplyDelay(5_000)) .AddAction(a => a.AboveThreshold(30).ApplyDelay(10_000))) - .AddSerializer() + .AddDeserializer() .Add() ) ) diff --git a/samples/KafkaFlow.Sample.FlowControl/KafkaFlow.Sample.FlowControl.csproj b/samples/KafkaFlow.Sample.FlowControl/KafkaFlow.Sample.FlowControl.csproj index e4a1914e1..20946db23 100644 --- a/samples/KafkaFlow.Sample.FlowControl/KafkaFlow.Sample.FlowControl.csproj +++ b/samples/KafkaFlow.Sample.FlowControl/KafkaFlow.Sample.FlowControl.csproj @@ -26,7 +26,6 @@ - diff --git a/samples/KafkaFlow.Sample.FlowControl/Program.cs b/samples/KafkaFlow.Sample.FlowControl/Program.cs index 87a783ce1..26ed362fd 100644 --- a/samples/KafkaFlow.Sample.FlowControl/Program.cs +++ b/samples/KafkaFlow.Sample.FlowControl/Program.cs @@ -35,7 +35,7 @@ .WithWorkersCount(1) .AddMiddlewares( m => m - .AddSingleTypeSerializer() + .AddSingleTypeDeserializer() .Add() ) ); diff --git a/samples/KafkaFlow.Sample.PauseConsumerOnError/KafkaFlow.Sample.PauseConsumerOnError.csproj b/samples/KafkaFlow.Sample.PauseConsumerOnError/KafkaFlow.Sample.PauseConsumerOnError.csproj index f6f6d1e42..7a55c9fd7 100644 --- a/samples/KafkaFlow.Sample.PauseConsumerOnError/KafkaFlow.Sample.PauseConsumerOnError.csproj +++ b/samples/KafkaFlow.Sample.PauseConsumerOnError/KafkaFlow.Sample.PauseConsumerOnError.csproj @@ -14,8 +14,6 @@ - - diff --git a/samples/KafkaFlow.Sample.PauseConsumerOnError/MessageHandler.cs b/samples/KafkaFlow.Sample.PauseConsumerOnError/MessageHandler.cs index 114dae019..77b597265 100644 --- a/samples/KafkaFlow.Sample.PauseConsumerOnError/MessageHandler.cs +++ b/samples/KafkaFlow.Sample.PauseConsumerOnError/MessageHandler.cs @@ -1,4 +1,4 @@ -using KafkaFlow.TypedHandler; +using KafkaFlow.Middlewares.TypedHandler; namespace KafkaFlow.Sample.PauseConsumerOnError; diff --git a/samples/KafkaFlow.Sample.PauseConsumerOnError/Program.cs b/samples/KafkaFlow.Sample.PauseConsumerOnError/Program.cs index 478aa838b..35a2ce5b6 100644 --- a/samples/KafkaFlow.Sample.PauseConsumerOnError/Program.cs +++ b/samples/KafkaFlow.Sample.PauseConsumerOnError/Program.cs @@ -2,7 +2,6 @@ using KafkaFlow.Producers; using KafkaFlow.Sample.PauseConsumerOnError; using KafkaFlow.Serializer; -using KafkaFlow.TypedHandler; using Microsoft.Extensions.DependencyInjection; var services = new ServiceCollection(); @@ -33,7 +32,7 @@ middlewares => middlewares .Add() - .AddSerializer() + .AddDeserializer() .AddTypedHandlers(h => h.AddHandler()) ) ) diff --git a/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/AvroMessageHandler.cs b/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/AvroMessageHandler.cs index f7f70e702..0893f1225 100644 --- a/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/AvroMessageHandler.cs +++ b/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/AvroMessageHandler.cs @@ -1,20 +1,21 @@ -namespace KafkaFlow.Sample.SchemaRegistry.Handlers; - -using System; -using System.Threading.Tasks; -using global::SchemaRegistry; -using TypedHandler; - -public class AvroMessageHandler : IMessageHandler +namespace KafkaFlow.Sample.SchemaRegistry.Handlers { - public Task Handle(IMessageContext context, AvroLogMessage message) + using System; + using System.Threading.Tasks; + using KafkaFlow.Middlewares.TypedHandler; + using global::SchemaRegistry; + + public class AvroMessageHandler : IMessageHandler { - Console.WriteLine( - "Partition: {0} | Offset: {1} | Message: {2} | Avro", - context.ConsumerContext.Partition, - context.ConsumerContext.Offset, - message.Severity.ToString()); + public Task Handle(IMessageContext context, AvroLogMessage message) + { + Console.WriteLine( + "Partition: {0} | Offset: {1} | Message: {2} | Avro", + context.ConsumerContext.Partition, + context.ConsumerContext.Offset, + message.Severity.ToString()); - return Task.CompletedTask; + return Task.CompletedTask; + } } } \ No newline at end of file diff --git a/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/AvroMessageHandler2.cs b/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/AvroMessageHandler2.cs index ab35a60e3..65660f5f3 100644 --- a/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/AvroMessageHandler2.cs +++ b/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/AvroMessageHandler2.cs @@ -3,7 +3,7 @@ using System; using System.Threading.Tasks; using global::SchemaRegistry; -using TypedHandler; +using KafkaFlow.Middlewares.TypedHandler; public class AvroMessageHandler2 : IMessageHandler { diff --git a/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/JsonMessageHandler.cs b/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/JsonMessageHandler.cs index f17b98a8b..39b446434 100644 --- a/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/JsonMessageHandler.cs +++ b/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/JsonMessageHandler.cs @@ -3,7 +3,7 @@ using System; using System.Threading.Tasks; using global::SchemaRegistry; -using TypedHandler; +using KafkaFlow.Middlewares.TypedHandler; public class JsonMessageHandler : IMessageHandler { diff --git a/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/ProtobufMessageHandler.cs b/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/ProtobufMessageHandler.cs index aee7c1cd9..e1158c9c8 100644 --- a/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/ProtobufMessageHandler.cs +++ b/samples/KafkaFlow.Sample.SchemaRegistry/Handlers/ProtobufMessageHandler.cs @@ -1,7 +1,7 @@  using System; using System.Threading.Tasks; -using KafkaFlow.TypedHandler; +using KafkaFlow.Middlewares.TypedHandler; using SchemaRegistry; namespace KafkaFlow.Sample.SchemaRegistry.Handlers; diff --git a/samples/KafkaFlow.Sample.SchemaRegistry/KafkaFlow.Sample.SchemaRegistry.csproj b/samples/KafkaFlow.Sample.SchemaRegistry/KafkaFlow.Sample.SchemaRegistry.csproj index 220139cdf..ddef38718 100644 --- a/samples/KafkaFlow.Sample.SchemaRegistry/KafkaFlow.Sample.SchemaRegistry.csproj +++ b/samples/KafkaFlow.Sample.SchemaRegistry/KafkaFlow.Sample.SchemaRegistry.csproj @@ -24,8 +24,6 @@ - - diff --git a/samples/KafkaFlow.Sample.SchemaRegistry/Program.cs b/samples/KafkaFlow.Sample.SchemaRegistry/Program.cs index bc539818d..926879042 100644 --- a/samples/KafkaFlow.Sample.SchemaRegistry/Program.cs +++ b/samples/KafkaFlow.Sample.SchemaRegistry/Program.cs @@ -5,7 +5,6 @@ using KafkaFlow; using KafkaFlow.Producers; using KafkaFlow.Sample.SchemaRegistry.Handlers; -using KafkaFlow.TypedHandler; using Microsoft.Extensions.DependencyInjection; using SchemaRegistry; @@ -74,7 +73,7 @@ .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddSchemaRegistryAvroSerializer() + .AddSchemaRegistryAvroDeserializer() .AddTypedHandlers( handlers => handlers .AddHandler() @@ -103,7 +102,7 @@ .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddSchemaRegistryProtobufSerializer() + .AddSchemaRegistryProtobufDeserializer() .AddTypedHandlers(handlers => handlers.AddHandler()) ) ) diff --git a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj index dcf23e1b3..a6743af44 100644 --- a/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj +++ b/samples/KafkaFlow.Sample/KafkaFlow.Sample.csproj @@ -19,13 +19,10 @@ - - - diff --git a/samples/KafkaFlow.Sample/PrintConsoleHandler.cs b/samples/KafkaFlow.Sample/PrintConsoleHandler.cs index 11276440c..57a1a48b8 100644 --- a/samples/KafkaFlow.Sample/PrintConsoleHandler.cs +++ b/samples/KafkaFlow.Sample/PrintConsoleHandler.cs @@ -1,6 +1,6 @@ using System; using System.Threading.Tasks; -using KafkaFlow.TypedHandler; +using KafkaFlow.Middlewares.TypedHandler; namespace KafkaFlow.Sample; diff --git a/samples/KafkaFlow.Sample/Program.cs b/samples/KafkaFlow.Sample/Program.cs index 874962de1..b0be4f2c7 100644 --- a/samples/KafkaFlow.Sample/Program.cs +++ b/samples/KafkaFlow.Sample/Program.cs @@ -4,7 +4,6 @@ using KafkaFlow.Producers; using KafkaFlow.Sample; using KafkaFlow.Serializer; -using KafkaFlow.TypedHandler; using Microsoft.Extensions.DependencyInjection; var services = new ServiceCollection(); @@ -33,7 +32,7 @@ .WithWorkersCount(3) .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .AddTypedHandlers(h => h.AddHandler()) ) ) diff --git a/src/KafkaFlow.Abstractions/IMessageCompressor.cs b/src/KafkaFlow.Abstractions/ICompressor.cs similarity index 55% rename from src/KafkaFlow.Abstractions/IMessageCompressor.cs rename to src/KafkaFlow.Abstractions/ICompressor.cs index 53fffb40f..8b1d6a1b5 100644 --- a/src/KafkaFlow.Abstractions/IMessageCompressor.cs +++ b/src/KafkaFlow.Abstractions/ICompressor.cs @@ -3,7 +3,7 @@ namespace KafkaFlow /// /// Used to create a message compressor /// - public interface IMessageCompressor + public interface ICompressor { /// /// Compress the given message @@ -11,12 +11,5 @@ public interface IMessageCompressor /// The message to be compressed /// The compressed message byte[] Compress(byte[] message); - - /// - /// Decompress the given message - /// - /// The message to be decompressed - /// The decompressed message - byte[] Decompress(byte[] message); } } diff --git a/src/KafkaFlow.Abstractions/IDecompressor.cs b/src/KafkaFlow.Abstractions/IDecompressor.cs new file mode 100644 index 000000000..69e140ffb --- /dev/null +++ b/src/KafkaFlow.Abstractions/IDecompressor.cs @@ -0,0 +1,15 @@ +namespace KafkaFlow +{ + /// + /// Used to create a message decompressor + /// + public interface IDecompressor + { + /// + /// Decompress the given message + /// + /// The message to be decompressed + /// The decompressed message + byte[] Decompress(byte[] message); + } +} diff --git a/src/KafkaFlow.Abstractions/IDeserializer.cs b/src/KafkaFlow.Abstractions/IDeserializer.cs new file mode 100644 index 000000000..0ecfd2c86 --- /dev/null +++ b/src/KafkaFlow.Abstractions/IDeserializer.cs @@ -0,0 +1,21 @@ +namespace KafkaFlow +{ + using System; + using System.IO; + using System.Threading.Tasks; + + /// + /// Used to implement a message serializer + /// + public interface IDeserializer + { + /// + /// Deserializes the given message + /// + /// A stream to read the data to be deserialized + /// The type to be created + /// An object containing metadata + /// The deserialized message + Task DeserializeAsync(Stream input, Type type, ISerializerContext context); + } +} diff --git a/src/KafkaFlow.TypedHandler/IMessageHandler.cs b/src/KafkaFlow.Abstractions/IMessageHandler.cs similarity index 96% rename from src/KafkaFlow.TypedHandler/IMessageHandler.cs rename to src/KafkaFlow.Abstractions/IMessageHandler.cs index 7c7f181f8..611ba91f6 100644 --- a/src/KafkaFlow.TypedHandler/IMessageHandler.cs +++ b/src/KafkaFlow.Abstractions/IMessageHandler.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.TypedHandler +namespace KafkaFlow { using System.Threading.Tasks; diff --git a/src/KafkaFlow.Abstractions/ISerializer.cs b/src/KafkaFlow.Abstractions/ISerializer.cs index 0b0951997..23cd9b73a 100644 --- a/src/KafkaFlow.Abstractions/ISerializer.cs +++ b/src/KafkaFlow.Abstractions/ISerializer.cs @@ -1,6 +1,5 @@ namespace KafkaFlow { - using System; using System.IO; using System.Threading.Tasks; @@ -17,14 +16,5 @@ public interface ISerializer /// An object containing metadata /// The serialized message Task SerializeAsync(object message, Stream output, ISerializerContext context); - - /// - /// Deserializes the given message - /// - /// A stream to read the data to be deserialized - /// The type to be created - /// An object containing metadata - /// The deserialized message - Task DeserializeAsync(Stream input, Type type, ISerializerContext context); } } diff --git a/src/KafkaFlow.Abstractions/ISerializerContext.cs b/src/KafkaFlow.Abstractions/ISerializerContext.cs index 7c8d7a459..5fc5b42e9 100644 --- a/src/KafkaFlow.Abstractions/ISerializerContext.cs +++ b/src/KafkaFlow.Abstractions/ISerializerContext.cs @@ -1,7 +1,7 @@ namespace KafkaFlow { /// - /// A context that can have some metadata to help with serialization process + /// Context for serialization and deserialization operations. /// public interface ISerializerContext { diff --git a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs index 4e8c00477..608649536 100644 --- a/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Admin/Extensions/ClusterConfigurationBuilderExtensions.cs @@ -6,7 +6,6 @@ using KafkaFlow.Admin.Handlers; using KafkaFlow.Configuration; using KafkaFlow.Serializer; - using KafkaFlow.TypedHandler; /// /// No needed @@ -54,7 +53,7 @@ public static class ClusterConfigurationBuilderExtensions .DisableManagement() .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .AddTypedHandlers( handlers => handlers .WithHandlerLifetime(InstanceLifetime.Singleton) @@ -123,7 +122,7 @@ public static class ClusterConfigurationBuilderExtensions .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .AddTypedHandlers( handlers => handlers .WithHandlerLifetime(InstanceLifetime.Singleton) diff --git a/src/KafkaFlow.Admin/Handlers/ChangeConsumerWorkersCountHandler.cs b/src/KafkaFlow.Admin/Handlers/ChangeConsumerWorkersCountHandler.cs index 370f418c6..4dbf0f7a0 100644 --- a/src/KafkaFlow.Admin/Handlers/ChangeConsumerWorkersCountHandler.cs +++ b/src/KafkaFlow.Admin/Handlers/ChangeConsumerWorkersCountHandler.cs @@ -3,7 +3,7 @@ namespace KafkaFlow.Admin.Handlers using System.Threading.Tasks; using KafkaFlow.Admin.Messages; using KafkaFlow.Consumers; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class ChangeConsumerWorkersCountHandler : IMessageHandler { diff --git a/src/KafkaFlow.Admin/Handlers/ConsumerTelemetryMetricHandler.cs b/src/KafkaFlow.Admin/Handlers/ConsumerTelemetryMetricHandler.cs index 0e7c038c9..e2b7fe36b 100644 --- a/src/KafkaFlow.Admin/Handlers/ConsumerTelemetryMetricHandler.cs +++ b/src/KafkaFlow.Admin/Handlers/ConsumerTelemetryMetricHandler.cs @@ -2,7 +2,7 @@ namespace KafkaFlow.Admin.Handlers { using System.Threading.Tasks; using KafkaFlow.Admin.Messages; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class ConsumerTelemetryMetricHandler : IMessageHandler { diff --git a/src/KafkaFlow.Admin/Handlers/PauseConsumerByNameHandler.cs b/src/KafkaFlow.Admin/Handlers/PauseConsumerByNameHandler.cs index c8bd71843..9afc8efdf 100644 --- a/src/KafkaFlow.Admin/Handlers/PauseConsumerByNameHandler.cs +++ b/src/KafkaFlow.Admin/Handlers/PauseConsumerByNameHandler.cs @@ -5,7 +5,7 @@ namespace KafkaFlow.Admin.Handlers using KafkaFlow.Admin.Extensions; using KafkaFlow.Admin.Messages; using KafkaFlow.Consumers; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class PauseConsumerByNameHandler : IMessageHandler { diff --git a/src/KafkaFlow.Admin/Handlers/PauseConsumersByGroupHandler.cs b/src/KafkaFlow.Admin/Handlers/PauseConsumersByGroupHandler.cs index a7aac4f2f..376393842 100644 --- a/src/KafkaFlow.Admin/Handlers/PauseConsumersByGroupHandler.cs +++ b/src/KafkaFlow.Admin/Handlers/PauseConsumersByGroupHandler.cs @@ -5,7 +5,7 @@ namespace KafkaFlow.Admin.Handlers using KafkaFlow.Admin.Extensions; using KafkaFlow.Admin.Messages; using KafkaFlow.Consumers; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class PauseConsumersByGroupHandler : IMessageHandler { diff --git a/src/KafkaFlow.Admin/Handlers/ResetConsumerOffsetHandler.cs b/src/KafkaFlow.Admin/Handlers/ResetConsumerOffsetHandler.cs index ca732a741..2d9c60008 100644 --- a/src/KafkaFlow.Admin/Handlers/ResetConsumerOffsetHandler.cs +++ b/src/KafkaFlow.Admin/Handlers/ResetConsumerOffsetHandler.cs @@ -7,7 +7,7 @@ namespace KafkaFlow.Admin.Handlers using KafkaFlow.Admin.Extensions; using KafkaFlow.Admin.Messages; using KafkaFlow.Consumers; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class ResetConsumerOffsetHandler : IMessageHandler { diff --git a/src/KafkaFlow.Admin/Handlers/RestartConsumerByNameHandler.cs b/src/KafkaFlow.Admin/Handlers/RestartConsumerByNameHandler.cs index 82316a519..3525bf490 100644 --- a/src/KafkaFlow.Admin/Handlers/RestartConsumerByNameHandler.cs +++ b/src/KafkaFlow.Admin/Handlers/RestartConsumerByNameHandler.cs @@ -3,7 +3,7 @@ namespace KafkaFlow.Admin.Handlers using System.Threading.Tasks; using KafkaFlow.Admin.Messages; using KafkaFlow.Consumers; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class RestartConsumerByNameHandler : IMessageHandler { diff --git a/src/KafkaFlow.Admin/Handlers/ResumeConsumerByNameHandler.cs b/src/KafkaFlow.Admin/Handlers/ResumeConsumerByNameHandler.cs index dfbd14f11..e8eb336ff 100644 --- a/src/KafkaFlow.Admin/Handlers/ResumeConsumerByNameHandler.cs +++ b/src/KafkaFlow.Admin/Handlers/ResumeConsumerByNameHandler.cs @@ -5,7 +5,7 @@ namespace KafkaFlow.Admin.Handlers using KafkaFlow.Admin.Extensions; using KafkaFlow.Admin.Messages; using KafkaFlow.Consumers; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class ResumeConsumerByNameHandler : IMessageHandler { diff --git a/src/KafkaFlow.Admin/Handlers/ResumeConsumersByGroupHandler.cs b/src/KafkaFlow.Admin/Handlers/ResumeConsumersByGroupHandler.cs index 5aa6f3898..553d2ca23 100644 --- a/src/KafkaFlow.Admin/Handlers/ResumeConsumersByGroupHandler.cs +++ b/src/KafkaFlow.Admin/Handlers/ResumeConsumersByGroupHandler.cs @@ -5,7 +5,7 @@ namespace KafkaFlow.Admin.Handlers using KafkaFlow.Admin.Extensions; using KafkaFlow.Admin.Messages; using KafkaFlow.Consumers; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class ResumeConsumersByGroupHandler : IMessageHandler { diff --git a/src/KafkaFlow.Admin/Handlers/RewindConsumerOffsetToDateTimeHandler.cs b/src/KafkaFlow.Admin/Handlers/RewindConsumerOffsetToDateTimeHandler.cs index 83712ab40..b5895ef56 100644 --- a/src/KafkaFlow.Admin/Handlers/RewindConsumerOffsetToDateTimeHandler.cs +++ b/src/KafkaFlow.Admin/Handlers/RewindConsumerOffsetToDateTimeHandler.cs @@ -7,7 +7,7 @@ namespace KafkaFlow.Admin.Handlers using KafkaFlow.Admin.Extensions; using KafkaFlow.Admin.Messages; using KafkaFlow.Consumers; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class RewindConsumerOffsetToDateTimeHandler : IMessageHandler { diff --git a/src/KafkaFlow.Admin/Handlers/StartConsumerByNameHandler.cs b/src/KafkaFlow.Admin/Handlers/StartConsumerByNameHandler.cs index 4189c2814..b7ad5fa08 100644 --- a/src/KafkaFlow.Admin/Handlers/StartConsumerByNameHandler.cs +++ b/src/KafkaFlow.Admin/Handlers/StartConsumerByNameHandler.cs @@ -3,7 +3,7 @@ namespace KafkaFlow.Admin.Handlers using System.Threading.Tasks; using KafkaFlow.Admin.Messages; using KafkaFlow.Consumers; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class StartConsumerByNameHandler : IMessageHandler { diff --git a/src/KafkaFlow.Admin/Handlers/StopConsumerByNameHandler.cs b/src/KafkaFlow.Admin/Handlers/StopConsumerByNameHandler.cs index 203f4faf7..92a9fc0a0 100644 --- a/src/KafkaFlow.Admin/Handlers/StopConsumerByNameHandler.cs +++ b/src/KafkaFlow.Admin/Handlers/StopConsumerByNameHandler.cs @@ -3,7 +3,7 @@ namespace KafkaFlow.Admin.Handlers using System.Threading.Tasks; using KafkaFlow.Admin.Messages; using KafkaFlow.Consumers; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class StopConsumerByNameHandler : IMessageHandler { diff --git a/src/KafkaFlow.Admin/KafkaFlow.Admin.csproj b/src/KafkaFlow.Admin/KafkaFlow.Admin.csproj index 58d220888..3b7ecf40f 100644 --- a/src/KafkaFlow.Admin/KafkaFlow.Admin.csproj +++ b/src/KafkaFlow.Admin/KafkaFlow.Admin.csproj @@ -9,8 +9,6 @@ - - diff --git a/src/KafkaFlow.BatchConsume/AssemblyInfo.cs b/src/KafkaFlow.BatchConsume/AssemblyInfo.cs deleted file mode 100644 index f8402e685..000000000 --- a/src/KafkaFlow.BatchConsume/AssemblyInfo.cs +++ /dev/null @@ -1,4 +0,0 @@ -using System.Runtime.CompilerServices; - -[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")] -[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] diff --git a/src/KafkaFlow.BatchConsume/KafkaFlow.BatchConsume.csproj b/src/KafkaFlow.BatchConsume/KafkaFlow.BatchConsume.csproj deleted file mode 100644 index 8a6dcb8a2..000000000 --- a/src/KafkaFlow.BatchConsume/KafkaFlow.BatchConsume.csproj +++ /dev/null @@ -1,15 +0,0 @@ - - - - netstandard2.0 - KafkaFlow.BatchConsume - A KafkaFlow middleware to consume messages in batches - - - - - - - - - diff --git a/src/KafkaFlow.Compressor.Gzip/GzipMessageCompressor.cs b/src/KafkaFlow.Compressor.Gzip/GzipMessageCompressor.cs index bcc46d9ae..3f795c6af 100644 --- a/src/KafkaFlow.Compressor.Gzip/GzipMessageCompressor.cs +++ b/src/KafkaFlow.Compressor.Gzip/GzipMessageCompressor.cs @@ -6,7 +6,7 @@ /// /// A GZIP message compressor /// - public class GzipMessageCompressor : IMessageCompressor + public class GzipMessageCompressor : ICompressor { /// public byte[] Compress(byte[] message) @@ -21,19 +21,5 @@ public byte[] Compress(byte[] message) return outputStream.ToArray(); } - - /// - public byte[] Decompress(byte[] message) - { - using var outputStream = new MemoryStream(); - using var inputStream = new MemoryStream(message); - - using (var gzipStream = new GZipStream(inputStream, CompressionMode.Decompress)) - { - gzipStream.CopyTo(outputStream); - } - - return outputStream.ToArray(); - } } } diff --git a/src/KafkaFlow.Compressor.Gzip/GzipMessageDecompressor.cs b/src/KafkaFlow.Compressor.Gzip/GzipMessageDecompressor.cs new file mode 100644 index 000000000..19baffaf0 --- /dev/null +++ b/src/KafkaFlow.Compressor.Gzip/GzipMessageDecompressor.cs @@ -0,0 +1,25 @@ +namespace KafkaFlow.Compressor.Gzip +{ + using System.IO; + using System.IO.Compression; + + /// + /// A GZIP message decompressor + /// + public class GzipMessageDecompressor : IDecompressor + { + /// + public byte[] Decompress(byte[] message) + { + using var outputStream = new MemoryStream(); + using var inputStream = new MemoryStream(message); + + using (var gzipStream = new GZipStream(inputStream, CompressionMode.Decompress)) + { + gzipStream.CopyTo(outputStream); + } + + return outputStream.ToArray(); + } + } +} diff --git a/src/KafkaFlow.Compressor.Gzip/KafkaFlow.Compressor.Gzip.csproj b/src/KafkaFlow.Compressor.Gzip/KafkaFlow.Compressor.Gzip.csproj index 54f7ece59..7d2c63f51 100644 --- a/src/KafkaFlow.Compressor.Gzip/KafkaFlow.Compressor.Gzip.csproj +++ b/src/KafkaFlow.Compressor.Gzip/KafkaFlow.Compressor.Gzip.csproj @@ -8,5 +8,6 @@ + \ No newline at end of file diff --git a/src/KafkaFlow.Compressor/ConfigurationBuilderExtensions.cs b/src/KafkaFlow.Compressor/ConfigurationBuilderExtensions.cs deleted file mode 100644 index 1f748f112..000000000 --- a/src/KafkaFlow.Compressor/ConfigurationBuilderExtensions.cs +++ /dev/null @@ -1,68 +0,0 @@ -namespace KafkaFlow.Compressor -{ - using KafkaFlow.Configuration; - - /// - /// Extension methods for and . - /// - public static class ConfigurationBuilderExtensions - { - /// - /// Registers a middleware to decompress the message - /// - /// The middleware configuration builder - /// The compressor type - /// - public static IConsumerMiddlewareConfigurationBuilder AddCompressor(this IConsumerMiddlewareConfigurationBuilder middlewares) - where T : class, IMessageCompressor - { - middlewares.DependencyConfigurator.AddTransient(); - return middlewares.AddCompressor(resolver => resolver.Resolve()); - } - - /// - /// Registers a middleware to decompress the message - /// - /// The middleware configuration builder - /// The compressor type that implements - /// A factory to create the instance - /// - public static IConsumerMiddlewareConfigurationBuilder AddCompressor( - this IConsumerMiddlewareConfigurationBuilder middlewares, - Factory factory) - where T : class, IMessageCompressor - { - return middlewares.Add(resolver => new CompressorConsumerMiddleware(factory(resolver))); - } - - /// - /// Registers a middleware to compress the message - /// It is highly recommended to use the producer native compression ('WithCompression()' method) instead of using the compressor middleware - /// - /// The middleware configuration builder - /// The compressor type that implements - /// - public static IProducerMiddlewareConfigurationBuilder AddCompressor(this IProducerMiddlewareConfigurationBuilder middlewares) - where T : class, IMessageCompressor - { - middlewares.DependencyConfigurator.AddTransient(); - return middlewares.AddCompressor(resolver => resolver.Resolve()); - } - - /// - /// Registers a middleware to compress the message - /// It is highly recommended to use the producer native compression ('WithCompression()' method) instead of using the compressor middleware - /// - /// The middleware configuration builder - /// The compressor type that implements - /// A factory to create the instance - /// - public static IProducerMiddlewareConfigurationBuilder AddCompressor( - this IProducerMiddlewareConfigurationBuilder middlewares, - Factory factory) - where T : class, IMessageCompressor - { - return middlewares.Add(resolver => new CompressorProducerMiddleware(factory(resolver))); - } - } -} diff --git a/src/KafkaFlow.Compressor/KafkaFlow.Compressor.csproj b/src/KafkaFlow.Compressor/KafkaFlow.Compressor.csproj deleted file mode 100644 index d9ceb3efe..000000000 --- a/src/KafkaFlow.Compressor/KafkaFlow.Compressor.csproj +++ /dev/null @@ -1,13 +0,0 @@ - - - - netstandard2.0 - KafkaFlow.Compressor - Compression middleware for KafkaFlow - - - - - - - diff --git a/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs b/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs index f743478d6..23edee91c 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Bootstrapper.cs @@ -9,7 +9,6 @@ namespace KafkaFlow.IntegrationTests.Core using global::Microsoft.Extensions.Configuration; using global::Microsoft.Extensions.DependencyInjection; using global::Microsoft.Extensions.Hosting; - using KafkaFlow.Compressor; using KafkaFlow.Compressor.Gzip; using KafkaFlow.IntegrationTests.Core.Handlers; using KafkaFlow.IntegrationTests.Core.Messages; @@ -17,7 +16,6 @@ namespace KafkaFlow.IntegrationTests.Core using KafkaFlow.IntegrationTests.Core.Producers; using KafkaFlow.Serializer; using KafkaFlow.Serializer.SchemaRegistry; - using KafkaFlow.TypedHandler; using AutoOffsetReset = KafkaFlow.AutoOffsetReset; internal static class Bootstrapper @@ -130,7 +128,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithConsumerConfig(defaultConfig) .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .AddTypedHandlers( handlers => handlers .WithHandlerLifetime(InstanceLifetime.Singleton) @@ -158,7 +156,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithConsumerConfig(defaultConfig) .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .AddTypedHandlers( handlers => handlers .WithHandlerLifetime(InstanceLifetime.Singleton) @@ -186,7 +184,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithConsumerConfig(defaultConfig) .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .AddTypedHandlers( handlers => handlers .WithHandlerLifetime(InstanceLifetime.Singleton) @@ -210,7 +208,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddSingleTypeSerializer(typeof(TestMessage1)) + .AddSingleTypeDeserializer(typeof(TestMessage1)) .AddTypedHandlers( handlers => handlers @@ -231,7 +229,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection }) .AddMiddlewares( middlewares => middlewares - .AddSingleTypeSerializer() + .AddSingleTypeDeserializer() .AddTypedHandlers( handlers => handlers @@ -246,7 +244,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .AddTypedHandlers( handlers => handlers @@ -261,7 +259,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddCompressor() + .AddDecompressor() .Add())) .AddConsumer( consumer => consumer @@ -272,7 +270,7 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddSerializer(_ => new JsonCoreSerializer()) + .AddDeserializer(_ => new JsonCoreDeserializer()) .AddTypedHandlers( handlers => handlers @@ -288,8 +286,8 @@ private static void SetupServices(HostBuilderContext context, IServiceCollection .WithAutoCommitIntervalMs(1) .AddMiddlewares( middlewares => middlewares - .AddCompressor() - .AddSerializer() + .AddDecompressor() + .AddDeserializer() .AddTypedHandlers( handlers => handlers diff --git a/src/KafkaFlow.IntegrationTests/Core/Handlers/AvroMessageHandler.cs b/src/KafkaFlow.IntegrationTests/Core/Handlers/AvroMessageHandler.cs index 23994a0d9..feb76b1f8 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Handlers/AvroMessageHandler.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Handlers/AvroMessageHandler.cs @@ -1,7 +1,7 @@ namespace KafkaFlow.IntegrationTests.Core.Handlers { using System.Threading.Tasks; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; using MessageTypes; internal class AvroMessageHandler : IMessageHandler diff --git a/src/KafkaFlow.IntegrationTests/Core/Handlers/ConfluentJsonMessageHandler.cs b/src/KafkaFlow.IntegrationTests/Core/Handlers/ConfluentJsonMessageHandler.cs index f049242a5..22e12e1cc 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Handlers/ConfluentJsonMessageHandler.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Handlers/ConfluentJsonMessageHandler.cs @@ -2,7 +2,7 @@ namespace KafkaFlow.IntegrationTests.Core.Handlers { using System.Threading.Tasks; using KafkaFlow.IntegrationTests.Core.Messages; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class ConfluentJsonMessageHandler : IMessageHandler { diff --git a/src/KafkaFlow.IntegrationTests/Core/Handlers/ConfluentProtobufMessageHandler.cs b/src/KafkaFlow.IntegrationTests/Core/Handlers/ConfluentProtobufMessageHandler.cs index 192e09bbb..697efca9f 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Handlers/ConfluentProtobufMessageHandler.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Handlers/ConfluentProtobufMessageHandler.cs @@ -2,7 +2,7 @@ namespace KafkaFlow.IntegrationTests.Core.Handlers { using System.Threading.Tasks; using KafkaFlow.IntegrationTests.Core.Messages; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class ConfluentProtobufMessageHandler : IMessageHandler { diff --git a/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageHandler.cs b/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageHandler.cs index a5f77c0ae..f2d1bb484 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageHandler.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageHandler.cs @@ -2,7 +2,7 @@ namespace KafkaFlow.IntegrationTests.Core.Handlers { using System.Threading.Tasks; using KafkaFlow.IntegrationTests.Core.Messages; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class MessageHandler : IMessageHandler { diff --git a/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageHandler1.cs b/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageHandler1.cs index c33518e09..1b01e1ea0 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageHandler1.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageHandler1.cs @@ -2,7 +2,7 @@ namespace KafkaFlow.IntegrationTests.Core.Handlers { using System.Threading.Tasks; using KafkaFlow.IntegrationTests.Core.Messages; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class MessageHandler1 : IMessageHandler { diff --git a/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageHandler2.cs b/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageHandler2.cs index 38534e1c0..f47f02638 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageHandler2.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Handlers/MessageHandler2.cs @@ -3,7 +3,7 @@ namespace KafkaFlow.IntegrationTests.Core.Handlers using System; using System.Threading.Tasks; using KafkaFlow.IntegrationTests.Core.Messages; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class MessageHandler2 : IMessageHandler { diff --git a/src/KafkaFlow.IntegrationTests/Core/Handlers/PauseResumeHandler.cs b/src/KafkaFlow.IntegrationTests/Core/Handlers/PauseResumeHandler.cs index f58772786..05ccf1350 100644 --- a/src/KafkaFlow.IntegrationTests/Core/Handlers/PauseResumeHandler.cs +++ b/src/KafkaFlow.IntegrationTests/Core/Handlers/PauseResumeHandler.cs @@ -2,7 +2,7 @@ namespace KafkaFlow.IntegrationTests.Core.Handlers { using System.Threading.Tasks; using KafkaFlow.IntegrationTests.Core.Messages; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; internal class PauseResumeHandler : IMessageHandler { diff --git a/src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs b/src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs index d9055fd00..c1452a340 100644 --- a/src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs +++ b/src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs @@ -222,7 +222,7 @@ private void ConfigureConsumer(IConsumerConfigurationBuilder consumerConfigur .WithAutoOffsetReset(KafkaFlow.AutoOffsetReset.Earliest) .AddMiddlewares( middlewares => middlewares - .AddSerializer() + .AddDeserializer() .Add()) .WithPartitionsAssignedHandler((_, _) => { diff --git a/src/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj b/src/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj index 3aeaabc22..63e7a9705 100644 --- a/src/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj +++ b/src/KafkaFlow.IntegrationTests/KafkaFlow.IntegrationTests.csproj @@ -37,7 +37,6 @@ - @@ -45,8 +44,6 @@ - - diff --git a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs index 855083632..9a2bac810 100644 --- a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs +++ b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs @@ -150,7 +150,7 @@ private async Task GetServiceProvider() .WithAutoOffsetReset(AutoOffsetReset.Latest) .AddMiddlewares( middlewares => middlewares - .AddCompressor() + .AddDecompressor() .Add()) .WithPartitionsAssignedHandler((_, _) => { diff --git a/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj b/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj index b1241a6fb..d7aa2a489 100644 --- a/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj +++ b/src/KafkaFlow.SchemaRegistry/KafkaFlow.SchemaRegistry.csproj @@ -9,12 +9,12 @@ - + - + diff --git a/src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs b/src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs index f0dd56cd9..3861d1ef6 100644 --- a/src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs +++ b/src/KafkaFlow.SchemaRegistry/SchemaRegistryTypeResolver.cs @@ -6,6 +6,7 @@ namespace KafkaFlow using System.Linq; using System.Threading; using System.Threading.Tasks; + using KafkaFlow.Middlewares.Serializer.Resolvers; /// /// The message type resolver to be used with schema registry serializers diff --git a/src/KafkaFlow.Serializer.JsonCore/JsonCoreDeserializer.cs b/src/KafkaFlow.Serializer.JsonCore/JsonCoreDeserializer.cs new file mode 100644 index 000000000..521b3be16 --- /dev/null +++ b/src/KafkaFlow.Serializer.JsonCore/JsonCoreDeserializer.cs @@ -0,0 +1,40 @@ +namespace KafkaFlow.Serializer +{ + using System; + using System.IO; + using System.Text.Json; + using System.Threading.Tasks; + + /// + /// A message deserializer using System.Text.Json library + /// + public class JsonCoreDeserializer : IDeserializer + { + private readonly JsonSerializerOptions serializerOptions; + + /// + /// Initializes a new instance of the class. + /// + /// Json serializer options + public JsonCoreDeserializer(JsonSerializerOptions options) + { + this.serializerOptions = options; + } + + /// + /// Initializes a new instance of the class. + /// + public JsonCoreDeserializer() + : this(new JsonSerializerOptions()) + { + } + + /// + public async Task DeserializeAsync(Stream input, Type type, ISerializerContext context) + { + return await JsonSerializer + .DeserializeAsync(input, type, this.serializerOptions) + .ConfigureAwait(false); + } + } +} diff --git a/src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs b/src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs index 5bb840360..e0069dd7e 100644 --- a/src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs +++ b/src/KafkaFlow.Serializer.JsonCore/JsonCoreSerializer.cs @@ -1,6 +1,5 @@ namespace KafkaFlow.Serializer { - using System; using System.IO; using System.Text.Json; using System.Threading.Tasks; @@ -59,13 +58,5 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con return Task.CompletedTask; } - - /// - public async Task DeserializeAsync(Stream input, Type type, ISerializerContext context) - { - return await JsonSerializer - .DeserializeAsync(input, type, this.serializerOptions) - .ConfigureAwait(false); - } } -} \ No newline at end of file +} diff --git a/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonDeserializer.cs b/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonDeserializer.cs new file mode 100644 index 000000000..f0b05dcff --- /dev/null +++ b/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonDeserializer.cs @@ -0,0 +1,51 @@ +namespace KafkaFlow.Serializer +{ + using System; + using System.IO; + using System.Text; + using System.Threading.Tasks; + using Newtonsoft.Json; + + /// + /// A message deserializer using NewtonsoftJson library + /// + public class NewtonsoftJsonDeserializer : IDeserializer + { + private const int DefaultBufferSize = 1024; + + private static readonly UTF8Encoding UTF8NoBom = new (false); + private readonly JsonSerializerSettings settings; + + /// + /// Initializes a new instance of the class. + /// + /// Json serializer settings + public NewtonsoftJsonDeserializer(JsonSerializerSettings settings) + { + this.settings = settings; + } + + /// + /// Initializes a new instance of the class. + /// + public NewtonsoftJsonDeserializer() + : this(new JsonSerializerSettings()) + { + } + + /// + public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) + { + using var sr = new StreamReader( + input, + UTF8NoBom, + true, + DefaultBufferSize, + true); + + var serializer = JsonSerializer.CreateDefault(this.settings); + + return Task.FromResult(serializer.Deserialize(sr, type)); + } + } +} diff --git a/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonSerializer.cs b/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonSerializer.cs index bbc943482..8ca731d2c 100644 --- a/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonSerializer.cs +++ b/src/KafkaFlow.Serializer.NewtonsoftJson/NewtonsoftJsonSerializer.cs @@ -1,6 +1,5 @@ namespace KafkaFlow.Serializer { - using System; using System.IO; using System.Text; using System.Threading.Tasks; @@ -43,20 +42,5 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con return Task.CompletedTask; } - - /// - public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) - { - using var sr = new StreamReader( - input, - UTF8NoBom, - true, - DefaultBufferSize, - true); - - var serializer = JsonSerializer.CreateDefault(this.settings); - - return Task.FromResult(serializer.Deserialize(sr, type)); - } } } diff --git a/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetDeserializer.cs b/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetDeserializer.cs new file mode 100644 index 000000000..c4838c35a --- /dev/null +++ b/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetDeserializer.cs @@ -0,0 +1,19 @@ +namespace KafkaFlow.Serializer +{ + using System; + using System.IO; + using System.Threading.Tasks; + using ProtoBuf; + + /// + /// A message deserializer using protobuf-net library + /// + public class ProtobufNetDeserializer : IDeserializer + { + /// + public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) + { + return Task.FromResult(Serializer.Deserialize(type, input)); + } + } +} diff --git a/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs b/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs index 50ff531a9..12d8a706d 100644 --- a/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs +++ b/src/KafkaFlow.Serializer.ProtobufNet/ProtobufNetSerializer.cs @@ -1,6 +1,5 @@ namespace KafkaFlow.Serializer { - using System; using System.IO; using System.Threading.Tasks; using ProtoBuf; @@ -17,11 +16,5 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con return Task.CompletedTask; } - - /// - public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) - { - return Task.FromResult(Serializer.Deserialize(type, input)); - } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroDeserializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroDeserializer.cs new file mode 100644 index 000000000..3f29adc7d --- /dev/null +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroDeserializer.cs @@ -0,0 +1,42 @@ +namespace KafkaFlow.Serializer.SchemaRegistry +{ + using System; + using System.IO; + using System.Threading.Tasks; + using Confluent.SchemaRegistry; + using Confluent.SchemaRegistry.Serdes; + + /// + /// A message serializer using Apache.Avro library + /// + public class ConfluentAvroDeserializer : IDeserializer + { + private readonly ISchemaRegistryClient schemaRegistryClient; + + /// + /// Initializes a new instance of the class. + /// + /// The to be used by the framework + public ConfluentAvroDeserializer(IDependencyResolver resolver) + { + this.schemaRegistryClient = + resolver.Resolve() ?? + throw new InvalidOperationException( + $"No schema registry configuration was found. Set it using {nameof(ClusterConfigurationBuilderExtensions.WithSchemaRegistry)} on cluster configuration"); + } + + /// + public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) + { + return ConfluentDeserializerWrapper + .GetOrCreateDeserializer( + type, + () => Activator + .CreateInstance( + typeof(AvroDeserializer<>).MakeGenericType(type), + this.schemaRegistryClient, + null)) + .DeserializeAsync(input, context); + } + } +} diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs index 4fe5c2099..e0e2eb49e 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConfluentAvroSerializer.cs @@ -43,19 +43,5 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con this.serializerConfig)) .SerializeAsync(message, output, context); } - - /// - public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) - { - return ConfluentDeserializerWrapper - .GetOrCreateDeserializer( - type, - () => Activator - .CreateInstance( - typeof(AvroDeserializer<>).MakeGenericType(type), - this.schemaRegistryClient, - null)) - .DeserializeAsync(input, context); - } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs index 81ac47435..859e852f5 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs @@ -2,6 +2,7 @@ { using Confluent.SchemaRegistry; using KafkaFlow.Configuration; + using KafkaFlow.Middlewares.Serializer; using KafkaFlow.Serializer.SchemaRegistry; /// @@ -14,12 +15,12 @@ public static class ConsumerConfigurationBuilderExtensions /// /// The middleware configuration builder /// - public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryAvroSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryAvroDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares) { return middlewares.Add( - resolver => new SerializerConsumerMiddleware( - new ConfluentAvroSerializer(resolver), + resolver => new DeserializerConsumerMiddleware( + new ConfluentAvroDeserializer(resolver), new SchemaRegistryTypeResolver(new ConfluentAvroTypeNameResolver(resolver.Resolve())))); } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro.csproj b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro.csproj index 355666973..e65dfa824 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro.csproj +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro.csproj @@ -10,7 +10,6 @@ - diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ProducerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ProducerConfigurationBuilderExtensions.cs index d60a7a39c..54bf6bac7 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ProducerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ProducerConfigurationBuilderExtensions.cs @@ -3,6 +3,7 @@ using Confluent.SchemaRegistry; using Confluent.SchemaRegistry.Serdes; using KafkaFlow.Configuration; + using KafkaFlow.Middlewares.Serializer; using KafkaFlow.Serializer.SchemaRegistry; /// diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonDeserializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonDeserializer.cs new file mode 100644 index 000000000..0ec931348 --- /dev/null +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonDeserializer.cs @@ -0,0 +1,39 @@ +namespace KafkaFlow.Serializer.SchemaRegistry +{ + using System; + using System.IO; + using System.Threading.Tasks; + using Confluent.SchemaRegistry.Serdes; + using NJsonSchema.Generation; + + /// + /// A json message serializer integrated with the confluent schema registry + /// + public class ConfluentJsonDeserializer : IDeserializer + { + private readonly JsonSchemaGeneratorSettings schemaGeneratorSettings; + + /// + /// Initializes a new instance of the class. + /// + /// An instance of + public ConfluentJsonDeserializer(JsonSchemaGeneratorSettings schemaGeneratorSettings = null) + { + this.schemaGeneratorSettings = schemaGeneratorSettings; + } + + /// + public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) + { + return ConfluentDeserializerWrapper + .GetOrCreateDeserializer( + type, + () => Activator + .CreateInstance( + typeof(JsonDeserializer<>).MakeGenericType(type), + null, + this.schemaGeneratorSettings)) + .DeserializeAsync(input, context); + } + } +} diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs index 4b827857e..fa2763903 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConfluentJsonSerializer.cs @@ -62,19 +62,5 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con this.schemaGeneratorSettings)) .SerializeAsync(message, output, context); } - - /// - public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) - { - return ConfluentDeserializerWrapper - .GetOrCreateDeserializer( - type, - () => Activator - .CreateInstance( - typeof(JsonDeserializer<>).MakeGenericType(type), - null, - this.schemaGeneratorSettings)) - .DeserializeAsync(input, context); - } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConsumerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConsumerConfigurationBuilderExtensions.cs index 06c8c04b9..4700b797b 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConsumerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ConsumerConfigurationBuilderExtensions.cs @@ -1,6 +1,7 @@ namespace KafkaFlow { using KafkaFlow.Configuration; + using KafkaFlow.Middlewares.Serializer.Resolvers; using KafkaFlow.Serializer.SchemaRegistry; /// @@ -17,8 +18,8 @@ public static class ConsumerConfigurationBuilderExtensions public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryJsonSerializer( this IConsumerMiddlewareConfigurationBuilder middlewares) { - return middlewares.AddSerializer( - resolver => new ConfluentJsonSerializer(resolver), + return middlewares.AddDeserializer( + resolver => new ConfluentJsonDeserializer(), _ => new SingleMessageTypeResolver(typeof(TMessage))); } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson.csproj b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson.csproj index c7e599371..305788443 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson.csproj +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson.csproj @@ -15,7 +15,6 @@ - diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ProducerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ProducerConfigurationBuilderExtensions.cs index 1708eea73..208595450 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ProducerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentJson/ProducerConfigurationBuilderExtensions.cs @@ -2,6 +2,7 @@ { using Confluent.SchemaRegistry.Serdes; using KafkaFlow.Configuration; + using KafkaFlow.Middlewares.Serializer.Resolvers; using KafkaFlow.Serializer.SchemaRegistry; /// diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufDeserializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufDeserializer.cs new file mode 100644 index 000000000..3722dc35d --- /dev/null +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufDeserializer.cs @@ -0,0 +1,27 @@ +namespace KafkaFlow.Serializer.SchemaRegistry +{ + using System; + using System.Collections.Generic; + using System.IO; + using System.Threading.Tasks; + using Confluent.SchemaRegistry.Serdes; + + /// + /// A protobuf message serializer integrated with the confluent schema registry + /// + public class ConfluentProtobufDeserializer : IDeserializer + { + /// + public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) + { + return ConfluentDeserializerWrapper + .GetOrCreateDeserializer( + type, + () => Activator + .CreateInstance( + typeof(ProtobufDeserializer<>).MakeGenericType(type), + (IEnumerable>)null)) + .DeserializeAsync(input, context); + } + } +} diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs index 5285501cd..d29699d71 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufSerializer.cs @@ -1,7 +1,6 @@ namespace KafkaFlow.Serializer.SchemaRegistry { using System; - using System.Collections.Generic; using System.IO; using System.Threading.Tasks; using Confluent.SchemaRegistry; @@ -42,18 +41,5 @@ public Task SerializeAsync(object message, Stream output, ISerializerContext con this.serializerConfig)) .SerializeAsync(message, output, context); } - - /// - public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) - { - return ConfluentDeserializerWrapper - .GetOrCreateDeserializer( - type, - () => Activator - .CreateInstance( - typeof(ProtobufDeserializer<>).MakeGenericType(type), - (IEnumerable>)null)) - .DeserializeAsync(input, context); - } } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs index ce3b0e4c3..59012f3d8 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs @@ -2,6 +2,7 @@ { using Confluent.SchemaRegistry; using KafkaFlow.Configuration; + using KafkaFlow.Middlewares.Serializer; using KafkaFlow.Serializer.SchemaRegistry; /// @@ -14,12 +15,12 @@ public static class ConsumerConfigurationBuilderExtensions /// /// The middleware configuration builder /// - public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryProtobufSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryProtobufDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares) { return middlewares.Add( - resolver => new SerializerConsumerMiddleware( - new ConfluentProtobufSerializer(resolver), + resolver => new DeserializerConsumerMiddleware( + new ConfluentProtobufDeserializer(), new SchemaRegistryTypeResolver(new ConfluentProtobufTypeNameResolver(resolver.Resolve())))); } } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf.csproj b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf.csproj index f2eac2e3b..418a3ccd7 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf.csproj +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf.csproj @@ -16,6 +16,5 @@ - diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ProducerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ProducerConfigurationBuilderExtensions.cs index fc59f5e3d..af83cd19d 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ProducerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ProducerConfigurationBuilderExtensions.cs @@ -3,6 +3,7 @@ using Confluent.SchemaRegistry; using Confluent.SchemaRegistry.Serdes; using KafkaFlow.Configuration; + using KafkaFlow.Middlewares.Serializer; using KafkaFlow.Serializer.SchemaRegistry; /// diff --git a/src/KafkaFlow.Serializer/KafkaFlow.Serializer.csproj b/src/KafkaFlow.Serializer/KafkaFlow.Serializer.csproj deleted file mode 100644 index 2a0a1c219..000000000 --- a/src/KafkaFlow.Serializer/KafkaFlow.Serializer.csproj +++ /dev/null @@ -1,19 +0,0 @@ - - - - netstandard2.0 - KafkaFlow.Serializer - Serializer middleware for KafkaFlow - KafkaFlow - - - - - - - - - - - - diff --git a/src/KafkaFlow.TypedHandler/AssemblyInfo.cs b/src/KafkaFlow.TypedHandler/AssemblyInfo.cs deleted file mode 100644 index f8402e685..000000000 --- a/src/KafkaFlow.TypedHandler/AssemblyInfo.cs +++ /dev/null @@ -1,4 +0,0 @@ -using System.Runtime.CompilerServices; - -[assembly: InternalsVisibleTo("KafkaFlow.UnitTests")] -[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")] diff --git a/src/KafkaFlow.TypedHandler/ConfigurationBuilderExtensions.cs b/src/KafkaFlow.TypedHandler/ConfigurationBuilderExtensions.cs deleted file mode 100644 index 9befd1a92..000000000 --- a/src/KafkaFlow.TypedHandler/ConfigurationBuilderExtensions.cs +++ /dev/null @@ -1,34 +0,0 @@ -namespace KafkaFlow.TypedHandler -{ - using System; - using KafkaFlow.Configuration; - - /// - /// Extension methods over - /// - public static class ConfigurationBuilderExtensions - { - /// - /// Adds typed handler middleware - /// - /// Instance of - /// A handler to configure the middleware - /// - public static IConsumerMiddlewareConfigurationBuilder AddTypedHandlers( - this IConsumerMiddlewareConfigurationBuilder builder, - Action configure) - { - var typedHandlerBuilder = new TypedHandlerConfigurationBuilder(builder.DependencyConfigurator); - - configure(typedHandlerBuilder); - - var configuration = typedHandlerBuilder.Build(); - - builder.Add( - resolver => new TypedHandlerMiddleware(resolver, configuration), - MiddlewareLifetime.Message); - - return builder; - } - } -} diff --git a/src/KafkaFlow.TypedHandler/KafkaFlow.TypedHandler.csproj b/src/KafkaFlow.TypedHandler/KafkaFlow.TypedHandler.csproj deleted file mode 100644 index 260ab8c41..000000000 --- a/src/KafkaFlow.TypedHandler/KafkaFlow.TypedHandler.csproj +++ /dev/null @@ -1,13 +0,0 @@ - - - - netstandard2.0 - KafkaFlow.TypedHandler - A KafkaFlow middleware to execute a handler class when a specific message arrives - - - - - - - diff --git a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs index 2269db974..819b324ed 100644 --- a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs @@ -3,7 +3,7 @@ namespace KafkaFlow.UnitTests.BatchConsume using System; using System.Threading.Tasks; using FluentAssertions; - using KafkaFlow.BatchConsume; + using KafkaFlow.Batching; using KafkaFlow.Configuration; using KafkaFlow.Consumers; using Microsoft.VisualStudio.TestTools.UnitTesting; diff --git a/src/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs index ce2808cb2..57ced96cb 100644 --- a/src/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Compressors/CompressorConsumerMiddlewareTests.cs @@ -3,7 +3,9 @@ namespace KafkaFlow.UnitTests.Compressors using System; using System.Threading.Tasks; using FluentAssertions; - using KafkaFlow.Compressor; + + using KafkaFlow.Middlewares.Compressor; + using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -11,16 +13,16 @@ namespace KafkaFlow.UnitTests.Compressors public class CompressorConsumerMiddlewareTests { private Mock contextMock; - private Mock compressorMock; + private Mock decompressorMock; private bool nextCalled; - private CompressorConsumerMiddleware target; + private DecompressorConsumerMiddleware target; [TestInitialize] public void Setup() { this.contextMock = new Mock(); - this.compressorMock = new Mock(); - this.target = new CompressorConsumerMiddleware(this.compressorMock.Object); + this.decompressorMock = new Mock(); + this.target = new DecompressorConsumerMiddleware(this.decompressorMock.Object); } [TestMethod] @@ -38,7 +40,7 @@ public void Invoke_NotByteArrayMessage_ThrowsInvalidOperationException() act.Should().Throw(); this.nextCalled.Should().BeFalse(); this.contextMock.Verify(x => x.SetMessage(It.IsAny(), It.IsAny()), Times.Never); - this.compressorMock.Verify(x => x.Decompress(It.IsAny()), Times.Never); + this.decompressorMock.Verify(x => x.Decompress(It.IsAny()), Times.Never); } [TestMethod] @@ -55,7 +57,7 @@ public async Task Invoke_ValidMessage_CallNext() .SetupGet(x => x.Message) .Returns(compressedMessage); - this.compressorMock + this.decompressorMock .Setup(x => x.Decompress((byte[]) compressedMessage.Value)) .Returns(uncompressedValue); @@ -76,7 +78,7 @@ public async Task Invoke_ValidMessage_CallNext() resultContext.Should().NotBeNull(); resultContext.Should().Be(transformedContextMock.Object); this.contextMock.VerifyAll(); - this.compressorMock.VerifyAll(); + this.decompressorMock.VerifyAll(); } private Task SetNextCalled() diff --git a/src/KafkaFlow.UnitTests/Compressors/CompressorProducerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Compressors/CompressorProducerMiddlewareTests.cs index 54b135a70..24c41eed2 100644 --- a/src/KafkaFlow.UnitTests/Compressors/CompressorProducerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Compressors/CompressorProducerMiddlewareTests.cs @@ -3,7 +3,9 @@ namespace KafkaFlow.UnitTests.Compressors using System; using System.Threading.Tasks; using FluentAssertions; - using KafkaFlow.Compressor; + + using KafkaFlow.Middlewares.Compressor; + using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -11,7 +13,7 @@ namespace KafkaFlow.UnitTests.Compressors public class CompressorProducerMiddlewareTests { private Mock contextMock; - private Mock compressorMock; + private Mock compressorMock; private CompressorProducerMiddleware target; @@ -19,7 +21,7 @@ public class CompressorProducerMiddlewareTests public void Setup() { this.contextMock = new Mock(); - this.compressorMock = new Mock(); + this.compressorMock = new Mock(); this.target = new CompressorProducerMiddleware(this.compressorMock.Object); } diff --git a/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj b/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj index 6e905e545..681188711 100644 --- a/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj +++ b/src/KafkaFlow.UnitTests/KafkaFlow.UnitTests.csproj @@ -31,16 +31,12 @@ - - - - diff --git a/src/KafkaFlow.UnitTests/Serializers/NewtonsoftJsonDeserializerTests.cs b/src/KafkaFlow.UnitTests/Serializers/NewtonsoftJsonDeserializerTests.cs new file mode 100644 index 000000000..88fa963e8 --- /dev/null +++ b/src/KafkaFlow.UnitTests/Serializers/NewtonsoftJsonDeserializerTests.cs @@ -0,0 +1,48 @@ +namespace KafkaFlow.UnitTests.Serializers +{ + using System; + using System.IO; + using System.Text; + using System.Threading.Tasks; + using AutoFixture; + using FluentAssertions; + using KafkaFlow.Serializer; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + using Newtonsoft.Json; + + [TestClass] + public class NewtonsoftJsonDeserializerTests + { + private readonly Mock contextMock = new (); + private readonly NewtonsoftJsonDeserializer deserializer = new (); + + private readonly Fixture fixture = new(); + + [TestMethod] + public async Task DeserializeAsync_ValidPayload_ObjectGenerated() + { + // Arrange + var message = this.fixture.Create(); + using var input = new MemoryStream(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message))); + + // Act + var result = await this.deserializer.DeserializeAsync(input, typeof(TestMessage), this.contextMock.Object); + + // Assert + result.Should().NotBeNull(); + result.Should().BeOfType(); + } + + private class TestMessage + { + public int IntegerField { get; set; } + + public string StringField { get; set; } + + public double DoubleField { get; set; } + + public DateTime DateTimeField { get; set; } + } + } +} diff --git a/src/KafkaFlow.UnitTests/Serializers/NewtonsoftJsonSerializerTests.cs b/src/KafkaFlow.UnitTests/Serializers/NewtonsoftJsonSerializerTests.cs index 3e3a5997e..93673ed05 100644 --- a/src/KafkaFlow.UnitTests/Serializers/NewtonsoftJsonSerializerTests.cs +++ b/src/KafkaFlow.UnitTests/Serializers/NewtonsoftJsonSerializerTests.cs @@ -2,14 +2,12 @@ namespace KafkaFlow.UnitTests.Serializers { using System; using System.IO; - using System.Text; using System.Threading.Tasks; using AutoFixture; using FluentAssertions; using KafkaFlow.Serializer; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; - using Newtonsoft.Json; [TestClass] public class NewtonsoftJsonSerializerTests @@ -34,21 +32,6 @@ public async Task SerializeAsync_ValidPayload_JsonByteArrayGenerated() output.Position.Should().BeGreaterThan(0); } - [TestMethod] - public async Task DeserializeAsync_ValidPayload_ObjectGenerated() - { - // Arrange - var message = this.fixture.Create(); - using var input = new MemoryStream(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message))); - - // Act - var result = await this.serializer.DeserializeAsync(input, typeof(TestMessage), this.contextMock.Object); - - // Assert - result.Should().NotBeNull(); - result.Should().BeOfType(); - } - private class TestMessage { public int IntegerField { get; set; } diff --git a/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs index cf0f1398b..6c05b0201 100644 --- a/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs @@ -4,6 +4,8 @@ namespace KafkaFlow.UnitTests.Serializers using System.IO; using System.Threading.Tasks; using FluentAssertions; + using KafkaFlow.Middlewares.Serializer; + using KafkaFlow.Middlewares.Serializer.Resolvers; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -11,22 +13,22 @@ namespace KafkaFlow.UnitTests.Serializers public class SerializerConsumerMiddlewareTests { private Mock contextMock; - private Mock serializerMock; + private Mock deserializerMock; private Mock typeResolverMock; private bool nextCalled; - private SerializerConsumerMiddleware target; + private DeserializerConsumerMiddleware target; [TestInitialize] public void Setup() { this.contextMock = new Mock(); - this.serializerMock = new Mock(); + this.deserializerMock = new Mock(); this.typeResolverMock = new Mock(); - this.target = new SerializerConsumerMiddleware( - this.serializerMock.Object, + this.target = new DeserializerConsumerMiddleware( + this.deserializerMock.Object, this.typeResolverMock.Object); } @@ -49,7 +51,7 @@ public async Task Invoke_NullMessageType_ReturnWithoutCallingNext() this.nextCalled.Should().BeFalse(); this.typeResolverMock.VerifyAll(); this.contextMock.Verify(x => x.SetMessage(It.IsAny(), It.IsAny()), Times.Never); - this.serializerMock.Verify( + this.deserializerMock.Verify( x => x.DeserializeAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Never); } @@ -67,7 +69,7 @@ public async Task Invoke_NullMessage_CallNext() // Assert this.nextCalled.Should().BeTrue(); - this.serializerMock.Verify( + this.deserializerMock.Verify( x => x.DeserializeAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Never); this.typeResolverMock.Verify(x => x.OnConsumeAsync(It.IsAny()), Times.Never); @@ -88,7 +90,7 @@ public void Invoke_NotByteArrayMessage_ThrowsInvalidOperationException() act.Should().Throw(); this.nextCalled.Should().BeFalse(); this.contextMock.Verify(x => x.SetMessage(It.IsAny(), It.IsAny()), Times.Never); - this.serializerMock.Verify( + this.deserializerMock.Verify( x => x.DeserializeAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Never); this.typeResolverMock.Verify(x => x.OnConsumeAsync(It.IsAny()), Times.Never); @@ -122,7 +124,7 @@ public async Task Invoke_ValidMessage_Deserialize() .Setup(x => x.OnConsumeAsync(this.contextMock.Object)) .ReturnsAsync(messageType); - this.serializerMock + this.deserializerMock .Setup(x => x.DeserializeAsync(It.IsAny(), messageType, It.IsAny())) .ReturnsAsync(deserializedMessage); @@ -143,7 +145,7 @@ public async Task Invoke_ValidMessage_Deserialize() resultContext.Should().NotBeNull(); resultContext.Should().Be(transformedContextMock.Object); this.contextMock.VerifyAll(); - this.serializerMock.VerifyAll(); + this.deserializerMock.VerifyAll(); this.typeResolverMock.VerifyAll(); } diff --git a/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs index 74030a595..c828b4504 100644 --- a/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Serializers/SerializerProducerMiddlewareTests.cs @@ -5,6 +5,8 @@ namespace KafkaFlow.UnitTests.Serializers using System.Threading.Tasks; using AutoFixture; using FluentAssertions; + using KafkaFlow.Middlewares.Serializer; + using KafkaFlow.Middlewares.Serializer.Resolvers; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; diff --git a/src/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs b/src/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs index 48f4ba6ca..90856c544 100644 --- a/src/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs +++ b/src/KafkaFlow.UnitTests/TypedHandler/HandlerTypeMappingTests.cs @@ -1,7 +1,7 @@ namespace KafkaFlow.UnitTests.TypedHandler { using FluentAssertions; - using KafkaFlow.TypedHandler; + using KafkaFlow.Middlewares.TypedHandler; using Microsoft.VisualStudio.TestTools.UnitTesting; [TestClass] diff --git a/src/KafkaFlow.sln b/src/KafkaFlow.sln index 0b4f56ea7..1d55c3c82 100644 --- a/src/KafkaFlow.sln +++ b/src/KafkaFlow.sln @@ -25,12 +25,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Compression", "Compression" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Abstractions", "KafkaFlow.Abstractions\KafkaFlow.Abstractions.csproj", "{88808771-56BE-422B-94DC-7AB070F64E98}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.TypedHandler", "KafkaFlow.TypedHandler\KafkaFlow.TypedHandler.csproj", "{E47EF9E0-A1C7-4FF0-AEC5-143F52ED0FBE}" -EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Serializer", "KafkaFlow.Serializer\KafkaFlow.Serializer.csproj", "{B7197114-B1C7-49EC-8740-1E09233B2C40}" -EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Compressor", "KafkaFlow.Compressor\KafkaFlow.Compressor.csproj", "{D29EC709-33DE-4045-8F3B-EC6619CDB429}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.IntegrationTests", "KafkaFlow.IntegrationTests\KafkaFlow.IntegrationTests.csproj", "{36F459F4-8323-472A-A8C5-8C9D89F92012}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Serializer.NewtonsoftJson", "KafkaFlow.Serializer.NewtonsoftJson\KafkaFlow.Serializer.NewtonsoftJson.csproj", "{FC622AB0-6481-4249-8D83-27BC39912103}" @@ -59,8 +53,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Serializer.Schema EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Middlewares", "Middlewares", "{ED24B548-6F37-4283-A35B-F6015BFB7A34}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.BatchConsume", "KafkaFlow.BatchConsume\KafkaFlow.BatchConsume.csproj", "{C891D0DB-BE19-4D20-9E2F-61D413210F8D}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Sample.BatchOperations", "..\samples\KafkaFlow.Sample.BatchOperations\KafkaFlow.Sample.BatchOperations.csproj", "{DE8A8871-B19E-489D-8292-386A06A4CDFA}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.Extensions.Hosting", "KafkaFlow.Extensions.Hosting\KafkaFlow.Extensions.Hosting.csproj", "{7913342E-80FD-4094-B892-18DAA2E6948F}" @@ -125,18 +117,6 @@ Global {88808771-56BE-422B-94DC-7AB070F64E98}.Debug|Any CPU.Build.0 = Debug|Any CPU {88808771-56BE-422B-94DC-7AB070F64E98}.Release|Any CPU.ActiveCfg = Release|Any CPU {88808771-56BE-422B-94DC-7AB070F64E98}.Release|Any CPU.Build.0 = Release|Any CPU - {E47EF9E0-A1C7-4FF0-AEC5-143F52ED0FBE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {E47EF9E0-A1C7-4FF0-AEC5-143F52ED0FBE}.Debug|Any CPU.Build.0 = Debug|Any CPU - {E47EF9E0-A1C7-4FF0-AEC5-143F52ED0FBE}.Release|Any CPU.ActiveCfg = Release|Any CPU - {E47EF9E0-A1C7-4FF0-AEC5-143F52ED0FBE}.Release|Any CPU.Build.0 = Release|Any CPU - {B7197114-B1C7-49EC-8740-1E09233B2C40}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {B7197114-B1C7-49EC-8740-1E09233B2C40}.Debug|Any CPU.Build.0 = Debug|Any CPU - {B7197114-B1C7-49EC-8740-1E09233B2C40}.Release|Any CPU.ActiveCfg = Release|Any CPU - {B7197114-B1C7-49EC-8740-1E09233B2C40}.Release|Any CPU.Build.0 = Release|Any CPU - {D29EC709-33DE-4045-8F3B-EC6619CDB429}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {D29EC709-33DE-4045-8F3B-EC6619CDB429}.Debug|Any CPU.Build.0 = Debug|Any CPU - {D29EC709-33DE-4045-8F3B-EC6619CDB429}.Release|Any CPU.ActiveCfg = Release|Any CPU - {D29EC709-33DE-4045-8F3B-EC6619CDB429}.Release|Any CPU.Build.0 = Release|Any CPU {36F459F4-8323-472A-A8C5-8C9D89F92012}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {36F459F4-8323-472A-A8C5-8C9D89F92012}.Debug|Any CPU.Build.0 = Debug|Any CPU {36F459F4-8323-472A-A8C5-8C9D89F92012}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -177,10 +157,6 @@ Global {2E63A019-F8AD-4EC3-A80A-F560DEC7C5B4}.Debug|Any CPU.Build.0 = Debug|Any CPU {2E63A019-F8AD-4EC3-A80A-F560DEC7C5B4}.Release|Any CPU.ActiveCfg = Release|Any CPU {2E63A019-F8AD-4EC3-A80A-F560DEC7C5B4}.Release|Any CPU.Build.0 = Release|Any CPU - {C891D0DB-BE19-4D20-9E2F-61D413210F8D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {C891D0DB-BE19-4D20-9E2F-61D413210F8D}.Debug|Any CPU.Build.0 = Debug|Any CPU - {C891D0DB-BE19-4D20-9E2F-61D413210F8D}.Release|Any CPU.ActiveCfg = Release|Any CPU - {C891D0DB-BE19-4D20-9E2F-61D413210F8D}.Release|Any CPU.Build.0 = Release|Any CPU {DE8A8871-B19E-489D-8292-386A06A4CDFA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {DE8A8871-B19E-489D-8292-386A06A4CDFA}.Debug|Any CPU.Build.0 = Debug|Any CPU {DE8A8871-B19E-489D-8292-386A06A4CDFA}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -246,9 +222,6 @@ Global {ADAAA63C-E17C-4F1B-A062-3CCA071D75C2} = {ED24B548-6F37-4283-A35B-F6015BFB7A34} {0A782A83-B66D-4B99-9BE2-2B18AAD2E03C} = {ED24B548-6F37-4283-A35B-F6015BFB7A34} {88808771-56BE-422B-94DC-7AB070F64E98} = {068CB250-2804-4C7E-9490-17F432B9CE21} - {E47EF9E0-A1C7-4FF0-AEC5-143F52ED0FBE} = {ED24B548-6F37-4283-A35B-F6015BFB7A34} - {B7197114-B1C7-49EC-8740-1E09233B2C40} = {ADAAA63C-E17C-4F1B-A062-3CCA071D75C2} - {D29EC709-33DE-4045-8F3B-EC6619CDB429} = {0A782A83-B66D-4B99-9BE2-2B18AAD2E03C} {36F459F4-8323-472A-A8C5-8C9D89F92012} = {7A9B997B-DAAC-4004-94F3-32F6B88E0068} {FC622AB0-6481-4249-8D83-27BC39912103} = {ADAAA63C-E17C-4F1B-A062-3CCA071D75C2} {B86A51E3-7AC9-4EF8-BD2A-1ACC9EF0F5AE} = {292BCEDD-55B4-49BB-B8B2-24CD834FF2AA} @@ -259,7 +232,6 @@ Global {827620D3-2258-410E-A79E-E782ED42284C} = {58483813-0D7C-423E-8E7D-8FBF3E6CDB6D} {15C12D0C-FE8A-41F9-BBCF-5A963F05D5C7} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} {2E63A019-F8AD-4EC3-A80A-F560DEC7C5B4} = {ADAAA63C-E17C-4F1B-A062-3CCA071D75C2} - {C891D0DB-BE19-4D20-9E2F-61D413210F8D} = {ED24B548-6F37-4283-A35B-F6015BFB7A34} {DE8A8871-B19E-489D-8292-386A06A4CDFA} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} {7913342E-80FD-4094-B892-18DAA2E6948F} = {068CB250-2804-4C7E-9490-17F432B9CE21} {98C9826C-76F6-4C21-8A32-D55C2647905B} = {ADAAA63C-E17C-4F1B-A062-3CCA071D75C2} diff --git a/src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs b/src/KafkaFlow/Batching/BatchConsumeMessageContext.cs similarity index 97% rename from src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs rename to src/KafkaFlow/Batching/BatchConsumeMessageContext.cs index 9bc92fe33..41b994201 100644 --- a/src/KafkaFlow.BatchConsume/BatchConsumeMessageContext.cs +++ b/src/KafkaFlow/Batching/BatchConsumeMessageContext.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.BatchConsume +namespace KafkaFlow.Batching { using System; using System.Collections.Generic; diff --git a/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs b/src/KafkaFlow/Batching/BatchConsumeMiddleware.cs similarity index 99% rename from src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs rename to src/KafkaFlow/Batching/BatchConsumeMiddleware.cs index a6f0621df..70e1f323e 100644 --- a/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs +++ b/src/KafkaFlow/Batching/BatchConsumeMiddleware.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.BatchConsume +namespace KafkaFlow.Batching { using System; using System.Collections.Generic; diff --git a/src/KafkaFlow.BatchConsume/BatchConsumeExtensions.cs b/src/KafkaFlow/Batching/BatchingExtensions.cs similarity index 90% rename from src/KafkaFlow.BatchConsume/BatchConsumeExtensions.cs rename to src/KafkaFlow/Batching/BatchingExtensions.cs index 9e3326156..00b100aa4 100644 --- a/src/KafkaFlow.BatchConsume/BatchConsumeExtensions.cs +++ b/src/KafkaFlow/Batching/BatchingExtensions.cs @@ -1,14 +1,15 @@ -namespace KafkaFlow.BatchConsume +namespace KafkaFlow { using System; using System.Collections.Generic; + using KafkaFlow.Batching; using KafkaFlow.Configuration; using KafkaFlow.Consumers; /// /// no needed /// - public static class BatchConsumeExtensions + public static class BatchingExtensions { /// /// Accumulates a group of messages to be passed as a batch to the next middleware as just one message @@ -17,7 +18,7 @@ public static class BatchConsumeExtensions /// The maximum size of the batch, when this limit is reached the next middleware will be called /// The maximum time the middleware will wait to call the next middleware /// - public static IConsumerMiddlewareConfigurationBuilder BatchConsume( + public static IConsumerMiddlewareConfigurationBuilder AddBatching( this IConsumerMiddlewareConfigurationBuilder builder, int batchSize, TimeSpan batchTimeout) @@ -32,7 +33,7 @@ public static class BatchConsumeExtensions } /// - /// Gets the accumulated grouped by BatchConsume middleware + /// Gets the accumulated grouped by batching middleware /// /// The message context /// All the contexts in the batch diff --git a/src/KafkaFlow/Extensions/ConfigurationBuilderExtensions.cs b/src/KafkaFlow/Extensions/ConfigurationBuilderExtensions.cs index a5af5c77a..8189c22b5 100644 --- a/src/KafkaFlow/Extensions/ConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow/Extensions/ConfigurationBuilderExtensions.cs @@ -7,6 +7,8 @@ namespace KafkaFlow using KafkaFlow.Configuration; using KafkaFlow.Consumers; using KafkaFlow.Consumers.WorkersBalancers; + using KafkaFlow.Middlewares.Compressor; + using KafkaFlow.Middlewares.TypedHandler; /// /// Provides extension methods over and @@ -174,5 +176,90 @@ public static IConsumerConfigurationBuilder WithConsumerConfig(this IConsumerCon maxInstanceWorkers, TimeSpan.FromMinutes(5)); } + + /// + /// Adds typed handler middleware + /// + /// Instance of + /// A handler to configure the middleware + /// + public static IConsumerMiddlewareConfigurationBuilder AddTypedHandlers( + this IConsumerMiddlewareConfigurationBuilder builder, + Action configure) + { + var typedHandlerBuilder = new TypedHandlerConfigurationBuilder(builder.DependencyConfigurator); + + configure(typedHandlerBuilder); + + var configuration = typedHandlerBuilder.Build(); + + builder.Add( + resolver => new TypedHandlerMiddleware(resolver, configuration), + MiddlewareLifetime.Message); + + return builder; + } + + /// + /// Registers a middleware to decompress the message + /// + /// The middleware configuration builder + /// The compressor type + /// + [Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")] + public static IConsumerMiddlewareConfigurationBuilder AddDecompressor(this IConsumerMiddlewareConfigurationBuilder middlewares) + where T : class, IDecompressor + { + middlewares.DependencyConfigurator.AddTransient(); + return middlewares.AddDecompressor(resolver => resolver.Resolve()); + } + + /// + /// Registers a middleware to decompress the message + /// + /// The middleware configuration builder + /// The decompressor type that implements + /// A factory to create the instance + /// + [Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")] + public static IConsumerMiddlewareConfigurationBuilder AddDecompressor( + this IConsumerMiddlewareConfigurationBuilder middlewares, + Factory factory) + where T : class, IDecompressor + { + return middlewares.Add(resolver => new DecompressorConsumerMiddleware(factory(resolver))); + } + + /// + /// Registers a middleware to compress the message + /// It is highly recommended to use the producer native compression ('WithCompression()' method) instead of using the compressor middleware + /// + /// The middleware configuration builder + /// The compressor type that implements + /// + [Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")] + public static IProducerMiddlewareConfigurationBuilder AddCompressor(this IProducerMiddlewareConfigurationBuilder middlewares) + where T : class, ICompressor + { + middlewares.DependencyConfigurator.AddTransient(); + return middlewares.AddCompressor(resolver => resolver.Resolve()); + } + + /// + /// Registers a middleware to compress the message + /// It is highly recommended to use the producer native compression ('WithCompression()' method) instead of using the compressor middleware + /// + /// The middleware configuration builder + /// The compressor type that implements + /// A factory to create the instance + /// + [Obsolete("Compressors should only be used in backward compatibility scenarios, in the vast majority of cases native compression (producer.WithCompression()) should be used instead")] + public static IProducerMiddlewareConfigurationBuilder AddCompressor( + this IProducerMiddlewareConfigurationBuilder middlewares, + Factory factory) + where T : class, ICompressor + { + return middlewares.Add(resolver => new CompressorProducerMiddleware(factory(resolver))); + } } } diff --git a/src/KafkaFlow/KafkaFlow.csproj b/src/KafkaFlow/KafkaFlow.csproj index ddf21310f..a05e5e9e3 100644 --- a/src/KafkaFlow/KafkaFlow.csproj +++ b/src/KafkaFlow/KafkaFlow.csproj @@ -8,7 +8,9 @@ + + diff --git a/src/KafkaFlow.Compressor/CompressorProducerMiddleware.cs b/src/KafkaFlow/Middlewares/Compressor/CompressorProducerMiddleware.cs similarity index 79% rename from src/KafkaFlow.Compressor/CompressorProducerMiddleware.cs rename to src/KafkaFlow/Middlewares/Compressor/CompressorProducerMiddleware.cs index 89fa47d78..ed3ca3d42 100644 --- a/src/KafkaFlow.Compressor/CompressorProducerMiddleware.cs +++ b/src/KafkaFlow/Middlewares/Compressor/CompressorProducerMiddleware.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Compressor +namespace KafkaFlow.Middlewares.Compressor { using System; using System.Threading.Tasks; @@ -8,13 +8,13 @@ /// public class CompressorProducerMiddleware : IMessageMiddleware { - private readonly IMessageCompressor compressor; + private readonly ICompressor compressor; /// /// Initializes a new instance of the class. /// - /// Instance of - public CompressorProducerMiddleware(IMessageCompressor compressor) + /// Instance of + public CompressorProducerMiddleware(ICompressor compressor) { this.compressor = compressor; } diff --git a/src/KafkaFlow.Compressor/CompressorConsumerMiddleware.cs b/src/KafkaFlow/Middlewares/Compressor/DecompressorConsumerMiddleware.cs similarity index 57% rename from src/KafkaFlow.Compressor/CompressorConsumerMiddleware.cs rename to src/KafkaFlow/Middlewares/Compressor/DecompressorConsumerMiddleware.cs index 033babbdc..d52ae8360 100644 --- a/src/KafkaFlow.Compressor/CompressorConsumerMiddleware.cs +++ b/src/KafkaFlow/Middlewares/Compressor/DecompressorConsumerMiddleware.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.Compressor +namespace KafkaFlow.Middlewares.Compressor { using System; using System.Threading.Tasks; @@ -6,17 +6,17 @@ /// /// Middleware to decompress the messages when consuming /// - public class CompressorConsumerMiddleware : IMessageMiddleware + public class DecompressorConsumerMiddleware : IMessageMiddleware { - private readonly IMessageCompressor compressor; + private readonly IDecompressor decompressor; /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// - /// Instance of - public CompressorConsumerMiddleware(IMessageCompressor compressor) + /// Instance of + public DecompressorConsumerMiddleware(IDecompressor decompressor) { - this.compressor = compressor; + this.decompressor = decompressor; } /// @@ -28,7 +28,7 @@ public Task Invoke(IMessageContext context, MiddlewareDelegate next) $"{nameof(context.Message.Value)} must be a byte array to be decompressed and it is '{context.Message.Value.GetType().FullName}'"); } - var data = this.compressor.Decompress(rawData); + var data = this.decompressor.Decompress(rawData); return next(context.SetMessage(context.Message.Key, data)); } diff --git a/src/KafkaFlow.Serializer/ConsumerMiddlewareConfigurationBuilderExtensions.cs b/src/KafkaFlow/Middlewares/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs similarity index 62% rename from src/KafkaFlow.Serializer/ConsumerMiddlewareConfigurationBuilderExtensions.cs rename to src/KafkaFlow/Middlewares/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs index dc80a5ead..bae8b29c6 100644 --- a/src/KafkaFlow.Serializer/ConsumerMiddlewareConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow/Middlewares/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs @@ -2,29 +2,31 @@ { using System; using KafkaFlow.Configuration; + using KafkaFlow.Middlewares.Serializer; + using KafkaFlow.Middlewares.Serializer.Resolvers; /// /// No needed /// - public static class ConsumerMiddlewareConfigurationBuilderExtensions + public static class ConsumerMiddlewareConfigurationBuilder { /// /// Registers a middleware to deserialize messages /// /// The middleware configuration builder - /// A class that implements + /// A class that implements /// A class that implements /// - public static IConsumerMiddlewareConfigurationBuilder AddSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares) - where TSerializer : class, ISerializer + where TDeserializer : class, IDeserializer where TResolver : class, IMessageTypeResolver { middlewares.DependencyConfigurator.AddTransient(); - middlewares.DependencyConfigurator.AddTransient(); + middlewares.DependencyConfigurator.AddTransient(); - return middlewares.AddSerializer( - resolver => resolver.Resolve(), + return middlewares.AddDeserializer( + resolver => resolver.Resolve(), resolver => resolver.Resolve()); } @@ -32,20 +34,20 @@ public static class ConsumerMiddlewareConfigurationBuilderExtensions /// Register a middleware to deserialize messages /// /// The middleware configuration builder - /// A class that implements + /// A class that implements /// A class that implements - /// A factory to create a + /// A factory to create a /// A factory to create a /// - public static IConsumerMiddlewareConfigurationBuilder AddSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares, - Factory serializerFactory, + Factory serializerFactory, Factory resolverFactory) - where TSerializer : class, ISerializer + where TDeserializer : class, IDeserializer where TResolver : class, IMessageTypeResolver { return middlewares.Add( - resolver => new SerializerConsumerMiddleware( + resolver => new DeserializerConsumerMiddleware( serializerFactory(resolver), resolverFactory(resolver))); } @@ -54,32 +56,32 @@ public static class ConsumerMiddlewareConfigurationBuilderExtensions /// Registers a middleware to deserialize messages /// /// The middleware configuration builder - /// A class that implements + /// A class that implements /// - public static IConsumerMiddlewareConfigurationBuilder AddSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares) - where TSerializer : class, ISerializer + where TDeserializer : class, IDeserializer { - middlewares.DependencyConfigurator.AddTransient(); + middlewares.DependencyConfigurator.AddTransient(); - return middlewares.AddSerializer( - resolver => resolver.Resolve(), + return middlewares.AddDeserializer( + resolver => resolver.Resolve(), _ => new DefaultTypeResolver()); } /// /// Register a middleware to deserialize messages /// - /// A class that implements + /// A class that implements /// The middleware configuration builder - /// A factory to create a + /// A factory to create a /// - public static IConsumerMiddlewareConfigurationBuilder AddSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares, - Factory serializerFactory) - where TSerializer : class, ISerializer + Factory serializerFactory) + where TDeserializer : class, IDeserializer { - return middlewares.AddSerializer( + return middlewares.AddDeserializer( serializerFactory, _ => new DefaultTypeResolver()); } @@ -88,16 +90,16 @@ public static class ConsumerMiddlewareConfigurationBuilderExtensions /// Register a middleware to deserialize the message to a fixed type /// /// The middleware configuration builder - /// A factory to create a + /// A factory to create a /// The message type - /// A class that implements + /// A class that implements /// - public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares, - Factory serializerFactory) - where TSerializer : class, ISerializer + Factory serializerFactory) + where TDeserializer : class, IDeserializer { - return middlewares.AddSerializer( + return middlewares.AddDeserializer( serializerFactory, _ => new SingleMessageTypeResolver(typeof(TMessage))); } @@ -107,13 +109,13 @@ public static class ConsumerMiddlewareConfigurationBuilderExtensions /// /// The middleware configuration builder /// The message type - /// A class that implements + /// A class that implements /// - public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares) - where TSerializer : class, ISerializer + where TDeserializer : class, IDeserializer { - return middlewares.AddSingleTypeSerializer(typeof(TMessage)); + return middlewares.AddSingleTypeDeserializer(typeof(TMessage)); } /// @@ -121,17 +123,17 @@ public static class ConsumerMiddlewareConfigurationBuilderExtensions /// /// The middleware configuration builder /// The message type - /// A class that implements + /// A class that implements /// - public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares, Type messageType) - where TSerializer : class, ISerializer + where TDeserializer : class, IDeserializer { - middlewares.DependencyConfigurator.AddTransient(); + middlewares.DependencyConfigurator.AddTransient(); - return middlewares.AddSerializer( - resolver => resolver.Resolve(), + return middlewares.AddDeserializer( + resolver => resolver.Resolve(), _ => new SingleMessageTypeResolver(messageType)); } @@ -139,17 +141,17 @@ public static class ConsumerMiddlewareConfigurationBuilderExtensions /// Register a middleware to deserialize the message to a fixed type /// /// The middleware configuration builder - /// A factory to create a + /// A factory to create a /// The message type - /// A class that implements + /// A class that implements /// - public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeSerializer( + public static IConsumerMiddlewareConfigurationBuilder AddSingleTypeDeserializer( this IConsumerMiddlewareConfigurationBuilder middlewares, - Factory serializerFactory, + Factory serializerFactory, Type messageType) - where TSerializer : class, ISerializer + where TDeserializer : class, IDeserializer { - return middlewares.AddSerializer( + return middlewares.AddDeserializer( serializerFactory, _ => new SingleMessageTypeResolver(messageType)); } diff --git a/src/KafkaFlow.Serializer/ProducerMiddlewareConfigurationBuilder.cs b/src/KafkaFlow/Middlewares/Serializer/Configuration/ProducerMiddlewareConfigurationBuilder.cs similarity index 98% rename from src/KafkaFlow.Serializer/ProducerMiddlewareConfigurationBuilder.cs rename to src/KafkaFlow/Middlewares/Serializer/Configuration/ProducerMiddlewareConfigurationBuilder.cs index 4c05e5ae1..83b667532 100644 --- a/src/KafkaFlow.Serializer/ProducerMiddlewareConfigurationBuilder.cs +++ b/src/KafkaFlow/Middlewares/Serializer/Configuration/ProducerMiddlewareConfigurationBuilder.cs @@ -2,6 +2,8 @@ { using System; using KafkaFlow.Configuration; + using KafkaFlow.Middlewares.Serializer; + using KafkaFlow.Middlewares.Serializer.Resolvers; /// /// No needed diff --git a/src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs b/src/KafkaFlow/Middlewares/Serializer/DeserializerConsumerMiddleware.cs similarity index 78% rename from src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs rename to src/KafkaFlow/Middlewares/Serializer/DeserializerConsumerMiddleware.cs index a00a81d6b..191ae420a 100644 --- a/src/KafkaFlow.Serializer/SerializerConsumerMiddleware.cs +++ b/src/KafkaFlow/Middlewares/Serializer/DeserializerConsumerMiddleware.cs @@ -1,27 +1,28 @@ -namespace KafkaFlow +namespace KafkaFlow.Middlewares.Serializer { using System; using System.IO; using System.Threading.Tasks; + using KafkaFlow.Middlewares.Serializer.Resolvers; /// /// Middleware to deserialize messages when consuming /// - public class SerializerConsumerMiddleware : IMessageMiddleware + public class DeserializerConsumerMiddleware : IMessageMiddleware { - private readonly ISerializer serializer; + private readonly IDeserializer deserializer; private readonly IMessageTypeResolver typeResolver; /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// - /// Instance of + /// Instance of /// Instance of - public SerializerConsumerMiddleware( - ISerializer serializer, + public DeserializerConsumerMiddleware( + IDeserializer deserializer, IMessageTypeResolver typeResolver) { - this.serializer = serializer; + this.deserializer = deserializer; this.typeResolver = typeResolver; } @@ -61,7 +62,7 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next) using var stream = new MemoryStream(rawData); - var data = await this.serializer + var data = await this.deserializer .DeserializeAsync( stream, messageType, diff --git a/src/KafkaFlow.Serializer/DefaultTypeResolver.cs b/src/KafkaFlow/Middlewares/Serializer/Resolvers/DefaultTypeResolver.cs similarity index 94% rename from src/KafkaFlow.Serializer/DefaultTypeResolver.cs rename to src/KafkaFlow/Middlewares/Serializer/Resolvers/DefaultTypeResolver.cs index 6613db2c9..b1bf40385 100644 --- a/src/KafkaFlow.Serializer/DefaultTypeResolver.cs +++ b/src/KafkaFlow/Middlewares/Serializer/Resolvers/DefaultTypeResolver.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow +namespace KafkaFlow.Middlewares.Serializer.Resolvers { using System; using System.Threading.Tasks; diff --git a/src/KafkaFlow.Serializer/IMessageTypeResolver.cs b/src/KafkaFlow/Middlewares/Serializer/Resolvers/IMessageTypeResolver.cs similarity index 93% rename from src/KafkaFlow.Serializer/IMessageTypeResolver.cs rename to src/KafkaFlow/Middlewares/Serializer/Resolvers/IMessageTypeResolver.cs index a7b9190d8..44aba2ebf 100644 --- a/src/KafkaFlow.Serializer/IMessageTypeResolver.cs +++ b/src/KafkaFlow/Middlewares/Serializer/Resolvers/IMessageTypeResolver.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow +namespace KafkaFlow.Middlewares.Serializer.Resolvers { using System; using System.Threading.Tasks; diff --git a/src/KafkaFlow.Serializer/SingleMessageTypeResolver.cs b/src/KafkaFlow/Middlewares/Serializer/Resolvers/SingleMessageTypeResolver.cs similarity index 94% rename from src/KafkaFlow.Serializer/SingleMessageTypeResolver.cs rename to src/KafkaFlow/Middlewares/Serializer/Resolvers/SingleMessageTypeResolver.cs index ee4cbc35c..bc75f6144 100644 --- a/src/KafkaFlow.Serializer/SingleMessageTypeResolver.cs +++ b/src/KafkaFlow/Middlewares/Serializer/Resolvers/SingleMessageTypeResolver.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow +namespace KafkaFlow.Middlewares.Serializer.Resolvers { using System; using System.Threading.Tasks; diff --git a/src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs b/src/KafkaFlow/Middlewares/Serializer/SerializerProducerMiddleware.cs similarity index 95% rename from src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs rename to src/KafkaFlow/Middlewares/Serializer/SerializerProducerMiddleware.cs index 2adf0f443..b91f6fcb3 100644 --- a/src/KafkaFlow.Serializer/SerializerProducerMiddleware.cs +++ b/src/KafkaFlow/Middlewares/Serializer/SerializerProducerMiddleware.cs @@ -1,6 +1,7 @@ -namespace KafkaFlow +namespace KafkaFlow.Middlewares.Serializer { using System.Threading.Tasks; + using KafkaFlow.Middlewares.Serializer.Resolvers; using Microsoft.IO; /// diff --git a/src/KafkaFlow.TypedHandler/TypedHandlerConfiguration.cs b/src/KafkaFlow/Middlewares/TypedHandler/Configuration/TypedHandlerConfiguration.cs similarity index 79% rename from src/KafkaFlow.TypedHandler/TypedHandlerConfiguration.cs rename to src/KafkaFlow/Middlewares/TypedHandler/Configuration/TypedHandlerConfiguration.cs index 2f33874de..22d5407e2 100644 --- a/src/KafkaFlow.TypedHandler/TypedHandlerConfiguration.cs +++ b/src/KafkaFlow/Middlewares/TypedHandler/Configuration/TypedHandlerConfiguration.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.TypedHandler +namespace KafkaFlow.Middlewares.TypedHandler.Configuration { using System; diff --git a/src/KafkaFlow.TypedHandler/TypedHandlerConfigurationBuilder.cs b/src/KafkaFlow/Middlewares/TypedHandler/Configuration/TypedHandlerConfigurationBuilder.cs similarity index 98% rename from src/KafkaFlow.TypedHandler/TypedHandlerConfigurationBuilder.cs rename to src/KafkaFlow/Middlewares/TypedHandler/Configuration/TypedHandlerConfigurationBuilder.cs index f51775f90..acea49420 100644 --- a/src/KafkaFlow.TypedHandler/TypedHandlerConfigurationBuilder.cs +++ b/src/KafkaFlow/Middlewares/TypedHandler/Configuration/TypedHandlerConfigurationBuilder.cs @@ -1,10 +1,12 @@ -namespace KafkaFlow.TypedHandler +namespace KafkaFlow { using System; using System.Collections.Generic; using System.Linq; using System.Reflection; + using KafkaFlow.Middlewares.TypedHandler.Configuration; + /// /// Builder class for typed handler configuration /// diff --git a/src/KafkaFlow.TypedHandler/HandlerExecutor.cs b/src/KafkaFlow/Middlewares/TypedHandler/HandlerExecutor.cs similarity index 95% rename from src/KafkaFlow.TypedHandler/HandlerExecutor.cs rename to src/KafkaFlow/Middlewares/TypedHandler/HandlerExecutor.cs index 4a301ff08..0af39ecd0 100644 --- a/src/KafkaFlow.TypedHandler/HandlerExecutor.cs +++ b/src/KafkaFlow/Middlewares/TypedHandler/HandlerExecutor.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.TypedHandler +namespace KafkaFlow.Middlewares.TypedHandler { using System; using System.Collections.Concurrent; diff --git a/src/KafkaFlow.TypedHandler/HandlerTypeMapping.cs b/src/KafkaFlow/Middlewares/TypedHandler/HandlerTypeMapping.cs similarity index 95% rename from src/KafkaFlow.TypedHandler/HandlerTypeMapping.cs rename to src/KafkaFlow/Middlewares/TypedHandler/HandlerTypeMapping.cs index cc581f753..986ee1c2a 100644 --- a/src/KafkaFlow.TypedHandler/HandlerTypeMapping.cs +++ b/src/KafkaFlow/Middlewares/TypedHandler/HandlerTypeMapping.cs @@ -1,4 +1,4 @@ -namespace KafkaFlow.TypedHandler +namespace KafkaFlow.Middlewares.TypedHandler { using System; using System.Collections.Generic; diff --git a/src/KafkaFlow.TypedHandler/TypedHandlerMiddleware.cs b/src/KafkaFlow/Middlewares/TypedHandler/TypedHandlerMiddleware.cs similarity index 93% rename from src/KafkaFlow.TypedHandler/TypedHandlerMiddleware.cs rename to src/KafkaFlow/Middlewares/TypedHandler/TypedHandlerMiddleware.cs index 7269228db..37d308491 100644 --- a/src/KafkaFlow.TypedHandler/TypedHandlerMiddleware.cs +++ b/src/KafkaFlow/Middlewares/TypedHandler/TypedHandlerMiddleware.cs @@ -1,7 +1,8 @@ -namespace KafkaFlow.TypedHandler +namespace KafkaFlow.Middlewares.TypedHandler { using System.Linq; using System.Threading.Tasks; + using KafkaFlow.Middlewares.TypedHandler.Configuration; internal class TypedHandlerMiddleware : IMessageMiddleware {