# Manual Instruction
This Jupyter notebook provides a detailed explanation of the code of my research project “**Event Detection-based Method for Online Signal Synchronization in Time Series**".  

Hao Zhu, MSc at ETH
26.09.2024

## Dataset Insight
1. In this recording, there are syncing event at the beginning and ending. They are clear at Acc signals and also (especially) at Sensomat signals. However, from my observation, the starting syncing event may not successfully capture because some sensors are not activated successfully when recording began. Hence, using the ending syncing event signals are clearer and better than starting syncing event signals.  
2. Some sensors recording csv files includes multiple days. We have to exclude those unnecessary days. Therefore, we need the *Sensei-V2 - Modified-Labels.xlsx* file for help. We can use the frame number of the recordings as they are accurately reflect events. (See utils/unzipper.py section below)
3. Raw data inevitably has missing values.
4. From my observation, sensomat data provide the most accurate relection of syncing event and complete datapoints. So I set sensomat data as the reference time.

## Code Structure Overview
The repository include 3 folders: config, src, and utils
```
DATA_SYNC/  
|  
|--config/  
|  |  
|  |--csv_config.yaml  
|  
|--utils/  
|  |  
|  |--data_augmentation_utils.py  
|  |  
|  |--files_utils.py  
|  |  
|  |--plotting_utils.py  
|  |  
|  |--stream_sync.py   
|  |  
|  |--time_utils.py  
|  |  
|  |--unzipper.py  
|
|--src/  
|  |  
|  |--detectors/  
|  |  |  
|  |  |--detector_zoo.py  
|  |  |  
|  |  |--event_detector.py  
|  |  
|  |--evaluation/  
|  |  |  
|  |  |--evaluation.py
|  |
|  |--DataReceiver.py
|  |
|  |--data_benchmark.py
|  |
|  |--signal_sync.py
 
```

In [None]:
import os
import sys

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random
from datetime import datetime, timedelta
import time
import pytz
import threading

import gzip
import shutil

from matplotlib.animation import FuncAnimation
from collections import deque, defaultdict
from typing import Dict, List, Any


MYTZ = "Europe/Zurich"
tzinfo = pytz.timezone(MYTZ)

## Utils
In the following parts, I will explain the implementation of the logic of each python script under Utils

### data_augmentation_utils.py  
This script provides some data augmentation methods for time series data.  

The notable method is *apply_time_drift*, which simulates the clock shift across time. It applied a drift to the timestamps in a DataFrame over specified intervals.  

Parameters:  
df (pd.DataFrame): target DataFrame with a 'time' column to be drifted.  
drift_interval_seconds (int): Interval at which to apply the drift in seconds.  
drift_amount_seconds (float): Amount of drift to apply at each interval in seconds.  

Just focus on the **for loop**, The function iterate the each timestamp of the target df. When the time different current timestamp and the previous timestamp is larger than *specific interval (drift_interval_seconds)*, we add *specific time (drift_amount_seconds)* to all subsequent timestamps starting from index i

In [None]:
def apply_time_drift(df, drift_interval_seconds, drift_amount_seconds):
    """
    Apply time drift to the timestamps in the dataframe at regular intervals.
    
    Parameters:
    df (pd.DataFrame): DataFrame with a 'time' column.
    drift_interval_seconds (int): Interval at which to apply the drift in seconds.
    drift_amount_seconds (float): Amount of drift to apply at each interval in seconds.
    
    Returns:
    pd.DataFrame: DataFrame with drifted timestamps.
    """
    df_augmented = df.copy()
    drifted_timestamps = df_augmented['time'].copy()
    
    start_time = drifted_timestamps.iloc[0]
    
    for i in range(1, len(drifted_timestamps)):
        if (drifted_timestamps.iloc[i] - start_time).total_seconds() >= drift_interval_seconds:
            drifted_timestamps.iloc[i:] += pd.Timedelta(seconds=drift_amount_seconds)
            start_time = drifted_timestamps.iloc[i]
    
    df_augmented['time'] = drifted_timestamps
    return df_augmented

### files_utils.py
This script is a helper scirpt to unzip all the files from raw zip files. Since we don't know the exact layers of each folders, we implemented it based on a *recursion* manner.  

The function **unzipper** is the major functions to run. Given the root directory, it will unzip all .gz file resursively

In [None]:
def is_gz_file(filepath):
    # Check if the file has a .gz extension
    return filepath.endswith('.gz')


def unzip_gz_file(filepath, output_dir=None):
    if not is_gz_file(filepath):
        return ''
    
    # Define the output file path
    if output_dir is None:
        output_filepath = os.path.splitext(filepath)[0]  # Remove the .gz extension
    else:
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        output_filepath = os.path.join(output_dir, os.path.basename(os.path.splitext(filepath)[0]))

    # Unzip the .gz file
    with gzip.open(filepath, 'rb') as f_in:
        with open(output_filepath, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)
    
    return output_filepath


def unzipper(path):
    """
    recursively unzip all the csv files in  subfolders under the directory
    """
    if not os.path.isdir(path):
        
        output_file = unzip_gz_file(path)
        return
    
    for entry in os.listdir(path):
        new_path = os.path.join(path, entry)
        unzipper(new_path)

### plotting_utils.py
This is a helper function for plotting colors. There is only one function which works like a color palette so we don't need to worry how to assign colors when the number of sensors are unknown.

In [1]:
# from https://stackoverflow.com/a/25628397
def get_cmap(n, name='hsv'):
    '''Returns a function that maps each index in 0, 1, ..., n-1 to a distinct 
    RGB color; the keyword argument name must be a standard mpl colormap name.'''
    return plt.cm.get_cmap(name, n)

### time_utils.py
This files provide some helper conversion from between unix epoch time <-> datetime and frame number to exact time.  
*frame2sec*: Because we can locate the precise frame number of the start and the end of recording (provided by a excel file), we can use this to infer to precise time in seconds. FS = $\frac{\text{total frames}}{\text{total seconds}}$  

*sec2datetime*: start_time is a datetime object, adding the timedelta can result a new datetime object

In [None]:
# convert the frame rate to seconds
def frame2sec(frame_cur, frame_begin, FS):
    return (frame_cur - frame_begin) / FS

# convert seconds to datetime object
def sec2datetime(second, start_time):
    return start_time + timedelta(seconds=second)


# convert the datetime object to unix epoch time
def time2UnixEpochTime(datetime_obj):
    # Define the format
    datetime_format = '%Y-%m-%d %H:%M:%S'

    unix_timestamp = int(datetime_obj.timestamp())
    
    return unix_timestamp

def unixEpochTime2time(u_time):
    return datetime.fromtimestamp(u_time)

An example of how to use it to find the exact starting time

In [None]:
label_guide_xlsx = 'PATH_TO_XLSX'
user_id = 'sensei-223'
YEAR, MONTH, DAY = 2022, 11, 14
user_event = label_guide_xlsx[label_guide_xlsx['User'] == user_id].iloc[0]

start_frame = user_event['first_frame_of_second_START']
start_time  = user_event['frame_laptop_time_START']
end_frame   = user_event['first_frame_of_second_END']
end_time    = user_event['frame_laptop_time_END']

sync_frame_start = user_event['VideoFrame_Touchpad_START_1']
sync_frame_end = user_event['VideoFrame_Touchpad_END_3']

rec_date  = datetime(YEAR, MONTH, DAY)
start_time_comb = datetime.combine(rec_date, start_time)
end_time_comb   = datetime.combine(rec_date, end_time)

duration_seconds = (end_time_comb - start_time_comb).total_seconds()
total_frames = end_frame - start_frame

FS = total_frames / duration_seconds

accurate_sec_sync_start = frame2sec(sync_frame_start, start_frame, FS)
accurate_datetime_sync_start = sec2datetime(accurate_sec_sync_start - 5, start_time_comb)
unix_start = int(accurate_datetime_sync_start.timestamp())

accurate_sec_sync_end = frame2sec(sync_frame_end, start_frame, FS)
accurate_datetime_sync_end = sec2datetime(accurate_sec_sync_end + 15, start_time_comb)
unix_end = int(accurate_datetime_sync_end.timestamp())

### unzipper.py
This file helps to unzip all the file from root directory using the function *files_utils/unzipper* and extract the exact starting and ending unix epoch time for each user recordings by calling the example above.  

For every recording, the idea is as followed:  
1) Read the *Sensei-V2 - Modified-Labels.xlsx* file for knowing the rough labels for starting/ending of the recordings and also the events. Some important labels are:  
    • *first_frame_of_second_START* AND *frame_laptop_time_START*: *first_frame_of_second_START* (frame number of starting the recording) is accurately aligns with *frame_laptop_time_START* (time of starting the recording)  
    • *first_frame_of_second_END* AND *frame_laptop_time_END*: *first_frame_of_second_END* (frame number of ending the recording) is accurately aligns with *frame_laptop_time_START* (time of ending the recording).  
    • *VideoFrame_Touchpad_START_1* AND *VideoFrame_Touchpad_END_3*: These two labels reflect the exact frame number of starting syncing event and ending syncing event. Frame number is accurate but the time, *Timestamp_Touchpad_START_1* is NOT. 
    Because the time and frame are accurately aligned, when can use this to calculate FS: FS = $\frac{\text{total frames}}{\text{total seconds}}$. Hence, the code is like:  
  

In [None]:
start_frame = user_event['first_frame_of_second_START']
start_time  = user_event['frame_laptop_time_START']
end_frame   = user_event['first_frame_of_second_END']
end_time    = user_event['frame_laptop_time_END']


rec_date  = datetime(YEAR, MONTH, DAY)
start_time_comb = datetime.combine(rec_date, start_time)
end_time_comb   = datetime.combine(rec_date, end_time)

duration_seconds = (end_time_comb - start_time_comb).total_seconds()
total_frames = end_frame - start_frame

FS = total_frames / duration_seconds

2) The labels of timestamps of syncing event are not accurate, but the frame number are. Therefore, we can use the label *VideoFrame_Touchpad_START_1* and *VideoFrame_Touchpad_END_3* to know the frame, and use the *frame_laptop_time_START* and the calculated FS to calculate the exact starting time.  
   • line 4: get number of seconds between recording start frame and sync event start frame.  
   • line 5: recroding start time adds the seconds calculated from line 4 and convert to datetime object.  
   • line 6: convert datetime object to unix epoch time

In [None]:
sync_frame_start = user_event['VideoFrame_Touchpad_START_1']
sync_frame_end = user_event['VideoFrame_Touchpad_END_3']

accurate_sec_sync_start = frame2sec(sync_frame_start, start_frame, FS)
accurate_datetime_sync_start = sec2datetime(accurate_sec_sync_start - 5, start_time_comb)
unix_start = int(accurate_datetime_sync_start.timestamp())

## src

### detectors/detector_zoo.py
This script provide a bunch of models for Anomaly/Event Detection. The models are all from [scikit-learn](https://scikit-learn.org/stable/).   
• IsolationForest  
• OneClassSVM  
• **LocalOutlierFactor**  (current best method I tested)  
• SGDOneClassSVM


In [None]:
from sklearn.ensemble import IsolationForest
from sklearn.linear_model import SGDOneClassSVM
from sklearn.kernel_approximation import Nystroem
from sklearn.neighbors import LocalOutlierFactor
from sklearn import svm

from sklearn.pipeline import make_pipeline

from nixtla import NixtlaClient

In [None]:
def get_model(sensor_config):
    model_name = sensor_config['model_name']
    if model_name == 'IsolationForest':
        model = IsolationForest(contamination=0.0001, random_state=42)
    elif model_name == 'OneClassSVM':
        model = svm.OneClassSVM(nu=0.0001, kernel="rbf", gamma=0.1)
    elif model_name == 'LocalOutlierFactor':
        model = LocalOutlierFactor(n_neighbors=35, contamination=0.05)
    elif model_name == 'SGDOneClassSVM':
        model = make_pipeline(
            Nystroem(gamma=0.1, random_state=42, n_components=100),
            SGDOneClassSVM(
                nu=0.01,
                shuffle=True,
                fit_intercept=True,
                random_state=42,
                tol=1e-6,
            )
        )
    elif model_name == 'TimeGPT':
        nixtla_client = NixtlaClient(
            # defaults to os.environ.get("NIXTLA_API_KEY")
        )
        
        
        return nixtla_client
    else: return NotImplementedError

    return model

### detectors/event_detector.py 
This script leverages the chosen model from last step to `fit_predict` input features to calculate the anomaly score (if an event is detected).  

The input `window` is the fixed size deque of lastest data. Since `time` and `timestamp` is not relevant for event detection, we drop these two columns first. We can simly call `fit_predict` to get if an event is detected since this is a common functino across all sklearn model.   

In [None]:
def get_detector(model_name : str):
    if model_name == "TimeGPT":
        return detector_timeGPT
    else: return detector_sk

def detector_sk(window, model):
    """
    detector for using sklearn model
    """
    features = pd.DataFrame(list(window))
    features = features.drop(columns=['time', 'timestamp'], errors='ignore')
    anomaly_scores = model.fit_predict(features)
    anomaly_scores = (anomaly_scores == -1).astype(int)

### evaluation/metric.py
The timestamp is manually shifted first and synchronized. The evaluation in this script is based on the difference between the synchronized timestamp and the original timestamp.  

There are 2 metrics I implemented: **RMSE** and **MAE**.  
• $\text{RMSE} = \sqrt{\frac{1}{n} \sum_{i=1}^{n} (y_i - \hat{y}_i)^2}$  
• $\text{MAE} = \frac{1}{n} \sum_{i=1}^{n} |y_i - \hat{y}_i|$  
$y_i$ is the ground truth timestamp for $i$th data point; $\hat{y}_i$ is the synchronized timestamp for $i$th data point



In [None]:
def RMSE(corrected_df, gt_df):
    corrected_time = corrected_df['time'].reset_index(drop=True)
    gt_time = gt_df['time'].reset_index(drop=True)
    
    time_error = (corrected_time - gt_time).dt.total_seconds()
    rmse = np.sqrt((time_error ** 2).mean())
    return rmse


def MAE(corrected_df, gt_df):
    corrected_time = corrected_df['time'].reset_index(drop=True)
    gt_time = gt_df['time'].reset_index(drop=True)
    
    time_error = (corrected_time - gt_time).dt.total_seconds()
    mae = time_error.abs().mean()
    return mae

### evaluation/evaluation.py
This function calls the evaluation metric (RMSE & MAE).  

Input parameters:
• *updated_dfs*: The synchronized dataframe with synchronized timestamp  
• *gt_dfs*: The ground truth dataframe with groud truth timestamp  

However, since the *updated_dfs* is reading from threading, which is not a datetime object, we have to make extra steps that force casting the type of `time` to a datetime object. Check line 5 & 6 in below cell


In [None]:
def run_eval(updated_dfs, gt_dfs):
    for sensor_name in gt_dfs.keys():
        print(f"sensor_name: {sensor_name}")

        updated_dfs[sensor_name]['time'] = pd.to_datetime(updated_dfs[sensor_name]['time'], errors='coerce')
        updated_dfs[sensor_name]['time'] = updated_dfs[sensor_name]['time'].dt.tz_convert(tzinfo)
        rmse = RMSE(updated_dfs[sensor_name], gt_dfs[sensor_name])
        mae = MAE(updated_dfs[sensor_name], gt_dfs[sensor_name])

        print(f"RMSE of Sensor {sensor_name}: {rmse}")
        print(f"MAE of Sensor {sensor_name}: {mae}")

### data_benchmark.py
This scripts aims to benchmark the recordings and reset all the timestamp from 1970.01.01 so that the data is anonymous.

The script hold a similar structure and logic as in `utils/unzipper.py` above as it also requires to find the exact starting/ending timestamp. But here are some extra steps to handle.

1. Find the reference timestamp (sensomat data). Find the 0th index timestamp point as the reference starting time (line 6).    
```bash
1 gt_mat_rec = gt_mat_rec.sort_values(by='time')
2 gt_mat_rec_idx = gt_mat_rec[(gt_mat_rec['time'] >= unix_start) & (gt_mat_rec['time'] <= unix_end)].index
3 gt_mat_rec = gt_mat_rec.loc[gt_mat_rec_idx]
4 
5 gt_senso_time = gt_mat_rec['time'].to_numpy()
6 gt_starting_time = gt_senso_time[0]
```  

2. Iterate all the csv files (even include reference sensor recording) in this recording. We will subtract the starting time by the `gt_starting_time` to reset the timestamp (line 5).  
```bash
1 csv_df = csv_df.sort_values(by='time')
2 csv_df_idx = csv_df[(csv_df['time'] >= unix_start) & (csv_df['time'] <= unix_end)].index
3 csv_df = csv_df.loc[csv_df_idx]
4 
5 csv_df['time'] = csv_df['time'] - gt_starting_time
```

### signal_sync.py
This function is to project the input shifted dataframe timestamp to reference timestamp. The main idea is to normalize the shifted dataframe to 0~1 and project the result from starting time.   

Once an event is detected from any sensors (except the reference sensor), run this function and synchronized the time to all sensors (except the reference sensor).  

Signal Sync applies on a certain time range. Time range is extracted in bewtween the previous detected event time and current detected event time.   

Input param  
• *ref_name*: Name of reference sensor name
• *dfs*: The dataframe that stores the loading data  
• *prev_frame*: The prev_frame that detected an event  
• *frame*: The current frame that detected an event   

The synchrnization runs between two frames.

```bash
1 df_normalized_times = (
2         df_frame['time'].apply(lambda x: x.timestamp() if pd.notnull(x) else np.nan) - df_start_timestamp)
3       / (df_end_timestamp - df_start_timestamp)
4
5 aligned_timestamps = ref_start_timestamp + df_normalized_times * (ref_end_timestamp - ref_start_timestamp)
```

The steps are:

1. Normalize the shifted signal from 0 to 1
$$ \text{normalized\_time} = \frac{\text{timestamp} - \text{start time}}{\text{end time} - \text{start time}} $$

2. Interpolate the normalized signals by reference time
$$ \text{aligned\_time} = \text{ref start time} + (\text{normalized\_time} \times (\text{ref end time} - \text{ref start time})) $$


In [None]:
def synchronization(
        ref_name : str, 
        dfs : dict, 
        prev_frame : int, 
        frame : int):
    
    ref = dfs[ref_name]
    ref_frame = ref.iloc[prev_frame : frame + 1]

    if ref_frame.empty:
        print("Reference frame is empty. Skipping synchronization.")
        return dfs, prev_frame

    ref_start_time = ref_frame['time'].iloc[0]
    ref_end_time = ref_frame['time'].iloc[-1]

    ref_start_timestamp = ref_start_time.timestamp()
    ref_end_timestamp = ref_end_time.timestamp()

    for i, (sensor_name, df) in enumerate(dfs.items()):
        if sensor_name == ref_name: continue
        df_frame = df.iloc[prev_frame : frame + 1]
        if df_frame.empty:
            print(f"DataFrame for {sensor_name} is empty. Skipping synchronization for this sensor.")
            continue

        df_start_time = df_frame['time'].iloc[0]
        df_end_time = df_frame['time'].iloc[-1]

       
        df_start_timestamp = df_start_time.timestamp()
        df_end_timestamp = df_end_time.timestamp()

        # normalize the shifted timestamp to 0~1               
        df_normalized_times = (
            df_frame['time'].apply(lambda x: x.timestamp() if pd.notnull(x) else np.nan) - df_start_timestamp
        ) / (df_end_timestamp - df_start_timestamp)

        # Drop NaN values
        df_normalized_times = df_normalized_times.dropna()

        if len(df_normalized_times) == 0:
            print(f"DataFrame for {sensor_name} after normalization is empty. Skipping synchronization for this sensor.")
            continue

        aligned_timestamps = ref_start_timestamp + df_normalized_times * (ref_end_timestamp - ref_start_timestamp)

        aligned_times = pd.to_datetime(aligned_timestamps, unit='s')
        aligned_times = aligned_times.dt.tz_localize('UTC').dt.tz_convert(tzinfo)

        df.loc[prev_frame : frame, 'time'] = aligned_times
        df.loc[prev_frame : frame, 'timestamp'] = aligned_timestamps
        
    prev_frame = frame
    return dfs, prev_frame

### DataReceiver.py
The class captures all the data loading from multi-threading function. 

####  class members
The class members can be divided into two parts: data members and plotting members
1. Data members  
- self.window_size: dtype: map; window size of each sensor  
- self.dataframe: dtype: map; empty *pd.DataFrame* for each sensor  
- self.window: dtype: map; empty *deque* with specified window_size for each sensor  
- self.anomaly_scores: dtype: map; empty *list* for each sensor  
- self.ref_sensor: name of reference sensor, which is "sensomat"
- self.model: dtype: map; event detection model for each sensor
- self.detector: dtype: map; the corresponding detector for each sensor in respond to the model
- self.columns_map: dtype: map; the columns that liek to keep for each sensor

2. plotting members
- self.lines_map: the map store the corresponding line of each sensor and anomaly score
- self.prev_timeframe: previous timestamp
- self.axes: axis that stores for each subplot
- self.fig: plt.fig of current figure
- self.ani: object of FuncAnimation 

In [None]:
class DataReceiver:
    def __init__(self, 
                 sensors_config: Dict[str, Dict[str, Any]],
                 plotting: bool,
                 df_map: Dict[str, Any]):
        self.sensors_config = sensors_config
        self.sensors = list(sensors_config.keys())
        self.df_map = df_map

        self.window_size = {sensor: self.sensors_config[sensor]['window_size'] for sensor in self.sensors}
        self.dataframe = {sensor: pd.DataFrame(columns=self.sensors_config[sensor]['columns']) for sensor in self.sensors}
        self.window = {sensor: deque(maxlen=self.sensors_config[sensor]['window_size']) for sensor in self.sensors}
        self.anomaly_scores = {sensor: [] for sensor in self.sensors}
        self.ref_sensor = 'mat'
        self.model = {sensor: get_model(self.sensors_config[sensor]) for sensor in self.sensors}
        self.detector = {sensor: get_detector(self.sensors_config[sensor]['model_name']) for sensor in self.sensors}
        self.columns_map = {sensor: self.sensors_config[sensor]['columns'] for sensor in self.sensors}
        self.lock = threading.Lock()  # Ensure thread-safe operations

        # Plotting members
        self.lines_map = defaultdict(list)
        self.prev_timeframe = 0
        self.axes = None
        self.fig = None
        self.ani = None
        self.all_data_streams_completed = threading.Event() ## ADD
        self.animation_running = True
        if plotting:
            self.init_plot()

#### DataReceiver.init_plot()
This function initialize plotting. It sets up the figure to plot the data. Evey sensor has two subplots (*so the number of subplot is len(sensors) * 2*). The even plot plot (0-indexed) always plot the anomaly score and odd plot plot (0-indexed) the data, e.g. accX, accY, accZ. 

In [None]:
def init_plot(self):
    fig, axes = plt.subplots(len(self.sensors) * 2, 1, figsize=(12, 10), sharex=False)
    color_map = plt.get_cmap('tab10')

    for i, (sensor_name, columns) in enumerate(self.columns_map.items()):
        line, = axes[2 * i].plot([], [], label=f'Anomaly Score of {sensor_name}', color='red', marker='.')
        self.lines_map[sensor_name].append((line))
        axes[2 * i].set_ylabel('Anomaly Score')
        axes[2 * i].legend()
        axes[2 * i].grid(True)

        for j, c in enumerate(columns[2:]):
            color_idx = (i * len(columns) + j) % color_map.N
            line_temp, = axes[2 * i + 1].plot([], [], label=c, color=color_map(color_idx), marker='.')
            self.lines_map[sensor_name].append((line_temp))

        axes[2 * i + 1].set_xlabel('Time')
        axes[2 * i + 1].set_ylabel(sensor_name)
        axes[2 * i + 1].legend()
        axes[2 * i + 1].grid(True)

    plt.subplots_adjust(bottom=0.3)
    plt.suptitle('Live Data Streaming with Time Drift')

    self.axes = axes
    self.fig = fig

#### DataReceiver.update_data(self, sensor_name: str, sensor_data: pd.DataFrame)
Input param
- sensor_name: name of the current sensor that loading the data 
- sensor_data: the latest sensor_data loading  
New data needs to concatenate back to the orginal `dataframe` and append to the end of `window`

In [None]:
def update_data(self, sensor_name: str, sensor_data: pd.DataFrame):
    with self.lock:
        sensor_data = sensor_data.T
        sensor_data = sensor_data[[*self.columns_map[sensor_name]]]
        self.dataframe[sensor_name] = pd.concat([self.dataframe[sensor_name], sensor_data], ignore_index=True)
        self.window[sensor_name].append(sensor_data.iloc[0])

#### DataReceiver.event_detect(self, sensor_name: str)
Input param
- sensor_name: name of the current sensor  

If the window is greater than the threshold window size of this sensor, then run the `detector` and append latest anomaly score, else no event is detected.  

0 in anomaly score is no event detected; 1 means an event is detected

In [3]:
def event_detect(self, sensor_name: str):
    with self.lock:
        cur_window = self.window[sensor_name]
        if len(cur_window) == self.window_size[sensor_name]:
            predicted_score = self.detector[sensor_name](cur_window, self.model[sensor_name])
            self.anomaly_scores[sensor_name].append(predicted_score[-1])
        else:
            self.anomaly_scores[sensor_name].append(0)

#### DataReceiver.update_plot(self, frame)
For each sensor, if the `anomaly score` is changed, either from no event to detected event or vice versa. Run the script of `synchronization()`. Then update the plot by only using the lasest certain numbers of values.  

If all data has been loaded, then run the `synchronization()` last time and run `evaluation`.  

In [None]:
def update_plot(self, frame):
    if not self.animation_running:
        return 

    with self.lock:
        for s in self.sensors:
            cur_a_score = self.anomaly_scores[s]
            # when anomaly score is changed, synchronize the time for all sensor
            if len(cur_a_score) >= 2 and cur_a_score[-1] != cur_a_score[-2]:
                # TODO: Ensure synchronization is correct
                self.dataframe, self.prev_timeframe = synchronization(self.ref_sensor, self.dataframe, self.prev_timeframe, frame)
                
        for i, (sensor_name, columns) in enumerate(self.columns_map.items()):
            if len(self.dataframe[sensor_name]) > 0:  # Ensure there is data
                self.lines_map[sensor_name][0].set_data(self.dataframe[sensor_name]['time'][-100:], self.anomaly_scores[sensor_name][-100:])
                for j, c in enumerate(columns[2:]):
                    self.lines_map[sensor_name][j + 1].set_data(self.dataframe[sensor_name]['time'][-100:], self.dataframe[sensor_name][c][-100:])

                if not self.dataframe[sensor_name].empty:
                    self.axes[2 * i].set_xlim(self.dataframe[sensor_name]['time'][-100:].min(), self.dataframe[sensor_name]['time'][-100:].max())
                    self.axes[2 * i].set_ylim(-1.2, 1.2)

                    self.axes[2 * i + 1].set_xlim(self.dataframe[sensor_name]['time'][-100:].min(), self.dataframe[sensor_name]['time'][-100:].max())
                    self.axes[2 * i + 1].set_ylim(self.dataframe[sensor_name][[*columns[2:]]][-100:].min().min() - 0.1, self.dataframe[sensor_name][[*columns[2:]]][-100:].max().max() + 0.1)

    if self.all_data_streams_completed.is_set():
        print("All data streams have completed. Stopping animation.")
        self.animation_running = False
        for s in self.sensors:
            cur_a_score = self.anomaly_scores[s]
            # TODO: Ensure synchronization is correct
            self.dataframe, self.prev_timeframe = synchronization(self.ref_sensor, self.dataframe, self.prev_timeframe, frame)
        run_eval(self.dataframe, self.df_map)


## config
This folder only includes one file, which structures the config for the DataReceiver class member. Hence, we can very easily add/delete/change the some configurations of sensors.

For instance, we can tune window size, model and its corresponding parameter for specific sensor

```
config/.yaml

sensors:
  - name: cos
    file: /path/to/cos_acc.csv
    columns: 
      - time
      - timestamp
      - acc_x
      - acc_y
      - acc_z
    model_name: LocalOutlierFactor
    parameters:
      n_neighbors: 35
      contamination: 0.01
    window_size: 100
  - name: cor
    file: /path/to/cos_acc.csv
    columns: 
      - time
      - timestamp
      - accX
      - accY
      - accZ
    model_name: LocalOutlierFactor
    parameters:
      n_neighbors: 35
      contamination: 0.01
    window_size: 100
  - name: mat
    file: /path/to/mat.csv
    columns: 
      - time
      - timestamp
      - device1_value4
      - device1_value5
      - device1_value9
    model_name: LocalOutlierFactor
    parameters:
      n_neighbors: 35
      contamination: 0.01
    window_size: 100
  - name: viva_acc
    file: /path/to/viva_hr.csv
    columns: 
      - time
      - timestamp
      - x
      - y
      - z
    model_name: LocalOutlierFactor
    parameters:
      n_neighbors: 35
      contamination: 0.01
    window_size: 100
plotting: true

```