Skip to content
Permalink
Browse files

KAFKA-6670: Implement a Scala wrapper library for Kafka Streams

This PR implements a Scala wrapper library for Kafka Streams. The library is implemented as a project under streams, namely `:streams:streams-scala`. The PR contains the following:

* the library implementation of the wrapper abstractions
* the test suite
* the changes in `build.gradle` to build the library jar

The library has been tested running the tests as follows:

```
$ ./gradlew -Dtest.single=StreamToTableJoinScalaIntegrationTestImplicitSerdes streams:streams-scala:test
$ ./gradlew -Dtest.single=StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro streams:streams-scala:test
$ ./gradlew -Dtest.single=WordCountTest streams:streams-scala:test
```

Author: Debasish Ghosh <ghosh.debasish@gmail.com>
Author: Sean Glover <seglo@randonom.com>

Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Ismael Juma <ismael@juma.me.uk>, John Roesler <john@confluent.io>, Damian Guy <damian@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #4756 from debasishg/scala-streams
  • Loading branch information...
debasishg authored and nimosunbit committed Apr 23, 2018
1 parent b0b60ae commit 345abf7ff440a178c8ebd008c64bb933c8d711ad
Showing with 3,055 additions and 118 deletions.
  1. +1 −1 .gitignore
  2. +45 −2 build.gradle
  3. +13 −0 docs/api.html
  4. +25 −18 docs/streams/developer-guide/datatypes.html
  5. +180 −2 docs/streams/developer-guide/dsl-api.html
  6. +26 −16 docs/streams/developer-guide/write-streams.html
  7. +63 −71 docs/streams/index.html
  8. +10 −3 docs/streams/upgrade-guide.html
  9. +5 −4 gradle/dependencies.gradle
  10. +15 −0 gradle/findbugs-exclude.xml
  11. +1 −1 settings.gradle
  12. +1 −0 streams/streams-scala/.gitignore
  13. +3 −0 streams/streams-scala/NOTICE
  14. +27 −0 streams/streams-scala/src/main/scala/org/apache/kafka/streams/package.scala
  15. +47 −0 streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/DefaultSerdes.scala
  16. +108 −0 streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
  17. +76 −0 streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
  18. +70 −0 streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ScalaSerde.scala
  19. +179 −0 streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
  20. +145 −0 streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
  21. +138 −0 streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
  22. +581 −0 streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
  23. +292 −0 streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
  24. +125 −0 .../streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
  25. +125 −0 ...ams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
  26. +34 −0 streams/streams-scala/src/test/resources/log4j.properties
  27. +237 −0 ...st/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
  28. +61 −0 streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinTestData.scala
  29. +199 −0 streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
  30. +223 −0 streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/WordCountTest.scala
@@ -50,4 +50,4 @@ docs/generated/

kafkatest.egg-info/
systest/

*.swp
@@ -124,7 +124,8 @@ if (new File('.git').exists()) {
'**/id_rsa',
'**/id_rsa.pub',
'checkstyle/suppressions.xml',
'streams/quickstart/java/src/test/resources/projects/basic/goal.txt'
'streams/quickstart/java/src/test/resources/projects/basic/goal.txt',
'streams/streams-scala/logs/*'
])
}
}
@@ -518,7 +519,7 @@ for ( sv in availableScalaVersions ) {
}

def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file']
def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:test-utils', 'streams:examples'] + connectPkgs
def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:streams-scala', 'streams:test-utils', 'streams:examples'] + connectPkgs

/** Create one task per default Scala version */
def withDefScalaVersions(taskName) {
@@ -740,6 +741,8 @@ project(':core') {
from(project(':connect:file').configurations.runtime) { into("libs/") }
from(project(':streams').jar) { into("libs/") }
from(project(':streams').configurations.runtime) { into("libs/") }
from(project(':streams:streams-scala').jar) { into("libs/") }
from(project(':streams:streams-scala').configurations.runtime) { into("libs/") }
from(project(':streams:test-utils').jar) { into("libs/") }
from(project(':streams:test-utils').configurations.runtime) { into("libs/") }
from(project(':streams:examples').jar) { into("libs/") }
@@ -974,6 +977,46 @@ project(':streams') {
}
}

project(':streams:streams-scala') {
println "Building project 'streams-scala' with Scala version ${versions.scala}"
apply plugin: 'scala'
archivesBaseName = "kafka-streams-scala"

dependencies {
compile project(':streams')

compile libs.scalaLibrary

testCompile project(':core')
testCompile project(':core').sourceSets.test.output
testCompile project(':streams').sourceSets.test.output
testCompile project(':clients').sourceSets.test.output
testCompile libs.scalaLogging

testCompile libs.junit
testCompile libs.scalatest

testRuntime libs.slf4jlog4j
}

javadoc {
include "**/org/apache/kafka/streams/scala/**"
}

tasks.create(name: "copyDependantLibs", type: Copy) {
from (configurations.runtime) {
exclude('kafka-streams*')
}
into "$buildDir/dependant-libs-${versions.scala}"
duplicatesStrategy 'exclude'
}

jar {
dependsOn 'copyDependantLibs'
}

}

project(':streams:test-utils') {
archivesBaseName = "kafka-streams-test-utils"

@@ -78,6 +78,19 @@ <h3><a id="streamsapi" href="#streamsapi">2.3 Streams API</a></h3>
&lt;/dependency&gt;
</pre>

<p>
When using Scala you may optionally include the <code>kafka-streams-scala</code> library. Additional documentation on using the Kafka Streams DSL for Scala is available <a href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#scala-dsl">in the developer guide</a>.
<p>
To use Kafka Streams DSL for Scala for Scala 2.11 you can use the following maven dependency:

<pre class="brush: xml;">
&lt;dependency&gt;
&lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
&lt;artifactId&gt;kafka-streams-scala_2.11&lt;/artifactId&gt;
&lt;version&gt;{{fullDotVersion}}&lt;/version&gt;
&lt;/dependency&gt;
</pre>

<h3><a id="connectapi" href="#connectapi">2.4 Connect API</a></h3>

The Connect API allows implementing connectors that continually pull from some source data system into Kafka or push from Kafka into some sink data system.
@@ -44,12 +44,15 @@
<ul class="simple">
<li><a class="reference internal" href="#configuring-serdes" id="id1">Configuring SerDes</a></li>
<li><a class="reference internal" href="#overriding-default-serdes" id="id2">Overriding default SerDes</a></li>
<li><a class="reference internal" href="#available-serdes" id="id3">Available SerDes</a><ul>
<li><a class="reference internal" href="#available-serdes" id="id3">Available SerDes</a></li>
<ul>
<li><a class="reference internal" href="#primitive-and-basic-types" id="id4">Primitive and basic types</a></li>
<li><a class="reference internal" href="#avro" id="id5">Avro</a></li>
<li><a class="reference internal" href="#json" id="id6">JSON</a></li>
<li><a class="reference internal" href="#further-serdes" id="id7">Further serdes</a></li>
</ul>
<li><a class="reference internal" href="#scala-dsl-serdes" id="id8">Kafka Streams DSL for Scala Implicit SerDes</a></li>
</ul>
<div class="section" id="configuring-serdes">
<h2>Configuring SerDes<a class="headerlink" href="#configuring-serdes" title="Permalink to this headline"></a></h2>
<p>SerDes specified in the Streams configuration via <code class="docutils literal"><span class="pre">StreamsConfig</span></code> are used as the default in your Kafka Streams application.</p>
@@ -155,23 +158,27 @@ <h3>JSON<a class="headerlink" href="#json" title="Permalink to this headline"></
<a class="reference external" href="https://github.com/apache/kafka/blob/1.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java">PageViewTypedDemo</a>
example demonstrates how to use this JSON serde.</p>
</div>
<div class="section" id="implementing-custom-serdes">
<span id="streams-developer-guide-serdes-custom"></span><h2>Implementing custom SerDes<a class="headerlink" href="#implementing-custom-serdes" title="Permalink to this headline"></a></h2>
<p>If you need to implement custom SerDes, your best starting point is to take a look at the source code references of
existing SerDes (see previous section). Typically, your workflow will be similar to:</p>
<ol class="arabic simple">
<li>Write a <em>serializer</em> for your data type <code class="docutils literal"><span class="pre">T</span></code> by implementing
<a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java">org.apache.kafka.common.serialization.Serializer</a>.</li>
<li>Write a <em>deserializer</em> for <code class="docutils literal"><span class="pre">T</span></code> by implementing
<a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java">org.apache.kafka.common.serialization.Deserializer</a>.</li>
<li>Write a <em>serde</em> for <code class="docutils literal"><span class="pre">T</span></code> by implementing
<a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java">org.apache.kafka.common.serialization.Serde</a>,
which you either do manually (see existing SerDes in the previous section) or by leveraging helper functions in
<a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java">Serdes</a>
such as <code class="docutils literal"><span class="pre">Serdes.serdeFrom(Serializer&lt;T&gt;,</span> <span class="pre">Deserializer&lt;T&gt;)</span></code>.</li>
</ol>
</div>
</div>
<div class="section" id="implementing-custom-serdes">
<span id="streams-developer-guide-serdes-custom"></span><h2>Implementing custom SerDes<a class="headerlink" href="#implementing-custom-serdes" title="Permalink to this headline"></a></h2>
<p>If you need to implement custom SerDes, your best starting point is to take a look at the source code references of
existing SerDes (see previous section). Typically, your workflow will be similar to:</p>
<ol class="arabic simple">
<li>Write a <em>serializer</em> for your data type <code class="docutils literal"><span class="pre">T</span></code> by implementing
<a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java">org.apache.kafka.common.serialization.Serializer</a>.</li>
<li>Write a <em>deserializer</em> for <code class="docutils literal"><span class="pre">T</span></code> by implementing
<a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java">org.apache.kafka.common.serialization.Deserializer</a>.</li>
<li>Write a <em>serde</em> for <code class="docutils literal"><span class="pre">T</span></code> by implementing
<a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java">org.apache.kafka.common.serialization.Serde</a>,
which you either do manually (see existing SerDes in the previous section) or by leveraging helper functions in
<a class="reference external" href="https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java">Serdes</a>
such as <code class="docutils literal"><span class="pre">Serdes.serdeFrom(Serializer&lt;T&gt;,</span> <span class="pre">Deserializer&lt;T&gt;)</span></code>.</li>
</ol>
</div>
</div>
<div class="section" id="scala-dsl-serdes">
<h2>Kafka Streams DSL for Scala Implicit SerDes<a class="headerlink" href="scala-dsl-serdes" title="Permalink to this headline"></a></h2>
<p>When using the <a href="dsl-api.html#scala-dsl">Kafka Streams DSL for Scala</a> you're not required to configure a default SerDes. In fact, it's not supported. SerDes are instead provided implicitly by default implementations for common primitive datatypes. See the <a href="dsl-api.html#scala-dsl-implicit-serdes">Implicit SerDes</a> and <a href="dsl-api.html#scala-dsl-user-defined-serdes">User-Defined SerDes</a> sections in the DSL API documentation for details</p>
</div>


</div>

0 comments on commit 345abf7

Please sign in to comment.
You can’t perform that action at this time.