# IoT Streaming Analytics Demo
This is a demo notebook for dashDB's integrated Apache Spark environment and how it can be used to process and land streaming data in dashDB tables.

<img src="https://ibm.box.com/shared/static/6dr6gou6xr1alfwnj6zkj0tbsyc64vda.png", width=700>

If you are new to dashDB and it's integrated Apache Spark capabilities you may first want to check out this [overview article](http://www.ibmbigdatahub.com/blog/evolving-enterprise-data-warehouse-beyond-sql-apache-spark).

## Set up IoT demo data producer
This demo relies on a little data producer container that you have to set up in addition to your dashDB local and Jupyter notebook containers on the same host machine. You can find it [here](https://github.com/ibmdbanalytics/dashdb_analytic_tools/tree/notebook-dev/dashdblocal_notebooks/iot_producer) in a sub folder of the dashDB Jupyter notebook container project. Please follow the instructions found there and perform the few simple steps to set it up. 

This producer will set up a Kafka server on port `9092` with a Kafka topic named `iot4dashdb` and permanently flow in there messages about wind turbine device measurements.

## Streaming environment setup for Spark

We need the kafka streaming library in the classpath. Loading it via `%AddDep` magic does not currently work because of classloader issues (Spark datasource lookup doesn't find it). The dashDB container gets the kafka library deployed in the subsequent step. So adding the dependency is not necessary to execute the code. However the goal of this notebook is to eventually export, compile and deploy it as a Spark application inside dashDB. The compilation is going to be performed in the Jupyter container. For this reason we still need to add the kafka library as a compile-time dependency inside comments. The deployment function treats commented %AddDeps as a compile-only dependecy.

In [None]:
//%AddDeps org.apache.spark spark-streaming-kafka-0-10-assembly_2.11 2.3.2
true // to avoid a syntax error because of an empty code cell

To prepare the Spark runtime for kafka streaming inside dashDB, we need to download the kafka assembly explicitly and place it in the spark/defaultlibs directory for the current user. This is done in the following cell. When you do this for the first time in your user's envuironment, you need to restart the kernel in order to make it effective in the classpath. You can do this by selecting `File->Close and Halt` and then opening this notebook again.

Attention Any .jar in $HOME/spark/defaultlibs will be placed into the classpath for **all** Spark apps submitted
by the user. With large assemblies like kafka, which contain widely used packages, e.g. jackson or apache commons, 
this may override classes that your applciation expects, so be careful.

In [None]:
//NOT-FOR-APP
val path = "spark/defaultlibs"
val kafka_assembly = "spark-streaming-kafka-0-10-assembly_2.11-2.3.2.jar"
val url = "http://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10-assembly_2.11/2.3.2/" + kafka_assembly
if (new java.io.File(path + "/" + kafka_assembly).exists()) {
    print("found kafka assembly")
} else {
    import scala.sys.process._
    s"wget -nv -P $path $url" ! ;
    print("loaded kafka assembly, please restart the notebook")
}

All the imports we need for our demo

In [None]:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.streaming.Durations
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.sql.SaveMode

// import $ notation for Spark SQL
val sparkImplicits = spark.implicits
import sparkImplicits._

Kafka configuration to connect to our IoT demo data producer

In [None]:
val brokers = "localhost:9092"
val topics = "iot4dashdb"
val topicList = topics.split(",")

val kafkaParams = Map[String, Object] (
    "bootstrap.servers" -> brokers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "democonsumer",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
)

type inputStream = DStream[ConsumerRecord[String,String]]

Now we define a utility function to set up a Kafka Streaming context, and run it for an optionally defined period of timeoutMillisecs with a given processing function. When we run this interactively in a notebook cell, it's important to set the timeout to to a specific value stop the streaming context again. Otherwise it will keep running and produce cell output, even after the cell has finished executing.

Note that a streaming context cannot be re-used after it has executed, so we can't keep it across cells and therefore we have to use the utility function to re-create it before every execution.

We use a finally block to make sure the context is stopped even in case of exception; otherwise the whole streaming infrastructure becomes unusable within the notebook kernels lifetime.

In [None]:
var timeoutMillisecs: Option[Long] = None

In [None]:
def runStreamingContext(processMessages: inputStream => Unit): Unit = {
    val ssc = new StreamingContext(spark.sparkContext, Durations.seconds(3))
    val messages = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent,
                ConsumerStrategies.Subscribe[String,String](topicList, kafkaParams)
            );
    processMessages(messages)
    
    try {
        ssc.start()
        timeoutMillisecs match {
            case Some(t) => ssc.awaitTerminationOrTimeout(t)
            case _ => ssc.awaitTermination()
        }
    } finally {
        ssc.stop(false, true)
    }
}

When running interactively in the notebook we want all streaming tests to run only for 10 seconds and stop automatically. Later, when we deploy the notebook as a stand-alone Spark application, we want the processing to continue until the application is stopped externally. Hence we tag the cell NOT-FOR-APP.

In [None]:
//NOT-FOR-APP
timeoutMillisecs = Some(10000)

## Data exploration

First, we simply iterate over the batch RDDs and print out their content, just to explore the data in our Kafka queue. We also don't need this when we deploy the notebook as application. So we tag the cell accordingly.

Before you execute the cell below, you will need to start the producer container that writes into the kafka queue. When the IoT producer container is not started or has already terminated, the streaming computation in the following cell will hang and never return.

In [None]:
//NOT-FOR-APP
runStreamingContext(messages =>
  // The ConsumerRecord objects produced by a Kafa queue are not serializable, need to map them to a String
  // before collecting
  messages.map(record => s"${record.key} -> ${record.value}").
    foreachRDD(rdd => rdd.collect().foreach(println))
)

So this gives us an idea about the JSON format we can expect from our data source. Lets try to parse each batch into a DataSet with Spark SQL. We don't want Spark to perform automatic JSON schema detection over and over again for each batch, so we give it a pre-defined schema

In [None]:
val schema = (new StructType).
    add("payload", (new StructType).
        add("temperature", LongType).
        add("tempOutside", LongType).
        add("powerProd", LongType).
        add("noiseLevel1", LongType).
        add("time", TimestampType)).
    add("deviceId", StringType).
    add("deviceType", StringType).
    add("eventType", StringType)

As we can see, the produced data has a nested structure. 
Spark can deal with this type of schema, but it's not suitable being stored in a plain SQL database table, so we flatten it to straight rows.

In [None]:
//NOT-FOR-APP
def processStream1(messages: inputStream): Unit = {
    messages.map(_.value).foreachRDD(rdd => {
        val ds = spark.read.schema(schema).json(rdd)
        val flatDataset = ds.select($"payload.temperature", $"payload.tempOutside",
            $"payload.powerProd", $"payload.noiseLevel1", $"payload.time",
            $"deviceId", $"deviceType", $"eventType")
        flatDataset.select("deviceId", "time", "temperature", "tempOutside", "powerProd", "noiseLevel1").show(false)
    })
}
runStreamingContext(processStream1)

## Write IoT records to a persistent table
In the desired deployed application we want to write the records read from the queue into a dashDB table instead of only printing them out the datasets. In this demonstration here we will always first empty the table in the first batch after the application is started. We use Spark's 'Overwrite' mode for writing the first batch to clear a pre-existing table and then use 'Append' mode in subsequent batches.

In [None]:
val tableName = "USERDATA.IOT_EVENTS"

var saveMode = SaveMode.Overwrite
def processStream2(messages: inputStream): Unit = {
    messages.map(_.value).foreachRDD(rdd => {
        val ds = spark.read.schema(schema).json(rdd)
        val flatDataset = ds.select($"payload.temperature", $"payload.tempOutside",
            $"payload.powerProd", $"payload.noiseLevel1", $"payload.time",
            $"deviceId", $"deviceType", $"eventType")
        flatDataset.write.format("com.ibm.idax.spark.idaxsource").
            option("dbtable", tableName).
            option("allowAppend", "TRUE").
            mode(saveMode).
            save()
        println("Batch written to database")
        saveMode = SaveMode.Append
    })
}
runStreamingContext(processStream2)

In case you want to capture some longer timeframes of events in this notebook, increase the timeoutMillisecs e.g. by running the cell below and then running the previous cell again.

In [None]:
//NOT-FOR-APP
timeoutMillisecs = Some(120000)

## Deploying automatic background IoT landing app inside dashDB

Finally you can deploy the essential parts of this notebook (i.e. those **not** marked with //NOT-FOR-APP) as a stand-alone scala application into dashDB. To do this, select `File -> Deploy as -> Deploy to dashDB Spark` from the menu. Then use one of the alternatives shown in the result page to launch the application, e.g. using the [SPARK_SUBMIT](https://www.ibm.com/support/knowledgecenter/SS6NHC/com.ibm.swg.im.dashdb.analytics.doc/doc/r_spark_applications_spark_submit.html) stored procedure. 

If you want to take a look at the generated code, select `File -> Download as -> Scala class (in browser)`

This notebook is set up so that the timeout does not apply in the exported application, so your streaming context will continue to run and insert events into the database until the application is explicitly stopped via [spark-submit.sh --kill](https://www.ibm.com/support/knowledgecenter/SS6NHC/com.ibm.swg.im.dashdb.doc/learn_how/spark_ref.html), the 
[CANCEL_APP](https://www.ibm.com/support/knowledgecenter/SS6NHC/com.ibm.swg.im.dashdb.analytics.doc/doc/r_spark_applications_cancel_app.html) stored procedure, the [/public/apps/cancel](https://developer.ibm.com/clouddataservices/wp-content/themes/projectnext-clouddata/dashDBanalytics/#/) endpoint of dashDB's REST API or the Spark monitoring UI in the dashDB console (`Monitor->Workloads->Spark`).

**For your convenience** you can simply use the cells in the next section to start and stop the deployed app in the background.

## Launching deployed IoD landing app in background
Run the following cell to launch the just deployed IoD landing app inside dashDB. The cell keeps a database connection open that we will use to stop the landing app again when you execute the subsequent cell.

In [None]:
//NOT-FOR-APP
import java.sql.DriverManager
import java.sql.Connection
import java.sql.SQLException
val connection = DriverManager.getConnection("jdbc:db2:BLUDB")
var iot_lander_submission_id : String = null
try {   
    val sp_call = connection.prepareCall(
      "CALL IDAX.SPARK_SUBMIT(?, '{ \"appResource\" : \"IoT_demo-assembly-1.0.jar\", " +
                                    "\"mainClass\" : \"SampleApp\"}', " +
                                    "'mode=async')")
    sp_call.registerOutParameter(1, java.sql.Types.VARCHAR);
    sp_call.executeUpdate();
    iot_lander_submission_id = sp_call.getString(1)
    println("Successfully launched IoT landing app with submission id " + iot_lander_submission_id);
} catch {
    case e: SQLException => { println("Error: " + e) }
    iot_lander_submission_id = null
}

You can use the cells in the subsequent section to observe the progress of landing IoT messages in the target table. When you are done, you can run the following cell to stop the IoT landing app that you started in the background in the previous cell.

In [None]:
//NOT-FOR-APP
if(iot_lander_submission_id !=null) {
    try {   
        val sp_call = connection.prepareCall("CALL IDAX.CANCEL_APP(?)")
        sp_call.setString(1, iot_lander_submission_id);
        sp_call.executeUpdate();
        println("Successfully stopped IoT landing app with submission id " + iot_lander_submission_id)
    } catch {
        case e: SQLException => { println("Error: " + e) }
    }
} else println("No submission ID defined")
connection.close()

## Check landed IoT data
You can verify the landed data in dashDB. For this we simply establish a data frame on the result table.

In [None]:
//NOT-FOR-APP
var landed_iot_data = spark.read.
   format("com.ibm.idax.spark.idaxsource").
   option("url", "jdbc:db2:BLUDB").
   option("dbtable", tableName).
   load()

Now we show the content. You can verify the progress of your deployed landing app by running the next cell repeatetly and check the new records and overall count.

In [None]:
//NOT-FOR-APP
println("Total IoT records: "+ landed_iot_data.count())
println("Newest 10 IoT records:")
landed_iot_data.select("deviceId", "time", "temperature", "tempOutside", "powerProd", "noiseLevel1").
                orderBy(org.apache.spark.sql.functions.col("time").desc).show(10, false)