New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6049: extend Kafka Streams Scala API for cogroup (KIP-150) #7847
Conversation
Call for review @bbejeck @vvcephei @wcarlson5 |
* @return an instance of {@link SessionWindowedCogroupedKStream} | ||
*/ | ||
SessionWindowedCogroupedKStream<K, VOut> windowedBy(final SessionWindows sessionWindows); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just re-ordered this method to have the same ordering over multiple classes (time-window always before session-window)
implicit def wrapSessionWindowedKStream[K, V](inner: SessionWindowedKStreamJ[K, V]): SessionWindowedKStream[K, V] = | ||
new SessionWindowedKStream[K, V](inner) | ||
|
||
implicit def wrapTimeWindowedKStream[K, V](inner: TimeWindowedKStreamJ[K, V]): TimeWindowedKStream[K, V] = | ||
new TimeWindowedKStream[K, V](inner) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this one up to get better overview/grouping. The three below are new
@@ -525,7 +525,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { | |||
* @see `org.apache.kafka.streams.kstream.KStream#join` | |||
* @deprecated since 2.4. Use [[KStream#join(KStream, ValueJoiner, JoinWindows, StreamJoined)]] instead. | |||
*/ | |||
@deprecated | |||
@deprecated("use join(KStream, ValueJoiner, JoinWindows, StreamJoined) instead", "2.4") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a side fix -- got a warning about it
@@ -21,15 +21,18 @@ package org.apache.kafka.streams.scala | |||
package kstream | |||
|
|||
import org.apache.kafka.streams.kstream.internals.KTableImpl | |||
import org.apache.kafka.streams.kstream.{KTable => KTableJ, SessionWindowedKStream => SessionWindowedKStreamJ, _} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only some side cleanup in this class
_ | ||
} | ||
import org.apache.kafka.streams.scala.FunctionsCompatConversions._ | ||
import org.apache.kafka.streams.scala.ImplicitConversions._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IntelliJ complains that this is an unused import, but without it, it does not compile at command line -- what might I do wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can explicitly import the implicit conversions that you're using. Actually, I would recommend not even using them implicitly, but just to explicitly invoke the relevant conversion methods when you need them. (i.e., implicit conversions are syntactic sugar for users of the public API, we should try to instead write explicit and maintainable code in our internal implementations)
It should be pretty obvious which ones you need if you just delete this line and see what fails to compile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on this commend, I did a proper cleanup PR: #7852
We should merge the cleanup PR first, and I can rebase this PR afterwards.
_ | ||
} | ||
import org.apache.kafka.streams.scala.FunctionsCompatConversions._ | ||
import org.apache.kafka.streams.scala.ImplicitConversions._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @mjsax , thanks for adding this!
I answered your question. Otherwise, this looks reasonable, but I no longer test any Scala APIs that don't have "black box" tests... I.e., we need to write a test that uses the API just like users will to make sure it actually works the way we think it will.
_ | ||
} | ||
import org.apache.kafka.streams.scala.FunctionsCompatConversions._ | ||
import org.apache.kafka.streams.scala.ImplicitConversions._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can explicitly import the implicit conversions that you're using. Actually, I would recommend not even using them implicitly, but just to explicitly invoke the relevant conversion methods when you need them. (i.e., implicit conversions are syntactic sugar for users of the public API, we should try to instead write explicit and maintainable code in our internal implementations)
It should be pretty obvious which ones you need if you just delete this line and see what fails to compile.
@mjsax failures are relevant |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c229ddb
to
18176b9
Compare
import org.scalatest.{FlatSpec, Matchers} | ||
import org.scalatestplus.junit.JUnitRunner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some side cleanup. org.scalatest.junit.JUnitRunner
is deprecated
5c63bc9
to
7da4bdc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updated PR @mjsax, LGTM
Java 8 failed with
Updated existing flaky test Jira tickets Java 11 passed retest this please |
Java 8 failed (one overlapping test failure):
Java 11 passed. Retest this please. |
Java 11 passed. Java 8 failed (different test):
Merging this. |
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)