Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6670: Implement a Scala wrapper library for Kafka Streams #4756

Closed
wants to merge 23 commits into from

Conversation

debasishg
Copy link
Contributor

This PR implements a Scala wrapper library for Kafka Streams. The library is implemented as a project under streams, namely :streams:streams-scala. The PR contains the following:

  • the library implementation of the wrapper abstractions
  • the test suite
  • the changes in build.gradle to build the library jar

The library has been tested running the tests as follows:

$ ./gradlew -Dtest.single=StreamToTableJoinScalaIntegrationTestImplicitSerdes streams:streams-scala:test
$ ./gradlew -Dtest.single=StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro streams:streams-scala:test
$ ./gradlew -Dtest.single=WordCountTest streams:streams-scala:test

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mjsax mjsax added the streams label Mar 22, 2018
@mjsax mjsax self-requested a review March 22, 2018 18:22
@mjsax
Copy link
Member

mjsax commented Mar 22, 2018

\cc @bbejeck @vvcephei

@guozhangwang
Copy link
Contributor

retest this please

build.gradle Outdated
testCompile project(':core')
testCompile libs.avro4s
testCompile libs.scalaLogging
testCompile libs.logback
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use logback for anything else. I'd suggest keeping it consistent with the project.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done ..

mavenArtifact: "3.5.2"
mavenArtifact: "3.5.2",
avro4sVersion: "1.8.3",
scalaLoggingVersion: "3.8.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scalaLogging is already defined in this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact"
mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact",
avro4s: "com.sksamuel.avro4s:avro4s-core_$versions.baseScala:$versions.avro4sVersion",
scalaLogging: "com.typesafe.scala-logging:scala-logging_$versions.baseScala:$versions.scalaLoggingVersion",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already defined in this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Could you rebase?

Also a meta question about the avro-dependent test StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala, do people think it's worth adding Avro dependency for demonstrating this test?

build.gradle Outdated

testCompile libs.junit
testCompile libs.scalatest
testCompile libs.scalatestEmbeddedKafka
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to import these two dependencies? Could we use Kafka's own EmbeddedKafkaCluster?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This library scalatestEmbeddedKafka has a nice integration with scalatest. Hence it makes writing tests easier and we don't have to bother starting / managing the embedded kafka instance. The test code becomes very concise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally speaking AK repo tend to avoid dependencies unless it is necessary. I'm wondering if we can improve on Kafka's own EmbeddedKafkaCluster to have the same functionalities as the net.manub:scalatest-embedded-kafka-streams.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can definitely use Kafka's own EmbeddedKafkaCluster to integrate with ScalaTest. In net.manub:scalatest-embedded-kafka-streams, the main value add is integration with ScalaTest and hence you don't have to explicitly start / stop server as part of the test.

Also with Kafka Streams it has very nice constructs like the one we use here .. https://github.com/lightbend/kafka/blob/scala-streams/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala#L77-L99 .. Note you just have to define the transformations and do the publish and consume as part of a closure. No need to start / stop the topology. Hence the test code becomes very concise.

Of course it depends on the opinion of the committee but I think this would be a great addition to the dependency.

Here's a suggestion ..

We use net.manub:scalatest-embedded-kafka-streams for now. After all it's a test dependency. And work on a separate PR to make the integration between EmbeddedKafkaCluster and Scalatest better and in line with the functionalities offered by the library.

WDYAT ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a fan of net.manub:scalatest-embedded-kafka-streams, but I understand the concern about bringing in more deps. I only mentioned it on the dev-kafka list because I didn't think there was much value in improving the embedded Kafka implementation in kafka-stream-scala and making it a public interface because scalatest-embedded-kafka already existed. I wasn't aware of EmbeddedKafkaCluster.

If scalatest-embedded-kafka were brought into the project then there will be drift between the version of Kafka broker in code and whatever this test lib references.

@debasishg I like your suggestion:

work on a separate PR to make the integration between EmbeddedKafkaCluster and Scalatest better and in line with the functionalities offered by the library.

Perhaps we can do this now for this PR, but keep it simple. I could work on it if you're busy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have used net.manub:scalatest-embedded-kafka-streams before, and it is very nice.

But I also worry about adding dependencies to the core project, even test deps. If our tests become a bit uglier, or if we have some test-util class that duplicates some of the functionality you're using, I would consider that to be a worthy tradeoff for dropping the dependency.

I would absolutely support planning to come back in a follow-up PR to build out support for testing scala code and then terraforming these tests to use the new support. Or even delaying this PR until a test-support one is available.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seglo @vvcephei @debasishg thanks for your thoughts. We do have plans to publish testing-util artifacts inside AK in the future. And in fact we have been doing so for kafka-streams module as a first step and going to do that for kafka-core and kafka-clients soon. In kafka-clients testing utils we are going to include some improved version of embeddedkafkacluster for users to easily write their integration tests that involve interacting with a mock a kafka cluster.

So I'd suggest we stay with the uglier implementation with the existing embedded kafka cluster and not bring in the dependency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your thoughts @guozhangwang .. we will remove the dependency on net.manub:scalatest-embedded-kafka-streams and use EmbeddedKafkaCluster instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang - Removed all dependencies on net.manub:scalatest-embedded-kafka and net.manub:scalatest-embedded-kafka-streams. Now using EmbeddedKafkaCluster instead for tests. Also removed the test that used avro - hence dependency on avros eliminated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @debasishg !

build.gradle Outdated
compile libs.scalaLibrary

testCompile project(':core')
testCompile libs.avro4s
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it worth to include this dependency at test runtime? cc @ewencp @ijuma .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only to demonstrate custom Serdes. We picked up Avro since (AFAIR) @guozhangwang suggested this example in one of the earlier PR discussions. This example goes to show that custom serdes can be handled as seamlessly as primitive ones.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. In that case, maybe we should also add 'streams:streams-scala:examples' and put it there?

Copy link
Contributor

@guozhangwang guozhangwang Mar 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit on the fence for introducing Avro as "the one" serde in our demonstration examples rather than keeping Kafka and Avro separate, since there are many protobufs / etc fans in the community.

How about adding avro examples in eco-system repos, e.g. in Lightbend / Confluent / etc's own examples repo They can add their own example? cc @ijuma @gwenshap

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Sounds reasonable to me.

Copy link
Contributor Author

@debasishg debasishg Mar 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax - Cool .. then we can remove the test StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro and the dependencies from build.gradle .. ok ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

implicit val stringSerde: Serde[String] = Serdes.String()
implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray()
implicit val bytesSerde: Serde[org.apache.kafka.common.utils.Bytes] = Serdes.Bytes()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add the default for Short and ByteBuffer as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

/**
* Implicit values for default serdes
*/
object DefaultSerdes {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we could provide default serdes for windowed key as well? See o.a.k.streams.kstream.WindowedSerdes for Java code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 ..

def asValueMapper: ValueMapper[V, VR] = v => f(v)
}

implicit class ValueMapperFromFunctionX[V, VR](val f: V => Iterable[VR]) extends AnyVal {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about rename to FlatValueMapperFromFunction for better understanding? Ditto below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


// applies the predicate to know what messages should go to the left stream (predicate == true)
// or to the right stream (predicate == false)
def split(predicate: (K, V) => Boolean): (KStream[K, V], KStream[K, V]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a syntax sugar as branch? I'd prefer to keep Java and Scala interfaces consistent, so that if we think it is worthwhile we'd better add it in Java APIs as well, otherwise we should remove it from Scala APIs.

WDYT? @mjsax @bbejeck @vvcephei

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that both APIs should be consistent. Does a split add much value compare to branch? Btw, this might be related to https://issues.apache.org/jira/browse/KAFKA-5488

I am also open to add a split() if we think it's useful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the time being, we can remove split from the Scala API and rethink if / when it's implemented as a Java API

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will remove split from KStream for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed!

c.mapValues[Long](Long2long(_))
}

def count(store: String, keySerde: Option[Serde[K]] = None): KTable[Windowed[K], Long] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a deprecated API in java, we should replace it with Materialized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add the following:

def count(store: String,
    materialized: Materialized[K, Long, WindowStore[Bytes, Array[Byte]]]): KTable[Windowed[K], Long] = { //..

But this one also may be useful when the user just needs to pass in the keySerde. She need not construct any Materialized which is abstracted within the implementation of the API.

Suggestions ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd vote for keeping java / scala API consistent, and we are going to remove deprecated APIs in future releases anyway.

In current API we'd only have one additional overload:

def count(materialized: Materialized[K, Long, WindowStore[Bytes, Array[Byte]]]): KTable[Windowed[K], Long] = { //..

I think for users who do not want to specify the store name at all, they can rely on

static Materialized<K, V, S> with(final Serde<K> keySerde, final Serde<V> valueSerde)

to still hide the materialized parameter with implicit conversion.

For users who do want to specify the store name, but want to rely on type conversion, we could call withKeySerde and withValueSerde internally in the implicit conversion so that user only need to give Materialized.as(storeName)

Does that work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 .. will remove this overload for count.

## * limitations under the License.
## */
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=ERROR, R
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: default to INFO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

log4j.appender.A1=org.apache.log4j.ConsoleAppender

log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=logs/kafka-server.log
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file logs/kafka-server.log seems not appropriate as it is not for kafka broker logs right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change the name to kafka-streams-scala.log

* limitations under the License.
*/
!-->
<configuration>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a meta comment: I'd suggest we consider adding logback generally for Kafka, instead of sneaking in for Streams Scala wrapper. We can still use log4j for now. See https://issues.apache.org/jira/browse/KAFKA-2717. cc @ijuma

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's stick to log4j in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove logback.xml ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, please remove that file as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@debasishg
Copy link
Contributor Author

Added commit with changes for code review feedback. Waiting for more feedback and some of the still unresolved questions. Should I rebase now or wait till all questions resolved ?

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a Scala person -- so take my review with a grain of salt...

Did not look at the tests yet.

mavenArtifact: "3.5.2"
mavenArtifact: "3.5.2",
avro4sVersion: "1.8.3",
scalatestEmbeddedKafkaVersion: "1.1.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: those are actually sorted alphabetically -- can we clean this up? Thx.

mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact",
avro4s: "com.sksamuel.avro4s:avro4s-core_$versions.baseScala:$versions.avro4sVersion",
scalatestEmbeddedKafka: "net.manub:scalatest-embedded-kafka_$versions.baseScala:$versions.scalatestEmbeddedKafkaVersion",
scalatestEmbeddedKafkaStreams: "net.manub:scalatest-embedded-kafka-streams_$versions.baseScala:$versions.scalatestEmbeddedKafkaVersion"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above.

<Match>
<Package name="org.apache.kafka.streams.scala"/>
<Source name="ScalaSerde.scala"/>
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE"/>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add those exclusions? I know that we put exclusions when introducing findbugs because it is not possible to introduce it and rewrite all the code -- but for new code, we should consider changing the code. I am not a Scale person though -- can you elaborate on this?

Copy link
Member

@seglo seglo Mar 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a false positive. FindBugs is reporting that Serializer and Deserializer should be defined as a different type name than what it's inheriting. IIRC the consensus earlier is that we want type names the same as the base types they're wrapping (which includes traits and interfaces IMO). I've updated the FindBugs rule exclusion to be specific to the types generating the violation, rather than the entire ScalaSerde file.

import org.apache.kafka.common.utils.Bytes

package object scala {
type ByteArrayKVStore = KeyValueStore[Bytes, Array[Byte]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be ByteArrayKeyValueStore? -- we don't use abbreviations in the Java code base.

*/
object DefaultSerdes {
implicit val stringSerde: Serde[String] = Serdes.String()
implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need an asInstanceOf here? (same below)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's because we're presenting the Serde[java.lang.Long] as a Serde[scala.lang.Long], but casting the Serde won't automatically cast the parameters and returns of its methods. I'm surprised you don't get cast class exceptions trying to use the Java long serde as a Scala long serde. Unless I'm wrong about what this is for...

Copy link
Contributor

@ijuma ijuma Mar 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no such thing as a scala.Long at runtime, Scala changed to use the same classes as Java for boxing around the 2.8 timeframe if I remember correctly. Previously there was a RichLong, RichInt, etc.

In any case, this seems like a variance issue, but I didn't look into it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Scala there's an implicit conversion between scala.Long and java.lang.Long but not between Serde[scala.Long] and Serde[java.lang.Long] - hence the cast. In fact picked this trick from https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala#L106

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh! Well, that explains it. I have seen the conversions of the raw types, and also suffered from type errors that Serde[scala.Long] != Serde[java.lang.Long], so I just assumed that scala.Long was a different class than java.Long. Thanks for the explanation, @ijuma .


override def init(context: ProcessorContext): Unit = transformerS.init(context)

@deprecated ("Please use Punctuator functional interface at https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/processor/Punctuator.html instead", "0.1.3") // scalastyle:ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I am not a fan of adding links in JavaDocs, because links might break; better reference to the corresponding class as JavaDoc cross reference?

Also: I am wondering if we should remove this method from the wrapper in the first place? IMHO, it's not a good idea to add deprecated API in new code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on not including this method in the wrapper. The code that would use this library is not written yet, so it's better if deprecated methods are simply not available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, the deprecation is on punctuate, which is part of the contract of Transformer. How do we remove this ? We can only remove this when punctuate is removed from Transformer ? Or am I missing something ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right, I thought this was one of your scala replacement classes.

What I have been doing for cases like this is throwing an UnsupportedOperationException in the body. It's not as good as not having the method, but it def. ensures it can't be used. And you don't have to maintain the code that's in the body.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that you cannot change the Java Transformer interface and must implement the deprecated method when calling new Transformer -- what I was wondering is about scala.Transformer interface -- should we add one and remove punctuate from it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I think adding a new scala interface just to remove a method that we plan to remove from the java interface is not necessary. Better just to implement it and move on.

Also, it would be a bit trickier to swap in a scala replacement for Transformer than for the top-level DSL classes, since implementations of the java Transformer won't implement the scala Transfomer, so you wouldn't be able to plug them in via the scala DSL wrapper. But there's otherwise no reason this shouldn't work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Was just an idea. I don't really speak Scala (yet) -- this is an exercise to learn something about it...

If we need to have it, I vote to throw an exception to forces users to use the new API.

def transformValues[VR](valueTransformerSupplier: () => ValueTransformer[V, VR],
stateStoreNames: String*): KStream[K, VR] = {

val valueTransformerSupplierJ: ValueTransformerSupplier[V, VR] = () => valueTransformerSupplier()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A ValueTransformer also have init(), punctuate() and close() method. Why is this code much simpler than the wrapper for transform() above?

Copy link
Contributor Author

@debasishg debasishg Mar 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to provide implementation of ValueTransformer here since the passed in () => ValueTransformer[V, VR] gets converted to ValueTransformerSupplier[V, VR] in the implementation through SAM type conversion. We could not do that for transform as the SAM type conversion will not be handled automatically in that case. Please have a look here for SAM type conversions in Scala.

def process(processorSupplier: () => Processor[K, V],
stateStoreNames: String*): Unit = {

val processorSupplierJ: ProcessorSupplier[K, V] = () => processorSupplier()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above. What about init(), punctuate(), and close() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same logic as ValueTransformer above.


// applies the predicate to know what messages should go to the left stream (predicate == true)
// or to the right stream (predicate == false)
def split(predicate: (K, V) => Boolean): (KStream[K, V], KStream[K, V]) = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that both APIs should be consistent. Does a split add much value compare to branch? Btw, this might be related to https://issues.apache.org/jira/browse/KAFKA-5488

I am also open to add a split() if we think it's useful.

c.mapValues[Long](Long2long _)
}

def count(store: String, keySerde: Option[Serde[K]] = None): KTable[K, Long] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we only allow to specify a keySerde but not replace the store with a different one?

Scala noob question: would it be possible to have a single count / reduce etc instead of overloads and use Option and implicits to infer optional arguments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax -

I think I may be missing something here. We are allowing the user to pass in the store and keySerde. And we create the Materialized out of the 2 with Long as the value serde.

However we were allowing the user to pass in the keySerde optionally and in case the user does not supply one we assumed it will be taken from the config. This is actually a remnant from the earlier thoughts where we thought passing serdes through config may be a good idea. However in the current context, we should not make keySerde optional. Here's the suggestion for the changed API ..

def count(store: String, keySerde: Serde[K]): KTable[K, Long] = { 
  val materialized = Materialized.as[K, java.lang.Long, ByteArrayKeyValueStore](store).withKeySerde(keySerde)
  val c: KTable[K, java.lang.Long] = inner.count(materialized)
  c.mapValues[Long](Long2long _)
}

An alternative option could have been to allow the user to pass in the Materialized instance itself (like we do in the reduce function). The problem with that alternative is that the Java API expects java.lang.Long as the value serde, while the Scala API needs to take a scala.Long. And there is no way we can convert a Materialized.as[K, scala.Long, ByteArrayKeyValueStore] to Materialized.as[K, java.lang.Long, ByteArrayKeyValueStore] without a cast. Hence the divergence in the API signature between count and reduce.

Please share if u have any other thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense.

My 2 cents:
We're presenting the scala KGroupedStream basically as the java one, but not implementing the interface so that we can smooth over a couple of specific gaps. I think this is a good call, but it's also a huge risk for cognitive dissonance and developer confusion, since they will read the java version of the docs and try to use that knowledge in scala. Therefore, it's important to be super disciplined about making sure the methods available are as close to the java interface as possible.

Clearly, moving serdes, etc., to implicit params is the kind of thing we do want to do. But I think that presenting count(String,Serde[K]) instead of count(Materialized[K, Long, KeyValueStore[Bytes, Array[Byte]]] is too far off.

I do agree that the method should take a scala type. Apparently, it's perfectly fine to cast Serde[scala.Long] to Serde[java.Long]. Does that same logic apply here? Alternatively, we can actually convert the Materialized[java] to a Materialized[scala].

Copy link
Contributor Author