# Structured Streaming - Kafka Example

The intention of this example is to explore the main aspects of the Structured Streaming API.

 - We use the Kafka `source` to consume the `iot-data` topic.
 - We use a file `sink` to store the data into a _Parquet_ file.
 
To run this example, you also require:

- a running  Kafka broker. We suggest to use the easy-to-run _dockerized_ version maintained by Spotify: https://hub.docker.com/r/spotify/kafka/
- a reference file, listing parameters for each sensor. This file must be generated with [reference-data-generator](./reference-data-generator.snb.ipynb)
- our data producer notebook: [kafka_sensor_data_producer](./kafka_sensor_data_producer.snb.ipynb)

Because Kafka acts as a broker between producer and consumer, you can choose to run the two notebooks in any order. 
Nevertheless, we suggest that you run the producer first to have data available when we go through this example.

In [ ]:
import org.apache.spark.sql.kafka010._

In [ ]:
import java.io.File
// Kafka
val kafkaBootstrapServer = "127.0.0.1:9092"
val topic = "iot-data"

// File system
val workDir = "/tmp/streaming-with-spark"
val referenceFile = "sensor-records.parquet"
val targetFile = "structured_enrichedIoTStream.parquet"
val targetPath = new File(workDir, targetFile).getAbsolutePath
val unknownSensorsTargetFile = "unknownSensorsStream.parquet"
val unknownSensorsTargetPath = new File(workDir, unknownSensorsTargetFile).getAbsolutePath


In [ ]:
val rawData = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBootstrapServer)
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
      .load()

In [ ]:
rawData.isStreaming

In [ ]:
rawData.printSchema()

In [ ]:
case class SensorData(sensorId: Int, timestamp: Long, value: Double)

In [ ]:
val iotData = rawData.select($"value").as[String].flatMap{record =>
  val fields = record.split(",")
  Try {
    SensorData(fields(0).toInt, fields(1).toLong, fields(2).toDouble)
  }.toOption
}

## Load the reference data from a parquet file¶
We also cache the data to keep it in memory and improve the performance of our steaming application

In [ ]:
val sensorRef = sparkSession.read.parquet(s"$workDir/$referenceFile")
sensorRef.cache()

## Join the Reference Data with the Stream to Compute the Enriched Values

In [ ]:
val sensorWithInfo = sensorRef.join(iotData, Seq("sensorId"), "inner")

val knownSensors = sensorWithInfo
  .withColumn("dnvalue", $"value"*($"maxRange"-$"minRange")+$"minRange")
  .drop("value", "maxRange", "minRange")

## Write the Results to a Parquet File

In [ ]:
val query = knownSensors.writeStream
  .outputMode("append")
  .format("parquet")
  .option("path", targetPath)
  .option("checkpointLocation", workDir + "/iot-checkpoint")
  .start()

In [ ]:

query.recentProgress