# Data ETL for analysis

## Overview
- Read compressed CSV files
- Remove bad quality data
- Resample data
- Save raw and resampled data to DB

In [None]:
# import libraries
import os
import math
import pandas as pd
from sqlalchemy import create_engine
import seaborn as sns

# config for graphs
%matplotlib inline
sns.set(rc={"figure.figsize": (26, 10)})

# Sensor descriptions

A description of the various sensors are located in the `sensors_sensor_202107282041.csv`. It includes:
- `sensor_id` to match with the daily data files.
- `sensor_type` and `sensor_uom` (unit of measure).
- `sensor_ucl` and `sensor_lcl` which are the upper and lower control limits used for alerts.
- `sensor_uucl` and `sensor_llcl` which are the upper upper and lower lower control limits used for alarms.

In [None]:
# get sensor meta data
df_meta = pd.read_csv("../data/sensors_sensor_202107282041.csv", parse_dates=True, index_col="sensor_id")
df_meta.head(15)

In [None]:
df_meta.columns

In [None]:
# keep only the required columns
df_meta = df_meta[['sensor_name', 'sensor_type', 'sensor_ucl',
       'sensor_lcl', 'sensor_llcl', 'sensor_uucl', 'sensor_uom']]
df_meta.head(13)

In [None]:
# keep only the sensors of interest
list_of_sensors = [30, 36, 29, 35, 31]
df_meta = df_meta[df_meta.index.isin(list_of_sensors)]
df_meta.head()

In [None]:
# add a short name for the sensors
df_meta["sensor_short_name"] = ["WIP_temp", "DP_temp", "Ext_temp", "WIP_current", "DP_current"]
df_meta.head()

# Sensor data

Sensor data is stored in compressed CSV files and each files contains the data for the specific date as per the file name. The file contains three columns: 
- `timestamp`: the date and time the sensor readings were taken.
- `value`: the raw sensor reading in the unit of measure as per the sensor description file.
- `sensor_id_id`: the integer ID of the sensor to match with the sensor description file.

## Reading data

In [None]:
# get list of files
local_path = "../data/raw/"
local_files = os.listdir(local_path)
print(local_files[:5])

In [None]:
# read all data files into one dataframe
list_of_dfs = []

for local_file in local_files:
    local_file_path = f"{local_path}/{local_file}"
    df_data = pd.read_csv(local_file_path, compression="gzip", parse_dates=True)
    list_of_dfs.append(df_data)

df = pd.concat(list_of_dfs, ignore_index=True)
df.head()

In [None]:
df.info()

In [None]:
# correct timestamp type
df.timestamp = df.timestamp.astype("datetime64[ns]")

In [None]:
# list unique sensor ids
df.sensor_id_id.unique()

In [None]:
# keep only data linked to our sensors of interest
df = df[df.sensor_id_id.isin(list_of_sensors)]
df.head()

## Bad quality data

The following criteria is used to filter bad quality data, as per the real-time monitoring system:
- The value of `-99` is assigned to any sensor `value` which was bad quality or not available.
- The value of `-127` indicates bad quality data for some of the temperature sensors.
- The values of `-327` and `327` indicates the bad quality data that is at the extreme limits of the device.

Process knowledge is key to ensure good quality data is not filtered out. In this application, temperatures are normally just above zero degrees Celcius and thus values of `-99` or `-127` are not near normal ranges.

In [None]:
# describe some basic stats in the data
df.pivot(index="timestamp", values="value", columns="sensor_id_id").describe()

There are no values outside at the range limits based on min and max values. There are some -127 value present, but cannot necessarily determine the -99 value.

In [None]:
# count bad quality values
print(f"-99 values: {df.value[df.value == -99].count()}")
print(f"-127 values: {df.value[df.value == -127].count()}")

In [None]:
df.shape

In [None]:
# remove values equal to -127
df = df[df.value != -127]
df.shape

In [None]:
# review basic stats
df.pivot(index="timestamp", values="value", columns="sensor_id_id").describe()

Most of the obvious bad quality data was removed

In [None]:
# pivot data to show sensors as columns against timestamps
dfp = df.pivot(index="timestamp", values="value", columns="sensor_id_id")
dfp[100:110].head(10)

As this is raw data, `NaN`s cannot be dropped as the `timestamp` is when measurements are taken and some rows will have `NaN`s. As we don't need data at this level of detail, we will resample the data for 1 minute, 5 minute and 1 hour intervals for analysis. We will also check compare the trends to see how much information we may potentially loose.

In [None]:
# replace sensor ids with short names
for column in dfp.columns:
    new_column_name = df_meta.sensor_short_name[df_meta.index == column].values[0]
    dfp.rename(columns={column:new_column_name}, inplace=True)

dfp.head()

In [None]:
# lineplot of raw data
sns.lineplot(x=dfp.index, y="WIP_temp", data=dfp, label="raw");

In [None]:
# select a period of "normal data" to use as comparison
filter_raw = (dfp.index > '2021-05-15') & (dfp.index < '2021-05-16')
sns.lineplot(x=dfp.index[filter_raw], y="WIP_temp", data=dfp[filter_raw], label="raw");

These are typical cycle trends for this system. The large peaks are the periodic defrost cycles while the trends in between is the normal refridgeration cycle swithcing on and off periodically. Current alert limits are between 0 and 8.5 and alarms limits -2 and 10 degrees Celcius.

In [None]:
# resample in 1 minute intervals
dfp_1min = dfp.resample("1min").mean()
dfp_1min[100:110].head(10)

In [None]:
# resample in 5 minute intervals
dfp_5min = dfp.resample("5min").mean()
dfp_5min[100:110].head(10)

In [None]:
# compare raw data and 1 minute resampled data for WIP cold room temperature
sns.lineplot(x=dfp.index[filter_raw], y="WIP_temp", data=dfp[filter_raw], label="raw")

filter_1min = (dfp_1min.index > '2021-05-15') & (dfp_1min.index < '2021-05-16')
sns.lineplot(x=dfp_1min.index[filter_1min], y="WIP_temp", data=dfp_1min[filter_1min], label="1min");

In [None]:
# compare raw data and 5 minute resampled data
sns.lineplot(x=dfp.index[filter_raw], y="WIP_temp", data=dfp[filter_raw], label="raw")

filter_5min = (dfp_5min.index > '2021-05-15') & (dfp_5min.index < '2021-05-16')
sns.lineplot(x=dfp_5min.index[filter_5min], y="WIP_temp", data=dfp_5min[filter_5min], label="5min");

In [None]:
# raw data for current of the WIP refridgeration system
sns.lineplot(x=dfp.index, y="WIP_current", data=dfp, label="raw");

Data looks very noisy as expected as energy consumption depends on several factors and varies a lot based on the loads connected.

In [None]:
# compare raw data and 1 minute resampled data for WIP cold room temperature
sns.lineplot(x=dfp.index[filter_raw], y="WIP_current", data=dfp[filter_raw], label="raw")

filter_1min = (dfp_1min.index > '2021-05-15') & (dfp_1min.index < '2021-05-16')
sns.lineplot(x=dfp_1min.index[filter_1min], y="WIP_current", data=dfp_1min[filter_1min], label="1min");

In [None]:
# compare raw data and 5 minute resampled data
sns.lineplot(x=dfp.index[filter_raw], y="WIP_current", data=dfp[filter_raw], label="raw")

filter_5min = (dfp_5min.index > '2021-05-15') & (dfp_5min.index < '2021-05-16')
sns.lineplot(x=dfp_5min.index[filter_5min], y="WIP_current", data=dfp_5min[filter_5min], label="5min");

By using 1 minute average data, we keep most of the variability in the data and gives the best real-time view. By using a 5 minute average, the variation in the normal on-off cycle is slightly reduced but the defrost spike is still prominent.

For anomaly detection, the 5 minute data would provide enough information to monitor the refridgeration system and take into account the defrost cycle. As quick action (that is within a seconds) is not needed, this should be good.

For the energy analysis, the 1 minute data would be used to calculate the sum of energy used per hour. The mean would be calculated for temperatures and sum of the current values would give an energy value in Ah. The voltage at the site is 400V in a three-phase system. An average power factor of 0.85 would be assumed.

In [None]:
# calculate Amps-Hours
dfp_1min['WIP_currentAh'] = dfp_1min.WIP_current/60
dfp_1min['DP_currentAh'] = dfp_1min.DP_current/60

In [None]:
dfp_1min.columns

In [None]:
# resample data - mean for temperatures and sum for energy values (Ah)
dfp_hour = dfp_1min.resample("60min").agg({"WIP_temp": "mean", "DP_temp": "mean", "Ext_temp": "mean", "WIP_currentAh": "sum", "DP_currentAh": "sum"})
dfp_hour.head()

In [None]:
# convert Ah to kWh using voltage (3-phase, 400V for this site) and assumed power factor of 0.85
dfp_hour["WIP_energy"] = math.sqrt(3) * 400 * dfp_hour["WIP_currentAh"] * 0.85 / 1000
dfp_hour["DP_energy"] = math.sqrt(3) * 400 * dfp_hour["DP_currentAh"] * 0.85 / 1000

In [None]:
dfp_hour.head()

In [None]:
# save data to SQLite DB
engine = create_engine(f"sqlite:///../data/RawData.db")
dfp.to_sql("SensorData_raw", engine, if_exists="replace")

In [None]:
dfp_1min.to_sql("SensorData_1min", engine, if_exists="replace")

In [None]:
dfp_5min.to_sql("SensorData_5min", engine, if_exists="replace")

In [None]:
dfp_hour.to_sql("SensorData_1hour", engine, if_exists="replace")

In [None]:
df_meta.to_sql("SensorDetails", engine, if_exists="replace")