# Anomaly detection using matrix profile

In [1]:
import wfdb
import stumpy
import numpy as np
import pandas as pd

pd.options.plotting.backend = "plotly"

def normalize(series: pd.Series) -> pd.Series:
    """
    Normalizes a pandas Series to a 0-1 range.

    Args:
        series (pd.Series): The data series to be normalized.

    Returns:
        pd.Series: The normalized series with values ranging from 0 to 1.
    """
    return (series - series.min()) / (series.max() - series.min())



def calculate_matrix_profile(series: pd.Series, window_size: int, normalize_data: bool = True) -> np.ndarray:
    """
    Calculates the matrix profile for a given time series.

    Args:
        series (pd.Series): The input time series data.
        window_size (int): The window size for the matrix profile.
        normalize_data (bool, optional): If True, normalizes the series before
                                         calculating the matrix profile. Defaults to True.

    Returns:
        np.ndarray: The matrix profile distances with NaN padding for alignment.
    """
    if normalize_data:
        series = normalize(series).dropna()

    # Calculate matrix profile
    mp = stumpy.stump(series.astype("float"), m=window_size, ignore_trivial=True, normalize=False)
    mp_dist = mp[:, 0]

    # Fill NaNs, only for the plot 
    nan_value_count = np.empty(len(series) - len(mp_dist))
    nan_value_count.fill(np.nan)
    mp_dist = np.concatenate((nan_value_count, mp_dist.astype(float)))

    return mp_dist

def plot_matrix_profile_results(series: pd.Series, mp_dist: np.ndarray, annotations: wfdb.Annotation) -> None:
    """
    Plots the time series signal with its matrix profile and annotations.

    Args:
        series (pd.Series): The input time series data.
        mp_dist (np.ndarray): The matrix profile distances.
        annotations (wfdb.Annotation): Annotations for specific events in the series.
    """
    df_tmp = series.to_frame(name = "Original Series").assign(MatrixProfileScore=mp_dist)

    # Create a plot
    fig = df_tmp.plot(template="simple_white", title="Original series and matrix profile")
    fig.update_layout(title={'text': "Original series and matrix profile", 'x': 0.5})

    # Add horizontal line
    fig.add_hline(y=1, line_width=1, line_color="red", line_dash="dash") 

    # Update y-axis label
    fig.update_yaxes(title_text="Amplitude")  
    fig.update_xaxes(title_text="Index")  

    # Add annotations
    for index, symbol in zip(annotations.sample, annotations.symbol):
        fig.add_annotation(
            x=index,
            y=df_tmp.iloc[:, 0].max(),
            text=symbol,
            showarrow=True,
            arrowhead=2,
            ax=0,
            ay=-40,
            font=dict(color='red', size=12)
        )
    
    fig.show()

def run_matrix_profile(file_path: str, window_size: int, sampto: int) -> None:
    """
    Loads a signal from a WFDB file, computes the matrix profile, and plots the result.

    Args:
        file_path (str): Path to the WFDB file.
        window_size (int): Window size for matrix profile calculation.
        sampto (int): The number of samples to load from the file.
    """
    # Loading series and annotations
    record = wfdb.rdrecord(file_path, sampto=sampto)
    annotations = wfdb.rdann(file_path, 'atr', sampto=sampto)

    # Get only the first series
    signal = record.p_signal[:, 0]
    df_signal = pd.Series(signal)

    # Calculate matrix profile
    mp_dist = calculate_matrix_profile(df_signal, window_size=window_size)  

    # Plot results
    plot_matrix_profile_results(df_signal, mp_dist, annotations)


In [2]:
# Define some constants

window_size = 180
sampto = 20000

## File 113

In [3]:
file_path = "mit-bih-arrhythmia-database-1.0.0/113"

run_matrix_profile(file_path, window_size, sampto)

## File 215

In [4]:
file_path = "mit-bih-arrhythmia-database-1.0.0/215"

run_matrix_profile(file_path, window_size, sampto)

## File 123

In [5]:
file_path = "mit-bih-arrhythmia-database-1.0.0/123"

run_matrix_profile(file_path, window_size, sampto)

## File 207

In [6]:
file_path = "mit-bih-arrhythmia-database-1.0.0/207"

run_matrix_profile(file_path, window_size, sampto)

# Pattern Match

In [7]:
from typing import Tuple
import plotly.graph_objects as go

def calculate_matches(file: str, sampto: int, query_start: int, query_length: int, max_distance: float) -> Tuple[np.ndarray, np.ndarray, list, wfdb.Annotation]:
    """
    Calculates matches of a query segment within a time series using the STUMP algorithm.

    Args:
        file (str): Path to the WFDB file containing the time series data.
        sampto (int): The number of samples to load from the file.
        query_start (int): The starting index of the query segment in the time series.
        query_length (int): The length of the query segment.
        max_distance (float): The maximum allowable distance for a match.

    Returns:
        tuple: A tuple containing:
            - np.ndarray: The original time series data.
            - np.ndarray: The query segment.
            - list: A list of matches, where each match is a tuple of (distance, index).
            - wfdb.Annotation: Annotations for specific events in the time series.
    """
    # Loading series and annotations
    record = wfdb.rdrecord(file, sampto=sampto)
    annotations = wfdb.rdann(file, 'atr', sampto=sampto)

    # Get only the first series
    T = record.p_signal[:, 0]

    # Define the Query
    Q = T[query_start:query_start + query_length]

    # Got the matchs
    matches = stumpy.match(Q, T, query_idx=query_start, max_distance=max_distance)
    
    return T, Q, matches, annotations

def plot_results(T: np.ndarray, Q: np.ndarray, matches: list, annotations: wfdb.Annotation, query_start: int, query_length: int) -> None:
    """
    Plots the original time series, the query segment, and the matching segments.

    Args:
        T (np.ndarray): The original time series data.
        Q (np.ndarray): The query segment.
        matches (list): A list of matches, where each match is a tuple of (distance, index).
        annotations (wfdb.Annotation): Annotations for specific events in the time series.
        query_start (int): The starting index of the query segment in the time series.
        query_length (int): The length of the query segment.
    """
    fig = go.Figure()
    fig.add_trace(go.Scatter(y=T, mode='lines', name='Original Series', line=dict(color='blue')))

    for i, (dist, idx) in enumerate(matches):
        fig.add_trace(go.Scatter(
            x=list(range(idx, idx + query_length)),
            y=T[idx:idx + query_length],
            mode='lines',
            name=f'Match {i + 1} (distance={dist:.2f})',
            line=dict()
        ))

    fig.add_trace(go.Scatter(
        x=list(range(query_start, query_start + query_length)),
        y=Q,
        mode='lines',
        name='Query (Q)',
        line=dict(color='orange', width=2)
    ))

    for index, symbol in zip(annotations.sample, annotations.symbol):
        fig.add_annotation(
            x=index,
            y=T.max(),
            text=symbol,
            showarrow=True,
            arrowhead=2,
            ax=0,
            ay=-40,
            font=dict(color='red', size=12)
        )

    fig.update_layout(
        title="Signal and Matrix Profile with Correspondences",
        title_x=0.5,
        xaxis_title="Index",
        yaxis_title="Amplitude",
        template="simple_white",
    )

    fig.show()

In [8]:
# Executar análise do sinal ECG
file = "mit-bih-arrhythmia-database-1.0.0/207"
sampto = 20000
query_start = 20
query_length = 100
max_distance = 4.5

T, Q, matches, annotations = calculate_matches(file, sampto, query_start, query_length, max_distance)
plot_results(T, Q, matches, annotations, query_start, query_length)

In [9]:
# Executar análise do sinal ECG
file = "mit-bih-arrhythmia-database-1.0.0/207"
sampto = 20000
query_start = 14800
query_length = 100
max_distance = 4.5

T, Q, matches, annotations = calculate_matches(file, sampto, query_start, query_length, max_distance)
plot_results(T, Q, matches, annotations, query_start, query_length)