Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exploring streaming #133

Merged
merged 50 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
b24172f
added withSparkStreaming function and example. Let's see if something…
Jolanrensen Feb 21, 2022
a378070
makes withSparkStreaming reuse the normal withSpark
Jolanrensen Feb 21, 2022
47e1ccb
removed sc.stop()
Jolanrensen Feb 24, 2022
6c1b0d9
added withSparkStreaming function and example. Let's see if something…
Jolanrensen Feb 21, 2022
95a9563
makes withSparkStreaming reuse the normal withSpark
Jolanrensen Feb 21, 2022
9d1450b
removed sc.stop()
Jolanrensen Feb 24, 2022
60569f9
Merge remote-tracking branch 'origin/exploring-streaming' into explor…
Jolanrensen Mar 7, 2022
f2ce000
fixed merge
Jolanrensen Mar 7, 2022
cdf7296
working on tests
Jolanrensen Mar 7, 2022
330536c
added timeout and working streaming test
Jolanrensen Mar 8, 2022
d2e792a
added timeout and working streaming test
Jolanrensen Mar 8, 2022
4222a03
still exploring, just pushing to keep my work safe :)
Jolanrensen Mar 17, 2022
5769e92
still exploring, just pushing to keep my work safe :)
Jolanrensen Mar 17, 2022
14278ff
pairs are recognized too
Jolanrensen Mar 17, 2022
eec03cb
and tuples cause why not
Jolanrensen Mar 17, 2022
bc9fd3e
added Option(al) converters, testing fakeClassTag(). Working with str…
Jolanrensen Mar 21, 2022
ea3e7d8
removed commented out code, fixed checkpointPath (needs cleaning up c…
Jolanrensen Mar 31, 2022
d672ffb
Merge branch 'tuple-first' into streaming+tuples
Jolanrensen Apr 1, 2022
f523d9d
small streaming updates
Jolanrensen Apr 1, 2022
62daa82
Merge branch 'tuple-first' into streaming+tuples
Jolanrensen Apr 1, 2022
2708a5c
created temporary branch merging tuples with streaming. Will merge in…
Jolanrensen Apr 1, 2022
1e8ab36
Merge branch 'spark-3.2' into streaming+tuples
Jolanrensen Apr 11, 2022
52e5b0d
Merge branch 'spark-3.2' into exploring-streaming
Jolanrensen Apr 11, 2022
c70c0f3
working on testing, not yet finished
Jolanrensen Apr 12, 2022
02d50cc
updated to kotlin 1.6.20, refactored withSparkStreaming and fixed tests
Jolanrensen Apr 13, 2022
219b949
added conversions for Option(al), State.
Jolanrensen Apr 14, 2022
59f658e
added toDataFrame conversions for RDDs
Jolanrensen Apr 14, 2022
547ab14
added toDataFrame conversions for RDDs
Jolanrensen Apr 14, 2022
6a5fce4
more tests
Jolanrensen Apr 14, 2022
9449b9f
added kafka test, but issue with kotest extension
Jolanrensen Apr 14, 2022
70949ef
added embedded kafka so kafka tests work
Jolanrensen Apr 19, 2022
eaf13ce
Optional and nullable options for updateStateByKey, changing port for…
Jolanrensen Apr 19, 2022
8ccf5ea
qodana suggestions
Jolanrensen Apr 19, 2022
db566be
Merge branch 'spark-3.2' into exploring-streaming
Jolanrensen Apr 19, 2022
8e3b952
qodana suggestions
Jolanrensen Apr 19, 2022
1af4c04
let's see if adding a container does anything
Jolanrensen Apr 19, 2022
e4d6e2a
changing to ip 0.0.0.0
Jolanrensen Apr 20, 2022
25464d8
changing to ip localhost
Jolanrensen Apr 20, 2022
9901c5b
attempt to add exclusion for kafka streaming test for github
Jolanrensen Apr 20, 2022
1090e2b
attempting to exclude entire file
Jolanrensen Apr 20, 2022
729279e
attempting to exclude entire file
Jolanrensen Apr 20, 2022
9b5c1fb
attempting to exclude entire file
Jolanrensen Apr 20, 2022
290fc9e
attempting to exclude entire file
Jolanrensen Apr 20, 2022
fdee3a2
exclusion kafka works!
Jolanrensen Apr 21, 2022
4dfb747
exclusion kafka works!
Jolanrensen Apr 21, 2022
54b9d10
updating readme and example
Jolanrensen Apr 21, 2022
f694d07
attempt to add qodana scan action to github actions
Jolanrensen Apr 21, 2022
299fb75
removed qodana app, let's try
Jolanrensen Apr 21, 2022
f83727d
removed todo test and updated unused imports
Jolanrensen Apr 21, 2022
4ece47e
last cleanups
Jolanrensen Apr 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,12 @@ jobs:
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2
- name: Build with Maven
run: ./mvnw -B package --file pom.xml -Pscala-2.12
run: ./mvnw -B package --file pom.xml -Pscala-2.12 -Dkotest.tags="!Kafka"
qodana:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: 'Qodana Scan'
uses: JetBrains/qodana-action@v5.0.2

# vim: ts=2:sts=2:sw=2:expandtab
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ We have opened a Spark Project Improvement Proposal: [Kotlin support for Apache
- [Column infix/operator functions](#column-infixoperator-functions)
- [Overload Resolution Ambiguity](#overload-resolution-ambiguity)
- [Tuples](#tuples)
- [Streaming](#streaming)
- [Examples](#examples)
- [Reporting issues/Support](#reporting-issuessupport)
- [Code of Conduct](#code-of-conduct)
Expand Down Expand Up @@ -267,6 +268,48 @@ Finally, all these tuple helper functions are also baked in:
- `map`
- `cast`

### Streaming

A popular Spark extension is [Spark Streaming](https://spark.apache.org/docs/latest/streaming-programming-guide.html).
Of course the Kotlin Spark API also introduces a more Kotlin-esque approach to write your streaming programs.
There are examples for use with a checkpoint, Kafka and SQL in the [examples module](examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming).

We shall also provide a quick example below:
```kotlin
// Automatically provides ssc: JavaStreamingContext which starts and awaits termination or timeout
withSparkStreaming(batchDuration = Durations.seconds(1), timeout = 10_000) { // this: KSparkStreamingSession

// create input stream for, for instance, Netcat: `$ nc -lk 9999`
val lines: JavaReceiverInputDStream<String> = ssc.socketTextStream("localhost", 9999)

// split input stream on space
val words: JavaDStream<String> = lines.flatMap { it.split(" ").iterator() }

// perform action on each formed RDD in the stream
words.foreachRDD { rdd: JavaRDD<String>, _: Time ->

// to convert the JavaRDD to a Dataset, we need a spark session using the RDD context
withSpark(rdd) { // this: KSparkSession
val dataframe: Dataset<TestRow> = rdd.map { TestRow(word = it) }.toDS()
dataframe
.groupByKey { it.word }
.count()
.show()
// +-----+--------+
// | key|count(1)|
// +-----+--------+
// |hello| 1|
// | is| 1|
// | a| 1|
// | this| 1|
// | test| 3|
// +-----+--------+
}
}
}
```


## Examples

For more, check out [examples](https://github.com/JetBrains/kotlin-spark-api/tree/master/examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples) module.
Expand Down
13 changes: 13 additions & 0 deletions examples/pom-3.2_2.12.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
<artifactId>spark-streaming_${scala.compat.version}</artifactId>
<version>${spark3.version}</version>
</dependency>
<dependency><!-- Only needed for Qodana -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.compat.version}</artifactId>
<version>${spark3.version}</version>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -90,6 +95,14 @@
<skipNexusStagingDeployMojo>true</skipNexusStagingDeployMojo>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package org.jetbrains.kotlinx.spark.examples

import org.jetbrains.kotlinx.spark.api.broadcast
import org.jetbrains.kotlinx.spark.api.map
import org.jetbrains.kotlinx.spark.api.sparkContext
import org.jetbrains.kotlinx.spark.api.withSpark
import java.io.Serializable

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
*/
package org.jetbrains.kotlinx.spark.examples

import org.apache.spark.api.java.function.ReduceFunction
import org.apache.spark.sql.Dataset
import org.jetbrains.kotlinx.spark.api.*
import org.jetbrains.kotlinx.spark.api.tuples.*
import scala.*
import scala.Tuple2
import scala.Tuple3

data class Q<T>(val id: Int, val text: T)

@Suppress("RedundantLambdaArrow", "UsePropertyAccessSyntax")
object Main {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*-
* =LICENSE=
* Kotlin Spark API: Examples for Spark 3.2+ (Scala 2.12)
* ----------
* Copyright (C) 2019 - 2022 JetBrains
* ----------
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =LICENSEEND=
*/
package org.jetbrains.kotlinx.spark.examples.streaming

import org.apache.kafka.clients.consumer.ConsumerConfig.*
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.Durations
import org.apache.spark.streaming.api.java.JavaDStream
import org.apache.spark.streaming.api.java.JavaInputDStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.jetbrains.kotlinx.spark.api.reduceByKey
import org.jetbrains.kotlinx.spark.api.tuples.*
import org.jetbrains.kotlinx.spark.api.withSparkStreaming
import scala.Tuple2
import java.io.Serializable
import java.util.regex.Pattern
import kotlin.system.exitProcess


/**
* Src: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
*
* Consumes messages from one or more topics in Kafka and does wordcount.
* Usage: JavaDirectKafkaWordCount <brokers> <groupId> <topics>
* <brokers> is a list of one or more Kafka brokers
* <groupId> is a consumer group name to consume from topics
* <topics> is a list of one or more kafka topics to consume from
*
* Example:
*
* First make sure you have a Kafka producer running. For instance, when running locally:
* $ kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
*
* Then start the program normally or like this:
* $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port \
* consumer-group topic1,topic2
*/
object KotlinDirectKafkaWordCount {

private val SPACE = Pattern.compile(" ")

private const val DEFAULT_BROKER = "localhost:9092"
private const val DEFAULT_GROUP_ID = "consumer-group"
private const val DEFAULT_TOPIC = "quickstart-events"

@JvmStatic
fun main(args: Array<String>) {
if (args.size < 3 && args.isNotEmpty()) {
System.err.println(
"""Usage: JavaDirectKafkaWordCount <brokers> <groupId> <topics>
<brokers> is a list of one or more Kafka brokers
<groupId> is a consumer group name to consume from topics
<topics> is a list of one or more kafka topics to consume from
""".trimIndent()
)
exitProcess(1)
}

val brokers: String = args.getOrElse(0) { DEFAULT_BROKER }
val groupId: String = args.getOrElse(1) { DEFAULT_GROUP_ID }
val topics: String = args.getOrElse(2) { DEFAULT_TOPIC }

// Create context with a 2 seconds batch interval
withSparkStreaming(batchDuration = Durations.seconds(2), appName = "KotlinDirectKafkaWordCount") {

val topicsSet: Set<String> = topics.split(',').toSet()

val kafkaParams: Map<String, Serializable> = mapOf(
BOOTSTRAP_SERVERS_CONFIG to brokers,
GROUP_ID_CONFIG to groupId,
KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
)

// Create direct kafka stream with brokers and topics
val messages: JavaInputDStream<ConsumerRecord<String, String>> = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams),
)

// Get the lines, split them into words, count the words and print
val lines: JavaDStream<String> = messages.map { it.value() }
val words: JavaDStream<String> = lines.flatMap { it.split(SPACE).iterator() }

val wordCounts: JavaDStream<Tuple2<String, Int>> = words
.map { it X 1 }
.reduceByKey { a: Int, b: Int -> a + b }

wordCounts.print()

}
}
}