import java.util.UUID
import org.apache.spark.sql.streaming.ProcessingTime
import util.Boot
object KafkaToHdfsUsingSpark extends Boot {
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString
val upstream = spark.readStream
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test,Airport,Airports,Carriers,Planedata")
.option("startingOffsets", "earliest")
.selectExpr("topic", "CAST(value AS STRING)")
val downstream = upstream
.partitionBy("topic") // Partition by topic. it will create directory by topic name topic=Airport,topic=Carriers,topic=Planedata etc
.option("path", "/tmp/data")
.option("checkpointLocation", checkpointLocation)
