# Using IBM Data Science Experience to visualize data from Watson IoT Platform

This is an installation of spark-cloudant package, that may be missing. In general any missing python packages can be installed this way into your dsx account.
This cell needs to be executed only once, restart the kernel afterwards.

In [None]:
!pip install --upgrade pixiedust
import pixiedust
pixiedust.installPackage("cloudant-labs:spark-cloudant:2.0.0-s_2.11")

**User input required**

Cloudant credentials.

If you have a connection with Cloudant set up for this project, do the following:
In order do import your Cloudant credentials, click on the cell below to select it. Then click on “Find and Add Data” button on the top right, select the Connections tab and click on “Insert to code”. A dictionary called credentials_1 should be added to the cell containing the credentials. If the dictionary has another name, change it to credentials_1. Then run the cell.
If you don’t have a connection with Cloudant set up, the credantials can be found on Bluemix dashboard: Go to your Cloudant service on Bluemix, go to its Service Credentials section on the left and click on View Credentials to view the username and password. Set username and password variables below with Cloudant’s username and password.

In [None]:
# This empty cell will be populated with your Cloudant credentials if you follow the steps explained above.

In [None]:
username = credentials_1["username"]
password = credentials_1["password"]

In [None]:
host = username + '.cloudant.com'

**User input required**

Cloudant database name.

If you are not sure which database has the data you want to import, go to your Cloudant service on Bluemix and click on Launch. Find out the database name and set dbName variable with it. An example of database name is iotp_abcdef_default_2017_07_12.

In [None]:
dbName = "put your Cloudant database name here"

Connect to Cloudant database generated by WIoTP connector for hystorical data.

The following piece of code connects to Cloudant NoSQL DB and returns an RDD data frame for the stored IoT data.
The line `option("jsonstore.rdd.partitions", 4)` is needed only if your Cloudant service plan is "lite" because this plan has an access quota of 5 requests per second. Spark may run parallel jobs that might lead to more than 5 requests being made in one second. If this happens, a "too many requests" error will be raised. If you experience this kind of error, reduce the value for the `jsonstore.rdd.partitions` option to 2. For paid service plans this line can be commented out.

In [None]:
cloudantdata=sqlContext.read.format("com.cloudant.spark").\
option("cloudant.host", host).\
option("cloudant.username", username).\
option("cloudant.password", password).\
option("view","_design/iotp/_view/by-date").\
option("jsonstore.rdd.partitions", 4).\
load(dbName)

let's observe the loaded data:

In [None]:
cloudantdata.show()

All of the IOT data is located under the value column. 
Next we will transform this hierarchical  data frame into a flat one, and convert our timestamp from string into a timestamp type.

The function withColumn adds a column named 'ts' to the data frame, and calculates it's content based on timestamp column (string), using the to_ts function we defined.

The cache() function of a data frame caches the data frame in memory, this is very useful when data is accessed repeatedly.
Most RDD operations are lazy. RDD operations that require observing the contents of the data cannot be lazy (These are called actions), and they cause the computation (of all previous lazy operations + the action) to be performed. The cache() operation causes the computation to be performed and cached for further usages, so that it is not performed repeatedly for every action.

The weatherTelemetrics is a temporary view in the Spark Session. It will be useful for select statements (in case we choose write a raw SQL). 

In [None]:
import pandas as pd
from pyspark.sql import *
from pyspark.sql.functions import udf, col, asc, desc,to_date, unix_timestamp, weekofyear, countDistinct
from datetime import datetime
from pyspark.sql.types import DateType, TimestampType, IntegerType

In [None]:
# This function converts the string cell into a timestamp type:
str_to_ts =  udf (lambda d: datetime.strptime(d, "%Y-%m-%dT%H:%M:%S.%fZ"), TimestampType())

sparkDf = cloudantdata.selectExpr("value.deviceId as deviceId", "value.deviceType as deviceType", "value.eventType as eventType" ,  "value.timestamp as timestamp", "value.data.*")
sparkDf = sparkDf.withColumn('ts', str_to_ts(col('timestamp')))
sparkDf.cache()
sparkDf.createOrReplaceTempView("weatherTelemetrics")

# show the resulting schema and data 
sparkDf.printSchema()
spark.sql("SELECT * from weatherTelemetrics").show(5)


## Data visualization and comprehension

### Device Health 

In this section we will see how to learn about the population of IoT devices and answer questions such as: 
1. How many reports each device type had?
2. What is the breakdown of the devices per device type?
3. How many reports have been sent by each device? 
4. How many reports each event type had? 
5. How many devices reported in a given time interval?

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import pprint
from collections import Counter
import numpy as np
from matplotlib import dates

Next we will make visualizations of the data.
We will use Spark to prepare our data for visualization, since it supports big data processing.
When the data is ready for visualization, we will convert Spark data Frame into Pandas data Frame, since Pandas has good visualization support. 

#### How many reports each device type had?

Setting the deviceType as index of the created Pandas data frame will cause the bar plot to be aggregated by the deviceType.
Next we call the plot function of the Pandas data frame.

In [None]:
EperDtDF = spark.sql("SELECT ts,deviceType from weatherTelemetrics").groupBy("deviceType").count()
EperDtDF.cache()
EperDtDF.show()

EperDtPanda = EperDtDF.toPandas().set_index('deviceType')

ax = EperDtPanda.plot(kind='bar',legend=False)
ax.set_xlabel("deviceType")
ax.set_ylabel("events count")
ax.set_title('count of events by deviceType')

#### What is the breakdown of the devices per device type?

The bar chart is plotted in the same way, as before, but now we will also show the pie chart of the data.
Pandas data frame support all kinds of different plot types. Using the 'pie' kind will make a pie chart with percentage sizes of the pieces. 
In order to write the actual count of the devices, instead of percents, we use the autopct argument - we need to multiply by the total amount of devices we have and divide by 100 to get the actual numbers.
The total is calculated using the sum() function of Pandas data frame, that sums up the device count of all the deviceTypes.
The sum function returns a DataFrame, so we use the [0] index to get only the value into total.

In [None]:
DperDtDF = spark.sql("SELECT deviceId,deviceType from weatherTelemetrics").groupBy("deviceType").agg(countDistinct('deviceId'))
EperDtDF.cache()
DperDtDF.show()

# bar chart of deviceId by deviceType
EperDtPanda = DperDtDF.toPandas().set_index('deviceType')

ax = EperDtPanda.plot(kind='bar',legend=False)
ax.set_xlabel("deviceType")
ax.set_ylabel("devices count")
ax.set_title('count of deviceIds by deviceType')


# Pie chart of deviceId by deviceType
fig = plt.figure(figsize=(5,5))
ax = plt.subplot(111)
total = EperDtPanda.sum()[0]

ax = EperDtPanda.plot(kind='pie', ax=ax, figsize=(5,5), legend=False, shadow=True, subplots=True, autopct=lambda(p): '{:.0f}'.format(p * total / 100))
plt.title("count of deviceIds by deviceType")


#### How many reports have been sent by each device? 

In [None]:
EperDdf = spark.sql("SELECT deviceId,ts from weatherTelemetrics").groupBy("deviceId").count()####.sort()########
EperDtDF.cache()
EperDdf.show()

EperDPanda = EperDdf.toPandas().set_index('deviceId')

ax = EperDPanda.plot(kind='bar',legend=False)
ax.set_xlabel("deviceId")
ax.set_ylabel("events count")
ax.set_title('count of events by deviceId')


#### How many reports each event type had? 

In [None]:
EperEtDF = spark.sql("SELECT eventType,ts from weatherTelemetrics").groupBy("eventType").count()
EperDtDF.cache()
EperEtDF.show()

EperEtPanda = EperEtDF.toPandas().set_index('eventType')

ax = EperEtPanda.plot(kind='bar',legend=False)
ax.set_xlabel("eventType")
ax.set_ylabel("events count")
ax.set_title('count of events by eventType')

#### how many devices reported in a given time interval?

**User input required**

Replace the (year, month, day, hours, minutes, seconds) with values to specify `start` and `end` interval in the cell below.

For example:

`start = datetime(2017, 7, 12, 21, 35, 0)
end = datetime(2017, 7, 12, 21, 35, 3)`

Make sure the interval contains device events. Then run the cell.

In [None]:
# set the time interval of interest
start = datetime(year, month, day, hours, minutes, seconds)
end = datetime(year, month, day, hours, minutes, seconds) 

First we filter the data by a time interval, then group the resulting dataFrame by deviceId, and count the records for each deviceId.

In [None]:
#filter by time interval
weatherMetaDataTelemetrics = sparkDf.select('deviceId','deviceType','ts','timestamp','eventType').filter((col('ts')>=start) & (col('ts')<=end))

weatherMetaDataTelemetrics.cache()
#weatherMetaDataTelemetrics.show()

#how many devices reported in interval
byDevice = weatherMetaDataTelemetrics.groupby(['deviceId']).count()
byDevice.cache()

print "Number of events by deviceId in the interval: "
byDevice.show()
print "total number of devices reported in the interval:", byDevice.count()

Count of rows by time span for a specific device, using the filter function of Spark DataFrame:

In [None]:
byDevice.filter(byDevice["deviceId"]=='66666666').show() ##also show 5 with lowest counts

Extract all of the numeric columns of the data, for further analytics. I selected only a subset of the numeric columns, for this demonstration:

In [None]:
#find all numeric columns of the DataFrame
numericCols = filter( lambda(name, dt) : (('double' in dt) or ('int' in dt) or ('long' in dt)), sparkDf.dtypes) 

#numericCols is a list of pairs (columnName, dataType), here we select only the column name into the allkeys list
allkeys = [x[0] for x in numericCols]
print "all numeric columns", allkeys

#select only 5 numeric columns for further detailed visualizations
keys = ['O3', 'NOX', 'NO2', 'PM10', 'PM2_5']
print "selected 5 numeric columns", keys

#### Device type sensor visualization 

In this section we study the summary of sensor data reported by all devices of a device type, answering questions such as: 

1. What is the Average/Min/Max of all reported sensor values? 
2. Can I see a histogram of a sensor's output?   
3. What is the correlation between two sensors?

 ***add the visualization that are not stated here

#### Average/Min/Max of all reported sensor values

In [None]:
from pyspark.mllib.stat import Statistics

#show visualization for device type "dt1"
dType = "dt1"

#show summary only for the selected 5 columns, for easier view, since we have too many columns to fit in a row
dfKeysType = sparkDf.select(*keys).filter(sparkDf["deviceType"]==dType)
dfKeysType.cache()

dfKeysType.describe().show()


#### Histogram of a device type sensor's output

1. Use spark DataFrame to prepare the histogram for each specific sensor (key) (using rdd.flatMap)
2. Create Pandas DataFrame from the calculated histogram with 2 columns: "bin" and "frequency".
3. Plot the histogram using Pandas plot function.

In [None]:
for key in keys:
    histogram = dfKeysType.select(key).rdd.flatMap(lambda x: x).histogram(11)
    
    #print histogram
    pandaDf = pd.DataFrame(zip(list(histogram)[0],list(histogram)[1]),columns=['bin','frequency']).set_index('bin')
    ax =pandaDf.plot(kind='bar')
    ax.set_ylabel("frequency")
    ax.set_title('Histogram of ' + key + ' sensor output')
    
    

#### Correlation between two sensors

Correlation between two sensors can be plotted using Pandas plot with kind='scatter'.
Remeber that dfKeysType is a Spark DataFrame that only includes our selected 5 columns and filtered by deviceType.
In case this will result in too much data for Pandas DataFrame to handle, it can be further filtered by timestamp.

In [None]:
key1="PM10"
key2="PM2_5"

pandaDF = dfKeysType.toPandas()
ax = pandaDF.plot(kind='scatter', x=key1, y=key2, s=5, figsize=(7,7))
ax.set_title('Relationship between ' + key1 + ' and ' + key2 )

To view all the correlations of the selected 5 columns, together with a histogram on a diagonal, use the Pandas scatter_matrix function:

In [None]:
pd.scatter_matrix(pandaDF, figsize=(18,12))
plt.show()

A correlation matrix can be plotted, using Pandas corr() function on the DataFrame:

In [None]:
correlations = pandaDF.corr()

# plot correlation matrix
fig = plt.figure(figsize=(12,12))
ax = fig.add_subplot(111)
cax = ax.matshow(correlations, vmin=-1, vmax=1)
fig.colorbar(cax)
ticks = np.arange(0,5,1)
ax.set_xticks(ticks)
ax.set_yticks(ticks)
ax.set_xticklabels(keys)
ax.set_yticklabels(keys)
plt.show()

## Sensor deep dive

Here we produce much of the same charts, as in previous section, but filtering the data by specific deviceId.

#### Average/Min/Max of all reported sensor values by the device

In [None]:
#show visualization for specific deviceID
deviceId = "10101010"

#show summary only for a selected group of columns, for easier view, since we have too many columns to fit in a row
dfKeysDev = sparkDf.select(*keys).filter(sparkDf["deviceId"]==deviceId)
dfKeysDev.cache()

dfKeysDev.describe().show()


A box plot is a method for graphically depicting groups of numerical data through their quartiles.
The box extends from the lower to upper quartile values of the data, with a line at the median. 
The whiskers extend from the box to show the range of the data. 
Beyond the whiskers, data are considered outliers and are plotted as individual points.

A box plot for each devices sensor, produced with the Pandas plot function with kind="box":

In [None]:
pandaDF = dfKeysDev.toPandas()
pandaDF.plot(kind='box', subplots=True, layout=(10,3), sharex=False, sharey=False, figsize=(25,60))
plt.show()

#### Histogram of a device's sensor output

In [None]:
for key in keys:
    try:
        #The histogram is built with spark. Only the groupped by bins data will be converted to Pandas DataFrame
        histogram = dfKeysDev.select(key).rdd.flatMap(lambda x: x).histogram(11)

        #print histogram
        pandaDf = pd.DataFrame(zip(list(histogram)[0],list(histogram)[1]),columns=['bin','frequency']).set_index('bin')
        ax = pandaDf.plot(kind='bar')
        ax.set_ylabel("frequency")
        ax.set_title('Histogram of ' + key + ' sensor output')
   
    except: 
        print "no values for sensor " + key + " for device " + deviceId + "\n"

The histograms can also be built more easily with Pandas DataFrame, in case the dfKeysDev DataFrame is not too large.
For the case of big data, spark is more scalable.

In [None]:
pandaDF = dfKeysDev.toPandas()
pandaDF.hist(layout=(3,3), sharex=False, figsize=(20,15))

#### Density Plots

Kernel density estimation (KDE) is a non-parametric way to estimate the probability density function of a random variable. Kernel density estimation is a fundamental data smoothing problem where inferences about the population are made, based on a finite data sample.

Note: here we convert the data into Pandas DataFrame, after we filtered it by deviceId and selected a subset of keys.
In case this is still to much data for the Pandas DataFrame to handle, consider selecting fewer keys and filtering by time interval.

In [None]:
pandaDF = dfKeysDev.toPandas()

ax = pandaDF.plot(kind='density', subplots=True, layout=(3,3), sharex=False, figsize=(20,15))
plt.show()

#### How a specific device sensor value changes over time

Maximum, minimum and average lines are shown on plots.

Note: We can aggregate data by intervals (say hour) in spark and show aggregated plots (avg, min, max)

In [None]:
from pyspark.sql.functions import mean, min, max

#show visualization for specific deviceID
deviceId = "10101010"

print keys
for key in keys:
    df = spark.sql("SELECT deviceId, ts," + key +" from weatherTelemetrics where deviceId='" + deviceId + "'").where(col(key).isNotNull())
    df.cache()
    if (df.count() > 0):
        pandaDF = df.toPandas()
        
        ax = pandaDF.plot(x='ts', y=key , legend=False, figsize=(15,9), ls='-', marker='o')
        ax.xaxis.set_major_formatter(dates.DateFormatter('%d-%m-%Y %H:%M:%S'))
        ax.set_title(key + ' over time')
        ax.set_ylabel(key)
        ax.grid(True)
        
        # Draw lines to showcase the upper and lower threshold
        ax.axhline(y=pandaDF[key].min(),c="red",linewidth=2,zorder=0)
        ax.axhline(y=pandaDF[key].max(),c="red",linewidth=2,zorder=0)
        ax.axhline(y=pandaDF[key].mean(),c="green",linewidth=2,zorder=0, ls='--')
    
        ax.autoscale_view()

#### Compare between the sensor values of devices over time

the dfKeysDev DataFrame contains only keys columns, with no ts column, so we will create a new data frame that will also include the ts:

In [None]:
#show visualization for specific deviceID
deviceId = "10101010"

columns = list(keys)
columns.append('ts')
df = sparkDf.select(*columns).filter(sparkDf["deviceId"]==deviceId)

pandaDF = df.toPandas().set_index('ts')
ax = pandaDF.plot(figsize=(15,9),ls='', marker='o')   
ax.xaxis.set_major_formatter(dates.DateFormatter('%d-%m-%Y %H:%M:%S'))
ax.set_title(', '.join(keys) + ' over time')
ax.grid(True)
ax.autoscale_view()

In [None]:
pandaDF = dfKeysDev.toPandas()

pd.scatter_matrix(pandaDF, figsize=(18,12))
plt.show()

## Anomaly detection

Z-score

Z-score is a standard score that indicates how many standard deviations an element is from the mean.

A z-score can be calculated from the following formula 

z = (X - µ) / σ  

where z is the z-score, X is the value of the element, µ is the population mean, and σ is the standard deviation

A higher z-score value represents a larger deviation from the mean value which can be interpreted as abnormal.

We will calculate z-score for each selected column (sensor) of each deviceId, and plot only the sensors that have spikes.
We define a spike in the below function spike(row), by reported value having z-score above 3 or below -3.
Observe that the values for which the z-score is above 3 or below -3, marked as abnormal events in the graph below.

In [None]:
# ignore warnings if any
import warnings
warnings.filterwarnings('ignore')

'''
This function detects the spike and dip by returning a non-zero value 
when the z-score is above 3 (spike) and below -3(dip). Incase if you 
want to capture the smaller spikes and dips, lower the zscore value from 
3 to 2 in this function.
'''
upperThreshold = 3
lowerThreshold = -3
def spike(row):
    if(row['zscore'] >=upperThreshold or row['zscore'] <=lowerThreshold):
        return row[key]
    else:
        return 0

In [None]:
from pyspark.sql.functions import mean, min, max, mean, stddev

#get the list of available devices
devices = sparkDf.select("deviceId").groupBy("deviceId").count().rdd.map(lambda r: r[0]).collect()

#calculate for each device and each key
for dev in devices:
    for key in allkeys:
        df = spark.sql("SELECT deviceId, ts," + key +" from weatherTelemetrics where deviceId='" + dev + "'").where(col(key).isNotNull())
        if (df.count() > 0):
            pandaDF = df.toPandas().set_index("ts")
            
            # calculate z-score and populate a new column
            pandaDF['zscore'] = (pandaDF[key] - pandaDF[key].mean())/pandaDF[key].std(ddof=0)

            #add new column - spike, and calculate its value based on the thresholds, usinf spike function, defined above
            pandaDF['spike'] = pandaDF.apply(spike, axis=1)
            
            
            #plot the chart, only if spikes were detected (not all values of "spike" are zero)
            if (pandaDF['spike'].nunique() > 1):
                # select rows that are required for plotting
                plotDF = pandaDF[[key,'spike']]
                #calculate the y minimum value
                y_min = (pandaDF[key].max() - pandaDF[key].min()) / 10
                fig, ax = plt.subplots(num=None, figsize=(14, 6), dpi=80, facecolor='w', edgecolor='k')
                ax.set_ylim(plotDF[key].min() - y_min, plotDF[key].max() + y_min)
                x_filt = plotDF.index[plotDF.spike != 0]
                plotDF['spikes'] = plotDF[key]
                y_filt = plotDF.spikes[plotDF.spike != 0]
                #Plot the raw data in blue colour
                line1 = ax.plot(plotDF.index, plotDF[key], '-', color='blue', animated = True, linewidth=1, marker='o')
                #plot the anomalies in red circle
                line2 = ax.plot(x_filt, y_filt, 'ro', color='red', linewidth=2, animated = True)
                #Fill the raw area
                ax.fill_between(plotDF.index, (pandaDF[key].min() - y_min), plotDF[key], interpolate=True, color='blue',alpha=0.6)

                # calculate the sensor value that is corresponding to z-score that defines a spike
                valUpperThreshold = (pandaDF[key].std(ddof=0) * upperThreshold) + pandaDF[key].mean()
                # calculate the sensor value that is corresponding to z-score that defines a dip
                valLowerThreshold = (pandaDF[key].std(ddof=0) * lowerThreshold) + pandaDF[key].mean()

                #plot the thresholds
                ax.axhline(y=valUpperThreshold,c="red",linewidth=2,zorder=0,linestyle='dashed',label='Upper threshold')
                ax.axhline(y=valLowerThreshold,c="red",linewidth=2,zorder=0,linestyle='dotted',label='Lower threshold')
                
                # Label the axis
                ax.set_xlabel("Sequence",fontsize=20)
                ax.set_ylabel(key,fontsize=20)
                ax.set_title("deviceId: " + dev + " sensor:" + key)
                plt.tight_layout()
                plt.legend()
        
                print "sensor value that corresponds to z-score " , upperThreshold , ": " , valUpperThreshold
                print "sensor value that corresponds to z-score ", lowerThreshold, ": " , valLowerThreshold
                
                plt.show()

As shown, the red marks are the unexpected spikes and of whose z-score value is greater than 3 or less than -3. Incase if you want to detect the lower spikes, modify the value to 2 or even lower and run. Similarly, if you want to detect only the higher spikes, try increasing the z-score value from 3 to 4 and beyond.