Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Able to add metadata to the committed offset #563

Merged
merged 3 commits into from Sep 26, 2018

Conversation

Projects
None yet
2 participants
@johnclara
Copy link
Contributor

commented Sep 17, 2018

I would like to add metadata based on the ConsumerRecord along with each commit. The issue for this is here: #561

This PR will allow users to pass in a function to the source constructor which will add metadata to each commit based on the consumer record. Each committed offset is guaranteed to contain metadata constructed from the message as long as the commitRefreshInterval is infinite.

If the commitRefreshInterval is not infinite, then when the Consumer is assigned a partition it will also receive an offset without a message. If it has not processed a new message before the end of the interval, it will recommit this offset for this partition without adding metadata.
In order to solve this, we can read the last committed message each time a partition is assigned in order to reconstruct the metadata. This could negatively impact performance so it is not implemented in this PR.

@johnclara johnclara force-pushed the johnclara:commit-metadata branch from ecfc2b2 to bd52951 Sep 17, 2018

@johnclara johnclara force-pushed the johnclara:commit-metadata branch from bd52951 to f4c38fb Sep 18, 2018

@ennru
Copy link
Member

left a comment

Great addition! Thank you for finding a backwards-compatible way to make this work.

* when an offset is committed based on the record. This can be useful (for example) to store information about which
* node made the commit, what time the commit was made, the timestamp of the record etc.
*/
def committableSourceWithMetadata[K, V](

This comment has been minimized.

Copy link
@ennru

ennru Sep 18, 2018

Member

Would there be a better name for this construct?
eg. commitWithMetadataSource?

override protected def logic(shape: SourceShape[CommittableMessage[K, V]]) =
new SingleSourceLogic[K, V, CommittableMessage[K, V]](shape, settings, subscription)
with CommittableMessageWithMetadataBuilder[K, V] {
override def metadataFromRecord(record: ConsumerRecord[K, V]): String = _metadataFromRecord(record)

This comment has been minimized.

Copy link
@ennru

ennru Sep 18, 2018

Member

I believe this implementation may replace the existing committableSource version, if that one returns the empty String from metadataFromRecord.
Wouldn't that lead to fewer moving parts?

@johnclara
Copy link
Contributor Author

left a comment

Thanks for reviewing! I've tried to make the suggested changes

@ennru
Copy link
Member

left a comment

The code looks good now, but we'd like some documentation, as well.
Preferably in section https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#offset-storage-in-kafka-committing

@johnclara

This comment has been minimized.

Copy link
Contributor Author

commented Sep 19, 2018

Great! I'll start on updating the documentation

@johnclara johnclara force-pushed the johnclara:commit-metadata branch 2 times, most recently from eb0a55e to 4f92d05 Sep 19, 2018

@johnclara johnclara force-pushed the johnclara:commit-metadata branch from 4f92d05 to b04fc96 Sep 19, 2018

@johnclara

This comment has been minimized.

Copy link
Contributor Author

commented Sep 19, 2018

Okay! I've updated the documentation and have the unit tests passing. I also added a javadsl

@ennru

ennru approved these changes Sep 21, 2018

Copy link
Member

left a comment

LGTM

@ennru ennru merged commit f7f2271 into akka:master Sep 26, 2018

1 of 2 checks passed

continuous-integration/travis-ci/pr The Travis CI build failed
Details
typesafe-cla-validator All users have signed the CLA
Details

@ennru ennru added this to the 1.0-M1 milestone Sep 26, 2018

@ennru

This comment has been minimized.

Copy link
Member

commented Sep 26, 2018

Thank you for this great addition, please help us to improve it even more.

@ennru ennru referenced this pull request Sep 26, 2018

Closed

Commit With Metadata #561

@johnclara johnclara deleted the johnclara:commit-metadata branch Sep 28, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.