# Data Aggregation ETL Job

This notebook will aggregate IoT Event data stored in Cloudant and save the aggregated information in a dashDB Data Warehouse. It uses Spark Data Frames to store the information.

The notebook is divided into several sections:

1. Common Declaration
2. Read event data from Cloudant into Spark Data Frames
3. Aggregate event data using Data Frame operations
4. Move aggregated event data in the Data Frames into dashDB Tables
5. Merge aggregated event data into target dashDB tables using ibmdbpy and SQL.

## Common Declarations

The following fragment of code sets the scene by defining imports, global variables and connection functions:

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import functions as sqlfunc

#debug
debug = 1

#Initializations
# Define Spark configuration
conf = SparkConf()

# Initialize a SparkContext and SQLContext
#sc = SparkContext(conf=conf)
#sql_ctx = SQLContext(sc)

#Please don't modify this function
def readDataFrameFromCloudant(host,user,pw,database):
    cloudantdata=spark.read.format("com.cloudant.spark"). \
    option("cloudant.host",host). \
    option("cloudant.username", user). \
    option("cloudant.password", pw). \
    load(database)

    cloudantdata.createOrReplaceTempView("elevator_telemetrics")
    return cloudantdata

## Read Event Data from Cloudant 

Credentials for connecting to Cloudant DB. You will need to insert your credentials below:

In [5]:
# @hidden_cell
credentials_cloudant = {
  'hostname':'<hostname>',
  'user':'<user>',
  'password':'<password>',
  'database':'<database bucket>'
}

import datetime
now = datetime.datetime.now()
print now
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
print yesterday

print yesterday.strftime('%Y_%m_%d')

2017-09-22 03:26:28.992970
2017-09-21 03:26:28.993516
2017_09_21


Extracts data from Cloudant and associates it with a Data Frame.

In [3]:
cdf=readDataFrameFromCloudant(credentials_cloudant['hostname'], credentials_cloudant['user'], credentials_cloudant['password'], credentials_cloudant['database'])
if debug : cdf.show()
#spark.sql("SELECT deviceId, deviceType, timestamp, data.d.motorTemp from elevator_telemetrics").show()

+--------------------+--------------------+--------------------+----------+----------+---------+------+--------------------+
|                 _id|                _rev|                data|  deviceId|deviceType|eventType|format|           timestamp|
+--------------------+--------------------+--------------------+----------+----------+---------+------+--------------------+
|00000930-33e1-11e...|1-436a34550c3fbba...|[[4.0,73,1,1.0,0....|Elevator02|  Elevator|   Status|  json|2017-05-08T11:25:...|
|00016aa0-3a35-11e...|1-ba05047d8a33b08...|[[0.0,92,1,0.0,0....|Elevator01|  Elevator|   Status|  json|2017-05-16T12:41:...|
|00026f70-362c-11e...|1-3fc9a3a790faae9...|[[0.0,94,1,0.0,0....|Elevator02|  Elevator|   Status|  json|2017-05-11T09:27:...|
|0002f140-3a35-11e...|1-7fa9ac529db082f...|[[0.0,74,1,2.0,0....|Elevator02|  Elevator|   Status|  json|2017-05-16T12:41:...|
|00033d80-33e1-11e...|1-fbaaa184b48d0f7...|[[4.0,85,1,3.0,0....|Elevator03|  Elevator|   Status|  json|2017-05-08T11:25:...|


The following lines of code adds a column date and motor temperature as integer to the Data Frame using SQL functions. For a complete list see:
http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html

In [4]:
from pyspark.sql.functions import substring
from pyspark.sql.functions import rint

cdf_select = cdf.select("deviceId","deviceType","timestamp", "data.d.motorTemp")
cdf_date = cdf_select.withColumn("date", substring('timestamp',1,10))
cdf_mtemp = cdf_date.withColumn("motorTempInt", rint('motorTemp'))

cdf_mtemp.cache()

if debug : cdf_mtemp.show()

+----------+----------+--------------------+------------------+----------+------------+
|  deviceId|deviceType|           timestamp|         motorTemp|      date|motorTempInt|
+----------+----------+--------------------+------------------+----------+------------+
|Elevator02|  Elevator|2017-05-08T11:25:...|166.80070000000157|2017-05-08|       167.0|
|Elevator01|  Elevator|2017-05-16T12:41:...|166.77350000000345|2017-05-16|       167.0|
|Elevator02|  Elevator|2017-05-11T09:27:...|173.50340000000372|2017-05-11|       174.0|
|Elevator02|  Elevator|2017-05-16T12:41:...|166.73740000000333|2017-05-16|       167.0|
|Elevator03|  Elevator|2017-05-08T11:25:...| 155.1188000000015|2017-05-08|       155.0|
|Elevator03|  Elevator|2017-05-11T09:27:...|166.31770000000355|2017-05-11|       166.0|
|Elevator07|  Elevator|2017-05-11T08:51:...|165.14320000000126|2017-05-11|       165.0|
|Elevator10|  Elevator|2017-05-08T11:46:...|168.11020000000303|2017-05-08|       168.0|
|Elevator01|  Elevator|2017-05-1

## Aggregate Event Data 

The following lines of code do the aggregation using groupBy and agg.
The operations are documented here:
http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame

In [5]:
from pyspark.sql.functions import min
from pyspark.sql.functions import max
from pyspark.sql.functions import avg

#cdf_mtemp.filter(cdf_mtemp.motorTempInt > 164).show()
cdf_aggtemp = cdf_mtemp.groupBy('deviceId','deviceType','date').agg(min('motorTemp'),avg('motorTemp'),max('motorTemp'))
if debug : cdf_aggtemp.show()

+----------+----------+----------+------------------+------------------+------------------+
|  deviceId|deviceType|      date|    min(motorTemp)|    avg(motorTemp)|    max(motorTemp)|
+----------+----------+----------+------------------+------------------+------------------+
|Elevator02|  Elevator|2017-05-08|          163.7207|166.94495418060362|170.10070000000326|
|Elevator09|  Elevator|2017-05-16|          159.3389|162.97127758112282|166.67890000000375|
|Elevator05|  Elevator|2017-05-08|          161.2412|164.43060221402376|167.78120000000334|
|Elevator01|  Elevator|2017-05-11|          150.5035|156.09050301205096|161.60350000000568|
|Elevator08|  Elevator|2017-05-08|          151.5074|154.53478007380232|157.68740000000315|
|Elevator06|  Elevator|2017-05-16|161.39589999999998|164.91468609310735|168.39590000000356|
|Elevator06|  Elevator|2017-05-08|157.02169999999998|160.09710961408416| 163.1017000000031|
|Elevator01|  Elevator|2017-05-08|          151.0245|172.97757808311005|        

The following lines of code aggregate the event data to determine the distribution of temperatures for each elevator by date.

In [6]:
from pyspark.sql.functions import count

cdf_distrtemp = cdf_mtemp.groupBy('deviceId','deviceType','date','motorTempInt').agg(count('motorTempInt'))
if debug : cdf_distrtemp.show()

+----------+----------+----------+------------+-------------------+
|  deviceId|deviceType|      date|motorTempInt|count(motorTempInt)|
+----------+----------+----------+------------+-------------------+
|Elevator08|  Elevator|2017-05-16|       161.0|                250|
|Elevator02|  Elevator|2017-05-16|       161.0|                237|
|Elevator08|  Elevator|2017-05-11|       157.0|                199|
|Elevator07|  Elevator|2017-05-08|       156.0|                219|
|Elevator07|  Elevator|2017-05-16|       168.0|                250|
|Elevator06|  Elevator|2017-05-11|       165.0|                231|
|Elevator02|  Elevator|2017-05-08|       167.0|                231|
|Elevator07|  Elevator|2017-05-08|       160.0|                 88|
|Elevator05|  Elevator|2017-05-11|       179.0|                 75|
|Elevator10|  Elevator|2017-05-16|       177.0|                 87|
|Elevator10|  Elevator|2017-05-16|       174.0|                237|
|Elevator01|  Elevator|2017-05-16|       164.0| 

## Save Aggregated Data to dashDB Tables

In order to connect this notebook to dashDB systems, modify the credentials cell here. To do so click "Find and Add Data" at top right of the screen, then select "Connection" and select "Insert to code" for the dashDB system of your choice. Make sure you have a dashDB connection set up in your project beforehand.

In [7]:
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# @hidden_cell
# The following code is used to access your data and contains your credentials.
# You might want to remove those credentials before you share your notebook.

credentials_dashdb = {
    'host' : '<host>',
    'port' : '50000',
    'database' : 'BLUDB',
    'jdbcurl': '<jdbc>',
    'username': 'dash6769',
    'password': '<password>'
}

Next step is to save the data to dashDB. The lines of code relevant for doing that have been borrowed from: https://apsportal.ibm.com/analytics/notebooks/55e7fe57-c002-4180-bfa3-83fc6466a875/view?access_token=1792df00fbbcf6df08f73bec0055b34e988d323eea59aafc6cd12d2a4ea86ed0
Unlike mentioned in that tutorial, it is no longer neded to install pixiedust and a custom JDBC driver dialect.

!pip install --user --upgrade pixiedust
print("Please restart the kernel and then run through the notebook again (skipping this cell).")

import pixiedust
print dir(pixiedust)

In [8]:
from py4j.protocol import Py4JJavaError

try:
   cdf_aggtemp.write.jdbc(credentials_dashdb["jdbcurl"], "TEMP_DATAFRAME_AGGR_EVENT_DATA", properties = {"user" : credentials_dashdb["username"], "password" : credentials_dashdb["password"]}, mode="overwrite")
except Py4JJavaError:
   print "Oops!"

Oops!


In [9]:
cdf_aggtemp.write.jdbc(credentials_dashdb["jdbcurl"], "TEMP_DATAFRAME_AGGR_EVENT_DATA", properties = {"user" : credentials_dashdb["username"], "password" : credentials_dashdb["password"]}, mode="overwrite")

In [10]:
cdf_distrtemp.write.jdbc(credentials_dashdb["jdbcurl"], "TEMP_DATAFRAME_DISTR_EVENT_DATA", properties = {"user" : credentials_dashdb["username"], "password" : credentials_dashdb["password"]}, mode="overwrite")

## Move Aggregated Data to Target Tables

The aggregated data has been stored in a temporary table named TEMP_DATAFRAME_AGGR_EVENT_DATA in dashDB. From there it needs to be moved to the table named ELEVATOR_EVENTS_AGGREGATED_BY_DAY that holds all aggregated data from previous runs of the ETL job. 

In this notebook we will use ibmdpy to create a connection to dashdb as described in: 
https://apsportal.ibm.com/analytics/notebooks/5a59ba9b-02b2-40e4-b955-9727cb68c88b/view?access_token=09240b783432f1a62004bcc82b48a7aed07afc401e2f94a77c7e087b74d8c053

Having established the connection, an SQL statement will be submitted that merges the content of the table TEMP_DATAFRAME_DISTR_EVENT_DATA to the table ELEVATOR_EVENTS_AGGREGATED_BY_DAY suing the following SQL statement:

MERGE INTO ELEVATOR_EVENTS_AGGREGATED_BY_DAY t
USING (SELECT s."deviceId" AS DEVICEID, s."deviceType" AS DEVICETYPE, DATE(s."date") AS DATE, ROUND("min(motorTemp)",0) AS MINMOTORTEMP, ROUND("avg(motorTemp)",0) AS AVGMOTORTEMP, ROUND("max(motorTemp)",0) AS MAXMOTORTEMP FROM TEMP_DATAFRAME_AGGR_EVENT_DATA s) e ON (e.DATE=t.DATE AND e.DEVICEID = t.DEVICEID)
WHEN MATCHED THEN 
    UPDATE SET t.MINMOTORTEMP = e.MINMOTORTEMP, t.AVGMOTORTEMP = e.AVGMOTORTEMP, t.MAXMOTORTEMP = e.MAXMOTORTEMP
WHEN NOT MATCHED THEN 
    INSERT (DATE, DEVICEID, DEVICETYPE, MINMOTORTEMP, AVGMOTORTEMP, MAXMOTORTEMP) 
    VALUES (e.DATE, e.DEVICEID, e.DEVICETYPE, e.MINMOTORTEMP,e.AVGMOTORTEMP,e.MAXMOTORTEMP);
    
Notice that the merge makes the ETL job idempotent. Even if run several times it will produce the same result by updating existing records in the target database.

In [11]:
import ibmdbpy
from ibmdbpy import IdaDataBase
idadb = idadb = IdaDataBase(dsn="DASHDB;Database=BLUDB;Hostname=" + credentials_dashdb["host"] + ";Port=" + credentials_dashdb["port"] + ";PROTOCOL=TCPIP;UID=" + credentials_dashdb["username"] + ";PWD=" + credentials_dashdb["password"])

In [12]:
from ibmdbpy import IdaDataFrame

numberofRecsBefore = idadb.ida_scalar_query("SELECT count(*) FROM ELEVATOR_EVENTS_AGGREGATED_BY_DAY")

query =         'MERGE INTO ELEVATOR_EVENTS_AGGREGATED_BY_DAY t '
query = query + 'USING (SELECT s."deviceId" AS DEVICEID, s."deviceType" AS DEVICETYPE, DATE(s."date") AS DATE, ROUND("min(motorTemp)",0) AS MINMOTORTEMP, ROUND("avg(motorTemp)",0) AS AVGMOTORTEMP, ROUND("max(motorTemp)",0) AS MAXMOTORTEMP '
query = query + 'FROM TEMP_DATAFRAME_AGGR_EVENT_DATA s) e ON (e.DATE=t.DATE AND e.DEVICEID = t.DEVICEID) '
query = query + 'WHEN MATCHED THEN UPDATE SET t.MINMOTORTEMP = e.MINMOTORTEMP, t.AVGMOTORTEMP = e.AVGMOTORTEMP, t.MAXMOTORTEMP = e.MAXMOTORTEMP ' 
query = query + 'WHEN NOT MATCHED THEN INSERT (DATE, DEVICEID, DEVICETYPE, MINMOTORTEMP, AVGMOTORTEMP, MAXMOTORTEMP) VALUES (e.DATE, e.DEVICEID, e.DEVICETYPE, e.MINMOTORTEMP,e.AVGMOTORTEMP,e.MAXMOTORTEMP);'
print query

result = idadb.ida_query(query)

numberofRecsAfter = idadb.ida_scalar_query("SELECT count(*) FROM ELEVATOR_EVENTS_AGGREGATED_BY_DAY")

print("Number of recs before: " + str(numberofRecsBefore))
print("Number of recs after: " + str(numberofRecsAfter))

idadb.commit()

MERGE INTO ELEVATOR_EVENTS_AGGREGATED_BY_DAY t USING (SELECT s."deviceId" AS DEVICEID, s."deviceType" AS DEVICETYPE, DATE(s."date") AS DATE, ROUND("min(motorTemp)",0) AS MINMOTORTEMP, ROUND("avg(motorTemp)",0) AS AVGMOTORTEMP, ROUND("max(motorTemp)",0) AS MAXMOTORTEMP FROM TEMP_DATAFRAME_AGGR_EVENT_DATA s) e ON (e.DATE=t.DATE AND e.DEVICEID = t.DEVICEID) WHEN MATCHED THEN UPDATE SET t.MINMOTORTEMP = e.MINMOTORTEMP, t.AVGMOTORTEMP = e.AVGMOTORTEMP, t.MAXMOTORTEMP = e.MAXMOTORTEMP WHEN NOT MATCHED THEN INSERT (DATE, DEVICEID, DEVICETYPE, MINMOTORTEMP, AVGMOTORTEMP, MAXMOTORTEMP) VALUES (e.DATE, e.DEVICEID, e.DEVICETYPE, e.MINMOTORTEMP,e.AVGMOTORTEMP,e.MAXMOTORTEMP);
Number of recs before: 30
Number of recs after: 30
