Skip to content

Streaming

Jolan Rensen edited this page May 10, 2022 · 2 revisions

A popular Spark extension is Spark Streaming. Of course the Kotlin Spark API also introduces a more Kotlin-esque approach to write your streaming programs. There are examples for use with a checkpoint, Kafka and SQL in the examples module.

Example

We shall also provide a quick example below:

// Automatically provides ssc: JavaStreamingContext which starts and awaits termination or timeout
withSparkStreaming(batchDuration = Durations.seconds(1), timeout = 10_000) { // this: KSparkStreamingSession

    setRunAfterStart {
        println("Stream is started")
    }

    // create input stream for, for instance, Netcat: `$ nc -lk 9999`
    val lines: JavaReceiverInputDStream<String> = ssc.socketTextStream("localhost", 9999)
  
    // split input stream on space
    val words: JavaDStream<String> = lines.flatMap { it.split(" ").iterator() }

    // perform action on each formed RDD in the stream
    words.foreachRDD { rdd: JavaRDD<String>, _: Time ->
      
          // to convert the JavaRDD to a Dataset, we need a spark session using the RDD context
          withSpark(rdd) { // this: KSparkSession
            val dataframe: Dataset<TestRow> = rdd.map { TestRow(word = it) }.toDS()
            dataframe
                .groupByKey { it.word }
                .count()
                .show()
            // +-----+--------+
            // |  key|count(1)|
            // +-----+--------+
            // |hello|       1|
            // |   is|       1|
            // |    a|       1|
            // | this|       1|
            // | test|       3|
            // +-----+--------+
        }
    }
}

Spark session

Note that withSparkStreaming {} does not provide a spark session in the context. This is because it needs to be created from the right SparkConf depending on what you're doing with the data stream.

This is why we provide withSpark(sc: SparkConf) {} inside the KSparkStreamingSession as well as two helper functions for when you already have an instance of ssc: JavaStreamingContext or an RDD.

For instance, if you want to create a dataset inside the KSparkStreamingSession context or you want to broadcast a variable, you can create an instance of a spark session from the ssc: JavaStreamingContext like

withSparkStreaming(...) { // this: KSparkStreamingSession

    // for instance
    val broadcastSomething: Broadcast<*> = withSpark(sscForConf = ssc) { // this: KsparkSession
        spark.broadcast(something)
    }
}

When using something like foreachRDD {}, the spark session must be created from the SparkConf of the RDD itself. For example:

withSparkStreaming(...) { // this: KSparkStreamingSession
    
    val stream: JavaDStream<*> = ...

    stream.foreachRDD { rdd: JavaRDD<*>, time: Time ->
        
        // for instance
        withSpark(rddForConf = rdd) { // this: KSparkSession
            rdd.toDS().show()
        }
    }
}

Checkpoints

A feature of Spark Streaming is checkpointing. This can also be done easily from the Kotlin API like:

withSparkStreaming(batchDuration = ..., checkPointPath = "/path/to/checkpoint") {
    // contents will only only be run the first time
    // the second time, the stream transformations will be read from the checkpoint
}

NOTE: If setRunAfterStart {} is used, this is also only executed if the checkpoint is empty.

Key / Value streams

Using the Java API, it's necessary to use mapToPair {} to get a JavaPairDStream and specific key/value functions like reduceByKey {} on a JavaDStream. In Kotlin, however, we have extension functions, which makes it possible to have these types of functions accessible directly on JavaStream<Tuple2<*, *>>, so we can simply do:

val wordCounts: JavaDStream<Tuple2<String, Int>> = words
    .map { it X 1 }
    .reduceByKey { a: Int, b: Int -> a + b }