In [None]:
# Parameters

SRCDIR = "."

w = 100
a = 4 

# Rotation-invariant similarity in time series using bag-of-patterns representation

Implementation of Lin et. al. (2012), a histogram-based representation for time series data, similar to the “bag of words” approach - called "bag of patterns".

_Inputs:_
- _S: list of  time series as a numpy arrays_
- _n: size of the sliding window_
- _w: word length - also length of the extracted patterns_
- _a: alphabet size_

In [None]:
# Imports
from scipy.stats import rankdata, norm
from statsmodels.api import qqplot
import matplotlib.pyplot as plt
from itertools import product
import seaborn as sns
import pandas as pd
import numpy as np
import sys

sys.path.append(SRCDIR)

# Plot parameters
plt.rc('figure', titlesize=22)
plt.rc('axes', edgecolor='gray', facecolor='#FAFAFA', 
       titlecolor='#242424', titlesize=22, titlepad=10,
       labelcolor='#242424', labelsize=15)
plt.rc('grid', color='grey', linestyle='dotted', linewidth=0.5, alpha=0.8)
plt.rc('xtick', direction='out', color='#242424', labelsize=15)
plt.rc('ytick', direction='out', color='#242424', labelsize=15)
plt.rc('lines', linewidth=1)
plt.rc('figure.subplot', hspace=0.4)

# Pandas DataFrame parameters
pd.options.display.max_rows = 999
pd.options.display.max_colwidth = 100

::: {.panel-tabset}
# Loading data

In [1]:
def make_string(column): 
    """
    Converts a list of strings into a string. E.g. ['a', 'b', 'c'] --> 'abc'
    """
    return ''.join(l for l in column)

def motifs_table(S_discrete_list):
    """
    Given a list of discrete subsequences, store their starting position and their frequency
    """
    table = defaultdict(list)
    freq = defaultdict(int)
    
    for i, seq in enumerate(S_discrete_list):
        word = make_sentence(seq)
        table[word].append(i)
        freq[word] += 1
    
    return table, freq

def extract_patterns(S, n):
    """
    Given a time series S as a numpy array, extract all subsequences, of length n, with a stride of 1.
    Returns a numpy array, each row corresponding to a subsequence. 
    
    Uses an indexer matrix for faster computation time.
    """
    
    N = len(S)
    
    # k x n matrix with the correct indices to extract the subsequences
    window_indexer = np.array(
        np.expand_dims(np.arange(n), 0) + 
        np.expand_dims(np.arange(N-n), 0).T
    )
    
    return S[window_indexer]

def plot_bop(S, M_all):    
    """
    Simple plot function
    """
    
    # Extract important parameters from the arrays
        # Merge all keys from the dictionaries in M_all, and reset their values to 0
    merged_dict = {}
    for m in M_all:
        merged_dict.update(m)
    global_dictionary = {key: 0 for key in merged_dict} 
        # Get the max values of the time series
    max_len = 0
    max_val = 2
    min_val = -2
    for s, m in zip(S, M_all):
        s = zscore_norm(s)
        if len(s) > max_len:
            max_len = len(s)
        if max(s) > max_val:
            max_val = max(s)
        if min(s) < min_val:
            min_val = min(s)
        
    # Plot 
    # f, axes = plt.subplots(len(S), 2, figsize=(15, len(S)*2))
    f = plt.figure(figsize=(15, len(S)*2))
    gs = f.add_gridspec(len(S), 3)
    colors = sns.color_palette("hls", 8)
    
    #for i, (s, m, ax1, ax2) in enumerate(zip(S, M_all, axes.ravel()[::2], axes.ravel()[1::2])):
    for i, (s, m) in enumerate(zip(S, M_all)):
        # Normalized time series
        ax1 = f.add_subplot(gs[i, 0])
        ax1.plot(zscore_norm(s), c=colors[i%len(colors)])
        #ax1.set_xlim((0, max_len))
        ax1.set_xlim((0, len(s)))
        ax1.set_ylim((min_val, max_val))
        # Histogram
        hist = {**global_dictionary , **m} # Merge dict so every histogram has the same x-axis
        keys = np.sort(list(hist.keys())) # Sort keys by alphabetical order
        vals = [list(hist.values())[i]/max(list(hist.values())) for i in np.argsort(list(hist.keys()))]
        ax2 = f.add_subplot(gs[i, 1:])
        ax2.bar(keys, vals, color=colors[i%len(colors)])
        ax2.set_ylim((0, 1.1))
        ax2.tick_params(axis="x", rotation=90, labelsize=12)
        
    plt.tight_layout()
    plt.show()
    
    return

::: {.panel-tabset}
# SAX Representation

## Normalization: INT

In [None]:
def plot_INT(df, df_INT):
    
    for col in df.columns[1:]:

        _, ax = plt.subplots(figsize=(12, 2)) 

        ax.plot(df_INT['t'], df_INT[col])
        ax.set_title(f'INT - Column {col}')
        ax.set_xlabel('Time')
        ax.set_ylabel(f'{col}')
        plt.show()

        f = plt.figure(figsize=(12, 2))
        ax = f.add_subplot(1, 2, 1)
        qqplot(df[col], line='45', ax=ax, c='royalblue', markersize=0.5)
        ax.set_xlabel('Normal Theoretical Quantiles')
        ax.set_ylabel('Data Quantiles')
        ax.set_title(f'QQ-plot - Original data\nColumn {col}')

        ax = f.add_subplot(1, 2, 2)
        qqplot(df_INT[col], line='45', ax=ax, c='royalblue', markersize=0.5)
        ax.set_xlabel('Normal Theoretical Quantiles')
        ax.set_ylabel('Data Quantiles')
        ax.set_title(f'QQ-plot - Normalized data\nColumn {col}')

        plt.show()
    
    return

def INT(S, c):
    """
    Rank-based inverse normal transformation is a nonparametric transformation to 
    convert a sample distribution to the normal distribution.
    NaN values are ignored.

    Inputs:
    - S = pandas series 
    - c = Bloms constant

    Output:
    - transformed = normally distributed pandas series 
    """

    # Get rank, ties are averaged
    rank = rankdata(S, method="average")

    # Convert rank to normal distribution
    n = len(rank)
    transformed = norm.ppf((rank-c)/(n-2*c+1))

    return transformed

def INT_step(df, c=3.0/8):
    """
    Transforms all series in a dataframe using the rank-based inverse normal
    transformation.
    """

    # Initialize output DataFrame
    df_INT = pd.DataFrame(index=df.index)
    df_INT['t'] = df['t']

    # Process each column of the input dataframe
    for column in df.columns[1:]:
        df_INT[column] = INT(df[column], c)
    
    # Plot
    plot_INT(df, df_INT)

    return df_INT

In [None]:
df_INT = INT_step(df)

## PAA & SAX

In [None]:
def PAA_step(df_INT, w):
    """
    Piecewise Aggregate Approximation (PAA) (Keogh et. al., 2001) for multivariate 
    time series.

    Inputs:
    - df_INT = Resulting DataFrame from the normalization step 
    - w = number of segments
    """

    n = len(df_INT)

    # Resample dataframe
    new_index = df_INT.index.repeat(w)
    df_temp = df_INT.iloc[:, 1:].reindex(new_index) # Do not apply on the time column

    # Create new re-indexed dataframe from previously re-indexed dataframe
    df_PAA = pd.DataFrame(data=df_temp.to_numpy(),
                          index=[i for i in range(w) for k in range(n)],
                          columns=df_INT.columns[1:])
    
    # Average segments
    df_PAA = df_PAA.groupby(df_PAA.index).mean()
    
    # Create a new correct time column
    new_t = pd.date_range(start=df_INT['t'].iloc[0], 
                          end=df_INT['t'].iloc[-1],
                          periods=w+1)
    df_PAA['t'] = new_t[:-1]
    
    # Reorder columns
    cols = df_PAA.columns.tolist()
    new_cols = cols[-1:] + cols[:-1]
    df_PAA = df_PAA[new_cols]

    return df_PAA


def plot_SAX_plt(df_INT, df_PAA, df_SAX, breakpoints, w):
    
    for col in df.columns[1:]:

        _, ax = plt.subplots(figsize=(12, 2))

        for line in breakpoints:
            ax.axhline(line, c='crimson', alpha=0.4, ls='--', lw=0.7)

        dt = df_SAX['t'].iloc[1] - df_SAX['t'].iloc[0]
        for t, letter in enumerate(df_SAX[col]):
            ax.text(df_PAA['t'].iloc[t]+dt/2, 
                    df_PAA[col].iloc[t]+0.1, 
                    f'{letter}', c='crimson', 
                    ha='center', fontsize=15)

        # PAA
        ax.step(df_PAA['t'], df_PAA[col], where='post', c='royalblue', alpha=0.9)
        ax.plot([df_PAA['t'].iloc[-1], df_INT['t'].iloc[-1]], 
                [df_PAA[col].iloc[-1], df_PAA[col].iloc[-1]], 
                c='royalblue', alpha=0.9)

        # INT Time Series
        ax.plot(df_INT['t'], df_INT[col], alpha=0.5)

        ax.set_title(f'SAX Representation - w = {w}\nColumn {col}')
        ax.set_xlabel('Time')
        ax.set_xlim((df_INT['t'].iloc[0], df_INT['t'].iloc[-1]))
        ax.set_ylabel(f'{col}')
        plt.show()
    
    return

def distance_table(a, breakpoints):
    """
    Look-up table to compute the distance on symbolized time series.
    """
    
    table = np.zeros((a, a))
    
    for i in range(table.shape[0]):
        for j in range(table.shape[1]):
            if abs(i-j) <= 1:
                table[i, j] = 0
            else:
                table[i, j] = breakpoints[max(i, j)] - breakpoints[min(i, j)+1]
    
    return table

def SAX_step(df_PAA, a):
    """
    Symbolic representation of S_paa.

    Inputs:
    - df_PAA = PAA of a pandas dataframe
    - a = alphabet size.
    """
    
    # Define alphabet from alphabet size and corresponding breakpoints
    alphabet = np.array([chr(i) for i in range(97, 97 + a)])
    breakpoints = norm.ppf(np.linspace(0, 1, a+1)[1:-1])
    
    # Compute a look-up table that allows us to define a distance
    # metric on symbolized time series. Useful for later
    dist_table = distance_table(a, breakpoints)
    
    # Initialize output DataFrame
    df_SAX = pd.DataFrame(index=df_PAA.index)
    df_SAX['t'] = df_PAA['t']
    
    # Process each column of the input dataframe
    n_timestamps = len(df_PAA)
    for column in df.columns[1:]:
        
        # Convert each value into a letter
        discretization = []
        for t in range(n_timestamps):
            
            if np.isnan(df_PAA[column].iloc[t]):
                discretization.append(np.nan)
            else:
                discretization.append(alphabet[np.searchsorted(breakpoints,
                                                               df_PAA[column].iloc[t],
                                                               side='left')]) 
    
        df_SAX[column] = discretization
    
    # Plot
    plot_SAX_plt(df_INT, df_PAA, df_SAX, breakpoints, w)
    
    return df_SAX, dist_table

In [None]:
df_PAA = PAA_step(df_INT, w=w)
df_SAX, dist_table = SAX_step(df_PAA, a=a)

:::

::: {.panel-tabset}
# BOP Representation

In [None]:
# Distance function on symbolized time series, according to Lin et. al. (2007)

def dist(S_discrete_1, S_discrete_2, dist_table):
    
    distance = np.zeros(len(S_discrete_1))
    
    for i, (s1, s2) in enumerate(zip(S_discrete_1, S_discrete_2)):
        distance[i] = dist_table[ord(s1)-ord('a'), ord(s2)-ord('a')]
        
    return np.sqrt(np.sum(distance**2))

def min_dist(S_1, S_2, dist_table):
    """
    Given two symbolic representations of time series S_1 and S_2 
    as lists of the same length w, computes their distance (Lin et. al. (2007)).
    """
    
    n = len(S_1)
    prefactor = np.sqrt(n/w)
    distance = dist(S_1, S_2, dist_table)
    
    return prefactor*distance

## BOP Algorithm

In [None]:
def BOP(S, n, w, a, numerosity_reduction=True, plot=True):
    """
    Implementation of Lin et. al. (2012), a histogram-based representation for time
    series data, similar to the “bag of words” approach - called "bag of patterns"
    
        - S: list of  time series as a numpy arrays
        - n: size of the sliding window
        - w: word length - also length of the extracted patterns
        - a: alphabet size
    """
    
    S_norm_all = []
    S_paa_all = []
    S_discrete_all = []
    hash_table_all = []
    M_all = []
        
    for s in S:
        subsequences = extract_patterns(s, n)

        S_norm_list = []
        S_paa_list = []
        S_discrete_list = []

        for seq in subsequences:
            S_norm = zscore_norm(seq)
            S_paa = paa(S_norm, w)
            S_discrete, breakpoints = discretization(S_paa, a)

            # Option for numerosity reduction - recommended for slow moving time series
            if numerosity_reduction:
                # If we have a succession of the same word, we only count it once
                if len(S_discrete_list):
                    word = ''.join(l for l in S_discrete)
                    if word == S_discrete_list[-1]:
                        pass
                    else:
                        S_norm_list.append(S_norm)
                        S_paa_list.append(S_paa)
                        S_discrete_list.append(''.join(l for l in S_discrete)) 
                
                # If it is the first subsequence, add everything to the arrays
                else:
                    S_norm_list.append(S_norm)
                    S_paa_list.append(S_paa)
                    S_discrete_list.append(''.join(l for l in S_discrete)) 

            # No numerosity reduction
            else:
                S_norm_list.append(S_norm)
                S_paa_list.append(S_paa)
                S_discrete_list.append(''.join(l for l in S_discrete))

        S_norm = np.concatenate(S_norm_list, axis=None)
        S_paa = np.concatenate(S_paa_list, axis=None)
        S_discrete = np.concatenate(S_discrete_list, axis=None)

        hash_table, M = motifs_table(S_discrete)

        S_norm_all.append(S_norm)
        S_paa_all.append(S_paa)
        S_discrete_all.append(S_discrete)
        hash_table_all.append(hash_table)
        M_all.append(M)
        
    if plot:
        plot_bop(S, M_all)
        
    return S_norm_all, S_paa_all, S_discrete_all, hash_table_all, M_all

## Distance Metric

In [None]:
def bop_distance(M_all, plot=True):
    """
    The distance is defined as the Euclidian distance between the histograms, i.e. the 
    Euclidian distance between the pattern's occurrence frequency. 
    """
    
    distance_matrix = np.zeros((len(M_all), len(M_all)))
    for i in range(distance_matrix.shape[0]):
        for j in range(i+1, distance_matrix.shape[1]):
            dist = 0
            for key in M_all[i].keys():
                dist += (M_all[i][key]-M_all[j][key])**2
            dist= round(np.sqrt(dist), 2)
            distance_matrix[j, i] += dist
            
    if plot:
        f, ax = plt.subplots(figsize=(8, 8))
        mask = np.zeros_like(distance_matrix)
        mask[np.triu_indices_from(mask)] = True
        sns.heatmap(distance_matrix, mask=mask, annot=True, fmt='.1f',
                    square=True, cbar_kws={'shrink': 0.5}, cmap='Blues', ax=ax)
        plt.show()
        
    return distance_matrix

:::

:::