# Scala notebook for analyzing archived IoT event data

This notebook uses Scala and Spark SQL to read and transform archived IoT event data. The purpose of the notebook is to analyze historic (i.e. archived) IoT Event Data comming from Elevators, and then determine at which dates specific elevators have been impacted by maintenance stops caused by overheated engines. 

The notebook has 3 sections:
1. Connect to Cloud Object Store to read the archived IoT event data into a Spark SQL Data Frame.
2. Filter and aggregate the event data so that it shows the dates where maintenance stops occured.
3. Save the information to a DB2 data table for further analysis e.g. by Business Intelligence tools.

Spark SQL is documented at: https://spark.apache.org/docs/latest/sql-programming-guide.html

## Import data from Cloud Object Storage

An initial step creates the connection to the Cloud Object Storage and reads the data from the archived Cloudant NoSQL Database into a list of Spark SQL Data Frames.

In [1]:
// Insert the connection to the file on Cloud Object Storage
<here>

val dfList1 = List("db05.json","db07.json","db08.json","db09.json","db10.json").map(db => spark.read.json(cos.url("iotelevator-donotdelete-pr-dchdyhr697izhi", db)))

Waiting for a Spark session to start...



## Filter and aggregate event data using Spark SQL¶

The initial dataframe will be filtered by removing empty rows as wells as event data where the motor temperature is below 200 degree Fahrenheit. In a final step, the information will be aggregated and sorted to show just the elevator name, the date as well as the max motor temperature on the date where an issue occured.

In [2]:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

object ElevatorEventDataTransformer {   
    def transform(dfData: DataFrame): DataFrame = {
      dfData.filter(dfData.col("key").!==("null"))
            .select("doc.deviceId","doc.timestamp","doc.data.d.motorTemp")
            .filter(($"doc.data.d.motorTemp").>(200))
            .withColumn("date",($"timestamp").substr(0,10).cast("date"))
            .groupBy("deviceId","date").max("motorTemp")
            .orderBy(($"date").asc)
    }
};

In [3]:
val dfListT = dfList1.map(df => ElevatorEventDataTransformer.transform(df))
val dfReduced = dfListT.reduce((df1,df2) => df1.union(df2))
dfReduced.show()

|  deviceId|      date|max(motorTemp)|
+----------+----------+--------------+
|Elevator01|2017-05-08|         214.0|
|Elevator01|2017-07-24|         215.0|
|Elevator01|2017-08-03|         220.0|
|Elevator01|2017-08-07|         220.0|
|Elevator03|2017-08-07|         213.0|
+----------+----------+--------------+



# Write to DB2

Writing to DB2 is done in this notebook using a simple scheme that will create the database table and simply append the rows in the data frame to the table.

The code has been inspired by the following two resources:
1. http://bigdatums.net/2016/10/16/writing-to-a-database-from-spark/
2. http://support.datascientistworkbench.com/knowledgebase/articles/829689-access-dashdb-or-db2-using-jdbc-from-scala-noteb

In [None]:
// Insert the credentials for the Db2 service
<here>


In [None]:
// Compute properties and URL for the connection
val prop = new java.util.Properties
prop.setProperty("driver", "com.ibm.db2.jcc.DB2Driver")
prop.setProperty("user", credentials_1("username"))
prop.setProperty("password", credentials_1("password")) 
 
//jdbc mysql url - destination database is named "data"
//val url = "jdbc:mysql://localhost:3306/data"
val url = List("jdbc:db2://", credentials_1("host"), ":", credentials_1("port"), "/", credentials_1("database")).mkString("");
 
//destination database table 
val table = "sample_data_table"

In [None]:
//write data from spark dataframe to database
dfReduced.write.mode("overwrite").jdbc(url, table, prop)