-
Notifications
You must be signed in to change notification settings - Fork 5
/
StructuredStreamingKafkaDSE.scala
60 lines (48 loc) · 2.13 KB
/
StructuredStreamingKafkaDSE.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.datastax.alexott.streaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
import scala.io.Source
object StructuredStreamingKafkaDSE {
def main(args: Array[String]): Unit = {
val sc = new SparkContext()
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
import spark.implicits._
val fileStream = StructuredStreamingKafkaDSE.getClass.getResourceAsStream("/tweets-1.json")
val jsonSampleString = Source.fromInputStream(fileStream).getLines().next()
val jsonSampleDS = spark.createDataset(List(jsonSampleString))
val jsonSample = spark.read.json(jsonSampleDS)
val schema = jsonSample.schema
val streamingInputDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.0.10:9092")
.option("subscribe", "tweets-txt")
.load()
val tweetDF = streamingInputDF.selectExpr("CAST(value AS STRING)")
.select(from_json($"value", schema).as("tweet"))
.select(unix_timestamp($"tweet.created_at", "EEE MMM dd HH:mm:ss Z yyyy").as("created_at").cast(TimestampType),
$"tweet.lang".as("lang"))
val streamingCountsDF = tweetDF
.where(col("lang").isNotNull)
.groupBy($"lang", window($"created_at", "1 minutes"))
.count()
.select($"lang", $"window.start".as("window"), $"count")
// need to have table created with following CQL:
// create table test.sttest_tweets(lang text, window timestamp, count int, primary key(lang, window));
// This works only with Spark 2.2 (if BYOS 6.0.4 is used)
val query = streamingCountsDF.writeStream
.outputMode(OutputMode.Update)
.format("org.apache.spark.sql.cassandra")
.option("checkpointLocation", "webhdfs://192.168.0.10:5598/checkpoint")
.option("keyspace", "test")
.option("table", "sttest_tweets")
.start()
/*val query = streamingCountsDF.writeStream
.outputMode("complete")
.format("console")
.start()*/
query.awaitTermination()
}
}