# Octank IIoT Data Analytics Notebook
Demonstration of [AWS IoT Analytics](https://aws.amazon.com/iot-analytics/) Notebooks, using real-time sensor data.

In [1]:
import boto3
import matplotlib.pyplot as plt
import pandas as pd
import pyspark.sql.functions as f
from matplotlib.dates import DateFormatter
from pandas.plotting import register_matplotlib_converters
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, BooleanType

In [2]:
def parse(x):
    x = pd.to_numeric(x, downcast="float")
    return pd.Timestamp(x, unit="s")

In [3]:
%%time

spark = SparkSession \
    .builder \
    .appName('iiot_demo') \
    .getOrCreate()

client = boto3.client("iotanalytics")
dataset = "iot_data_dataset"
data_location = client.get_dataset_content(datasetName = dataset)["entries"][0]["dataURI"]
schema = StructType([
    StructField(name='device', dataType=StringType(), nullable=False),
    StructField(name='ts', dataType=TimestampType(), nullable=False),
    StructField(name='humidity', dataType=DoubleType(), nullable=True),
    StructField(name='temp', dataType=DoubleType(), nullable=True),
    StructField(name='light', dataType=BooleanType(), nullable=True),
    StructField(name='motion', dataType=BooleanType(), nullable=True),
    StructField(name='lpg', dataType=DoubleType(), nullable=True),
    StructField(name='co', dataType=DoubleType(), nullable=True),
    StructField(name='smoke', dataType=DoubleType(), nullable=True),
    StructField(name='__dt', dataType=StringType(), nullable=True)
])

df = spark.createDataFrame(
    data=pd.read_csv(
        data_location,
        header=0,
        low_memory=False,
        date_parser=parse,
        infer_datetime_format=True,
        parse_dates=[1]
    ), schema=schema)

CPU times: user 1min 37s, sys: 5.03 s, total: 1min 42s
Wall time: 1min 48s


In [21]:
df = df.drop("__dt")
df = df.orderBy(f.asc("ts"))

In [22]:
print(df)

DataFrame[device: string, ts: timestamp, humidity: double, temp: double, light: boolean, motion: boolean, lpg: double, co: double, smoke: double]


In [27]:
df.show(5, False)

+------------------+-------------------+-----------------+------------------+-----+------+--------------------+--------------------+--------------------+
|device            |ts                 |humidity         |temp              |light|motion|lpg                 |co                  |smoke               |
+------------------+-------------------+-----------------+------------------+-----+------+--------------------+--------------------+--------------------+
|iot-demo-device-03|2020-06-21 00:00:00|45.79999923706055|75.9199993133545  |true |false |0.007461079933679425|0.004786845372382666|0.01987101220169098 |
|iot-demo-device-02|2020-06-21 00:00:00|70.69999694824219|63.139998626708994|true |false |0.006747995268470235|0.004166214182479396|0.017849189636577008|
|iot-demo-device-03|2020-06-21 00:00:00|45.9000015258789 |75.9199993133545  |true |false |0.007472417062662312|0.004796903006589501|0.019903266607975173|
|iot-demo-device-01|2020-06-21 00:00:00|82.5999984741211 |62.06000137329102 

In [28]:
print("Dataset Range")
print("-------------")
print("Record count: {:,}".format(df.count()))
print("Time range (min): {}".format(df.agg({"ts": "min"}).collect()[0][0]))
print("Time range (max): {}".format(df.agg({"ts": "max"}).collect()[0][0]))
print("Records:")
print("".format(df.groupBy("device").count().select('device', f.col('count').alias('count')).show()))

Dataset Range
-------------
Record count: 533,329
Time range (min): 2020-06-21 00:00:00
Time range (max): 2020-07-06 15:45:04
Records:
+------------------+------+
|            device| count|
+------------------+------+
|iot-demo-device-01|155380|
|iot-demo-device-02|151884|
|iot-demo-device-03|226065|
+------------------+------+




In [35]:
print(df.approxQuantile(col="humidity", probabilities=[0.5], relativeError=.25))

# q_hi  = df_filtered["humidity"].quantile(0.99)
# df_filtered = df_filtered[(df_filtered["humidity"] < q_hi) & 
#                           (df_filtered["humidity"] > q_low)]

# q_low = df_filtered["temp"].quantile(0.01)
# q_hi  = df_filtered["temp"].quantile(0.99)
# df_filtered = df_filtered[(df_filtered["temp"] < q_hi) & 
#                           (df_filtered["temp"] > q_low)]

# groups = df_filtered.groupby("device")

[70.9000015258789]
CPU times: user 25.1 ms, sys: 8.73 ms, total: 33.9 ms
Wall time: 2.86 s
