# ResNet plus developments

This software uses cauchyturing/UCR_Time_Series_Classification_Deep_Learning_Baseline

See MIT License in https://github.com/cauchyturing/UCR_Time_Series_Classification_Deep_Learning_Baseline README.md

Wang, Z., Yan, W. and Oates, T. (2017) ‘Time series classification from scratch with deep neural networks: A strong baseline’, 2017 International Joint Conference on Neural Networks (IJCNN), pp. 1578–1585 Online.


In [None]:
#!/usr/bin/env python3

import os
from pathlib import Path
import time
from datetime import datetime
from dateutil.tz import gettz
import itertools

import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow.keras as keras

from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Input, Dense, Activation, Dropout
from tensorflow.keras import regularizers
from tensorflow.keras.initializers import RandomUniform
from tensorflow.keras import utils
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
import seaborn as sns
from sklearn.model_selection import KFold, RepeatedStratifiedKFold
from sklearn import preprocessing
from sklearn.metrics import confusion_matrix, roc_curve, roc_auc_score, classification_report

np.random.seed(999123)

# User inputs

In [None]:
flist = ['private_dog0_correct_plus'] #, 'private_dog0_correct_plus', 'private_dog2_correct'] # List of dataset directory names. WormsTwoClass Lightning2 Earthquakes GunPoint 
batch_size = 32 
nb_epochs = 500
truncate_data = False # Truncate pressure samples to first n data points
model_type = 'MLP' #'MLP'
filter_data = False # Filter out noise below a threshold


k = 10 # For k-fold cross validation. If k=1, the original test-train split is used.
m = 1 # Number of repetitions of k-fold cross validation (if k>1).
tensorboard = True # Set to True to write logs for use by TensorBoard
k_fold_seed = 765432

# ResNet / FCN parameters
feature_maps = 128

# Output directories
logs_dir = '../logs'
tensorboard_dir = '../logs/tensorboard'
timestamp = '{:%Y-%m-%dT%H:%M}'.format(datetime.now(gettz("Europe/London")))
logs_dir = logs_dir +'/' + timestamp
tensorboard_dir = tensorboard_dir +'/' + timestamp

# Input directory
if 'private' in flist[0]:
    fdir = '../data/private_data/private_events_dev2' 
else:
    fdir = '../data' 

# Tools

In [None]:
def plot_confusion_matrix(cm, title='Normalised confusion matrix', name=''):
    ''' Plot the normalised confusion matrix
    Parameters
    cm : array - normalised confusion matrix
    Scikit-learn: Machine Learning in Python, Pedregosa et al., JMLR 12, pp. 2825-2830, 2011.
    'Confusion Matrix' https://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html#sphx-glr-auto-examples-model-selection-plot-confusion-matrix-py
    '''
    classes = ['Positive', 'Negative']
    cmap=plt.cm.Blues
    sns.set_style('dark')
    plt.figure()
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar(format=FuncFormatter('{0:.0%}'.format))
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)
    plt.clim(0, 1)
    fmt = '.0%'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")
    plt.ylabel('True class')
    plt.xlabel('Predicted class')
    plt.tight_layout()
    file_name = 'cm_devnet_'+name+'.png'
    plt.savefig(file_name, bbox_inches='tight')
        
        
def plot_roc(y_true, y_probs, name): 
    ''' Plot ROC and return AUC
    Parameters
    y_true : vector of true class labels.
    y_probs : array of predicted probabilities, one column for each class.
    Returns
    auc : float
    '''
    fpr, tpr, thresholds = roc_curve(y_true, y_probs[:,1])
    auc = roc_auc_score(y_true, y_probs[:,1])
    sns.set_style('whitegrid')
    plt.figure()
    plt.plot(fpr, tpr, color='darkorange',
             lw=2, label='ROC curve (area = %0.2f)' % auc)
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver operating characteristic curve')
    plt.legend(loc="lower right")
    file_name = 'roc_devnet_'+name+'.png'
    plt.savefig(file_name, bbox_inches='tight')
    return auc

def filter_out(x, threshold):
    if x < threshold:
        return 0
    return x
    
    
def preprocess(X):
    ''' Apply preprocessing to the input data X'''
    if filter_data:
        threshold = 0.1
        X = np.piecewise(X, [X < threshold, X >= threshold], [lambda X: 0, lambda X: X])
    return X
    
    
def readucr(filename):
    ''' Load a dataset from a file in UCR format
    space delimited, class labels in the first column.
    Returns
    X : DNN input data
    Y : class labels
    '''
    data = np.loadtxt(Path(filename))
    Y = data[:,0]
    X = data[:,1:]
    if truncate_data:
        X = X[:,:300]
    X = preprocess(X)
    return X, Y
   
def reshape(x, model_type):
    if model_type == 'ResNet':
        return reshape_resnet(x)
    elif model_type == 'FCN' or model_type == 'FCN_HARUS':
        return reshape_fcn(x)
    elif model_type == 'MLP':
        return x
    else:
        raise ValueError('Unrecognised model type')
    return x

# Build DNN
## ResNet

In [None]:
def reshape_resnet(x):
    x = x.reshape(x.shape + (1,1,))
    return x

def build_resnet(input_shape, n_feature_maps, nb_classes):
    # Parameters
    # kernels = [8, 5, 3, 1]
    k = 1 # kernel multiplier
    
    print ('build conv_x')
    x = Input(shape=(input_shape))
    conv_x = keras.layers.BatchNormalization()(x)
    conv_x = keras.layers.Conv2D(n_feature_maps, 8*k, 1, padding='same')(conv_x)
    conv_x = keras.layers.BatchNormalization()(conv_x)
    conv_x = Activation('relu')(conv_x)
     
    print ('build conv_y')
    conv_y = keras.layers.Conv2D(n_feature_maps, 5*k, 1, padding='same')(conv_x)
    conv_y = keras.layers.BatchNormalization()(conv_y)
    conv_y = Activation('relu')(conv_y)
     
    print ('build conv_z')
    conv_z = keras.layers.Conv2D(n_feature_maps, 3*k, 1, padding='same')(conv_y)
    conv_z = keras.layers.BatchNormalization()(conv_z)
     
    is_expand_channels = not (input_shape[-1] == n_feature_maps)
    if is_expand_channels:
        shortcut_y = keras.layers.Conv2D(n_feature_maps, 1*k, 1,padding='same')(x)
        shortcut_y = keras.layers.BatchNormalization()(shortcut_y)
    else:
        shortcut_y = keras.layers.BatchNormalization()(x)
    print ('Merging skip connection')
    y = keras.layers.add([shortcut_y, conv_z])
    y = Activation('relu')(y)
     
    print ('build conv_x')
    x1 = y
    conv_x = keras.layers.Conv2D(n_feature_maps*2, 8*k, 1, padding='same')(x1)
    conv_x = keras.layers.BatchNormalization()(conv_x)
    conv_x = Activation('relu')(conv_x)
         
    print ('build conv_y')
    conv_y = keras.layers.Conv2D(n_feature_maps*2, 5*k, 1, padding='same')(conv_x)
    conv_y = keras.layers.BatchNormalization()(conv_y)
    conv_y = Activation('relu')(conv_y)
     
    print ('build conv_z')
    conv_z = keras.layers.Conv2D(n_feature_maps*2, 3*k, 1, padding='same')(conv_y)
    conv_z = keras.layers.BatchNormalization()(conv_z)
     
    is_expand_channels = not (input_shape[-1] == n_feature_maps*2)
    if is_expand_channels:
        shortcut_y = keras.layers.Conv2D(n_feature_maps*2, 1*k, 1,padding='same')(x1)
        shortcut_y = keras.layers.BatchNormalization()(shortcut_y)
    else:
        shortcut_y = keras.layers.BatchNormalization()(x1)
    print ('Merging skip connection')
    y = keras.layers.add([shortcut_y, conv_z])
    y = Activation('relu')(y)
     
    print ('build conv_x')
    x1 = y
    conv_x = keras.layers.Conv2D(n_feature_maps*2, 8*k, 1, padding='same')(x1)
    conv_x = keras.layers.BatchNormalization()(conv_x)
    conv_x = Activation('relu')(conv_x)
     
    print ('build conv_y')
    conv_y = keras.layers.Conv2D(n_feature_maps*2, 5*k, 1, padding='same')(conv_x)
    conv_y = keras.layers.BatchNormalization()(conv_y)
    conv_y = Activation('relu')(conv_y)
     
    print ('build conv_z')
    conv_z = keras.layers.Conv2D(n_feature_maps*2, 3*k, 1, padding='same')(conv_y)
    conv_z = keras.layers.BatchNormalization()(conv_z)

    is_expand_channels = not (input_shape[-1] == n_feature_maps*2)
    if is_expand_channels:
        shortcut_y = keras.layers.Conv2D(n_feature_maps*2, 1*k, 1,padding='same')(x1)
        shortcut_y = keras.layers.BatchNormalization()(shortcut_y)
    else:
        shortcut_y = keras.layers.BatchNormalization()(x1)
    print ('Merging skip connection')
    y = keras.layers.add([shortcut_y, conv_z])
    y = Activation('relu')(y)
     
    full = keras.layers.GlobalAveragePooling2D()(y)   
    out = Dense(nb_classes, activation='sigmoid')(full)
    print ('        -- model was built.')
    return x, out

# FCN

In [None]:
def reshape_fcn(x):
    x = x.reshape(x.shape + (1,))
    return x
    
def build_fcn(input_shape, n_feature_maps, nb_classes):
    # Parameters
    k = 1 # kernel multiplier
    
    print ('build conv_x')
    x = Input(shape=(input_shape))
    conv_x = x
    #conv_x = keras.layers.BatchNormalization()(conv_x)
    conv_x = keras.layers.Conv1D(n_feature_maps, 8*k, 1, padding='same')(conv_x)
    conv_x = keras.layers.BatchNormalization()(conv_x)
    conv_x = Activation('relu')(conv_x)
     
    print ('build conv_y')
    conv_y = keras.layers.Conv1D(n_feature_maps*2, 5*k, 1, padding='same')(conv_x)
    conv_y = keras.layers.BatchNormalization()(conv_y)
    conv_y = Activation('relu')(conv_y)
     
    print ('build conv_z')
    conv_z = keras.layers.Conv1D(n_feature_maps, 3*k, 1, padding='same')(conv_y)
    conv_z = keras.layers.BatchNormalization()(conv_z)
    conv_z = Activation('relu')(conv_z)
    
    #print ('build conv_za')
    #conv_z = keras.layers.Conv1D(n_feature_maps, 3*k, 1, padding='same')(conv_z)
    #conv_z = keras.layers.BatchNormalization()(conv_z)
    #conv_z = Activation('relu')(conv_z)
    
    #print ('build conv_zb')
    #conv_z = keras.layers.Conv1D(n_feature_maps, 3*k, 1, padding='same')(conv_z)
    #conv_z = keras.layers.BatchNormalization()(conv_z)
    #conv_z = Activation('relu')(conv_z)
     
    full = keras.layers.GlobalAveragePooling1D()(conv_z)
    #full = Dense(128, activation='relu')(full)
    out = Dense(nb_classes, activation='sigmoid')(full)
    return x, out

# FCN HARUS style

Using the DNN architecture of
Ackermann, Nils, 2018, Introduction to 1D Convolutional Neural Networks in Keras for Time Sequences
https://blog.goodaudience.com/introduction-to-1d-convolutional-neural-networks-in-keras-for-time-sequences-3a7ff801a2cf 

In [None]:
def build_fcn_harus(input_shape, n_feature_maps, nb_classes):
    # Parameters
    n_features_a = 100 # Ackermann 100
    n_features_b = 160 # Ackermann 160
    filter_size = 10   # Ackermann 10
    pooling_size = 3   # Ackermann 3
    dropout = 0.5      # Ackermann 0.5
    
    print ('build FCN HARUS')
    x = Input(shape=(input_shape))
    conv_x = x
    conv_x = keras.layers.Conv1D(n_features_a, filter_size, activation='relu')(conv_x)
    conv_x = keras.layers.Conv1D(n_features_a, filter_size, activation='relu')(conv_x)
    conv_x = keras.layers.MaxPooling1D(pooling_size)(conv_x)
    conv_x = keras.layers.Conv1D(n_features_b, filter_size, activation='relu')(conv_x)
    conv_x = keras.layers.Conv1D(n_features_b, filter_size, activation='relu')(conv_x)
    full = keras.layers.GlobalAveragePooling1D()(conv_x)
    y = Dropout(dropout,name='Dropout')(full)
    out = Dense(nb_classes, activation='sigmoid')(full)
    return x, out

## MLP

In [None]:
def build_mlp(input_shape, nb_classes):
    drop = 0.2
    num = 64
    l2 = 0.1
    x = Input(shape=(input_shape))
    y = Dropout(drop,name='Drop010')(x)
    y = Dense(num, kernel_regularizer=regularizers.l2(l2), activation='relu', name='Dense010')(y)
    y = Dropout(drop,name='Drop020')(y)
    y = Dense(num, kernel_regularizer=regularizers.l2(l2), activation='relu', name='Dense020')(y)
    y = Dropout(drop,name='Drop021')(y)
    y = Dense(num, kernel_regularizer=regularizers.l2(l2), activation='relu', name='Dense021')(y)
    y = Dropout(drop,name='Drop031')(y)
    y = Dense(num, kernel_regularizer=regularizers.l2(l2), activation='relu', name='Dense030')(y)
    #y = Dropout(drop,name='Drop041')(y)
    #y = Dense(num, kernel_regularizer=regularizers.l2(l2), activation='relu', name='Dense040')(y) 
    y = Dropout(drop,name='Drop081')(y)
    out = Dense(nb_classes, activation='sigmoid', name='Dense080')(y)
    return x, out 

# Function - train model

In [None]:
def train_model(fname, x_train, y_train, x_test, y_test, label="0"):
    print('Running dataset', fname)
    nb_classes = len(np.unique(y_test))
     
    y_train = (y_train - y_train.min())/(y_train.max()-y_train.min())*(nb_classes-1)
    y_test = (y_test - y_test.min())/(y_test.max()-y_test.min())*(nb_classes-1)
     
    Y_train = utils.to_categorical(y_train, nb_classes)
    Y_test = utils.to_categorical(y_test, nb_classes)
     
    x_train_mean = x_train.mean()
    x_train_std = x_train.std()
    x_train = (x_train - x_train_mean)/(x_train_std) 
    x_test = (x_test - x_train_mean)/(x_train_std)
     
    x_train = reshape(x_train, model_type)
    x_test = reshape(x_test, model_type)
    if model_type == 'MLP':
        x, y = build_mlp(x_train.shape[1:], nb_classes)
    else:
        if model_type == 'ResNet':
            x, y = build_resnet(x_train.shape[1:], feature_maps, nb_classes)
        elif model_type == 'FCN':
            x, y = build_fcn(x_train.shape[1:], feature_maps, nb_classes)
        elif model_type == 'FCN_HARUS':
            x, y = build_fcn_harus(x_train.shape[1:], feature_maps, nb_classes)
    model = Model(x, y)
    #print(model.summary())
    
    optimizer = keras.optimizers.Adam()
    model.compile(loss='binary_crossentropy',
                  optimizer=optimizer,
                  metrics=['acc'])
    
    Path(logs_dir+'/'+fname).mkdir(parents=True, exist_ok=True) 
    reduce_lr = ReduceLROnPlateau(monitor='loss', factor=0.5,
                      patience=50, min_lr=0.0001) 
    callbacks = [reduce_lr]
    if tensorboard:
        tb_dir = tensorboard_dir+'/'+fname+'_'+label
        Path(tb_dir).mkdir(parents=True, exist_ok=True) 
        print('Tensorboard logs in', tb_dir)
        callbacks.append(keras.callbacks.TensorBoard(log_dir=tb_dir, histogram_freq=0))
  
    start = time.time()
    hist = model.fit(x_train, Y_train, batch_size=batch_size, epochs=nb_epochs,
              verbose=1, validation_data=(x_test, Y_test), callbacks=callbacks)
    end = time.time()
    log = pd.DataFrame(hist.history) 
    
    # Print results
    duration_seconds = round(end-start)
    duration_minutes = str(round((end-start)/60))
    print('Training complete on', fname, 'Duration:', duration_seconds, 'secs; about', duration_minutes, 'minutes.')
    
    # Print and save results. Print the testing results which has the lowest training loss.
    print('Selected the test result with the lowest training loss. Loss and validation accuracy are -')
    idx = log['loss'].idxmin()
    loss = log.loc[idx]['loss']
    val_acc = log.loc[idx]['val_acc']
    epoch = idx + 1
    print(loss, val_acc, 'at index', str(idx), ' (epoch ', str(epoch), ')')
    summary = '|' + label + '  |'+str(loss)+'  |'+str(val_acc)+' |'+str(epoch)+' |'+ duration_minutes + 'mins  |'
    summary_csv = label+','+str(loss)+','+str(val_acc)+','+str(epoch)+','+ duration_minutes 
    
    # Save summary file and log file.
    print('Tensorboard logs in', tb_dir)
    with open(logs_dir+'/'+fname+'/devnet_summary.csv', 'a+') as f:
        f.write(summary_csv)
        f.write('\n')
        print('Added summary row to ', logs_dir+'/'+fname+'/devnet_summary.csv')  
    print('Saving logs to',logs_dir+'/'+fname+'/history_'+label+'.csv')
    log.to_csv(logs_dir+'/'+fname+'/history_'+label+'.csv')
    
    return summary, model

# Train DNN

In [None]:
results = []
for each in flist:
    fname = each
    x_train, y_train = readucr(fdir+'/'+fname+'/'+fname+'_TRAIN.txt')
    x_test, y_test = readucr(fdir+'/'+fname+'/'+fname+'_TEST.txt')
    # k-fold cross validation setup
    if k > 1:
        x_all = np.concatenate((x_train, x_test), axis=0)
        y_all = np.concatenate((y_train, y_test), axis=0)
        kfold = RepeatedStratifiedKFold(n_splits=k, n_repeats=m, random_state=k_fold_seed)
        count = 0
        for train, test in kfold.split(x_all, y_all):
            x_train, y_train, x_test, y_test = x_all[train], y_all[train], x_all[test], y_all[test]
            summary, model = train_model(fname, x_train, y_train, x_test, y_test, str(count))
            results.append(summary)
            count = count + 1
    else:
        summary, model = train_model(fname, x_train, y_train, x_test, y_test)
        results.append(summary)
        
print('DONE')
print(fname, timestamp)
print('train:test', y_train.shape[0], y_test.shape[0])
for each in results:
    print(each)

In [None]:
# Print when done
print('Done at:' , '{:%Y-%m-%dT%H:%M}'.format(datetime.now()))

# Confidence interval

In [None]:
file =  logs_dir+'/'+fname+'/devnet_summary.csv'
data = pd.read_csv(file, header=None, names=['run','loss','val_acc','epoch','time'])
accuracy = data['val_acc']
print(file)
print('Accuracy mean and 95% confidence level is', accuracy.mean(), accuracy.std()*1.96)
print('95% confidence interval is', accuracy.quantile(0.0025), 'to', accuracy.quantile(0.975))
data.boxplot(column=['val_acc'], whis=[2.5,97.5])

# Metrics

In [None]:
# Use trained model (after all epochs) to make predictions

def predictions(model, model_type, x_input, y_input, name):
    do_print = True
    y_input = y_input - y_input.min()
    x_train_mean = x_train.mean()
    x_train_std = x_train.std()
    x_input = (x_input - x_train_mean)/(x_train_std)
    x_input = reshape(x_input, model_type)
    nb_classes = len(np.unique(y_input))
    y_input = (y_input - y_input.min())/(y_input.max()-y_input.min())*(nb_classes-1)
    # Class balance
    n0 = (y_input == 0).sum()
    n1 = (y_input == 1).sum()
    # Calculate model prediction
    y_probs = model.predict_on_batch(x_input)
    y_class = y_probs.argmax(axis=1)
    cm = confusion_matrix(y_input, y_probs.argmax(axis=1), labels=[1,0])
    acc_calc = (cm[0][0]+cm[1][1])/(cm.sum())
    cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    if do_print:
        print('Predicted class probabilities:\n', y_probs[:5,:])
        print('Pred', y_class[:20])
        print('True', y_input[:20].astype(int))
        print(cm)
        print('Calculated accuracy:',acc_calc)
        print('Class balance in test set:', n0, 'to', n1, 'i.e.', n0/(n0+n1))
        print('Normalised confusion matrix:\n', cm_norm)
    title = 'Normalised confusion matrix'
    plot_confusion_matrix(cm_norm, title=title, name=name)

    # ROC and AUC
    auc = plot_roc(y_input, y_probs, name=name)
    print('AUC:', auc)

    report = classification_report(y_input, y_class)
    print('\n', report)
    print('\nmicro av - averaging the total true positives, false negatives and false positives')
    print('macro av - averaging the unweighted mean per label')
    print('weighted av - averaging the support-weighted mean per label')
    return y_class
    
y_pred = predictions(model, model_type, x_test, y_test, fname)

# Plot data samples

In [None]:
def plot_samples(x, y_true, y_pred, title, meta=None):
    if meta is not None:
        print(title)
    n_plots = 10
    fig, ax = plt.subplots(n_plots, 4, sharex='col', sharey='row', figsize=(10, 10))
    rows = [0, 0, 0, 0]
    green_red = sns.color_palette("Paired")
    colors = [green_red[3], green_red[5], green_red[2], green_red[4]]
    for i in range(len(y_pred)):
        if y_true[i]==1:
            if y_pred[i]==1:
                col = 0
            else:
                col = 1
                if meta is not None:
                    print('FN at ', meta.iloc[i]['filename'], 'sensor', meta.iloc[i]['sensor_number'])
        if y_true[i]==0:
            if y_pred[i]==0:
                col = 2
            else:
                col = 3
                if meta is not None:
                    print('FP at ', meta.iloc[i]['filename'], 'sensor', meta.iloc[i]['sensor_number'])
        row = rows[col]
        rows[col] = rows[col] + 1
        if row < n_plots:
            ax[row, col].plot(x[i], color=colors[col])
            ax[0, col].set_title('True '+str(int(y_true[i]))+': Pred '+str(y_pred[i]))
            ax[row, col].set_ylim(bottom=0, top=2.2)
    ax[n_plots-1, 0].set_ylabel('x(t)')
    ax[n_plots-1, 0].set_xlabel('time, t')
    ax[n_plots-1, 1].set_xlabel('time, t')
    fig.suptitle(title)
    plt.savefig('data_samples_'+title+'.png', bbox_inches='tight')
    
plot_samples(x_test, y_test, y_pred, fname)

# Compare runs

In [None]:
file1 = '../logs/2019-03-17T12:59/private_dog0_correct/devnet_summary.csv'
data1 = pd.read_csv(file1, header=None, names=['run','loss','val_acc','epoch','time'])
name1 = 'dog0_correct'

file = logs_dir+'/'+fname+'/devnet_summary.csv'
print('Showing results from:\n', file1, 'and\n', file)
data2 = pd.read_csv(file, header=None, names=['run','loss','val_acc','epoch','time'])
name2 = 'this_run'

all_data = [data1['val_acc'], data2['val_acc']]
sns.set(style="whitegrid")
ax = sns.boxplot(data=all_data)
ax = sns.swarmplot(data=all_data, color='black')
ax.set_xlabel('DevNet')
ax.set_ylabel('validation accuracy')
ax.yaxis.set_major_formatter(FuncFormatter('{0:.0%}'.format))
plt.xticks([0, 1], [name1, name2])
plt.tight_layout()
plt.savefig('boxplot_devnet.png', bbox_inches='tight')

## Make predictions on other datasets

In [None]:
# NB consider if any of this dataset was present in the model's training set
if True:
    other = fname+'_END_TEST' #_dog_incorrect' # 'private_dog0_correct_plus_END_TEST'
    datadir = fdir+'/'+fname
    print('Testing on:', datadir+'/'+other+'.txt')
    x_other, y_other = readucr(datadir+'/'+other+'.txt')
    y_other_pred = predictions(model, model_type, x_other, y_other, other)
    # Get dog result
    meta = pd.read_csv(datadir+'/'+other+'_meta.txt', sep=',', parse_dates=['date'])
    cm = confusion_matrix(y_other, meta['dog_pred'], labels=[1,0])
    print('Dog cm \n', cm)
    dog_acc = (cm[0][0]+cm[1][1])/(cm.sum())
    cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    plot_confusion_matrix(cm_norm, title='Dog indications', name='dog_pred')
    print('True', y_other[:20])
    print('Dog ', meta['dog_pred'].values[:20])
    print('Dog accuracy', dog_acc)
    
    # Plot data
    plot_samples(x_other, y_other, y_other_pred, 'DNN predictions', meta)
    plot_samples(x_other, y_other, meta['dog_pred'], 'Dog indications', meta)

    


# Plot the difference in dog and DNN predictions


In [None]:
def plot_differences(x, y_pred, meta):
    # Concatenate all data
    y_diff = abs(meta['dog_pred'].values-y_pred.T)
    y_diff_df = pd.DataFrame(y_diff, columns=['y_diff'])
    y_pred_df = pd.DataFrame(y_pred, columns=['y_pred'])
    x_df = pd.DataFrame(x)
    data_meta = pd.concat([y_pred_df, y_diff_df, meta], axis=1)
    meta_header = list(data_meta)
    data_meta = pd.concat([data_meta, x_df], axis=1)
    
    # Sort the data
    data_meta = data_meta.sort_values(['class', 'y_diff', 'dog_result'])
    class0 = data_meta[data_meta['class']==0]
    class0 = class0.sort_values(['y_diff', 'dog_result'], ascending=[False, False])
    class1 = data_meta[data_meta['class']==1]
    class1 = class1.sort_values(['y_diff', 'dog_result'], ascending=[False, True])

    # Plot the data where dog and DNN did not agree
    for this_class in [class1, class0]:
        # Get x data
        this_x = this_class[this_class.columns.difference(meta_header)]
        assert this_x.shape[1] == 1000
        # Count plots required
        n = 0
        for i in range(len(this_class)):
            if this_class.iloc[i]['y_pred'] != this_class.iloc[i]['dog_pred']:
                n = n + 1
        # Create the plots
        fig, ax = plt.subplots(n, 3, sharex='col', sharey='row', figsize=(10, 4.8))
        class_label = this_class.iloc[0]['class']
        row = 0
        for i in range(len(this_class)):
            if row < n:
                dog_correct = this_class.iloc[i]['dog_pred'] == this_class.iloc[i]['class']
                dnn_correct = this_class.iloc[i]['y_pred'] == this_class.iloc[i]['class']
                dog_color = 'green' if dog_correct else 'red'
                dnn_color = 'green' if dnn_correct else 'red'  
                if this_class.iloc[i]['y_pred'] != this_class.iloc[i]['dog_pred']:
                    ax[row, 0].plot(this_x.iloc[i], color=dog_color)
                    ax[row, 1].plot(this_x.iloc[i], color=dnn_color)
                    ax[row, 0].set_ylim(bottom=0, top=2.2)
                    ax[row, 1].set_ylim(bottom=0, top=2.2)
                    file = this_class.iloc[i]['filename']
                    sensor = str(this_class.iloc[i]['sensor_number'])
                    ax[row, 2].text(0, 0.65, str(row+1)+') '+file+' sensor '+sensor)
                    row = row + 1
        ax[0, 0].set_title('Dog')
        ax[0, 1].set_title('DNN')
        ax[n-1, 0].set_ylabel('x(t)')
        ax[n-1, 0].set_xlabel('time, t')
        ax[n-1, 1].set_xlabel('time, t')
        ax[n-1, 2].set_xticklabels([])
        fig.suptitle('True class: '+str(class_label)+'    green=correct, red=incorrect')
        plt.savefig('DogDNN_diffs_class' + str(class_label) + '_'+fname+'.png', bbox_inches='tight')

plot_differences(x_other, y_other_pred, meta)

In [None]:
def plot_similarities(x, y_true, y_pred, y_dog, title):
    # Calculate number of plots required
    rows = [0, 0, 0, 0]
    for i in range(len(y_pred)):
        col = -1
        if y_true[i]==1:
            if y_pred[i]==1 and y_dog[i]==1:
                col = 0
            elif y_pred[i]==0 and y_dog[i]==0:
                col = 1
        if y_true[i]==0:
            if y_pred[i]==0 and y_dog[i]==0:
                col = 2
            elif y_pred[i]==1 and y_dog[i]==1:
                col = 3
        if col != -1:
            rows[col] = rows[col]+1
    n_plots = max(rows)
    
    # Set up the subplots
    fig, ax = plt.subplots(n_plots, 4, sharex='col', sharey='row', figsize=(10, 10))
    rows = [0, 0, 0, 0]
    green_red = sns.color_palette("Paired")
    colors = [green_red[3], green_red[5], green_red[2], green_red[4]]
    
    # Create each plot
    for i in range(len(y_pred)):
        col = -1
        if y_true[i]==1:
            if y_pred[i]==1 and y_dog[i]==1:
                col = 0
            elif y_pred[i]==0 and y_dog[i]==0:
                col = 1
        if y_true[i]==0:
            if y_pred[i]==0 and y_dog[i]==0:
                col = 2
            elif y_pred[i]==1 and y_dog[i]==1:
                col = 3
        if col != -1:
            row = rows[col]
            rows[col] = rows[col] + 1
            if row < n_plots:
                ax[row, col].plot(x[i], color=colors[col])
                ax[0, col].set_title('True '+str(int(y_true[i]))+': Pred '+str(y_pred[i]))
                ax[row, col].set_ylim(bottom=0, top=2.2)
                ax[row, 0].set_yticklabels([])
    
    # Add labels and title
    for c in range(4):
        ax[n_plots-1, c].set_xlabel('time, t')
    fig.suptitle(title)
    plt.savefig('DogDNN_match_'+fname+'.png', bbox_inches='tight')
    
plot_similarities(x_other, y_other, y_other_pred, meta['dog_pred'], 'Samples where the DNN\'s prediction matched the dog\'s indication')