## Walkthrough of the test env code

In [1]:
import time
import sys

import numpy as np
import pandas as pd

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

Import files from sibling a directory

In [2]:
sys.path.append("..")

import model.clean as cl
import model.model_trainer as mt
import model.model_predict as mp
from model.influx_interact import influx_class

### Step 1
Set up a local instance of influx by following the README.md

### Step 2  
`populate_influx.py`  
Populating influx with data from csvs stored in the repo

In [157]:
PATH_TO_CSVS = "../../data/labelled-skyspark-data/"
CSVS_TO_LOAD = [
    "CEC_compiled_data_1b_updated.csv",
    "CEC_compiled_data_2b_updated.csv",
    "CEC_compiled_data_3b_updated.csv",
    "CEC_compiled_data_4b_updated.csv",
    "CEC_compiled_data_5b_updated.csv",
]

Set up influx connection

In [158]:
# as set up in the docker-compose
token = "mytoken"
org = "UBC"
bucket = "MDS2021"

# set up influx
client = InfluxDBClient(url="http://localhost:8086", token=token, timeout=999_000)
write_api = client.write_api(write_options=SYNCHRONOUS)

Loop over each csv, read it then write it into influx  

Important note: If the influx write times out, re-run and it should work on second try.

In [161]:
for csv in CSVS_TO_LOAD:

    # load and set up dataframes
    df = pd.read_csv(PATH_TO_CSVS + csv, parse_dates=["Datetime"])
    df.rename(columns={"Value": "val_num"}, inplace=True)
    df.rename(columns={"ID": "uniqueID"}, inplace=True)
    df.rename(columns={"Anomaly": "AH"}, inplace=True)
    df["navName"] = "Energy"
    df["siteRef"] = "Campus Energy Centre"
    df.set_index("Datetime", drop=True, inplace=True)
    df = df.drop(["AH"], axis=1)

    print("writing: {}".format(csv))
    # write values
    write_api.write(
        bucket,
        org,
        record=df,
        data_frame_measurement_name="READINGS",
        data_frame_tag_columns=["uniqueID", "navName", "siteRef"],
    )
    time.sleep(5)

writing: CEC_compiled_data_1b_updated.csv
writing: CEC_compiled_data_2b_updated.csv
writing: CEC_compiled_data_3b_updated.csv
writing: CEC_compiled_data_4b_updated.csv
writing: CEC_compiled_data_5b_updated.csv


Look at the `df` object to see what was written to influx

In [162]:
df.head()

Unnamed: 0_level_0,val_num,uniqueID,navName,siteRef
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2020-01-01 07:45:00,2.9,Campus Energy Centre Boiler B-1 Exhaust O2,Energy,Campus Energy Centre
2020-01-01 08:00:00,2.9,Campus Energy Centre Boiler B-1 Exhaust O2,Energy,Campus Energy Centre
2020-01-01 08:15:00,2.9,Campus Energy Centre Boiler B-1 Exhaust O2,Energy,Campus Energy Centre
2020-01-01 08:30:00,2.9,Campus Energy Centre Boiler B-1 Exhaust O2,Energy,Campus Energy Centre
2020-01-01 08:45:00,2.9,Campus Energy Centre Boiler B-1 Exhaust O2,Energy,Campus Energy Centre


Now sensor data has been written to influx's READINGS bucket  
This simulates what UDL's influx will look like

### Step 3
`test_env_scheduled_training.py`  
Run the training file

Subset the training data for fast dev iterations

In [163]:
TESTING = True

Setup thresholds for anomaly detection.  
If a training example's loss is greater than the set threshold it will be flagged as anomalous

In [10]:
THRESHOLDS = {
    "Campus Energy Centre Campus HW Main Meter Power": 0.09,
    "Campus Energy Centre Boiler B-1 Exhaust O2": 0.019,
    "Campus Energy Centre Boiler B-1 Gas Pressure": 0.0725,
    "Campus Energy Centre Campus HW Main Meter Entering Water Temperature": 0.02938,
    "Campus Energy Centre Campus HW Main Meter Flow": 0.043,
}

Pick a end time for the readings to be used for training  
In the real implementation there will be no end_time for the training data, i.e it will train on all the data

In [165]:
END_TIME = 1613109600

Explore training with removing anomalous records, i.e only train on normal data for this specific sensor data

In [166]:
REMOVE_ANOMALOUS = True
REMOVE_ANOMALOUS_DATA = [
    "Campus Energy Centre Campus HW Main Meter Entering Water Temperature"
]

Influx connector setup

In [5]:
model_path = "./test_env_models/"
scaler_path = "./test_env_standardizers/"

# set up for influx
token = "mytoken"
org = "UBC"
bucket = "MDS2021"
url = "http://localhost:8086"

influx_conn = influx_class(
    org=org,
    url=url,
    bucket=bucket,
    token=token,
)

Read the training data from influx

In [168]:
influx_read_df = influx_conn.make_query(
    location="Campus Energy Centre",
    measurement="READINGS",
    end=END_TIME,
)

Split the data based on sensor ID

In [169]:
main_bucket = cl.split_sensors(influx_read_df)

the `main_bucket` object is a dict with the name of the sensor as the key and then the value is another dict of data objects

In [170]:
main_bucket.keys()

dict_keys(['Campus Energy Centre Boiler B-1 Exhaust O2', 'Campus Energy Centre Boiler B-1 Gas Pressure', 'Campus Energy Centre Campus HW Main Meter Entering Water Temperature', 'Campus Energy Centre Campus HW Main Meter Flow', 'Campus Energy Centre Campus HW Main Meter Power'])

### The following cell for training:
1) Iterates over each the data sets (line 1)  
2) Removes anomalous data if the data set has been specified (lines 4-23)  
3) Standardizes the values for training and saves the standardizer for the prediction step (lines 25-30)  
4) Subsets the data for faster training if specified (lines 32-33)  
5) Sequences the values into windows for the LSTM (lines 35-37)  
6) Fits and saves the model (line 42)  
7) Writes the resulting data (that contains: value, anomalies detected manually, and anomalies detected in training) to influx (lines 44-57)

In [171]:
for key, df in main_bucket.items():
    print("Training for : {}".format(key))

    # removes anomalies to only train on normal data
    if REMOVE_ANOMALOUS:
        if key in REMOVE_ANOMALOUS_DATA:
            PATH_TO_CSVS = "../../data/labelled-skyspark-data/"
            csv = "CEC_compiled_data_2b_updated.csv"
            df_with_manual_anomaly = pd.read_csv(
                PATH_TO_CSVS + csv, parse_dates=["Datetime"]
            )
            df_with_manual_anomaly["Datetime"] = pd.to_datetime(
                df_with_manual_anomaly["Datetime"], utc=True
            )
            df = df.merge(
                df_with_manual_anomaly[["Datetime", "Anomaly"]],
                how="left",
                left_on="DateTime",
                right_on="Datetime",
            )
            df = df.loc[df["Anomaly"] == False]
            df = df.drop(columns=["DateTime"], axis=1)
            am_df.rename(columns={"Anomaly": "manual_anomaly"}, inplace=True)

    # creates standardized column for each sensor in main bucket
    df["Stand_Val"] = cl.std_val_train(
        df[["Value"]],
        main_bucket[key]["ID"].any(),
        scaler_path,
    )

    if TESTING:
        df = df.tail(5000)

    # creates arrays for sliding windows
    x_train, y_train = mt.create_sequences(df["Stand_Val"], df["Stand_Val"])
    x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1))

    normal_dict = cl.model_parser(df, x_train, y_train)

    threshold = THRESHOLDS[key]
    mt.fit_models(normal_dict, model_path, threshold)

    # for writing AM to influx
    am_df = normal_dict[key]["train_score_df"]
    am_df.rename(columns={"anomaly": "model_anomaly"}, inplace=True)
    am_df.rename(columns={"ID": "uniqueID"}, inplace=True)
    am_df.rename(columns={"Datetime": "DateTime"}, inplace=True)
    am_df["val_num"] = df["Value"].iloc[x_train.shape[1] :]
    # only if it hasnt already been created earlier
    if "manual_anomaly" not in set(am_df.columns):
        am_df["manual_anomaly"] = False
    am_df.set_index("DateTime", drop=True, inplace=True)
    am_df = am_df[["uniqueID", "model_anomaly", "val_num", "manual_anomaly"]]

    influx_conn.write_data(am_df, "TRAINING_ANOMALY")

Training for : Campus Energy Centre Boiler B-1 Exhaust O2
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Training for : Campus Energy Centre Boiler B-1 Gas Pressure
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Training for : Campus Energy Centre Campus HW Main Meter Entering Water Temperature


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  return super().rename(


Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Training for : Campus Energy Centre Campus HW Main Meter Flow
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Training for : Campus Energy Centre Campus HW Main Meter Power
Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100


### Step 4
`test_env_scheduled_predictor.py`  
Create predictions and write to influx

Set up start and end times for the prediction data set in the test env.  
In the real implementation END_TIME would be `now()` and START_TIME would be `now() - 1d`

In [6]:
# END TIME FOR TRAINING SET BECOMES PREDICTING'S START TIME
START_TIME = 1613109600
END_TIME = 1613196000

Read the data to be predicted on from influx

In [7]:
print("reading data from influx")
influx_read_df = influx_conn.make_query(
    location="Campus Energy Centre",
    measurement="READINGS",
    start=START_TIME,
    end=END_TIME,
)

reading data from influx


Split based on data name

In [8]:
main_bucket = cl.split_sensors(influx_read_df)

### The following cell for predicting:
1) Iterates over each the data sets (line 1)  
2) Standardizes the values for training by loading the standardizer from the training step (lines 2-6)     
3) Sequences the values into windows for the LSTM and other reshaping for the prediction step(lines 8-14)  
4) Creates predictions for the data and returns the prediction object (lines 16-24)   
5) Shapes the prediction object for writing back to influx (a dataframe that contains, value, and the prediction of anomalous) (lines 26-34) 

In [14]:
for key, df in main_bucket.items():
    print(key)
    main_bucket[key]["Stand_Val"] = cl.std_val_predict(
        main_bucket[key][["Value"]],
        main_bucket[key]["ID"].any(),
        scaler_path,
    )
    print(main_bucket[key].shape)

    # creates arrays for sliding windows
    x_train, y_train = mt.create_sequences(
        main_bucket[key]["Stand_Val"], main_bucket[key]["Stand_Val"]
    )
    x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1))
    timestamps = df["DateTime"].tail(len(df) - x_train.shape[1]).values
    threshold = THRESHOLDS[key]

    # predicting and prediction formatting
    pred = mp.make_prediction(
        key,
        x_train,
        timestamps,
        threshold,
        model_path,
    )
    ar_df = pd.DataFrame.from_dict(pred["data"])

    # prep for writing
    ar_df.rename(columns={"anomaly": "realtime_anomaly"}, inplace=True)
    ar_df.rename(columns={"Timestamp": "DateTime"}, inplace=True)
    ar_df["uniqueID"] = key
    ar_df.set_index("DateTime", drop=True, inplace=True)
    ar_df["val_num"] = df["Value"].tail(len(df) - x_train.shape[1]).values
    ar_df = ar_df[["uniqueID", "val_num", "realtime_anomaly"]]

    influx_conn.write_data(ar_df, "PREDICT_ANOMALY_TWO", tags=["uniqueID", "realtime_anomaly"])

Campus Energy Centre Boiler B-1 Exhaust O2
(96, 5)
Campus Energy Centre Boiler B-1 Gas Pressure
(132, 5)
Campus Energy Centre Campus HW Main Meter Entering Water Temperature
(296, 5)
Campus Energy Centre Campus HW Main Meter Flow
(1386, 5)
Campus Energy Centre Campus HW Main Meter Power
(660, 5)


The test environment will now have three readings:  
READINGS: the raw data  
TRAINING_ANOMALY: data with anomalies flag manually and during the training  
PREDICT_ANOMALY: data with anomalies detected by the prediction step  

As well as a standardizer and a model for each data set.

In [176]:
ar_df

Unnamed: 0_level_0,uniqueID,val_num,realtime_anomaly
DateTime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1613112271000000000,Campus Energy Centre Campus HW Main Meter Power,39.900002,True
1613112564000000000,Campus Energy Centre Campus HW Main Meter Power,38.200001,True
1613112600000000000,Campus Energy Centre Campus HW Main Meter Power,38.299999,False
1613112877000000000,Campus Energy Centre Campus HW Main Meter Power,39.799999,True
1613113200000000000,Campus Energy Centre Campus HW Main Meter Power,39.200001,True
...,...,...,...
1613195579000000000,Campus Energy Centre Campus HW Main Meter Power,33.900002,True
1613195761000000000,Campus Energy Centre Campus HW Main Meter Power,32.299999,True
1613195822000000000,Campus Energy Centre Campus HW Main Meter Power,34.000000,True
1613195882000000000,Campus Energy Centre Campus HW Main Meter Power,32.200001,True


In [82]:
ar_sub = ar_df.head(3)
ar_sub

Unnamed: 0_level_0,uniqueID,val_num,realtime_anomaly
DateTime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1613112271000000000,Campus Energy Centre Campus HW Main Meter Power,70.0,True
1613112564000000000,Campus Energy Centre Campus HW Main Meter Power,80.0,True
1613112600000000000,Campus Energy Centre Campus HW Main Meter Power,90.0,False


In [83]:
ar_sub.index = [int(time.time_ns()), int(time.time_ns() - 3e11), int(time.time_ns() - 6e11),]
ar_sub.val_num = [-10.0, 30.0, 90.0]
ar_sub.realtime_anomaly = ["False", "False", "True"]

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self[name] = value


In [84]:
ar_sub.index.rename("DateTime", inplace=True)

In [85]:
ar_sub

Unnamed: 0_level_0,uniqueID,val_num,realtime_anomaly
DateTime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1623688989628763000,Campus Energy Centre Campus HW Main Meter Power,-10.0,False
1623688689628763904,Campus Energy Centre Campus HW Main Meter Power,30.0,False
1623688389628765952,Campus Energy Centre Campus HW Main Meter Power,90.0,True


In [86]:
from datetime import datetime
int(datetime.utcnow().timestamp() * 10e6)

16237141912675590

In [87]:
# # as set up in the docker-compose
# token = "mytoken"
# org = "UBC"
# bucket = "MDS2021"

# # set up influx
# client = InfluxDBClient(url="http://localhost:8086", token=token, timeout=999_000)
# write_api = client.write_api(write_options=SYNCHRONOUS)

In [88]:
influx_conn.write_data(ar_sub, "PREDICT_ANOMALY_TREE", tags=["uniqueID", "realtime_anomaly"])

In [68]:
write_api.write(
            bucket,
            org,
            record=ar_sub,
            data_frame_measurement_name="aa",
            data_frame_tag_columns=["uniqueID", "realtime_anomaly"],
        )

NameError: name 'write_api' is not defined

In [248]:
influx_conn.client.close()
client.close()

In [None]:
# from(bucket: "MDS2021")
#   |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
#   |> filter(fn: (r) => r["_measurement"] == "PREDICT_ANOMALY_TWO")
#   |> filter(fn: (r) => r["_field"] == "val_num")
#   |> filter(fn: (r) => r["realtime_anomaly"] == "True")