### Pre-requisite:
Before running this notebook, you will have to:
1. download the csv file named `dht_1k.csv` and `sds_1k.csv`  
stored under https://github.com/IBMProjectEventStore/db2eventstore-IoT-Analytics/tree/master/data.
2. Go to the `Project tab` and load both above mentioned csv files into the current project as dataset.
----
**Note: This Notebook can only run in Python version >= 3.0**

In [1]:
from eventstore.oltp import EventContext
from eventstore.sql import EventSession
from eventstore.common import ConfigurationReader
from pyspark.sql import SparkSession

ConfigurationReader.setEventUser("")
ConfigurationReader.setEventPassword("")

In [2]:
sparkSession = SparkSession.builder.appName("EventStore SQL in Python").getOrCreate()
eventSession = EventSession(sparkSession.sparkContext, "EVENTDB")
eventSession.set_query_read_option("SnapshotNow")
eventSession._jvm.org.apache.spark.sql.types.SqlTimeSeries.register(eventSession._jsparkSession)
eventSession.open_database()
ctx = EventContext.get_event_context("EVENTDB")

In [3]:
from eventstore.catalog import TableSchema
from pyspark.sql.types import *

In [4]:
from datetime import datetime

def datetime_converter(datetime_string):
    # (1) Convert to datetime format
    utc_time = datetime.strptime(datetime_string.split('.000Z')[0], "%Y-%m-%dT%H:%M:%S")

    return int((utc_time - datetime(1970, 1, 1)).total_seconds())

In [5]:
table_names = ctx.get_names_of_tables()
for idx, name in enumerate(table_names):
    print(idx, name)

0 ADMIN.DHT_TABLE
1 ADMIN.SDS_TABLE
2 ADMIN.IOTPERF


### Function Preparation

In [6]:
def create(df, key_col, ts_col, val_col, new_key_name="joined_primary_keys", time_series_name=None):
    """
    Highly efficient algorithm that creates a time series from the Spark Dataframe.
    ---
    @param df: Spark Dataframe : Containing input columns for time series creation
    @key_col: List[String] : List of column name strings of primary key for the time series creation
    @ts_col: String : Column name of timestamp
    @val_col: String : Column name of value
    @new_key_name: String : Column name of the joined primary key column to be created.
    @time_series_name: String : [Default: <val_col>_time_series] Column name of the time series column to be created.
    return: [Spark Dataframe] Spark df containing 2 columns: key column and time series column
    ---
    Example:
    ts_df = create(raw_table_with_dates, ["SATID","PKID","DATE"], "TIMESTAMP", "READING")
    """
    from pyspark.sql import DataFrame
    from pyspark.sql.functions import concat, col, lit
    ts_column_name = val_col + "_time_series"
    df = df.withColumn(new_key_name, concat(*key_col))
    ts_df = DataFrame(
        df.sql_ctx._jvm.com.ibm.research.time_series.spark_timeseries_sql.utils.api.java.TimeSeriesDataFrame.create(
            df._jdf,
            new_key_name,
            ts_col,
            val_col
        ),
        df.sql_ctx
    )
    if time_series_name:
        ts_df = ts_df.withColumnRenamed(ts_column_name, time_series_name)
    return ts_df

In [7]:
def showPartitionInfo(df):
    # show partition number and number of records in a partition in the given Spark dataframe
    #@df: Spark DataFrame
    print("- number of partitions prior to time series (after loading table): ",df.rdd.getNumPartitions())
    print("- partition sizes prior to time series (after loading table): ", df.rdd.mapPartitions(lambda s: iter([sum(1 for _ in s)])).collect())

## Data Preparation

- **Create DHT table and loading data**

In [8]:
# Define table schema to be created
with EventContext.get_event_context("EVENTDB") as ctx:
    schema = StructType([
        StructField("sensor_id", IntegerType(), nullable = False),
        StructField("timestamp", IntegerType(), nullable = False),
        StructField("location", IntegerType(), nullable = False),
        StructField("humidity", FloatType(), nullable = True)
    ])  
    table_schema = TableSchema("dht_table", schema,
                                sharding_columns=["sensor_id"],
                                pk_columns=["sensor_id","timestamp","location"])

In [9]:
# try create table if not exist
# try:
#     ctx.drop_table("DHT_TABLE")
# except Exception as error:
#     print(error)
try:
    ctx.create_table(table_schema)
except Exception as error:
    pass
    
table_names = ctx.get_names_of_tables()
for idx, name in enumerate(table_names):
    print(name)

ADMIN.DHT_TABLE
ADMIN.SDS_TABLE
ADMIN.IOTPERF


In [10]:
dht_table = eventSession.load_event_table("dht_table")

In [11]:
dht_table.show(5)

+---------+----------+--------+--------+
|SENSOR_ID| TIMESTAMP|LOCATION|HUMIDITY|
+---------+----------+--------+--------+
|     1480|1501632088|     732|    72.7|
|     1480|1501632234|     732|    73.1|
|     1480|1501632388|     732|    72.2|
|     1480|1501632534|     732|    73.3|
|     1480|1501632680|     732|    73.0|
+---------+----------+--------+--------+
only showing top 5 rows



In [12]:
# ingest data into table
import os
resolved_table_schema = ctx.get_table("dht_table")
print(resolved_table_schema)
with open(os.environ['DSX_PROJECT_DIR']+'/datasets/dht_1k.csv') as f:
    f.readline()
    content = f.readlines()
content = [l.split(",") for l in content]
batch = [dict(sensor_id=int(c[5]), timestamp=datetime_converter(c[7]), location=int(c[0]), humidity=float(c[2])) for c in content]
ctx.batch_insert(resolved_table_schema, batch)

ResolvedTableSchema(tableName=ADMINDHT_TABLE, schema=StructType(List(StructField(SENSOR_ID,IntegerType,false),StructField(TIMESTAMP,LongType,false),StructField(LOCATION,IntegerType,false),StructField(HUMIDITY,FloatType,true))), sharding_columns=['SENSOR_ID'], pk_columns=['SENSOR_ID', 'TIMESTAMP', 'LOCATION'], partition_columns=None, schema_name=Some(ADMIN))


In [13]:
# verify ingested result
dht_table = eventSession.load_event_table("dht_table")
dht_table.count()

41569

In [14]:
showPartitionInfo(dht_table)

- number of partitions prior to time series (after loading table):  1
- partition sizes prior to time series (after loading table):  [41569]


In [15]:
dht_table.createOrReplaceTempView("dht_raw_table")

In [16]:
eventSession.sql("select count(*) from dht_raw_table").show(5)

+--------+
|count(1)|
+--------+
|   41569|
+--------+



- **Create SDS table and loading data**

In [17]:
with EventContext.get_event_context("EVENTDB") as ctx:
    schema = StructType([
        StructField("sensor_id", IntegerType(), nullable = False),
        StructField("timestamp", LongType(), nullable = False),
        StructField("location", IntegerType(), nullable = False),
        StructField("p_1", DoubleType(), nullable = True)
    ])  
    table_schema = TableSchema("sds_table", schema,
                                sharding_columns=["sensor_id"],
                                pk_columns=["sensor_id","timestamp","location"])


In [18]:
# try:
#     ctx.drop_table("SDS_TABLE")
# except Exception as error:
#     print(error)
try:
    ctx.create_table(table_schema)
except Exception as error:
    print("Table not created.")
table_names = ctx.get_names_of_tables()
for idx, name in enumerate(table_names):
    print(name)

Table not created.
ADMIN.DHT_TABLE
ADMIN.SDS_TABLE
ADMIN.IOTPERF


In [19]:
sds_table = eventSession.load_event_table("sds_table")

In [20]:
with EventContext.get_event_context("EVENTDB") as ctx:
    resolved_table_schema = ctx.get_table("sds_table")
    with open(os.environ['DSX_PROJECT_DIR']+'/datasets/sds_1k.csv') as f:
        f.readline()
        content = f.readlines()
    content = [l.split(",") for l in content]
    batch = [dict(sensor_id=int(c[5]), timestamp=datetime_converter(c[7]), location=int(c[0]), p_1=float(c[2])) for c in content if c[2] != ""]
    ctx.batch_insert(resolved_table_schema, batch)

In [21]:
sds_table=eventSession.load_event_table("sds_table")
sds_table.count()

51246

In [22]:
showPartitionInfo(sds_table)

- number of partitions prior to time series (after loading table):  1
- partition sizes prior to time series (after loading table):  [51246]


In [23]:
sds_table.createOrReplaceTempView("sds_raw_table")

In [24]:
eventSession.sql("select * from sds_raw_table").show(5)

+---------+----------+--------+-----+
|SENSOR_ID| TIMESTAMP|LOCATION|  P_1|
+---------+----------+--------+-----+
|     5309|1504137742|    2677|33.33|
|     5309|1504137889|    2677|28.73|
|     5309|1504138035|    2677|41.23|
|     5309|1504138182|    2677| 40.4|
|     5309|1504138329|    2677|32.87|
+---------+----------+--------+-----+
only showing top 5 rows



In [25]:
## Query

In [26]:
sql="SELECT count(*) FROM dht_raw_table"
eventSession.sql(sql).show(5)

+--------+
|count(1)|
+--------+
|   41569|
+--------+



- **Repartitioning**

Optimize parallelism by repartitioning
Note that when the query is pushed down to Db2 Event Store and the data is retrieved, the data will be received by Spark as one single partitioned data frame. It's necessary for the user to explicitly repartition the dataframe.
It's suggested that one partition is created for each CPU core in the Spark cluster.

In [27]:
"""
dht_table
"""

'\ndht_table\n'

In [28]:
showPartitionInfo(dht_table)

- number of partitions prior to time series (after loading table):  1
- partition sizes prior to time series (after loading table):  [41569]


In [29]:
dht_table = dht_table.repartition(48)
showPartitionInfo(dht_table)

- number of partitions prior to time series (after loading table):  48
- partition sizes prior to time series (after loading table):  [866, 867, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866, 866]


In [30]:
"""
sds_table
"""

'\nsds_table\n'

In [31]:
showPartitionInfo(sds_table)

- number of partitions prior to time series (after loading table):  1
- partition sizes prior to time series (after loading table):  [51246]


In [32]:
sds_table = sds_table.repartition(48)
showPartitionInfo(sds_table)

- number of partitions prior to time series (after loading table):  48
- partition sizes prior to time series (after loading table):  [1067, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1068, 1067, 1067, 1067, 1067, 1067, 1067, 1067, 1067, 1067, 1067, 1067, 1067, 1067, 1067, 1067, 1067, 1067]


# Alignment of multiple time-series from different IoT sensors

The below example shows how one can align two timeseries using an inner join with the nearest timestamp. The problem arises when two sensors are generating timestamps with clocks that are not fully synchronized. This can occur regularly and is common in IoT scenarios.

In what follows, we will show an example on the OK Lab data that considers the timeseries generated by the SDS (particulate matter) sensors and the DHT (humidity/temperature) sensors. The manual of SDS states that the SDS values are valid only when the DHT sensor measures humidity as < 70%. Realizing this restraint requires the data from the 2 sensors to be synchronized temporally. We note first that traditional join style would be to look for exact timestamps, which will not work in this case as the sensors are coming from two different devices (with the same location). One approach would be to use a standard SQL statement such as:

```sql
SELECT 
    humidity, 
    p_1
FROM sds_raw_table STORED AS PARQUET sds011, dht_raw_table STORED AS PARQUET dht22
AND dht22.humidity <= 70
AND ((sds011.timestamp - INTERVAL 10 SECONDS) < dht22.timestamp) 
AND (dht22.timestamp < (sds011.timestamp + INTERVAL 10 SECONDS))
```

However, if executed as it stands, it can take several hours to complete given that it results in a traditional Cartesian Join.

In our example, we will down-select the data using DB2 Eventstore into two dataframes (sds and dht). Our approach then is to do a clever join that takes windowing into account and not do a full Cartesian join. 

In what follows, we we will show how using time-series capabilities of DB2-Eventstore can be used to address this problem of unaligned sensors. Although this is one application that time-series for DB2-eventstore covers, time-series capabilites are not limited to just this one use case as we have functions to handle simple statistical methods (fft, avg, percentile, etc.), time-series distance metrics (DL, DTW, SBD, etc.), sophisticated forms of segmentation (time-based, anchor-base, marker-base, record-based, etc.), etc.

### Creating your time series

First a time series must be created. IBM Db2 Event Store provides two ways of creating a time series, namely Spark UDAF SQL function and `create` function.

1/ **Spark User Defined Aggregate Function (UDAF) : `TIME_SERIES`**

Spark UDAF goes through each row of the dataframe, and creates a new time series by aggregating the previous rows and current row.
Because the Spark RDD is immutable, multiple intermediate RDDs will be created.

so for instance if you have 3 rows: [1] | [2] | [3], it will look as such [1] … [1] + [2] = [1,2] … [1,2] + [3] = [1,2,3]. Thus 5 intermediate rdds are created: [1] [2] [3] [1,2] [1,2,3].

Example: 
```sql
stmt = "SELECT location, TIME_SERIES(timestamp, humidity) AS dht FROM dht_raw_table where humidity < 70 GROUP BY location"
```

2/ **create function:**

The create function simply group the given dataframe by key, and create one time series for each key at once. The performance advantage of the create function will be increasingly obvious with the larger dataframe and time series size.

Example: 
```python
create(dht_table, ["location"], "timestamp", "HUMIDITY", "LOCATION")
```
 
In this notebook, we will use the `create` function, instead of the Spark UDAF function, to create the time series.

In [33]:
'''
Create dht time series
'''

'\nCreate dht time series\n'

In [34]:
%%time
dht_ts = create(dht_table, ["location"], "timestamp", "HUMIDITY", "LOCATION", "dht")
dht_ts.createOrReplaceTempView("dht_ts_table")
dht_ts.show(5)

+--------+--------------------+
|LOCATION|                 dht|
+--------+--------------------+
|    2640|[(1503609572,64.4...|
|     462|[(1501545650,87.1...|
|    1323|[(1503902976,99.9...|
|    1622|[(1501585511,57.7...|
|     138|[(1501545746,1.0)...|
+--------+--------------------+
only showing top 5 rows

CPU times: user 19.2 ms, sys: 3.12 ms, total: 22.4 ms
Wall time: 15.2 s


In [35]:
'''
Create sds time series
'''

'\nCreate sds time series\n'

In [36]:
%%time
sds_ts = create(sds_table, ["location"], "timestamp", "P_1", "LOCATION", "sds")
sds_ts.createOrReplaceTempView("sds_ts_table")
sds_ts.show(5)

+--------+--------------------+
|LOCATION|                 sds|
+--------+--------------------+
|    2640|[(1503609572,3.0)...|
|     462|[(1501545650,4.6)...|
|    1323|[(1503897678,6.27...|
|     135|[(1501545731,8.15...|
|    2676|[(1503956051,388....|
+--------+--------------------+
only showing top 5 rows

CPU times: user 11.3 ms, sys: 4.04 ms, total: 15.3 ms
Wall time: 6.11 s


# Initial exploration on data

In many cases, the first step to doing any Time-Series analysis is to learn about your time-series. To do so, we use what is called a describe. This will provide a rich set of metrics (avg, percentiles, timing-statistics, etc.) over a time-series such that a user can have some knowledge of the time-series they are working with.

In [37]:
'''
Exploration on the DHT (humidity/temperature) sensors data
'''

'\nExploration on the DHT (humidity/temperature) sensors data\n'

In [38]:
%%time
stmt = """
SELECT location, TS_DESCRIBE(dht) FROM dht_ts_table
"""

eventSession.sql(stmt).toPandas().head(5)

CPU times: user 834 ms, sys: 140 ms, total: 974 ms
Wall time: 5.6 s


In [39]:
"""|
Exploration on SDS (particulate matter) sensors data
"""

'|\nExploration on SDS (particulate matter) sensors data\n'

In [40]:
%%time
stmt = """
SELECT location, TS_DESCRIBE(sds) FROM sds_ts_table
"""

eventSession.sql(stmt).toPandas().head(5)

CPU times: user 12.5 ms, sys: 4.73 ms, total: 17.2 ms
Wall time: 4.29 s


## Description of performing a time-series sql temporal align

This query has a few main things to consider:

### Performing full temporal align

Performing a full temporal align requires 2 parameters:

- The left Time Series
- The right Time Series

Once given, the returned output will be 2 columns (the 2 aligned time series) as **left_column_aligned** and **right_column_aligned**

*Note: With this method, all missing values will be replaced with null*

In [41]:
%%time
stmt = """
    SELECT sds_ts_table.location, TS_FULL_ALIGN(dht, sds, TS_INTERPOLATOR_NEAREST(-1.0)) FROM 
        dht_ts_table
        INNER JOIN
        sds_ts_table
        ON dht_ts_table.location = sds_ts_table.location
"""
df = eventSession.sql(stmt)
df.show(5)
df.count()
eventSession.sql(stmt).createOrReplaceTempView("dht_sds_ts_table")

+--------+--------------------+--------------------+
|location|         dht_aligned|         sds_aligned|
+--------+--------------------+--------------------+
|    1773|[(1501703936,-1.0...|[(1501703936,424....|
|     462|[(1501545650,87.1...|[(1501545650,4.6)...|
|    2683|[(1503961757,-1.0...|[(1503961757,4.8)...|
|     138|[(1501545745,-1.0...|[(1501545745,6.47...|
|    1099|[(1503932184,-1.0...|[(1503932184,7.03...|
+--------+--------------------+--------------------+
only showing top 5 rows

CPU times: user 11.6 ms, sys: 6.42 ms, total: 18.1 ms
Wall time: 13 s


# Display the aligned TimeSeries table

In [42]:
%%time
eventSession.sql("select count(*) from dht_sds_ts_table").show()

+--------+
|count(1)|
+--------+
|      23|
+--------+

CPU times: user 7.63 ms, sys: 3.77 ms, total: 11.4 ms
Wall time: 4.15 s


# Interpolate missing values after alignment

Because in IoT use cases, sensors tend to be clocked at different rates, it's important to properly fill values where they don't exist in the data. Just because a value is not in our data, does not mean it did not exist. To approximate the missing value, we can provide an interpolator as simple as nearest, next, prev, but as sophisticated as linear interpolation or cubic spline interpolation. In the following example, we will fill all missing values based on a nearest interpolation method.

In [43]:
eventSession.sql("SELECT location, TS_FILLNA(dht_aligned,TS_INTERPOLATOR_NEAREST(-1.0)) as ts FROM dht_sds_ts_table").createOrReplaceTempView("dht_no_nulls")
eventSession.sql("SELECT location, TS_FILLNA(sds_aligned,TS_INTERPOLATOR_NEAREST(-1.0)) as ts FROM dht_sds_ts_table").createOrReplaceTempView("sds_no_nulls")

# Converting Time-Series data to tabular data

Once all Time-Series analysis has been done, because Time-Series types are not directly ingestable, a user may want to display there data in a tabular format to prepare for graphing or performing further analysis. The following is how would could convert that data.

In [44]:
eventSession.sql("SELECT location, TS_EXPLODE(ts) FROM dht_no_nulls").createOrReplaceTempView("dht_exploded")
eventSession.sql("SELECT location, TS_EXPLODE(ts) FROM sds_no_nulls").createOrReplaceTempView("sds_exploded")

In [45]:
eventSession.sql("select * from dht_exploded").count()

49786

In [46]:
eventSession.sql("select * from sds_exploded").count()

49786

# Joining the tabular data

Lastly, we will perform a classical join on the location and time_tick for humidity and coarse particulate matter data to properly display the aligned values

In [47]:
%%time
stmt = """
    select dht_exploded.location, dht_exploded.ts_timeTick as timestamp, dht_exploded.ts_value as humidity, sds_exploded.ts_value as p_1 FROM
        dht_exploded
        INNER JOIN
        sds_exploded
        ON dht_exploded.location=sds_exploded.location and dht_exploded.ts_timeTick=sds_exploded.ts_timeTick
"""
df = eventSession.sql(stmt)
df.show()
df.count()

+--------+----------+------------------+-----+
|location| timestamp|          humidity|  p_1|
+--------+----------+------------------+-----+
|    1099|1503963271| 91.69999694824219|11.77|
|    1099|1503989517|              94.0| 9.93|
|    1099|1504072961|  93.4000015258789|13.93|
|    1099|1504079851| 79.30000305175781|14.17|
|    1099|1504095401| 45.20000076293945|12.27|
|    1099|1504135888|  87.9000015258789| 4.27|
|    1099|1504145566|  88.5999984741211| 4.53|
|    1099|1504153192|  95.9000015258789| 6.67|
|    1099|1504158471|  99.9000015258789|  7.2|
|    1099|1504208762| 89.19999694824219|  5.6|
|    1216|1503978800|              94.0| 7.17|
|    1216|1504021087|              67.5|  5.0|
|    1216|1504030333|  90.5999984741211| 6.17|
|    1216|1504057926| 93.30000305175781| 4.33|
|    1216|1504078473| 92.69999694824219|  4.9|
|    1216|1504084344| 77.69999694824219| 2.63|
|    1216|1504099170| 54.29999923706055| 2.87|
|    1216|1504100931|54.599998474121094| 2.23|
|    1216|150