Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-5488: A method-chaining way to build branches for topology #6164

Closed
wants to merge 2 commits into from
Closed

KAFKA-5488: A method-chaining way to build branches for topology #6164

wants to merge 2 commits into from

Conversation

inponomarev
Copy link
Contributor

@inponomarev inponomarev commented Jan 17, 2019

KStream.branch method uses varargs to supply predicates and returns array of streams ('Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates').

This is poor API design that makes building branches very inconvenient because of 'impedance mismatch' between arrays and generics in Java language.

  • In general, the code have low cohesion: we need to define predicates in one place, and respective stream processors in another place of code.

  • If the number of predicates is predefined, this method forces us to use 'magic numbers' to extract the right branch from the result (see examples here).

  • If we need to build branches dynamically (e. g. one branch per enum value) we inevitably have to deal with 'generic arrays' and 'unchecked typecasts'.

The proposed class KafkaStreamBrancher introduces new standard way to build branches on top of KStream.

Instead of

KStream<String, String> source_o365_user_activity = builder.stream("source");
KStream<String, String>[] branches = source_o365_user_activity.branch( 
      (key, value) -> value.contains("A"),
      (key, value) -> value.contains("B"),
      (key, value) -> true
     );

branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

we can use

new KafkaStreamsBrancher<String, String>()
   .branch((key, value) -> value.contains("A"), ks->ks.to("A"))
   .branch((key, value) -> value.contains("B"), ks->ks.to("B"))
   //default branch should not necessarily be defined in the end!
   .defaultBranch(ks->ks.to("C"))
   .onTopOf(builder.stream("source"))

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mjsax
Copy link
Member

mjsax commented Jan 17, 2019

@inponomarev Thanks for the PR. Do you have a Jira Account so we can assign the ticket to you? Also note, that as mentioned on the ticket, this change will imply a public API change and requires a KIP: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

Let us know if you have any questions about the KIP process.

@inponomarev
Copy link
Contributor Author

Hello @mjsax ! My JIRA/Confluence account is iponomarev, however I cannot create a KIP -- they tell 'you don't have permission'

@mjsax
Copy link
Member

mjsax commented Jan 18, 2019

@inponomarev Thanks for the details. I added you to the list of contributors in JIRA and assigned the ticket to you (you can know also self assign tickets). I also granted write permissions to the wiki.

Looking forward to your KIP.

Just a heads up: the 2.2 KIP deadline is very close, and thus, your KIP might not make it into 2.2. (cf. https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=100827512)

checkstyle fixes
@inponomarev
Copy link
Contributor Author

@mjsax here is the KIP418.

@mjsax
Copy link
Member

mjsax commented Jan 18, 2019

Did you read the instruction how the KIP process works? Just writing it is not sufficient -- it must be discussed, voted and accepted, too. Let me know if you have question.

@inponomarev
Copy link
Contributor Author

The question is -- what should I do next?

@mjsax
Copy link
Member

mjsax commented Jan 18, 2019

As explained in the wiki, a KIP needs to be discussed on the dev mailing list. If you are not signed up there yet, please subscribe (cf. https://kafka.apache.org/contact). Then you start a DISCUSS thread for this KIP (example: http://mail-archives.apache.org/mod_mbox/kafka-dev/201901.mbox/%3CCANZZNGyeVw8q%3Dx9uOQS-18wL3FEmnOwpBnpJ9x3iMLdXY3gEug%40mail.gmail.com%3E -- please use the corresponding Subject for the email thread).

It's you responsibility to lead the discussion, to get feedback on the design and to get to final agreement. If the discussion is finished, you can call for a VOTE with a new email thread (cf http://mail-archives.apache.org/mod_mbox/kafka-dev/201901.mbox/%3CCANZZNGwOuFN59WiJmQN5LvFe67ENdruh7mkLdy5wDG2XzkcMqg%40mail.gmail.com%3E). The vote must but running for at least 72h and you need 3 binding vote to get the KIP accepted.

@bbejeck
Copy link
Contributor

bbejeck commented Mar 21, 2019

@inponomarev Any updates on this? Do you need any input on doing a KIP?

@inponomarev
Copy link
Contributor Author

Hi @bbejeck ! Well, I wrote a KIP-418 and started a discussion thread but no one seems to care. I am new to Kafka community and I don't know if anything else is expected from me.

Meanwhile, in spring-kafka they already have KafkaStreamBrancher on master branch.

@bbejeck
Copy link
Contributor

bbejeck commented Mar 21, 2019

Hi @inponomarev sorry about that, I must have missed the discussion thread for the KIP. I'll ping the mailing list about the KIP and see if we can get this jump-started again.

@mjsax
Copy link
Member

mjsax commented Mar 22, 2019

am new to Kafka community and I don't know if anything else is expected from me.

@inponomarev Sorry that this slipped... The mailing list rather busy. It's recommended that you reply to your own DISCUSS thread on a regular basis if nobody reacts (eg, after some days or maybe a week of silence), and ask for feedback. This bumps the thread and creates awareness that somebody is blocked and waits for feedback. This should ensure that it does not slip.

Thus, you did all correct. Nothing is "expected" from you, however, you should drive the KIP discussion by nagging people if nothing happens. It's a good thing that the project is very active and that the mailing list is busy. The downside is, the one needs to make some "noise" sometimes to get attention. Hope this makes sense.

@daliclass
Copy link

Would really like to see this merged, If anything I can do to help please let me know 👍

@inponomarev
Copy link
Contributor Author

Hi @daliclass! if you are using spring-kafka, KafkaStreamBrancher is already there.

Meanwhile the KIP is being discussed and it seems that in the end we will have something even more convenient.

@daliclass
Copy link

@inponomarev Thanks for the link, I have a project that is purely using the kafka streams library so look forward to something more convenient. Cheers for the link for spring, very helpful I am sure it will come in handy 👍

@mjsax
Copy link
Member

mjsax commented Oct 7, 2019

I am closing this PR due to long inactivity on the KIP. Feel free to re-open the PR an continue at any point @inponomarev.

@mjsax mjsax closed this Oct 7, 2019
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants