Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Schema Registry support #964

Closed
harochau-gamemod opened this issue Nov 17, 2023 · 6 comments
Closed

Feature: Schema Registry support #964

harochau-gamemod opened this issue Nov 17, 2023 · 6 comments
Assignees
Labels
enhancement New feature or request

Comments

@harochau-gamemod
Copy link

harochau-gamemod commented Nov 17, 2023

Some Kafka libraries like confluent-kafka-python, https://github.com/marcosschroh/python-schema-registry-client and https://github.com/lsst-sqre/kafkit provide various schema registry integration capabilities (HTTP client for schema registry, caching schema registry requests, serialization/deserialization, Avro, Protobuf, JSON-schema support)

It would be nice to have such capabilities built-in with declarative style API.
Further more, I suggest providing both schema-first (use model.avro files) and code-first implementations (generate and register avro schema from Pydantic models or Dataclasses using https://github.com/marcosschroh/dataclasses-avroschema) of schema registry integration.

Using default code example, I envision the following usage pattern.

from fastapi import FastAPI
from dataclasses_avroschema.avrodantic import AvroBaseModel

from faststream.kafka.fastapi import KafkaRouter
from faststream.kafka.schema_registry import SchemaRegistryConfig

router = KafkaRouter(
"localhost:9092",
schema_registry_config=SchemaRegistryConfig(
    format="avro",
    subject_naming_strategy="topic-key-value",
    schema_storage_strategy="from-pydantic-model",
    url="localhost:8081"
))

class Incoming(AvroBaseModel):
    m: dict

@router.subscriber("test")
@router.publisher("response")
async def hello(m: Incoming):
    return {"response": "Hello, world!"}

app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)

Describe alternatives you've considered
Each application may try to implement it's own integration with schema registry

@harochau-gamemod harochau-gamemod added the enhancement New feature or request label Nov 17, 2023
@Lancetnik
Copy link
Member

I am not a big Kafka specialist, but your code example with SchemaRegistryConfig looks great. @sternakt what do you think about this feature?

@davorrunje
Copy link
Collaborator

We are currently creating a new Kafka broker based on the Confluent library. We'll look into this as soon as the new broker support is finished.

@davorrunje davorrunje assigned kumaranvpl and unassigned sternakt Dec 13, 2023
@devova
Copy link

devova commented Jan 30, 2024

Using faststream = {version = "0.4.0rc0", extras = ["confluent"]}
I currently use build in confluent features, works pretty well

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from faststream.confluent import KafkaMessage


def get_avro_deserializer(settings: Annotated[KafkaSettings, Depends(get_kafka_settings)]) -> AvroDeserializer:
    """
    :raises SchemaRegistryError: if schema was not registered before
    """
    sr = SchemaRegistryClient({"url": str(settings.schema_registry_url)})
    latest_version = sr.get_latest_version(f"{settings.topic}-value")
    return AvroDeserializer(schema_registry_client=sr, schema_str=latest_version.schema.schema_str)


@apply_types
async def decode_message(
    msg: KafkaMessage,
    deserializer: Annotated[AvroDeserializer, Depends(get_avro_deserializer)],
    settings: Annotated[KafkaSettings, Depends(get_kafka_settings)],
):
    ctx = SerializationContext(settings.topic, MessageField.VALUE)
    return deserializer(msg.body, ctx)


@broker.subscriber(settings.topic,  decoder=decode_message)
async def consume(msg: PydanticModel)
    ...

@davorrunje
Copy link
Collaborator

@devova can we close this issue? Is it working for you?

@devova
Copy link

devova commented Mar 7, 2024

Sorry there was an bug in my example. You can't decorate with apply_types a decoder f-n

@apply_types
async def decode_message():
    ...

It would properly works only for the first time of f-n invocation, then msg: KafkaMessage would be passed as empty. And it is a behaviour of fast_depends.

The working example have to initiate deserializer explicitly:

async def decode_message(msg: KafkaMessage):
    deserializer = get_avro_deserializer(settings)
    return deserializer(msg.body, None)

It would be good to add documentation with examples how to deal with different schema registries. Again there are many registries and coupling router with a particular registry isn't a good idea, unless there will be a some Abstract class first, so later community can add implementation

@davorrunje
Copy link
Collaborator

Opening a new issue #1297 and closing this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Archived in project
Development

No branches or pull requests

6 participants