-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How to store processed log to Kafka or Cassandra with Manual Commit? #51
Comments
@evans-ye Have you considered using |
Yeah, thanks for your prompt response. Let's say the stream saving data to my topic is slower, and the committing stream is faster. |
You definitely can do this (and I do) by using a broadcast and some flow stages so you'll commit only after the persist to Cassandra future completes. As a side benefit, this pseudo flow only sends the message itself to your persist method. Source(publisher) ~> broadcast ~> Flow.map(.message).mapAsync(1)(persist) ~ zip.in0 |
OKAY! I got your point. This looks perfect to me. I only have one more question, which I think is the benefit using your module: |
Looks like we done with this |
Sorry I can't find a proper place to ask so I just open up an issue here (Close it if you think it's improper).
I'm currently doing a PoC based on this awesome reactive kafka module with the manual commit feature.
And I'm struggling adding a Sink to store log into some permanent storage system such as Kafka or Cassandra.
From your sample code messages are being processed on-the-fly in processMessage function, if I need to store data into Kafka, then I need to replace offsetCommitSink to another Kafka Sink, but in that way I can use offsetCommitSink to stream back for commit.
Another approach is to use a saveToKafka function to store processed log into Kafka(Shown in below), which is the current implementation of my PoC.
Source(consumerWithOffsetSink.publisher)
.map(processMessage()) // your message processing
.map(saveToKafka())
.to(consumerWithOffsetSink.offsetCommitSink) // stream back for commit
.run()
Do you think this is the best practice to achieve my goal?
The text was updated successfully, but these errors were encountered: