## Time Series Comparison

#### Imports

In [1]:
# run this if you made changes to the poleno-ml code 
# NB: Those changes must have been made to the /tf/tmp/poleno-ml repository to have an effect on this notebook's code.
# NB: However, changes made to the tmp repository are temporary and will be rolled back when Docker VM will be shutdown.
#     If you want to make them permanent, dupplicate them to /tf/home/dependencies/poleno-ml.
!pip install /tf/tmp/poleno-ml

Processing /tf/tmp/poleno-ml
  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: poleno-ml
  Building wheel for poleno-ml (setup.py) ... [?25ldone
[?25h  Created wheel for poleno-ml: filename=poleno_ml-0.1.0-py3-none-any.whl size=16657 sha256=0e13986734fe18947a4173cdc887ba59e1ab6d2e9b3f5de0bf03e5919029552a
  Stored in directory: /root/.cache/pip/wheels/36/27/94/c36c0ca182dfe6d14b2ad2190409db7ec462f251c1019d9266
Successfully built poleno-ml
Installing collected packages: poleno-ml
  Attempting uninstall: poleno-ml
    Found existing installation: poleno-ml 0.1.0
    Uninstalling poleno-ml-0.1.0:
      Successfully uninstalled poleno-ml-0.1.0
Successfully installed poleno-ml-0.1.0
[0m

In [77]:
%load_ext autoreload
%autoreload 2
import copy
import datetime
from IPython.display import clear_output, display
import ipywidgets as widgets
import json
import matplotlib
import matplotlib.pyplot as plt
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
import numpy as np
import operator as op
import os
import pandas as pd
from poleno_db_interface.database.filter import AndClause, OrClause, ConditionClause, DataColumn
from poleno_db_interface.database.query_utils import DataColumn, finalize_query
from poleno_ml.database.query_interface_ml import QueryInterfaceML, DatasetPipeline
import poleno_db_interface.database.model.poleno_data_model as pdm
import tensorflow as tf
import tensorflow.keras as keras
import time
from tqdm.notebook import tqdm
from typing import List
from uuid import UUID
import uuid

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


### Parameters

In [229]:
model_name = 'real1_bis' # model you want to evaluate
# pull poleno data between start_date and end_date measured by device_name
start_date  = datetime.date(2020,2,19) # February 19th 2020
end_date    = datetime.date(2020,5,24) # May 5th 2020
device_name = 'poleno-5'
# hirst data to compare against
hirst_file_path = os.path.join('validation_input', 'hirst_pay_19022020-24052020.csv')

In [230]:
db_chunksize = 64
pred_batch_size = 2048 # larger means faster prediction time but more memory consumption
assert pred_batch_size >= db_chunksize, 'Predictions are way slower if pred_batch_size is smaller than db_chunksize.'

#### Load the model

In [231]:
# prepare file paths
model_path = os.path.join('models', model_name, 'model')
model_info_file_path = os.path.join(model_path, 'model_info.json')
eval_cache_path = os.path.join('validation_input', f'cache_{device_name}_{start_date}-{end_date}')

In [232]:
# load trained model's info
with open(model_info_file_path, 'r') as f:
    model_info = json.loads(f.read())
# load trained model
model = keras.models.load_model(model_path, compile=False)

In [233]:
eval_dir = os.path.join('models', model_info['model_name'], 'eval')
os.makedirs(eval_dir, exist_ok=True)
poleno_file_path = os.path.join(eval_dir, f'{device_name}_{start_date.strftime("%d%m%Y")}-{end_date.strftime("%d%m%Y")}.csv')

### Get the new model's predictions

#### Pull raw data

In [210]:
import myloginpath
db_config = myloginpath.parse('client', path='/tf/.mylogin.cnf')

# Conect to the database and create an interface instance
query_interface_ml = QueryInterfaceML(**db_config)

In [211]:
# load poleno's raw data measured during timerange
filter_ = AndClause(
    ConditionClause(pdm.Event.timestamp, op.gt, time.mktime(start_date.timetuple())),
    ConditionClause(pdm.Event.timestamp, op.lt, time.mktime(end_date.timetuple())),
    ConditionClause(pdm.Event.device_id_str, op.eq, device_name),
    ConditionClause(pdm.ImageAnalysis.particleArea, op.ge, 625, "img0"),
    ConditionClause(pdm.ImageAnalysis.particleArea, op.ge, 625, "img1"),
    ConditionClause(pdm.ImageAnalysis.particleSolidity, op.ge, 0.9, "img0"),
    ConditionClause(pdm.ImageAnalysis.particleSolidity, op.ge, 0.9, "img1"),
    ConditionClause(pdm.ImageAnalysis.ImageData_id, op.eq, 0, "img0"),
    ConditionClause(pdm.ImageAnalysis.ImageData_id, op.eq, 1, "img1"),
)

timeseries_dataset = query_interface_ml.prepare_tf_dataset_from_event_filter(
    filter_=filter_,
    batch_size=model_info['batch_size'],
    model_features=copy.deepcopy(model_info['model_features']),
    include_timestamps=True,
    db_chunksize=db_chunksize
)
timeseries_dataset.dataset_length

 received 984000eceived 90000received 776000unique evet list finished. Calling prepare_tf_dataset function 7.678251803302539


984832

In [212]:
# cache the data
timeseries_dataset.enable_cache(eval_cache_path, prepare=True)

ATTENTION: Remember to remove the cache file if you make changes to the dataset! Otherwise, the changes will not be reflected into the dataset and the trainingwill run on the old data.


Preparing cache:   0%|          | 0/984832 [00:00<?, ?it/s]

Caching is done.


In [213]:
# batch and prefetch
timeseries_dataset.tf_dataset = timeseries_dataset.tf_dataset.unbatch().batch(pred_batch_size).prefetch(tf.data.AUTOTUNE)

#### Predict

In [215]:
# get the model's predictions for each event
labels = np.array(model_info['classes'])
list_batch_preds = []

for id_batch, feature_batch in tqdm(timeseries_dataset.get_data_pipeline(with_id=True), 
                                    total=timeseries_dataset.dataset_length//pred_batch_size, leave=False):
    try:
        # compute predictions
        preds = model.predict(feature_batch, verbose=False)
    except ValueError: # this is for backward compatibility with the old model's architecture
        feature_batch['input_1'] = feature_batch.pop('rec0')
        feature_batch['input_2'] = feature_batch.pop('rec1')
        # compute predictions
        preds = model.predict(feature_batch, verbose=False)
    # append predicted labels and certainty
    y_pred = np.argmax(preds, axis=-1)
    certainties = np.max(preds, axis=-1)
    pred_classes = labels[y_pred]
    list_batch_preds.append(pd.DataFrame({
        'event_id': [id_.decode() for id_ in id_batch["id"].numpy()],
        'pred_class': pred_classes,
        'pred_certainty': certainties,
        'event_timestamp': [ts_ for ts_ in id_batch["timestamp"].numpy()]
    }))
df_poleno = pd.concat(list_batch_preds).reset_index(drop=True) # convert list of pd.DataFrame to one pd.DataFrame
del list_batch_preds
# convert timestamp from double to datetime
df_poleno['event_timestamp'] = df_poleno['event_timestamp'].apply(float)
df_poleno['event_timestamp'] = pd.to_datetime(df_poleno['event_timestamp'], unit="s")
# set datetime as index
df_poleno.index = df_poleno.event_timestamp
df_poleno = df_poleno.drop(['event_timestamp'], axis=1)
df_poleno.to_csv(poleno_file_path) # save to csv

  0%|          | 0/480 [00:00<?, ?it/s]

In [216]:
query_interface_ml.session.rollback()

### Load previously created predictions file
If you already ran the code above to generate predictions for your model, run this code to load the DataFrame.

In [234]:
df_poleno = pd.read_csv(poleno_file_path, index_col=0, parse_dates=True)

### Get Hirst data

In [218]:
df_hirst = pd.read_csv(hirst_file_path)
timestamp_cols = ['Year', 'Month', 'Day', 'Hour', 'Minute']
df_hirst['event_timestamp'] = pd.to_datetime(df_hirst[timestamp_cols])
df_hirst = df_hirst.drop(timestamp_cols, axis=1)
df_hirst = df_hirst.replace(32767, np.nan)
df_hirst.index = df_hirst.event_timestamp
df_hirst = df_hirst[[c for c in df_hirst.columns if c in list(model_info['classes'])]]
df_hirst.head(3)

Unnamed: 0_level_0,Betula,Poaceae,Alnus,Carpinus,Corylus,Cupressus,Fagus,Fraxinus,Pinus,Platanus,Populus,Quercus,Taxus,Ulmus
event_timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
2020-02-19 00:00:00,0.0,0.0,195.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,98.0,0.0
2020-02-19 01:00:00,0.0,0.0,260.0,0.0,33.0,33.0,0.0,33.0,0.0,0.0,0.0,0.0,33.0,0.0
2020-02-19 02:00:00,0.0,0.0,163.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


### Plots

#### Utility functions

In [219]:
def df_poleno_to_ts_(df_poleno, taxa, threshold, agg_freq):
    df_poleno_to_plot = df_poleno[(df_poleno.pred_class == taxa) & (df_poleno.pred_certainty > threshold)].rename(columns={'event_id': 'poleno'})
    df_poleno_to_plot = df_poleno_to_plot.resample(agg_freq).count() # frequency conversion and resampling of time series
    if df_poleno_to_plot.shape[0] <= 0: # if no event have been found, create "empty" time series
        ix_ = pd.period_range(df_poleno.index.min(), df_poleno.index.max(), freq='1D')
        df_poleno_to_plot = pd.DataFrame({'poleno': np.zeros(len(ix_))}, index=ix_)
        df_poleno_to_plot.index.rename('event_timestamp', inplace=True)
    return df_poleno_to_plot

In [220]:
def get_ts_and_merge_(df_poleno, df_hirst, taxa, threshold, agg_freq, scaling=False, date_range=None):
    # poleno
    df_poleno_ts = df_poleno_to_ts_(df_poleno, taxa, threshold, agg_freq)
        
    # hirst
    df_hirst_ts = pd.DataFrame({'hirst': df_hirst[taxa].resample(agg_freq).sum()})
    
    # merged
    timeseries = df_poleno_ts.merge(df_hirst_ts, on='event_timestamp', how='outer')[['hirst','poleno']]
    
    if date_range is not None:
        timeseries = timeseries[date_range[0]:date_range[1]]
    if scaling:
        factor = timeseries['hirst'].mean() / timeseries['poleno'].mean()
        timeseries['poleno'] *= factor
    
    return timeseries

In [221]:
def plot_(df_poleno: pd.DataFrame, df_hirst: pd.DataFrame, date_range: List[pd._libs.tslibs.timestamps.Timestamp], taxa: str, agg_freq: str, threshold: float, scaling: bool):
    matplotlib.style.use('ggplot')
    timeseries = get_ts_and_merge_(df_poleno, df_hirst, taxa, threshold, agg_freq, scaling=scaling, date_range=date_range)
    print('Correlation: %.2f' % timeseries.corr().poleno.hirst)
    fig = timeseries.plot(figsize=(12,4), title=taxa, ylabel='concentration')
    fig.plot()
    plt.show()

In [222]:
def create_widget(df_poleno: pd.DataFrame, df_hirst: pd.DataFrame):
    # output widget
    out = widgets.Output(layout = widgets.Layout(height='450px', overflow_y='auto'))
    # ---

    # dates range
    start_date = min(df_poleno.index.min(), df_hirst.index.min())
    end_date = max(df_poleno.index.max(), df_hirst.index.max())
    dates = pd.date_range(start_date, end_date, freq='D')

    options = [(date.strftime('%d.%m.%Y'), date) for date in dates]
    index = (0, len(options)-1)

    date_range_selector = widgets.SelectionRangeSlider(
        options=options,
        index=index,
        description='Dates range',
        orientation='horizontal',
        layout={'width': '700px'},
        readout=False
    )
    date_range_display = widgets.HTML(
        value=(
            f"<b>{dates[index[0]]}" + 
            f" - {dates[index[1]]}</b>"))

    # Define the date range using the widgets.HBox
    date_range = widgets.VBox(
        (date_range_selector, date_range_display))

    # Callback function that updates the display
    def callback_daterange_(dts):
        date_range_display.value = f"<b>{dts[0].strftime('%d-%m-%Y')} - {dts[1].strftime('%d-%m-%Y')}</b>"

    widgets.interactive_output(
        callback_daterange_, 
        {"dts": date_range_selector})
    # ---
    
    # aggregation frequency
    agg_freq_input = widgets.Text(
        value='1D',
        description='Aggregation frequency:',
        disabled=False
    )
    agg_freq_display = widgets.HTML(value=('List of frequency strings <a href="https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#dateoffset-objects">here</a>.'))
    agg_freq = widgets.VBox(
        (agg_freq_input, agg_freq_display))
    # ---
    
    # taxa selection
    taxa_to_plot = sorted([t for t in model_info['classes'] if t in df_hirst.columns])
    taxa_selector = widgets.Dropdown(
        options=taxa_to_plot,
        value=taxa_to_plot[0],
        description='Taxa:',
    )
    # ---
    
    # threshold
    thresh_selector = widgets.BoundedFloatText(
        value=.3,
        min=.0,
        max=1.,
        step=0.05,
        description='Threshold:',
        disabled=False
    )
    # ---
    
    # enable/disable re-scaling
    scaling_selector = widgets.Checkbox(
        value=False,
        description='Scale Poleno on Hirst',
        disabled=False,
        indent=False
    )
    # ---

    # display button
    btn_display = widgets.Button(description='display')
    def on_click(_):
        # "linking function with output"
        with out:
            # what happens when we press the button
            clear_output()
            plot_(df_poleno, df_hirst, date_range_selector.value, taxa_selector.value, agg_freq_input.value, thresh_selector.value, scaling_selector.value)
            plt.show()
    # linking button and function together using a button's method
    btn_display.on_click(on_click)
    # ---
    return date_range, agg_freq, taxa_selector, thresh_selector, scaling_selector, btn_display, out

In [223]:
def rolling_win_corr(timeseries, win_size, punish_fp_threshold=None) -> np.array:
    # if mean concentration of a specific pollen during a time period is < punish_fp_threshold : 
    # then double the correlation weight between hirst and predictions for this time period
    corr, weights = [], []
    gb = timeseries.resample(win_size) # group by week or any other time period
    for x in gb.groups: # for each time period
        x_i = gb.get_group(x)
        corr.append(x_i.corr().poleno.hirst) # compute correlation
        w_i = 1. if punish_fp_threshold is None or x_i['hirst'].mean() > punish_fp_threshold else 2. # compute weight
        weights.append(w_i)
    ma = np.ma.MaskedArray(corr, mask=np.isnan(corr)) # mask to ignore nan
    weighted_avg = np.ma.average(ma, weights=weights)
    return weighted_avg

In [224]:
def compare_ts(df_poleno, df_hirst, thresholds=None, agg_freq='1D', trunccorr_win_size='2W', punish_fp_threshold=None):
    joint_taxa = sorted([t for t in model_info['classes'] if t in df_hirst.columns])
    if thresholds is None:
        thresholds = {t: .0 for t in joint_taxa}
    metrics = pd.DataFrame(index=joint_taxa, columns=['Pairwise Correlation', 
                                                      'Truncated Pairwise Correlation'], dtype=float)
    for taxa in joint_taxa:
        timeseries = get_ts_and_merge_(df_poleno, df_hirst, taxa, threshold, agg_freq, scaling=False)
        metrics.at[taxa, 'Pairwise Correlation'] = timeseries.corr().poleno.hirst
        metrics.at[taxa, 'Truncated Pairwise Correlation'] = rolling_win_corr(timeseries, trunccorr_win_size, punish_fp_threshold)
    return metrics

#### Actual plot widget and metrics

In [235]:
# compute and print metrics values
metrics = compare_ts(df_poleno, df_hirst, trunccorr_win_size='1W', punish_fp_threshold=None)
for m in metrics:
    print(m)
    print('Mean: %.2f, Median: %.2f, Min/Max: %.2f (%s)/%.2f (s)\n\n' % (metrics[m].mean(), metrics[m].median(), metrics[m].min(), metrics.index[metrics[m].argmin()], metrics[m].max()), metrics.index[metrics[m].argmax()])
metrics

Pairwise Correlation
Mean: 0.48, Median: 0.33, Min/Max: -0.09 (Taxus)/0.97 (s)

 Betula
Truncated Pairwise Correlation
Mean: 0.44, Median: 0.49, Min/Max: 0.18 (Carpinus)/0.71 (s)

 Fagus


Unnamed: 0,Pairwise Correlation,Truncated Pairwise Correlation
Alnus,0.299417,0.258027
Betula,0.965659,0.623581
Carpinus,0.288654,0.176901
Corylus,0.038568,0.177671
Cupressus,0.71506,0.554289
Fagus,0.839733,0.714859
Fraxinus,0.844194,0.529208
Poaceae,0.342591,0.456013
Populus,0.319133,0.548219
Quercus,0.939558,0.537213


In [236]:
# display widget
widgets.VBox(create_widget(df_poleno, df_hirst))

VBox(children=(VBox(children=(SelectionRangeSlider(description='Dates range', index=(0, 95), layout=Layout(wid…