Skip to content

Commit

Permalink
Merge pull request #133 from JetBrains/exploring-streaming
Browse files Browse the repository at this point in the history
Exploring streaming
  • Loading branch information
Jolanrensen committed Apr 21, 2022
2 parents aa6d3e5 + 4ece47e commit 8ca1be7
Show file tree
Hide file tree
Showing 23 changed files with 2,140 additions and 39 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,12 @@ jobs:
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2
- name: Build with Maven
run: ./mvnw -B package --file pom.xml -Pscala-2.12
run: ./mvnw -B package --file pom.xml -Pscala-2.12 -Dkotest.tags="!Kafka"
qodana:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: 'Qodana Scan'
uses: JetBrains/qodana-action@v5.0.2

# vim: ts=2:sts=2:sw=2:expandtab
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ We have opened a Spark Project Improvement Proposal: [Kotlin support for Apache
- [Column infix/operator functions](#column-infixoperator-functions)
- [Overload Resolution Ambiguity](#overload-resolution-ambiguity)
- [Tuples](#tuples)
- [Streaming](#streaming)
- [Examples](#examples)
- [Reporting issues/Support](#reporting-issuessupport)
- [Code of Conduct](#code-of-conduct)
Expand Down Expand Up @@ -267,6 +268,48 @@ Finally, all these tuple helper functions are also baked in:
- `map`
- `cast`

### Streaming

A popular Spark extension is [Spark Streaming](https://spark.apache.org/docs/latest/streaming-programming-guide.html).
Of course the Kotlin Spark API also introduces a more Kotlin-esque approach to write your streaming programs.
There are examples for use with a checkpoint, Kafka and SQL in the [examples module](examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming).

We shall also provide a quick example below:
```kotlin
// Automatically provides ssc: JavaStreamingContext which starts and awaits termination or timeout
withSparkStreaming(batchDuration = Durations.seconds(1), timeout = 10_000) { // this: KSparkStreamingSession

// create input stream for, for instance, Netcat: `$ nc -lk 9999`
val lines: JavaReceiverInputDStream<String> = ssc.socketTextStream("localhost", 9999)

// split input stream on space
val words: JavaDStream<String> = lines.flatMap { it.split(" ").iterator() }

// perform action on each formed RDD in the stream
words.foreachRDD { rdd: JavaRDD<String>, _: Time ->

// to convert the JavaRDD to a Dataset, we need a spark session using the RDD context
withSpark(rdd) { // this: KSparkSession
val dataframe: Dataset<TestRow> = rdd.map { TestRow(word = it) }.toDS()
dataframe
.groupByKey { it.word }
.count()
.show()
// +-----+--------+
// | key|count(1)|
// +-----+--------+
// |hello| 1|
// | is| 1|
// | a| 1|
// | this| 1|
// | test| 3|
// +-----+--------+
}
}
}
```


## Examples

For more, check out [examples](https://github.com/JetBrains/kotlin-spark-api/tree/master/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples) module.
Expand Down
13 changes: 13 additions & 0 deletions examples/pom-3.2_2.12.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
<artifactId>spark-streaming_${scala.compat.version}</artifactId>
<version>${spark3.version}</version>
</dependency>
<dependency><!-- Only needed for Qodana -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.compat.version}</artifactId>
<version>${spark3.version}</version>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -90,6 +95,14 @@
<skipNexusStagingDeployMojo>true</skipNexusStagingDeployMojo>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package org.jetbrains.kotlinx.spark.examples

import org.jetbrains.kotlinx.spark.api.broadcast
import org.jetbrains.kotlinx.spark.api.map
import org.jetbrains.kotlinx.spark.api.sparkContext
import org.jetbrains.kotlinx.spark.api.withSpark
import java.io.Serializable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
*/
package org.jetbrains.kotlinx.spark.examples

import org.apache.spark.api.java.function.ReduceFunction
import org.apache.spark.sql.Dataset
import org.jetbrains.kotlinx.spark.api.*
import org.jetbrains.kotlinx.spark.api.tuples.*
import scala.*
import scala.Tuple2
import scala.Tuple3

data class Q<T>(val id: Int, val text: T)

@Suppress("RedundantLambdaArrow", "UsePropertyAccessSyntax")
object Main {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*-
* =LICENSE=
* Kotlin Spark API: Examples for Spark 3.2+ (Scala 2.12)
* ----------
* Copyright (C) 2019 - 2022 JetBrains
* ----------
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =LICENSEEND=
*/
package org.jetbrains.kotlinx.spark.examples.streaming

import org.apache.kafka.clients.consumer.ConsumerConfig.*
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.Durations
import org.apache.spark.streaming.api.java.JavaDStream
import org.apache.spark.streaming.api.java.JavaInputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.jetbrains.kotlinx.spark.api.reduceByKey
import org.jetbrains.kotlinx.spark.api.tuples.*
import org.jetbrains.kotlinx.spark.api.withSparkStreaming
import scala.Tuple2
import java.io.Serializable
import java.util.regex.Pattern
import kotlin.system.exitProcess


/**
* Src: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
*
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: JavaDirectKafkaWordCount <brokers> <groupId> <topics>
* <brokers> is a list of one or more Kafka brokers
* <groupId> is a consumer group name to consume from topics
* <topics> is a list of one or more kafka topics to consume from
*
* Example:
*
* First make sure you have a Kafka producer running. For instance, when running locally:
* $ kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
*
* Then start the program normally or like this:
* $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port \
* consumer-group topic1,topic2
*/
object KotlinDirectKafkaWordCount {

private val SPACE = Pattern.compile(" ")

private const val DEFAULT_BROKER = "localhost:9092"
private const val DEFAULT_GROUP_ID = "consumer-group"
private const val DEFAULT_TOPIC = "quickstart-events"

@JvmStatic
fun main(args: Array<String>) {
if (args.size < 3 && args.isNotEmpty()) {
System.err.println(
"""Usage: JavaDirectKafkaWordCount <brokers> <groupId> <topics>
<brokers> is a list of one or more Kafka brokers
<groupId> is a consumer group name to consume from topics
<topics> is a list of one or more kafka topics to consume from
""".trimIndent()
)
exitProcess(1)
}

val brokers: String = args.getOrElse(0) { DEFAULT_BROKER }
val groupId: String = args.getOrElse(1) { DEFAULT_GROUP_ID }
val topics: String = args.getOrElse(2) { DEFAULT_TOPIC }

// Create context with a 2 seconds batch interval
withSparkStreaming(batchDuration = Durations.seconds(2), appName = "KotlinDirectKafkaWordCount") {

val topicsSet: Set<String> = topics.split(',').toSet()

val kafkaParams: Map<String, Serializable> = mapOf(
BOOTSTRAP_SERVERS_CONFIG to brokers,
GROUP_ID_CONFIG to groupId,
KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
)

// Create direct kafka stream with brokers and topics
val messages: JavaInputDStream<ConsumerRecord<String, String>> = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams),
)

// Get the lines, split them into words, count the words and print
val lines: JavaDStream<String> = messages.map { it.value() }
val words: JavaDStream<String> = lines.flatMap { it.split(SPACE).iterator() }

val wordCounts: JavaDStream<Tuple2<String, Int>> = words
.map { it X 1 }
.reduceByKey { a: Int, b: Int -> a + b }

wordCounts.print()

}
}
}

0 comments on commit 8ca1be7

Please sign in to comment.