Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[contrib] kafka: record-distributing Kafka consumer proxy for multiple upstream clusters (mesh-filter new feature) #24372

Closed
adamkotwasinski opened this issue Dec 6, 2022 · 8 comments
Assignees
Labels
enhancement Feature requests. Not bugs or questions.

Comments

@adamkotwasinski
Copy link
Contributor

adamkotwasinski commented Dec 6, 2022

Title: kafka: record-distributing Kafka consumer proxy for multiple upstream clusters (mesh-filter new feature)

Description:
To follow up with Kafka producer proxy, I'd like to have Envoy act as a proxy for multiple Kafka upstreams.
The consumer would want to point to Envoy instance, and with a poll() it would receive records from all matching upstreams.
Initially we could start with the same thing as Kafka producer handler does (policies based on topic names).

consumer-distributing-proxy

To provide record distribution (load sharing), Envoy would start N (librdkafka) consumers matching downstreams' requirements and connect them to upstream clusters. Envoy would then continuously poll for records (similar to what it does with producers now) and if there is "interest" (someone wants our records) then it'd immediately answer. Limiting the number of messages stored will be included.

On the protocol level this means the following requests from "simple" consumer flow need to be handled: ListOffsets (to handle consumer initially asking "where am I?") and Fetch (to do the real work).

Future work would then add another "proper proxy" mode where we would get into internals of upstream connection and then translate downstream FetchRequests into upstream ones (and here we might not need librdkafka anymore, as we get to a lower level).

Relevant Links:

@adamkotwasinski adamkotwasinski added enhancement Feature requests. Not bugs or questions. triage Issue requires triage labels Dec 6, 2022
@adamkotwasinski
Copy link
Contributor Author

/assign @adamkotwasinski

@adamkotwasinski
Copy link
Contributor Author

adamkotwasinski commented Jan 2, 2023

On holidays until mid-Jan, so don't get angry with me, repokitteh ;)

@github-actions
Copy link

github-actions bot commented Feb 1, 2023

This issue has been automatically marked as stale because it has not had activity in the last 30 days. It will be closed in the next 7 days unless it is tagged "help wanted" or "no stalebot" or other activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale stalebot believes this issue/PR has not been touched recently label Feb 1, 2023
@adamkotwasinski
Copy link
Contributor Author

Bad bot - this issue had a PR merged just a few days ago :)

@ggreenway ggreenway removed the stale stalebot believes this issue/PR has not been touched recently label Feb 1, 2023
@github-actions
Copy link

github-actions bot commented Mar 3, 2023

This issue has been automatically marked as stale because it has not had activity in the last 30 days. It will be closed in the next 7 days unless it is tagged "help wanted" or "no stalebot" or other activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale stalebot believes this issue/PR has not been touched recently label Mar 3, 2023
@adamkotwasinski
Copy link
Contributor Author

hey, sorry I was away and now I'm back a bit sick
btw. bad bot - issue mentioned 2 weeks ago

@github-actions github-actions bot removed the stale stalebot believes this issue/PR has not been touched recently label Mar 4, 2023
@adamkotwasinski
Copy link
Contributor Author

Fetch request interactions:

graph TD;
    FRH["FechRequestHolder"]
    KafkaMeshFilter <-.-> |"in-flight-reference\n(finish/abandon)"| FRH
    KafkaMeshFilter --> RP["RequestDecoder+RequestProcessor"]
    RP --> |"creates"| FRH

    RCP["<< interface >> \n RecordCallbackProcessor"]
    SCM["SharedConsumerManager"]
    SCM --> |subclass| RCP

    KC["RichKafkaConsumer"]
    FRH -.-> |registers itself with| SCM
    SCM -.-> |provides records| FRH
    SCM --> |stores mutliple| KC

    LibrdKafkaConsumer["<< librdkafka >> \n KafkaConsumer"]
    ConsumerPoller["<< thread >> \n consumer poller"]
    KC --> |wraps| LibrdKafkaConsumer
    KC --> |holds| ConsumerPoller
    ConsumerPoller --> |polls from| LibrdKafkaConsumer

    DSP["<< Envoy >> \n Dispatcher"]
    KafkaMeshFilter ---  DSP
    FRH -.-> |notifies on finish| DSP
Loading

@adamkotwasinski
Copy link
Contributor Author

Right now we have a very initial version of the consumer proxy implemented, so I'm going to close this issue.

It still has some limitations (see https://github.com/adamkotwasinski/envoy/blob/ff39845987af5cc5ff8796ad3b683f6a7e8dbe3f/docs/root/configuration/listeners/network_filters/kafka_mesh_filter.rst#consumer-proxy) but they can be proxied at a later phase.
And obviously I need to add some metrics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Feature requests. Not bugs or questions.
Projects
None yet
Development

No branches or pull requests

3 participants