In this notebook we'll train a classifier to find **point behaviors** in inertial sensor data.

In [1]:
%load_ext autoreload
%autoreload 2

In [13]:
from functools import partial
from ipdb import set_trace
from IPython.display import display
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from scipy.signal import find_peaks
from sklearn.model_selection import train_test_split
from sktime.classification.compose import TimeSeriesForestClassifier, ColumnEnsembleClassifier
from sktime.utils.data_processing import from_3d_numpy_to_nested
import stickleback as sb
from typing import List, Tuple

rg = np.random.Generator(np.random.PCG64(1134))
def flatten(list_of_lists: List[List]) -> List:
    """
    Flatten a list of lists
    
        Parameters: 
            list_of_lists: a nested list
        
        Return:
            A flattened list 
    
    """
    return [i for sublist in list_of_lists for i in sublist]

def assert_shape(x: np.ndarray, shape: List[int]):
    """ 
    Assert shape of ndarray. Via https://medium.com/@nearlydaniel/assertion-of-arbitrary-array-shapes-in-python-3c96f6b7ccb4
    
        Parameters:
            x: ndarray for shape assertion
            shape: shape specification
    """
    assert len(x.shape) == len(shape), (x.shape, shape)
    for _a, _b in zip(x.shape, shape):
        if isinstance(_b, int):
            assert _a == _b, (x.shape, shape)

def assert_dtype(x: np.ndarray, dtype: np.dtype):
    """
    Assert dtype of ndarray.
    
        Parameters:
            x: ndarray for dtype assertion
            dtype: dtype specification
    """
    assert issubclass(x.dtype.type, dtype), (x.dtype.type, dtype)

Read example longitudinal sensor data and events

In [19]:
# Read example data
breath_sb = sb.stickleback.Stickleback(
    sensors=pd.read_pickle("../data/bw180828-49_prh10.pkl"), 
    events=pd.DatetimeIndex(pd.read_pickle("../data/bw180828-49_breaths.pkl")),
    win_size=40
)

breath_sb.plot_sensors_events()

NameError: name 'events' is not defined

Create the training dataset consisting of **all** events and an equal sample size of negatives (randomly selected). We'll use 40 record windows (i.e., four seconds) and make sure negative sample windows don't overlap the positives.

In [None]:
win_size = 40
event_idx = np.array([sensors.index.get_loc(e, method="nearest") for e in events])

def diff_from(xs: np.ndarray, ys: np.ndarray) -> np.ndarray:
    """
    Return the array-wise least difference from another array
    
        Parameters:
            xs: the basis array
            ys: the target array
            
        Returns:
            The minimum absolute difference for each element of xs from the closest value in ys
    """
    return np.array([np.min(np.abs(x - ys)) for x in xs])

# Valid indices for negatives
nonevent_choices = np.array(range(win_size, len(sensors.index) - win_size - 1, win_size))
diff_from_event = diff_from(nonevent_choices, event_idx)
nonevent_choices = nonevent_choices[diff_from_event > win_size]

# Randomly choose 
nonevent_idx = rg.choice(nonevent_choices, size=len(event_idx), replace=False)
nonevent_idx.sort()

print("+: " + str(event_idx))
print("-: " + str(nonevent_idx))

Convert positives and negatives to sktime-compatible format

In [None]:
def extract_nested(data: pd.DataFrame, idx: np.ndarray, window_size: int) -> pd.DataFrame:
    """
    Extract samples from longitudinal sensor data and reformat into nested sktime DataFrame format
    
        Parameters:
            data: longitudinal sensor data
            idx: indices of sample centers
            window_size: number of records in each sample window
        
        Returns:
            Sample windows in nested sktime DataFrame format
    """
#     set_trace()
    assert_shape(idx, [None])
    assert_dtype(idx, np.integer)
    assert idx.min() >= int(window_size / 2), "idx out of bounds"
    assert idx.max() < len(data) - int(window_size / 2), "idx out of bounds"
    
    # Create a 3d numpy array of window data
    data_3d = np.empty([len(idx), len(data.columns), window_size], float)
    data_arr = data.to_numpy().transpose()
    start_idx = idx - int(window_size / 2)
    for i, start in enumerate(start_idx):
        data_3d[i] = data_arr[:, start:(start + window_size)]

    # Convert 3d numpy array to nested sktime DataFrame format
    nested = from_3d_numpy_to_nested(data_3d)
    nested.columns = data.columns
    nested.index = data.index[idx]
    
    return nested

nested_events = extract_nested(sensors, event_idx, win_size)
nested_nonevents = extract_nested(sensors, nonevent_idx, win_size)
clf_data = pd.concat([nested_events, nested_nonevents])
clf_labels = ["event"] * len(nested_events) + ["nonevent"] * len(nested_nonevents)
display(clf_data.head())
print(clf_labels[0:5])

Fit a ColumnEnsembleClassifier composed of TimeSeriesForest estimators

In [None]:
clf = ColumnEnsembleClassifier(
    estimators=[("TSF_" + c, TimeSeriesForestClassifier(n_estimators=100), [i]) 
                for i, c in enumerate(clf_data.columns)]
)
X_train, X_test, y_train, y_test = train_test_split(clf_data, clf_labels, random_state=42)
clf.fit(X_train, y_train)
clf.score(X_test, y_test)

Use model to predict on entire longitudinal sensor data set

In [None]:
def nest_all(data: pd.DataFrame, window_size: int, nth: int = 1, exclude: np.ndarray = None) -> pd.DataFrame:
    """
    Convert longitudinal sensor data into nested sktime DataFrame format
    
        Parameters:
            data: longitudinal sensor data
            window_size: number of records in each window
            nth: nest every nth value (1 by default i.e., no skips)
            exclude: indices to exclude (None by default)
            
        Return:
            Windows in nested sktime DataFrame format
    """
    if exclude is not None:
        assert_shape(exclude, [None])
        assert_dtype(exclude, np.integer)
    
    idx = np.array(range(int(window_size / 2), len(data) - int(window_size / 2), nth))
    if exclude is not None:
        idx = np.setdiff1d(idx, exclude)
    return extract_nested(data, idx, window_size)

sensors_nested = nest_all(sensors, win_size, nth=5)
display(sensors_nested.head())

proba = clf.predict_proba(sensors_nested)

Convert prediction probabilities to discrete events by finding *peaks*. Predicted events are considered *true positives* if they're within a tolerance of a labeled event.

In [None]:
def discrete_events(event_proba: pd.Series, proba_thr: float, min_period: int) -> pd.Series:
    """
    Convert continuous event probabilities to discrete events
    
        Paramters:
            event_proba: event probabilities (between 0 and 1)
            min_period: minimum period between events
        
        Return:
            A boolean series indicating predicted events
    """
    proba_peaks = find_peaks(event_proba, height=proba_thr, distance=min_period)
    result = pd.Series(False, name="predicted", index=event_proba.index)
    result.iloc[proba_peaks[0]] = True
    return result

def assess(predicted: pd.DatetimeIndex, actual: pd.DatetimeIndex, data: pd.DataFrame, tolerance: int) -> pd.Series:
    """
    Assess prediction accuracy
    
    The closest predicted event to each actual event (within tolerance) is considered a true positive. Predicted 
    events that are not the closest prediction to an actual event (or farther than tolerance) are considered false
    positives. Actual events with no predicted event within the tolerance are considered false negatives.
    
        Parameters:
            predicted: datetimes of predicted events
            actual: datetimes of actual events
            data: longitudinal sensor data
            window_size: number of records per window
            
        Return:
            A series indexed by time with "TP" for true positives, "FP" for false positives, "FN" for false
            negatives, and "TN" for true negatives.
    """
    # Find indices of actual and predicted in sensor data
    actual_idx = np.array([data.index.get_loc(a) for a in actual])
    predicted_idx = np.array([data.index.get_loc(p) for p in predicted])
    
    # Find closest predicted to each actual and their distance
    closest = np.array([np.argmin(np.abs(predicted_idx - a)) for a in actual_idx])
    distance = np.array([np.min(np.abs(predicted_idx - a)) for a in actual_idx])
    
    # Initialize result as true negative
    result = pd.Series("TN", name = "outcome", index = data.index)
    
    # Iterate through actual events. The closest predicted event within the tolerance is a true positive. If no
    # predicted events are within the tolerance, the actual event is a false negative.
    for i, (c, d) in enumerate(zip(closest, distance)):
        if d <= tolerance:
            result[predicted_idx[c]] = "TP" 
        else:
            result[actual_idx[i]] = "FN"
    # Iterate through predicted events. Any predicted events that aren't the closest to an actual event are false
    # positives.
    for i, p in enumerate(predicted_idx):
        if i not in closest:
            result[p] = "FP" 
        
    # Sanity checks
    assert (np.sum(result == "TP") + np.sum(result == "FP")) == len(predicted), "TP + FP != count of predicted events"
    assert (np.sum(result == "TP") + np.sum(result == "FN")) == len(actual), "TP + FN != count of actual events"
    
    return result

def make_predictions(proba: np.ndarray, proba_index: pd.Index, sensors: pd.DataFrame, proba_thr: float,
                     min_per: int, pred_tol: int) -> pd.DataFrame:
    # Add prediction probabilities to sensor data
    prediction_df = sensors.copy(). \
        join(pd.Series(proba[:, 0], name="event_proba", index=proba_index)). \
        interpolate(method="cubic"). \
        fillna(0)

    # Add predicted events
    prediction_df = prediction_df.join(discrete_events(prediction_df["event_proba"], proba_thr, min_per))

    # Add actual events
    prediction_df = prediction_df.join(pd.DataFrame({"actual": True}, index=sensors.index[event_idx]))
    prediction_df["actual"].fillna(False, inplace=True) 

    # Check accuracy
    prediction_df = prediction_df.join(assess(predicted=prediction_df.index[prediction_df["predicted"]], 
                                              actual=prediction_df.index[prediction_df["actual"]],
                                              data=prediction_df,
                                              tolerance=pred_tol))
    
    return prediction_df

predictions = make_predictions(proba, sensors_nested.index, sensors, proba_thr=0.5, min_per=25, pred_tol=50)

Plot predictions for a given set of hyperparamters (probability threshold, minimum period, prediction tolerance).

In [None]:
# Plot sensor data and predictions
fig = make_subplots(rows=len(sensors.columns) + 1, cols=1,
                    shared_xaxes=True,)
predicted_only = predictions[predictions["predicted"]]
actual_only = predictions[predictions["actual"]]

for i, col in enumerate(list(sensors.columns) + ["event_proba"]):
    # Line plot
    fig.append_trace(go.Scatter(
        x=predictions.index,
        y=predictions[col],
        mode="lines"
    ), row=i + 1, col=1)
    # Predicted events
    fig.append_trace(go.Scatter(
        x=predicted_only.index,
        y=predicted_only[col],
        marker_color=["blue" if o == "TP" else "red" for o in predicted_only["outcome"]],
        mode="markers"
    ), row=i + 1, col=1)
    # Actual events
    fig.append_trace(go.Scatter(
        x=actual_only.index,
        y=actual_only[col],
        mode="markers",
        marker_symbol="circle-open",
        marker_size=10,
        marker_color="purple",
    ), row=i + 1, col=1)
    if col == "depth":
        fig.update_yaxes(autorange="reversed", row=i + 1, col=1)
    fig.update_yaxes(title_text=col, row=i + 1, col=1)
    
fig.update_layout(showlegend=False)
fig.show()

Plot AUC

In [None]:
def sensitivity(predictions: pd.DataFrame) -> float:
    tp = np.sum(predictions["outcome"] == "TP")
    fn = np.sum(predictions["outcome"] == "FN")
    return tp / (tp + fn)

def specificity(predictions: pd.DataFrame) -> float:
    tn = np.sum(predictions["outcome"] == "TN")
    fp = np.sum(predictions["outcome"] == "FP")
    return tn / (tn + fp)

def f1(predictions: pd.DataFrame) -> float:
    tp = np.sum(predictions["outcome"] == "TP")
    fp = np.sum(predictions["outcome"] == "FP")
    fn = np.sum(predictions["outcome"] == "FN")
    return 2 * tp / (2 * tp + fp + fn)

make_predictions2 = partial(make_predictions, proba, sensors_nested.index, sensors, min_per=25, pred_tol=50)
thrs = np.linspace(0, 1, num=50)
roc_tpr = np.array([sensitivity(make_predictions2(t)) for t in thrs])
roc_fpr = 1 - np.array([specificity(make_predictions2(t)) for t in thrs])

best_thr = max(thrs[roc_tpr == 1])

fig = px.line(pd.DataFrame({"TPR": roc_tpr, "FPR": roc_fpr, "thr": thrs}), 
              x="FPR", y="TPR",
              hover_data=["thr"],
              title="Best thr: %0.3f" % best_thr)
fig.show()

Add false positives to training dataset and repeat

In [None]:
def refit(clf: ColumnEnsembleClassifier, sensors: pd.DataFrame, orig_X: pd.DataFrame, orig_y: List[str],
           predictions: pd.DataFrame, window_size: int) -> Tuple[pd.DataFrame, List[str]]:
    """
    Re-fit a classifier by adding false positives to the training data set.
    
        Parameters:
            clf: event classifier
            sensors: longitudinal sensor data
            orig_X: original training data
            orig_y: original training labels
            predictions: classifier predictions, discretized and assessed (see make_predictions())
            window_size: size of classifier window (in records)
            
        Return:
            A tuple of (new training data, new training labels)
    """
    false_pos_idx = np.array([sensors.index.get_loc(i) for i in predictions.index[predictions["outcome"] == "FP"]])
    if len(false_pos_idx) == 0:
        return (orig_X, orig_y)
    new_X = pd.concat([orig_X, extract_nested(sensors, false_pos_idx, window_size)])
    new_y = orig_y + ["nonevent"] * len(false_pos_idx)
    clf.fit(new_X, new_y)
    return (new_X, new_y)

# Fit and assess
n_refit = 5
refits = {o: [0] * (n_refit + 1) for o in ["TP", "FP", "FN"]}
new_X, new_y, new_pred = X_train, y_train, predictions
for o in ["TP", "FP", "FN"]:
    refits[o][0] = np.sum(new_pred["outcome"] == o)
for i in range(1, n_refit + 1):
    old_y = new_y
    new_X, new_y = refit(clf, sensors, new_X, new_y, new_pred, win_size)
    if new_y != old_y:
        new_proba = clf.predict_proba(sensors_nested)
        new_pred = make_predictions(new_proba, sensors_nested.index, sensors, proba_thr=0.5, min_per=25, pred_tol=50)
    for o in ["TP", "FP", "FN"]:
        refits[o][i] = np.sum(new_pred["outcome"] == o)
refits

In [None]:
# Plot sensor data and predictions
fig = make_subplots(rows=len(sensors.columns) + 1, cols=1,
                    shared_xaxes=True)
predicted_only = new_pred[new_pred["predicted"]]
actual_only = new_pred[new_pred["actual"]]

for i, col in enumerate(list(sensors.columns) + ["event_proba"]):
    # Line plot
    fig.append_trace(go.Scatter(
        x=new_pred.index,
        y=new_pred[col],
        mode="lines"
    ), row=i + 1, col=1)
    # Predicted events
    fig.append_trace(go.Scatter(
        x=predicted_only.index,
        y=predicted_only[col],
        marker_color=["blue" if o == "TP" else "red" for o in predicted_only["outcome"]],
        mode="markers"
    ), row=i + 1, col=1)
    # Actual events
    fig.append_trace(go.Scatter(
        x=actual_only.index,
        y=actual_only[col],
        mode="markers",
        marker_symbol="circle-open",
        marker_size=10,
        marker_color="purple",
    ), row=i + 1, col=1)
    if col == "depth":
        fig.update_yaxes(autorange="reversed", row=i + 1, col=1)
    fig.update_yaxes(title_text=col, row=i + 1, col=1)
    
fig.update_layout(showlegend=False)
fig.show()