Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request]: Schema Registry Samples is Missing an Avro Generic Example #532

Open
danielmpetrov opened this issue Feb 22, 2024 · 3 comments
Labels
enhancement New feature or request

Comments

@danielmpetrov
Copy link
Contributor

Is your request related to a problem you have?

I was running some samples in the Confluent.Kafka repo, and then some samples in this repo, and since I was interested in a GenericRecord example with Avro it made an impression on me that there are two Avro examples, and both use ISpecificRecord. In Contrast in the Confluent.Kafka repo they have one example for specific and one for generic, which is much more valuable in my opinion. See https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/AvroGeneric and https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/AvroSpecific for comparison.

Describe the solution you'd like

Add a generic avro record sample, in addition to the two specific record examples. Possibly replace one of the specific example with a generic for better value. Refer to https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/AvroGeneric.

Are you able to help bring it to life and contribute with a Pull Request?

Yes

Additional context

I might be able to make a PR for this, but first I must try it for myself. Hints appreciated.

@danielmpetrov danielmpetrov added the enhancement New feature or request label Feb 22, 2024
@danielmpetrov
Copy link
Contributor Author

For the sake of discussion and if anyone is researching this as I was, the below is the minimum implementation I've arrived at, based on https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/AvroGeneric.

.AddConsumer(
    consumer => consumer
        // ...
        .AddMiddlewares(
            middlewares => middlewares
                .AddSchemaRegistryAvroGenericRecordDeserializer()
                .AddTypedHandlers(
                    handlers => handlers
                        .AddHandler<AvroGenericRecordMessageHandler>())
        )
)

/// <summary>
/// Beware, this acts as a catch-all - it handles all Avro messages, even specific ones!
/// </summary>
public sealed class AvroGenericRecordMessageHandler : IMessageHandler<GenericRecord>
{
    public Task Handle(IMessageContext context, GenericRecord message)
    {
        Console.WriteLine(
            "Partition: {0} | Offset: {1} | Message: {2} | Avro Generic",
            context.ConsumerContext.Partition,
            context.ConsumerContext.Offset,
            message);

        return Task.CompletedTask;
    }
}

public sealed class SchemaRegistryGenericRecordTypeResolver : IMessageTypeResolver
{
    public ValueTask<Type> OnConsumeAsync(IMessageContext context) => ValueTask.FromResult(typeof(GenericRecord));

    public ValueTask OnProduceAsync(IMessageContext context) => default;
}

public static class ConsumerConfigurationBuilderExtensions
{
    public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryAvroGenericRecordDeserializer(
        this IConsumerMiddlewareConfigurationBuilder middlewares)
    {
        return middlewares.Add(
            resolver => new DeserializerConsumerMiddleware(
                new ConfluentAvroDeserializer(resolver),
                new SchemaRegistryGenericRecordTypeResolver()));
    }
}

public static class SampleData
{
    private static readonly RecordSchema s_schema = (RecordSchema)RecordSchema.Parse(
        @"{
            ""type"": ""record"",
            ""name"": ""User"",
            ""fields"": [
                {""name"": ""name"", ""type"": ""string""},
                {""name"": ""favorite_number"",  ""type"": ""long""},
                {""name"": ""favorite_color"", ""type"": ""string""}
            ]
        }"
    );

    public static GenericRecord CreateGenericUser(string name, long favouriteNumber, string favouriteColor)
    {
        var record = new GenericRecord(s_schema);
        record.Add("name", name);
        record.Add("favorite_number", favouriteNumber);
        record.Add("favorite_color", favouriteColor);
        return record;
    }
}

The main issue that wasn't very obvious is that the existing SchemaRegistryTypeResolver that's being registered with AddSchemaRegistryAvroDeserializer scans the assemblies for a class that matches the namespace and name, as specified by the Avro schema, when my intention was to deserialize in a GenericRecord. I think it's a bit unfortunate that the library doesn't support deserializing into a GenericRecord out of the box. Adding some custom code is not the end of the world, but it wasn't very intuitive why the GenericRecord handler wasn't invoked in my pipeline, since the underlying confluent SDK is fully capable of handling it. I had to debug before I realised what the issue is.

Question to the maintainers of the library, do you see value in adding this natively into the lib, or at the very least showcasing this in the samples and/or docs?

If this is to be added to the lib, an API must be defined, because there's more than one way of doing it. Above I've demonstrated a new deserializer, which is simple enough, but you may wish not to introduce a different registration method. Another option is to extend the existing SchemaRegistryTypeResolver to fallback to GenericRecord if it doesn't find a matching type, instead of returning null which eventually short-circuits the parent DeserializerConsumerMiddleware. While that solution is a bit more "magical", it will allow for chaining a generic handler as a "catch all" after specific handlers, e.g.

.AddConsumer(
    consumer => consumer
        // ...
        .AddMiddlewares(
            middlewares => middlewares
                .AddSchemaRegistryAvroGenericRecordDeserializer()
                .AddTypedHandlers(
                    handlers => handlers
                        .AddHandler<AvroMessageHandler>()
                        .AddHandler<AvroMessageHandler2>()
                        // everything that didn't match specific handlers will fall through to the generic handler
                        // assumes we modified the SchemaRegistryTypeResolver to fallback gracefully to typeof(GenericRecord)
                        .AddHandler<AvroGenericRecordMessageHandler>())
        )
)

Whilst I haven't tested the above design, I don't see a reason why it wouldn't work.

@JoaoRodriguesGithub
Copy link
Contributor

Hi @danielmpetrov,

Thank you for your suggestion, and we see value on adding this to KafkaFlow.

Since this will require a new implementation on KafkaFlow, we will work on it, and then open the PR.

Feel free to participate on it when we open the PR.

@danielmpetrov
Copy link
Contributor Author

Awesome, I'm looking forward to your design.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Development

No branches or pull requests

2 participants