Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-7027: Add an overload build method in scala #6373

Merged
merged 4 commits into from
Mar 15, 2019
Merged

KAFKA-7027: Add an overload build method in scala #6373

merged 4 commits into from
Mar 15, 2019

Conversation

massimosiani
Copy link
Contributor

The Java API can pass a Properties object to StreamsBuilder#build, to allow, e.g., topology optimization, while the Scala API does not yet. The latter only delegates the work to the underlying Java implementation.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@massimosiani
Copy link
Contributor Author

I didn' find a related issue, please let me know if I missed it (or missed a related PR), or if I should do anything else before submitting this.
Thank you!

@mjsax mjsax added the streams label Mar 7, 2019
@bbejeck bbejeck changed the title Add an overload build method in scala KAFKA-7027: Add an overload build method in scala Mar 7, 2019
@bbejeck
Copy link
Contributor

bbejeck commented Mar 7, 2019

@massimosiani thanks for the contribution! This actually is related to KIP-312 (Jira https://issues.apache.org/jira/browse/KAFKA-7027) so I've updated the title.

Also, can you rebase this to trunk? Then we can cherry-pick to 2.2

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @massimosiani!
This PR looks good to me.

We should add at least one test using the new builder overload.

@bbejeck
Copy link
Contributor

bbejeck commented Mar 7, 2019

\cc any of (@guozhangwang, @mjsax, @vvcephei, and @ableegoldman) for a second review

@massimosiani massimosiani changed the base branch from 2.2 to trunk March 8, 2019 08:41
@massimosiani
Copy link
Contributor Author

Hi @bbejeck, thanks for the review!
I have changed the base branch for the PR to trunk, and added a test case identical to the Simple one in TopologyTest.scala with the new API.
In the test, the TOPOLOGY_OPTIMIZATION property is passed, because that's the main goal of this PR.

Let me know if there is anything else I can do.
Thank you!

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @massimosiani LGTM

@bbejeck
Copy link
Contributor

bbejeck commented Mar 8, 2019

Java 8 failed with

org.apache.kafka.common.network.SslSelectorTest.testCloseOldestConnectionWithMultipleStagedReceives
kafka.api.UserQuotaTest.testThrottledProducerConsumer
kafka.network.SocketServerTest.testConnectionRateLimit

Java 11 passed
retest this please

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @massimosiani , Thanks for the contribution!

The implementation looks "obviously correct", but I had some comments on the test. The biggest concern is that the equality comparison of TopologyDescription is currently broken.

As I said, the implementation looks right, but the test would have value as a regression test, to make sure we don't break the logic in the future.

Thanks again,
-John

val streamBuilder = new StreamsBuilderJ
val textLines = streamBuilder.stream[String, String](inputTopic)

val _: KStreamJ[String, String] = textLines.flatMapValues(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, what's the advantage of assigning to _ instead of just ignoring the result of the builder calls?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @vvcephei, there are other tests in that class, and I wanted to preserve the general structure. Probably, they should be clean up. Removed from my code.

}

// should match
assertEquals(getTopologyScala, getTopologyJava)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comparison actually is invalid. No reason you should know that, though...

A contributor recently pointed it out (see #6210).

As a workaround, can we try comparing the result of TopologyDescription#toString instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new test, this command works:
gradle streams:streams-scala:test --tests TopologyTest
(gradle 5.2.1 on MacOS)

Copy link
Contributor

@bbejeck bbejeck Mar 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@massimosiani @vvcephei is correct about the test

This comparison actually is invalid. No reason you should know that, though...
A contributor recently pointed it out (see #6210).
As a workaround, can we try comparing the result of TopologyDescription#toString instead?

When comparing the getTopologyScala.toString and getTopologyJava.toString the test fails. But it's not your fault though, I'm sure you have just exposed a bug! Thanks and I've created a Jira for it (https://issues.apache.org/jira/browse/KAFKA-8101) and when that is fixed we can get this merged.

EDIT: This looks like a bug with the Streams Scala API in general as when using StreamsConfig.NO_OPTIMIZATION or not passing in Properties to builder.build() the test still fails.

val _: KStream[String, String] =
textLines.flatMapValues(v => pattern.split(v.toLowerCase))

streamBuilder.build(props).describe()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like, if the intent is to make sure the optimization property actually gets applied, we'd need to use a topology that would change under optimization. Perhaps you can copy the topology (and verification logic) from org.apache.kafka.streams.integration.RepartitionOptimizingIntegrationTest?

Alternatively, it might be sufficient to make it a unit test by injecting a mocked Java builder and just verify that the Scala layer passes the properties down correctly... Not sure how straightforward that would be, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been a pain to write the "Java" version (that's probably because of me being new to kafka), but I've done it.

@bbejeck
Copy link
Contributor

bbejeck commented Mar 13, 2019

@massimosiani After looking over the Scala API the issue for the different topology between Java and Scala is here https://github.com/apache/kafka/blob/trunk/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala#L51 .

KGroupedStream#count in Scala needs to add an additional mapValues proceessor to convert from a Java Long to a Scala Long, so a count operation won't have the same topology.

Thanks for your hard work so far and here's what I'd suggest going forward.

Change the example to have 3 streams and you call either selectKey or map on the first stream instance. Then perform two joins,

val sourceStream = builder.stream(inputTopic, Consumed.`with`(Serdes.String, Serdes.String))
val mappedStream: KStreamJ[String, String] =sourceStream.map((k: String, v: String) => ...))

val stream2 = ...
val stream3 = ...

val join1 = mappedStream.join(stream2....) 
val join2 - mappedStream.join(stream3,...)

This way with optimization turned on we should end up with only one repartition topic and since there is no aggregation operation the toplogies should end up the same.

We'll stil want the assertion step to be assertEquals(getTopologyScala.toString, getTopologyJava.toString)

@massimosiani
Copy link
Contributor Author

Hi @bbejeck, thank you.
I am not sure whether it makes sense, but I also added an assertion to check that the topology without optimization is different from the optimized one, so I am actually testing an optimized topology.
I had to add the explicit NO_OPTIMIZE property so we are not checking the default behavior of build().
Let me know if that makes sense at all.
Thank you!

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@massimosiani thanks for the updates.

I am not sure whether it makes sense, but I also added an assertion to check that the topology without optimization is different from the optimized one, so I am actually testing an optimized topology.

Yes, that makes sense, but I think to compare both optimized and unoptimized Scala/Java topologies has merit, I've left a comment regarding that.

Also, the build failure is related. The code you have here works for Scala version 2.12 but Scala 2.11 doesn't support Java 8 lamdas so the Java 8 Scala 2.11 build is failing.

You'll need to change initializer , aggregator etc. to have Java anonymous classes

val initializer: Initializer[Integer] =  new Initializer[Integer] {
     override def apply(): Integer = 0
 }
 val aggregator: Aggregator[String, String, Integer] = new Aggregator[String, String, Integer] {
     override def apply(key: String, value: String, aggregate: Integer): Integer =  aggregate + value.length
 }

This applies to all the operators on lines 345 to 353 I didn't list them out here
If you run ./gradlew -PscalaVersion=2.11 :streams:streams-scala:compileTestScala you'll see what all the errors are.

Thanks for your hard work and patience on this!

builder
}

assertNotEquals(getTopologyScala.build(props).describe.toString,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea here, but I'm thinking it's a good idea to compare unoptimized Scala topology to the unoptimized Java topology so I'd change this line to

assertEquals(getTopologyScala.build(propsNoOptimization).describe.toString,
                    getTopologyScala.build(propsNoOptimization).describe.toString)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about adding your assertion instead of substituting to line 399? Do you agree?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about adding your assertion instead of substituting to line 399?

Yes, that works as well

Copy link
Contributor

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @massimosiani. Changes LGTM pending Jenkins passing

@bbejeck
Copy link
Contributor

bbejeck commented Mar 14, 2019

Moving to new comment to make sure this gets picked up

@massimosiani one last thing I just noticed, do you mind changing the assertEquals on line 432-433
from:

assertEquals(getTopologyScala.build(propsNoOptimization).describe.toString, getTopologyScala.build(propsNoOptimization).describe.toString)
to:

assertEquals(getTopologyScala.build(propsNoOptimization).describe.toString, getTopologyJava.build(propsNoOptimization).describe.toString)

My apologies as this is my error from my previous comment.

@massimosiani
Copy link
Contributor Author

Moving to new comment to make sure this gets picked up

@massimosiani one last thing I just noticed, do you mind changing the assertEquals on line 432-433
from:

assertEquals(getTopologyScala.build(propsNoOptimization).describe.toString, getTopologyScala.build(propsNoOptimization).describe.toString)
to:

assertEquals(getTopologyScala.build(propsNoOptimization).describe.toString, getTopologyJava.build(propsNoOptimization).describe.toString)

My apologies as this is my error from my previous comment.

My fault, I took it for granted and did not read it carefully.

Copy link
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks, @massimosiani !

@bbejeck bbejeck merged commit 853f24a into apache:trunk Mar 15, 2019
@bbejeck
Copy link
Contributor

bbejeck commented Mar 15, 2019

Thanks for the contribution @massimosiani!

Merged #6373 to trunk

bbejeck pushed a commit that referenced this pull request Mar 15, 2019
The Java API can pass a Properties object to StreamsBuilder#build, to allow, e.g., topology optimization, while the Scala API does not yet. The latter only delegates the work to the underlying Java implementation.

Reviewers: John Roesler <john@confluent.io>,  Bill Bejeck <bbejeck@gmail.com>
bbejeck pushed a commit that referenced this pull request Mar 15, 2019
The Java API can pass a Properties object to StreamsBuilder#build, to allow, e.g., topology optimization, while the Scala API does not yet. The latter only delegates the work to the underlying Java implementation.

Reviewers: John Roesler <john@confluent.io>,  Bill Bejeck <bbejeck@gmail.com>
@bbejeck
Copy link
Contributor

bbejeck commented Mar 15, 2019

cherry-picked to 2.2, and 2.1

jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* apache/trunk:
  MINOR: Retain public constructors of classes from public API (apache#6455)
  KAFKA-8118; Ensure ZK clients are closed in tests, fix verification (apache#6456)
  KAFKA-7813: JmxTool throws NPE when --object-name is omitted
  KAFKA-8114: Wait for SCRAM credential propagation in DelegationTokenEndToEndAuthorizationTest (apache#6452)
  KAFKA-8111; Set min and max versions for Metadata requests (apache#6451)
  KAFKA-7855: Kafka Streams Maven Archetype quickstart fails to compile out of the box (apache#6194)
  MINOR: Update code to not use deprecated methods (apache#6434)
  MINOR: Update Trogdor ConnectionStressWorker status at the end of execution (apache#6445)
  KAFKA-8091; Use commitSync to check connection failure in listener update test (apache#6450)
  KAFKA-7027: Add an overload build method in scala (apache#6373)
  MINOR: Fix typos in LogValidator (apache#6449)
  KAFKA-7502: Cleanup KTable materialization logic in a single place (apache#6174)
  KAFKA-7730; Limit number of active connections per listener in brokers (KIP-402)
  KAFKA-8091; Remove unsafe produce from dynamic listener update test (apache#6443)
  MINOR: Fix JavaDocs warnings (apache#6435)
  MINOR: Better messaging for invalid fetch response (apache#6427)
  MINOR: Use Java 8 lambdas in KStreamImplTest (apache#6430)
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
The Java API can pass a Properties object to StreamsBuilder#build, to allow, e.g., topology optimization, while the Scala API does not yet. The latter only delegates the work to the underlying Java implementation.

Reviewers: John Roesler <john@confluent.io>,  Bill Bejeck <bbejeck@gmail.com>
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kip Requires or implements a KIP streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants