/
ConsumeKafkaMain.scala
66 lines (54 loc) · 2.23 KB
/
ConsumeKafkaMain.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package uk.co.odinconsultants.sssplayground.windows
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{Dataset, SparkSession}
object ConsumeKafkaMain {
case class Payload(payload: String, period: String)
def main(args: Array[String]): Unit = {
println("hello")
val s = session()
val kafkaUrl = args(0)
val topicNAme = args(1)
val sinkFile = args(2)
val processTimeMs = args(3).toLong
val streams = streamFromKafka(s, kafkaUrl, topicNAme, trivialKafkaParseFn)
streamingToHDFS(streams, sinkFile, processTimeMs)
s.streams.awaitAnyTermination()
}
def session(): SparkSession = {
val builder = SparkSession.builder()
builder.config(sparkConf).getOrCreate()
}
private def sparkConf: SparkConf = {
val conf = new SparkConf()
conf.setAppName("SSSPlayground")
conf
}
def streamingToHDFS(df: Dataset[Payload], sinkFile: String, processTimeMs: Long): StreamingQuery = {
val checkpointFilename = sinkFile + "checkpoint"
val streamingQuery = df.writeStream.format("parquet")
.outputMode(OutputMode.Append()) // Data source parquet does not support Complete output mode;
.option("path", sinkFile)
.option("checkpointLocation", checkpointFilename)
.trigger(Trigger.ProcessingTime(processTimeMs))
.partitionBy("period")
.start()
streamingQuery
}
type KafkaParseFn = (String, String) => Option[Payload]
val trivialKafkaParseFn: KafkaParseFn = { case (_, v) => Some(Payload(v, (v.hashCode % 10).toString)) }
def streamFromKafka(session: SparkSession, kafkaUrl: String, topicName: String, fn: KafkaParseFn): Dataset[Payload] = {
val df = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaUrl)
.option("subscribe", topicName)
.option("offset", "earliest")
.option("startingOffsets", "earliest")
.load()
import df.sqlContext.implicits._
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)].flatMap { case (key, value) =>
fn(key, value)
}
}
}