# Structured Streaming - Kafka Example
This example ports our IoT data flow from Kafka to Parquet in a Structured Streaming version.  

The intention of this example is to explore the main aspects of the Structured Streaming API.
We will: 
 - use the Kafka `source` to consume the `sensor-raw` topic.
 - implement the application logic using the Dataset API
 - use a file `sink` to store the data into a _Parquet_ file.
 
 
This example lets us compare and contrast the Structured Streaming approach with the Spark Streaming implementation that we already know.

In [ ]:
val sourceTopic = "sensor-raw"
val targetTopic = "sensor-parsed"
val workDir = "/tmp"
val referenceFile = "sensor-records.parquet"
val kafkaBootstrapServer = "172.17.0.2:9092"

sourceTopic: String = sensor-raw
targetTopic: String = sensor-parsed
workDir: String = /tmp
referenceFile: String = sensor-records.parquet
kafkaBootstrapServer: String = 172.17.0.2:9092


In [ ]:
val rawData = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBootstrapServer)
      .option("subscribe", sourceTopic)
      .option("enable.auto.commit", true)
      .option("group.id", "sensor-data-consumer")
      .option("startingOffsets", "latest")
      .load()

rawData: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]


In [ ]:
rawData.isStreaming

res3: Boolean = true


In [ ]:
rawData.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



## We need to declare the schema of the data in the stream.
this is a handy way to do so.

In [ ]:
case class SensorData(id: String, ts: Long, temp: Double, hum: Double)

defined class SensorData


In [ ]:
import org.apache.spark.sql.Encoders
val schema = Encoders.product[SensorData].schema

import org.apache.spark.sql.Encoders
schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,StringType,true), StructField(ts,LongType,false), StructField(temp,DoubleType,false), StructField(hum,DoubleType,false))


In [ ]:
val rawValues = rawData.selectExpr("CAST(value AS STRING)").as[String]
val jsonValues = rawValues.select(from_json($"value", schema) as "record")
val sensorData = jsonValues.select("record.*").as[SensorData]

rawValues: org.apache.spark.sql.Dataset[String] = [value: string]
jsonValues: org.apache.spark.sql.DataFrame = [record: struct<id: string, ts: bigint ... 2 more fields>]
sensorData: org.apache.spark.sql.Dataset[SensorData] = [id: string, ts: bigint ... 2 more fields]


## Load the reference data from a parquet file¶
We also cache the data to keep it in memory and improve the performance of our steaming application

In [ ]:
val sensorRef = sparkSession.read.parquet(s"$workDir/$referenceFile")
sensorRef.cache()

org.apache.spark.sql.AnalysisException: Path does not exist: file:/tmp/learningsparkstreaming/sensor-records.parquet;
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:360)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:348)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:344)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:348)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:559)
  at org.apache.spark.sql.DataFrameReader.parquet(DataF

# Join the incoming Streaming data with the reference table 

In [ ]:
val sensorWithInfo = sensorRef.join(iotData, Seq("sensorId"), "inner")

<console>:77: error: not found: value sensorRef
       val sensorWithInfo = sensorRef.join(iotData, Seq("sensorId"), "inner")
                            ^


In [ ]:
val knownSensors = sensorWithInfo.withColumn("dnvalue", $"value"*($"maxRange"-$"minRange")+$"minRange")
  .drop("value", "maxRange", "minRange")

<console>:68: error: not found: value sensorWithInfo
       val knownSensors = sensorWithInfo.withColumn("dnvalue", $"value"*($"maxRange"-$"minRange")+$"minRange")
                          ^


In [ ]:
val knownSensorsQuery = knownSensors.writeStream
  .outputMode("append")
  .format("parquet")
  .option("path", "/tmp/learning-spark-streaming/stst-known_sensors")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

<console>:68: error: not found: value knownSensors
       val knownSensorsQuery = knownSensors.writeStream
                               ^


In [ ]:

knownSensorsQuery.recentProgress

<console>:71: error: not found: value knownSensorsQuery
       knownSensorsQuery.recentProgress
       ^


In [ ]:
sensorData.writeStream
  .queryName("visualization")    // this query name will be the table name
  .outputMode("append")
  .format("memory")
  .start()

res11: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@131140da


In [ ]:
val sample = Seq((System.currentTimeMillis, 0.1))

val chart = CustomPlotlyChart(sample,
                  layout=s"{title: 'sensor data sample'}",
                  dataOptions="""{type: 'line'}""",
                  dataSources="{x: '_1', y: '_2'}")

sample: Seq[(Long, Double)] = List((1511907794533,0.1))
chart: notebook.front.widgets.charts.CustomPlotlyChart[Seq[(Long, Double)]] = <CustomPlotlyChart widget>


In [ ]:
chart

res17: notebook.front.widgets.charts.CustomPlotlyChart[Seq[(Long, Double)]] = <CustomPlotlyChart widget>


In [ ]:
val dataSample = sparkSession.sql("select * from visualization where ts > 1511908486082")
  

dataSample: org.apache.spark.sql.DataFrame = [id: string, ts: bigint ... 2 more fields]


In [ ]:
dataSample

res38: org.apache.spark.sql.DataFrame = [id: string, ts: bigint ... 2 more fields]


In [ ]:

res14.lastProgress

res37: org.apache.spark.sql.streaming.StreamingQueryProgress =
{
  "id" : "6193c21b-4b8b-4141-b545-0b288e01ca3c",
  "runId" : "45717cf3-7e65-4837-955c-5e6e8506dd25",
  "name" : "visualization",
  "timestamp" : "2017-11-28T21:58:43.088Z",
  "numInputRows" : 4,
  "inputRowsPerSecond" : 56.33802816901409,
  "processedRowsPerSecond" : 57.97101449275362,
  "durationMs" : {
    "addBatch" : 41,
    "getBatch" : 5,
    "getOffset" : 1,
    "queryPlanning" : 2,
    "triggerExecution" : 69,
    "walCommit" : 19
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[sensor-raw]]",
    "startOffset" : {
      "sensor-raw" : {
        "0" : 1100
      }
    },
    "endOffset" : {
      "sensor-raw" : {
        "0" : 1104
      }
    },
    "numInputRows" : 4,
  ...