Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix type inference on joins and aggregates on Scala API #5019

Merged
merged 1 commit into from
May 20, 2018

Conversation

joan38
Copy link
Contributor

@joan38 joan38 commented May 15, 2018

The type inference doesn't currently work for the join functions in Scala as it doesn't know yet the types of the given KStream[K, V] or KTable[K, V].

The fix here is to curry the joiner function. I personally prefer this notation but this also means it differs more from the Java API.
I believe the diff with the Java API is worth in this case as it's not only solving the type inference but also fits better the Scala way of coding (ex: fold).

Moreover any Scala dev will bug and spend little time on these functions trying to understand why the type inference is not working and then get frustrated to be obliged to be explicit here where it's not harmful to be inferred.

The change is fairly straight forward but is also breaking, the good news is that we didn't release the Scala API yet, so this is perfect time to do this change.

This would also need some documentation update that I'm happy to do if there is positive feedback on this.

Thanks

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@joan38
Copy link
Contributor Author

joan38 commented May 15, 2018

@ijuma @debasishg @guozhangwang Let me know what you think about this.

@debasishg
Copy link
Contributor

One of the points we tried to adhere to was to keep the diff with the Java API to a minimum. There may be more scope of such optimizations (or rather conciseness or making code more idiomatic Scala) in the Scala API, which we intentionally didn't do.

And personally I am not sure which version is more readable. The one with type inference is concise no doubt, but very often I find myself struggling to see what are the types of the parameters to the lambda.

// without type inference
.leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))

// type inferred
.leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))

On the whole I am +0 on this change, wouldn't mind, if done. But I would leave it to @guozhangwang and @ijuma for the final call.

@joan38
Copy link
Contributor Author

joan38 commented May 15, 2018

Indeed there is cases where explicit types are better and some other cases where it's too much info.
The question here is, should the API restrict this choice and not give the liberty to the developer as he is used to in the Scala collections or even in the other Kafka Streams APIs?

Outside of this question, it took me some time to understand why it wasn't compiling as all the params were matching the documentation. I had to add all the types explicitly to finally understand what's going on and then workout which ones I can remove to leave only the required ones.
This user experience (maybe isolated? maybe not?) is not great IMHO and feels more like a "bug" rather than a feature to force the explicit typing.

@joan38 joan38 changed the title Fix type inference on joins for Scala Fix type inference on joins on Scala API May 15, 2018
@joan38
Copy link
Contributor Author

joan38 commented May 16, 2018

I added the change for aggregate too. Let's see what you guys think @guozhangwang @ijuma

aggregator: (K, V, VR) => VR,
materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
inner.aggregate(initializer.asInitializer, aggregator.asAggregator, materialized)
def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a meta comment: why we want to separate initializer with other parameters in all the places?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang - As I mentioned in my comment #5019 (comment), this has been done to aid the Scala compiler do better type inferencing. Scala compiler does type inferencing from left to right in groups. So if you place the initializer in a separate group, then u get better type inferencing when specifying the initializer in the usage. There are a few areas where we can do this for better type inference. I have some reservations on this as I mentioned in the comment I linked earlier. May be you or @ijuma can take a call on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at the signature of foldLeft in Scala:

def foldLeft[B](z: B)(op: (B, A) => B): B

The zero (initializer) is curried away from the aggregator function.

Many think this is a style choice but it's not. If we wanted to implement that in one group of parameter as:

def foldLeft[B](z: B, op: (B, A) => B): B

The type parameter B would not be "fixed" by the type inference before we get into the aggregator function.
With currying you actually apply the first group of parameter so that we know what B are we talking about and then we apply the function.

Here exactly the same happens with aggregate, the type inference is not able to tell you from the initializer what the types of the function will be and therefore ask you to write all the types explicitly. Forcing such a thing in Scala APIs is not very common.

windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
*/
def join[VO, VR](otherStream: KStream[K, VO], windows: JoinWindows)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note in Java API we have joiner before the windows. Any specific reasons to switch the ordering here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as my last comment ..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also be:

def join[VO, VR](otherStream: KStream[K, VO])(joiner: (V, VO) => VR, windows: JoinWindows)

And in fact now that I wrote it I think it would be better to match the Java API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems different with what you proposed:

def join[VO, VR](otherStream: KStream[K, VO])(joiner: (V, VO) => VR, windows: JoinWindows)

joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
inner.join[VT, VR](table.inner, joiner.asValueJoiner, joined)
*/
def join[VT, VR](table: KTable[K, VT])(joiner: (V, VT) => VR)(implicit joined: Joined[K, V, VT]): KStream[K, VR] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, not sure what's the rationale of the refactoring here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as my last comment ..

@@ -142,7 +142,7 @@ class TopologyTest extends JUnitSuite {

val clicksPerRegion: KTable[String, Long] =
userClicksStream
.leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))
.leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @debasishg mentioned, we want to keep the scala API to be as consistent as possible with the java API. Are specific reasons for the changes here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above ..

Copy link
Contributor Author

@joan38 joan38 May 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the ability to remove (or keep) the types.
Also this doesn't go too fare from the Java API since it's just about currying parameters and has the benefit of bringing the API closer to the Scala "way of doing".

@guozhangwang
Copy link
Contributor

@joan38 @debasishg Thanks for your detailed explanations in follow-up, now I got it finally.

I think I'm +0.5 on the proposed changes except for stream-stream windowed joins I'd prefer join[VO, VR](otherStream: KStream[K, VO])(joiner: .. since I think such type inference is worthwhile to have with the trade of a bit more diff with the Java API.

@joan38
Copy link
Contributor Author

joan38 commented May 17, 2018

Thanks for your thoughts @guozhangwang.
I just pushed again bring back the original parameter order as:

def join[VO, VR](otherStream: KStream[K, VO])(joiner: (V, VO) => VR, windows: JoinWindows)

Essentially bringing the change to a simple currying only and not reordering the parameters in regards to the Java API.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joan38 thanks. Left some more comments.

windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
inner.join[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
*/
def join[VO, VR](otherStream: KStream[K, VO], windows: JoinWindows)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems different with what you proposed:

def join[VO, VR](otherStream: KStream[K, VO])(joiner: (V, VO) => VR, windows: JoinWindows)

windows: JoinWindows)(implicit joined: Joined[K, V, VO]): KStream[K, VR] =
inner.leftJoin[VO, VR](otherStream.inner, joiner.asValueJoiner, windows, joined)
def leftJoin[VO, VR](otherStream: KStream[K, VO], windows: JoinWindows)(
joiner: (V, VO) => VR
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed I forgot this one

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... this seems still not correct to me? Should it be

(otherStream: KStream[K, VO])(joiner: (V, VO) => VR, windows: JoinWindows)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it's good

def outerJoin[VO, VR](other: KTable[K, VO],
joiner: (V, VO) => VR,
materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
def outerJoin[VO, VR](other: KTable[K, VO], materialized: Materialized[K, VR, ByteArrayKeyValueStore])(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here? Also the parameters are re-ordered.

def leftJoin[VO, VR](other: KTable[K, VO],
joiner: (V, VO) => VR,
materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
def leftJoin[VO, VR](other: KTable[K, VO], materialized: Materialized[K, VR, ByteArrayKeyValueStore])(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here? Also the parameters are re-ordered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed I forgot this one

def join[VO, VR](other: KTable[K, VO],
joiner: (V, VO) => VR,
materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
def join[VO, VR](other: KTable[K, VO], materialized: Materialized[K, VR, ByteArrayKeyValueStore])(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here? Also the parameters are re-ordered.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed I forgot this one

@joan38
Copy link
Contributor Author

joan38 commented May 18, 2018

Addressed the comments.
Please note also that the function () => initializer becomes a call by name => initializer.

@ijuma
Copy link
Contributor

ijuma commented May 18, 2018

My general opinion is that it makes sense to help the inferencer if it can be done in a reasonably consistent manner in the Scala API and if the resulting code doesn't become harder to read (i.e. if you provided the types before/after, the after case should not be harder to read, ideally).

@ijuma
Copy link
Contributor

ijuma commented May 18, 2018

I'm not too concerned about the APIs looking a bit different than the Java APIs, it's more important to make the Scala API as good as they can be while maintaining the general style (IMO).

@@ -88,7 +88,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite
userClicksStream

// Join the stream against the table.
.leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))
.leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, why don't we just use Option(region).getOrElse("UNKNOWN")?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea but I can change this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma It was taken from a Java example and hence was not changed. Thought by not doing Option we can save an allocation :-)

@joan38
Copy link
Contributor Author

joan38 commented May 19, 2018

@debasishg @ijuma @guozhangwang what's the general opinion on this? Should we go forward?

@guozhangwang
Copy link
Contributor

Please note also that the function () => initializer becomes a call by name => initializer.

That looks fine to me.

@guozhangwang
Copy link
Contributor

@joan38 Seems people are not against proceeding with this change. I am retriggering another Jenkins test and will merge after it is fixed.

@guozhangwang
Copy link
Contributor

retest this please

1 similar comment
@guozhangwang
Copy link
Contributor

retest this please

@joan38
Copy link
Contributor Author

joan38 commented May 20, 2018

All good @guozhangwang 👍

@guozhangwang guozhangwang merged commit 96cda0e into apache:trunk May 20, 2018
@guozhangwang
Copy link
Contributor

Merged to trunk. @joan38 as you mentioned are there are further documentation updates that you'd want to make?

@joan38 joan38 changed the title Fix type inference on joins on Scala API Fix type inference on joins and aggregates on Scala API May 21, 2018
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
The type inference doesn't currently work for the join functions in Scala as it doesn't know yet the types of the given KStream[K, V] or KTable[K, V].

The fix here is to curry the joiner function. I personally prefer this notation but this also means it differs more from the Java API.
I believe the diff with the Java API is worth in this case as it's not only solving the type inference but also fits better the Scala way of coding (ex: fold).

Moreover any Scala dev will bug and spend little time on these functions trying to understand why the type inference is not working and then get frustrated to be obliged to be explicit here where it's not harmful to be inferred.

Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>, Ismael Juma <ismael@juma.me.uk>
@mowczare
Copy link
Contributor

mowczare commented Aug 13, 2018

Excuse me guys, how could this pull request get merged and released in version 2.0.0 without any tests?
And it's been 3 months already, I hope I am just blind, can't use search and there are already some issues pointing it out, but still, if not and I am the only user of Kafka Streams Scala API in the world...

Let's take a look at your changes:
file KTable.scala:

def join[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR): KTable[K, VR]
def join[VO, VR](other: KTable[K, VO])(
   joiner: (V, VO) => VR,	
   materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR]
): KTable[K, VR]

Currying is a nice feature of Scala indeed. Let's use it, shall we?

join(kTable)(joiner)
error: ambiguous reference to overloaded definition

Wait. Oh, maybe somehow I need to pass materializer:

join(kTable)(joiner, materializer)
error: ambiguous reference to overloaded definition

Wait... Did you just create an interface that cannot be used in another way than reflection? As a user I kinda don't really want to use scala.reflect.runtime.universe.runtimeMirror(getClass.getClassLoader) every time i need to join my KTable.

To be more generic, if you have:

abstract class B {
  def a(a: String)(b: Double, c: String): Unit
  def a(a: String)(b: Double): Unit
}
val b: B

Then you cannot call:

  • b.a("niceFunction")
  • b.a("niceFunction")(69.0, "wowSuchCurrying")
  • b.a("lolThisDoesNotWorkEither")(42.0)

cause everytime compiler will fail with:
error: ambiguous reference to overloaded definition

Like even if I'm wrong and there is some other magical way of calling those methods, this is disappointing that I spent 2 hours figuring out how to use your new interfaces and the only solution I came up with was the reflection.

Tomorrow I will do a pull request with reverting old Lightbend interfaces, till then I'm waiting for you to prove me I'm wrong.

@joan38
@debasishg
@ijuma
@guozhangwang

@joan38
Copy link
Contributor Author

joan38 commented Aug 13, 2018

Hi @mowczare,

Thanks for taking the time to find the original PR and reporting this issue here.
I didn't even had time to have a look at the newly released Kafka 2.0.0 myself since it got out 😄. Let me have a look at it.

@mowczare
Copy link
Contributor

I see, time is a precious thing, indeed. One shall not simply waste it for tests I guess.

@joan38
Copy link
Contributor Author

joan38 commented Aug 13, 2018

@mowczare I know right? I'm living in London, it's 21h I had a long day at work and I pass my evenings helping the Scala community on various open source projects, all of that for free just because I'm passionate about it.
I just find scandalous that I made a mistake by not spending more time on tests you are right.

EDIT: I truly agree with you, but maybe you should try to be a bit nicer in your messages.

@mowczare
Copy link
Contributor

It's not your fault obviously, no need to take it personally. I still hope that it works and it's just me and my lack of experience. However, I'm really concerned, if I'm right, how this could go on 2.0.0 without proper testing.

@joan38
Copy link
Contributor Author

joan38 commented Aug 13, 2018

Unfortunately, I think you are right for both the fact that it's broken and that it got out without testing.
I'm surprised the compiler is ok with this code.

I will raise a PR now to fix this (with tests 😉).
Meanwhile I can only see a monkey patch or a revert to Lightbend's interfaces, unless you have other ideas?
I keep you updated.

@mowczare
Copy link
Contributor

Wonderful! Your time will be much appreciated and hopefully, a 2.0.1 version will be up soon.
As for the compiler, it is a common case for 9 years thanks to Mr. Oderski:
https://issues.scala-lang.org/browse/SI-2628

@ijuma
Copy link
Contributor

ijuma commented Aug 18, 2018

Thanks for submitting a fix so quickly @joan38. It's true that there was a gap in testing here and it's unfortunate that it was not noticed before the PR was merged and the release was published. We should strive to do better in the future. Irrespective of that, we are thankful that @joan38 has spent his personal time to improve the usability of the Scala API for everyone else.

@mjsax mjsax added the streams label Aug 21, 2018
guozhangwang pushed a commit that referenced this pull request Aug 21, 2018
Join in the Scala streams API is currently unusable in 2.0.0 as reported by @mowczare:
#5019 (comment)

This due to an overload of it with the same signature in the first curried parameter.
See compiler issue that didn't catch it: https://issues.scala-lang.org/browse/SI-2628

Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>, John Roesler <john@confluent.io>
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
Join in the Scala streams API is currently unusable in 2.0.0 as reported by @mowczare:
apache#5019 (comment)

This due to an overload of it with the same signature in the first curried parameter.
See compiler issue that didn't catch it: https://issues.scala-lang.org/browse/SI-2628

Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>, John Roesler <john@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants