Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exploring streaming #133

Merged
merged 50 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
b24172f
added withSparkStreaming function and example. Let's see if something…
Jolanrensen Feb 21, 2022
a378070
makes withSparkStreaming reuse the normal withSpark
Jolanrensen Feb 21, 2022
47e1ccb
removed sc.stop()
Jolanrensen Feb 24, 2022
6c1b0d9
added withSparkStreaming function and example. Let's see if something…
Jolanrensen Feb 21, 2022
95a9563
makes withSparkStreaming reuse the normal withSpark
Jolanrensen Feb 21, 2022
9d1450b
removed sc.stop()
Jolanrensen Feb 24, 2022
60569f9
Merge remote-tracking branch 'origin/exploring-streaming' into explor…
Jolanrensen Mar 7, 2022
f2ce000
fixed merge
Jolanrensen Mar 7, 2022
cdf7296
working on tests
Jolanrensen Mar 7, 2022
330536c
added timeout and working streaming test
Jolanrensen Mar 8, 2022
d2e792a
added timeout and working streaming test
Jolanrensen Mar 8, 2022
4222a03
still exploring, just pushing to keep my work safe :)
Jolanrensen Mar 17, 2022
5769e92
still exploring, just pushing to keep my work safe :)
Jolanrensen Mar 17, 2022
14278ff
pairs are recognized too
Jolanrensen Mar 17, 2022
eec03cb
and tuples cause why not
Jolanrensen Mar 17, 2022
bc9fd3e
added Option(al) converters, testing fakeClassTag(). Working with str…
Jolanrensen Mar 21, 2022
ea3e7d8
removed commented out code, fixed checkpointPath (needs cleaning up c…
Jolanrensen Mar 31, 2022
d672ffb
Merge branch 'tuple-first' into streaming+tuples
Jolanrensen Apr 1, 2022
f523d9d
small streaming updates
Jolanrensen Apr 1, 2022
62daa82
Merge branch 'tuple-first' into streaming+tuples
Jolanrensen Apr 1, 2022
2708a5c
created temporary branch merging tuples with streaming. Will merge in…
Jolanrensen Apr 1, 2022
1e8ab36
Merge branch 'spark-3.2' into streaming+tuples
Jolanrensen Apr 11, 2022
52e5b0d
Merge branch 'spark-3.2' into exploring-streaming
Jolanrensen Apr 11, 2022
c70c0f3
working on testing, not yet finished
Jolanrensen Apr 12, 2022
02d50cc
updated to kotlin 1.6.20, refactored withSparkStreaming and fixed tests
Jolanrensen Apr 13, 2022
219b949
added conversions for Option(al), State.
Jolanrensen Apr 14, 2022
59f658e
added toDataFrame conversions for RDDs
Jolanrensen Apr 14, 2022
547ab14
added toDataFrame conversions for RDDs
Jolanrensen Apr 14, 2022
6a5fce4
more tests
Jolanrensen Apr 14, 2022
9449b9f
added kafka test, but issue with kotest extension
Jolanrensen Apr 14, 2022
70949ef
added embedded kafka so kafka tests work
Jolanrensen Apr 19, 2022
eaf13ce
Optional and nullable options for updateStateByKey, changing port for…
Jolanrensen Apr 19, 2022
8ccf5ea
qodana suggestions
Jolanrensen Apr 19, 2022
db566be
Merge branch 'spark-3.2' into exploring-streaming
Jolanrensen Apr 19, 2022
8e3b952
qodana suggestions
Jolanrensen Apr 19, 2022
1af4c04
let's see if adding a container does anything
Jolanrensen Apr 19, 2022
e4d6e2a
changing to ip 0.0.0.0
Jolanrensen Apr 20, 2022
25464d8
changing to ip localhost
Jolanrensen Apr 20, 2022
9901c5b
attempt to add exclusion for kafka streaming test for github
Jolanrensen Apr 20, 2022
1090e2b
attempting to exclude entire file
Jolanrensen Apr 20, 2022
729279e
attempting to exclude entire file
Jolanrensen Apr 20, 2022
9b5c1fb
attempting to exclude entire file
Jolanrensen Apr 20, 2022
290fc9e
attempting to exclude entire file
Jolanrensen Apr 20, 2022
fdee3a2
exclusion kafka works!
Jolanrensen Apr 21, 2022
4dfb747
exclusion kafka works!
Jolanrensen Apr 21, 2022
54b9d10
updating readme and example
Jolanrensen Apr 21, 2022
f694d07
attempt to add qodana scan action to github actions
Jolanrensen Apr 21, 2022
299fb75
removed qodana app, let's try
Jolanrensen Apr 21, 2022
f83727d
removed todo test and updated unused imports
Jolanrensen Apr 21, 2022
4ece47e
last cleanups
Jolanrensen Apr 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,29 @@
package org.jetbrains.kotlinx.spark.examples

import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Durations
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.jetbrains.kotlinx.spark.api.withSpark
import scala.Tuple2
import java.io.Serializable
import org.jetbrains.kotlinx.spark.api.*

data class Row @JvmOverloads constructor(
var word: String = "",
) : Serializable
data class TestRow(
val word: String,
)

fun main() = withSpark {

val context = JavaStreamingContext(
SparkConf()
.setMaster("local[*]")
.setAppName("Test"),
Durations.seconds(1),
)

val lines = context.socketTextStream("localhost", 9999)
fun main() = withSparkStreaming(Durations.seconds(1)) {

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap { it.split(" ").iterator() }

words.foreachRDD { rdd, time ->
val dataframe: Dataset<TestRow> = rdd.map { TestRow(it) }.toDS()

// todo convert rdd to dataset using kotlin data class?

val rowRdd = rdd.map { Row(it) }

val dataframe = spark.createDataFrame(rowRdd, Row::class.java)

dataframe
.groupByKey { it.word }
.count()
.show()

}


context.start()
context.awaitTermination()
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SparkSession.Builder
import org.apache.spark.sql.UDFRegistration
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.jetbrains.kotlinx.spark.api.SparkLogLevel.ERROR
import kotlin.math.log

/**
* Wrapper for spark creation which allows setting different spark params.
Expand Down Expand Up @@ -105,17 +108,60 @@ inline fun withSpark(sparkConf: SparkConf, logLevel: SparkLogLevel = ERROR, func
)
}


/**
* This wrapper over [SparkSession] which provides several additional methods to create [org.apache.spark.sql.Dataset]
* Wrapper for spark streaming creation. `spark: SparkSession` and `ssc: JavaStreamingContext` are provided, started,
* awaited, and stopped automatically.
*
* @param batchDuration The time interval at which streaming data will be divided into batches
* @param props spark options, value types are runtime-checked for type-correctness
* @param master Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]" to
* run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster. By default, it
* tries to get the system value "spark.master", otherwise it uses "local[*]"
* @param appName Sets a name for the application, which will be shown in the Spark web UI.
* If no application name is set, a randomly generated name will be used.
* @param logLevel Control our logLevel. This overrides any user-defined log settings.
* @param func function which will be executed in context of [KSparkStreamingSession] (it means that `this` inside block will point to [KSparkStreamingSession])
* todo: provide alternatives with path instead of batchDuration etc
*/
class KSparkSession(val spark: SparkSession) {

val sc: JavaSparkContext = JavaSparkContext(spark.sparkContext)
@JvmOverloads
inline fun withSparkStreaming(
batchDuration: Duration,
props: Map<String, Any> = emptyMap(),
master: String = SparkConf().get("spark.master", "local[*]"),
appName: String = "Kotlin Spark Sample",
logLevel: SparkLogLevel = SparkLogLevel.ERROR,
func: KSparkStreamingSession.() -> Unit,
) {
withSpark(
props = props,
master = master,
appName = appName,
logLevel = logLevel,
) {
val ssc = JavaStreamingContext(sc, batchDuration)
KSparkStreamingSession(session = this, ssc = ssc).apply {
func()
ssc.start()
ssc.awaitTermination()
}
}
}

/**
* This wrapper over [SparkSession] provides several additional methods to create [org.apache.spark.sql.Dataset]
*/
open class KSparkSession(val spark: SparkSession, val sc: JavaSparkContext = JavaSparkContext(spark.sparkContext)) {
inline fun <reified T> List<T>.toDS() = toDS(spark)
inline fun <reified T> Array<T>.toDS() = spark.dsOf(*this)
inline fun <reified T> dsOf(vararg arg: T) = spark.dsOf(*arg)
inline fun <reified T> RDD<T>.toDS() = toDS(spark)
inline fun <reified T> JavaRDDLike<T, *>.toDS() = toDS(spark)
val udf: UDFRegistration get() = spark.udf()
}

/**
* This wrapper over [SparkSession] and [JavaStreamingContext] provides several additional methods to create [org.apache.spark.sql.Dataset]
*/
class KSparkStreamingSession(session: KSparkSession, val ssc: JavaStreamingContext) : KSparkSession(session.spark, session.sc)