### Functions for computation of statistical summaries used to compute failure times

In [1]:
import numpy as np
import pandas as pd
import csv
from typing import List, Tuple
from sklearn.linear_model import LinearRegression
from scipy.signal import hilbert
from scipy.signal import hann
from scipy.signal import convolve


np.seterr(divide='ignore', invalid='ignore')
def linear_reg(data: pd.core.frame.DataFrame, abs=False):
    indexes = np.array(range(len(data)))
    arr = np.abs(data.values) if abs else data.values

    linear_regression = LinearRegression()
    linear_regression.fit(indexes.reshape(-1, 1), arr)
    return linear_regression.coef_[0]

def get_failure_times(data: pd.core.frame.DataFrame, aggregate_length: int = 150000, include_y: bool = True):
    size = len(data)
    statistical_summary = pd.DataFrame(dtype=np.float64)
    index = 0
    for i in range(0, size, aggregate_length):
        interval_data = data[i:i + aggregate_length]
        add_statistical_summaries(index, statistical_summary, interval_data.iloc[:, 0])
        if include_y:
            statistical_summary.loc[index, 'time_to_failure'] = interval_data.iloc[-1, 1]
        index += 1

    return statistical_summary

def add_statistical_summaries(index: int, statistical_summary: pd.core.frame.DataFrame, interval_data: pd.core.frame.DataFrame):
    statistical_summary.loc[index, 'mean'] = interval_data.mean()
    statistical_summary.loc[index, 'std'] = interval_data.std()
    statistical_summary.loc[index, 'min'] = interval_data.min()
    statistical_summary.loc[index, 'max'] = interval_data.max()
    absVals = np.abs(interval_data)

    statistical_summary.loc[index, 'abs_mean'] = absVals.mean()
    statistical_summary.loc[index, 'abs_std'] = absVals.std()
    statistical_summary.loc[index, 'abs_min'] = absVals.min()
    statistical_summary.loc[index, 'abs_max'] = absVals.max()
    statistical_summary.loc[index, 'q95'] = np.quantile(interval_data, 0.95)
    statistical_summary.loc[index, 'q99'] = np.quantile(interval_data, 0.99)
    statistical_summary.loc[index, 'q05'] = np.quantile(interval_data, 0.05)
    statistical_summary.loc[index, 'q01'] = np.quantile(interval_data, 0.01)

    statistical_summary.loc[index, 'std_f50000'] = interval_data[:50000].mean()
    statistical_summary.loc[index, 'mean_f50000'] = interval_data[:50000].std()
    statistical_summary.loc[index, 'min_f50000'] = interval_data[:50000].min()
    statistical_summary.loc[index, 'max_f50000'] = interval_data[:50000].max()
    statistical_summary.loc[index, 'std_l50000'] = interval_data[-50000:].mean()
    statistical_summary.loc[index, 'mean_l50000'] = interval_data[-50000:].std()
    statistical_summary.loc[index, 'min_l50000'] = interval_data[-50000:].min()
    statistical_summary.loc[index, 'max_l50000'] = interval_data[-50000:].max()

    statistical_summary.loc[index, 'std_first1000'] = interval_data[:1000].mean()
    statistical_summary.loc[index, 'mean_first1000'] = interval_data[:1000].std()
    statistical_summary.loc[index, 'min_first1000'] = interval_data[:1000].min()
    statistical_summary.loc[index, 'max_first1000'] = interval_data[:1000].max()

    statistical_summary.loc[index, 'std_last1000'] = interval_data[-1000:].mean()
    statistical_summary.loc[index, 'mean_last1000'] = interval_data[-1000:].std()
    statistical_summary.loc[index, 'min_last1000'] = interval_data[-1000:].min()
    statistical_summary.loc[index, 'max_last1000'] = interval_data[-1000:].max()

    statistical_summary.loc[index, 'trend'] = linear_reg(interval_data)
    statistical_summary.loc[index, 'trend_abs'] = linear_reg(interval_data, True)

    statistical_summary.loc[index, 'count_big'] = len(interval_data[np.abs(interval_data) > 500])
    statistical_summary.loc[index, 'hilbert_mean'] = np.abs(hilbert(interval_data)).mean()

    hann_window_150 = hann(150)
    statistical_summary.loc[index, 'hann_window_mean'] = (convolve(interval_data, hann_window_150, mode='same') / sum(hann_window_150)).mean()

    for windows in [10, 100, 1000]:
        int_std = interval_data.rolling(windows).std().dropna().values
        windows_str = str(windows)

        statistical_summary.loc[index, 'mean_int_std' + windows_str] = int_std.mean()
        statistical_summary.loc[index, 'std_int_std' + windows_str] = int_std.std()
        statistical_summary.loc[index, 'min_int_std' + windows_str] = int_std.min()
        statistical_summary.loc[index, 'max_int_std' + windows_str] = int_std.max()
        statistical_summary.loc[index, 'q95_int_std' + windows_str] = np.quantile(int_std, 0.95)
        statistical_summary.loc[index, 'q99_int_std' + windows_str] = np.quantile(int_std, 0.99)
        statistical_summary.loc[index, 'q05_int_std' + windows_str] = np.quantile(int_std, 0.05)
        statistical_summary.loc[index, 'q01_int_std' + windows_str] = np.quantile(int_std, 0.01)

        statistical_summary.loc[index, 'change_abs_int_std' + windows_str] = np.mean(np.nonzero((np.diff(int_std) / int_std[:-1]))[0])

        statistical_summary.loc[index, 'change_rate_int_std' + windows_str] = np.abs(int_std).max()

        int_mean = interval_data.rolling(windows).mean().dropna().values

        statistical_summary.loc[index, 'mean_int_mean' + windows_str] = int_mean.mean()
        statistical_summary.loc[index, 'std_int_mean' + windows_str] = int_mean.std()
        statistical_summary.loc[index, 'min_int_mean' + windows_str] = int_mean.min()
        statistical_summary.loc[index, 'max_int_mean' + windows_str] = int_mean.max()
        statistical_summary.loc[index, 'q95_int_mean' + windows_str] = np.quantile(int_mean, 0.95)
        statistical_summary.loc[index, 'q99_int_mean' + windows_str] = np.quantile(int_mean, 0.99)
        statistical_summary.loc[index, 'q05_int_mean' + windows_str] = np.quantile(int_mean, 0.05)
        statistical_summary.loc[index, 'q01_int_mean' + windows_str] = np.quantile(int_mean, 0.01)

        statistical_summary.loc[index, 'change_abs_int_mean' + windows_str] = np.mean(np.nonzero((np.diff(int_mean) / int_mean[:-1]))[0])

        statistical_summary.loc[index, 'change_rate_int_mean' + windows_str] = np.abs(int_mean).max()