# Is Dynamic Time Warping a useful measure for the impact of perturbations on a TS?

In [None]:
import numpy as np
import pandas as pd
from torch.utils.data import Dataset



class TSDataset(Dataset):
    """Time Series Dataset
    A sample consists of a (random) time window + consecutive time horizon.
    Args:
        df (pd.DataFrame): dataframe containing the data
        file_path (str): path to csv file containing the data
        first_column_is_date (bool): whether the first column is a date column. Only used if df is None and csv.
        input_len (int): length of input sequence
        pred_len (int): length of prediction sequence
        stride (int): stride between samples. Only used if nb_of_samples is None and samples are thus drawn sequentially.
        nb_of_samples (int): number of samples to draw. If None, all possible samples are drawn sequentially. Else, samples are drawn randomly.
        min_vals (np.array): minimum expected values for each feature, used for scaling
        max_vals (np.array): maximum expected values for each feature, used for scaling
        seed (int): seed for reproducibility
    """
    def __init__(self,
                 df=None,
                 file_path=None,
                 first_column_is_date=False,
                 input_len=100,
                 pred_len=20,
                 stride=1,
                 nb_of_samples=None,
                 min_vals=None,
                 max_vals=None,
                 seed=42
                 ):
        super().__init__()
        if df is not None:
            self.df = df
        elif file_path is not None:
            if file_path.endswith(".parquet"):
                self.df = pd.read_parquet(file_path)
            elif file_path.endswith(".csv"):
                self.df = pd.read_csv(file_path)
                if first_column_is_date:
                    self.df.set_index(pd.to_datetime(self.df.iloc[:,0], format="%Y-%m-%d %H:%M:%S"), inplace=True)
                    self.df.drop(self.df.columns[0], axis=1, inplace=True)
            else:
                raise ValueError("File format not supported.")
        else:
            raise ValueError("Either df or file_path must be given.")

        self.input_len = input_len
        self.pred_len = pred_len
        self.stride = stride  # only used if nb_of_samples is None and samples are drawn sequentially

        self.random_sampling = nb_of_samples is not None
        self.nb_of_samples = self.__len__() if nb_of_samples is None else nb_of_samples
        assert self.nb_of_samples <= self.__len__(), "nb_of_samples must be smaller than the number of possible samples."
        self.nb_of_features = self.df.shape[1]

        # Rescale data if min and max values are given
        self.min_vals = min_vals
        self.max_vals = max_vals
        if min_vals is not None and max_vals is not None:
            self.scale_data()

        if seed is not None:
            self.rng = np.random.default_rng(seed)  # Using a local random number generator
        else:
            self.rng = np.random.default_rng()  # Default random generator without a fixed seed

        self.sample_idxs = self._create_sample_indices()
    
    def set_scaler_params(self, min_vals=None, max_vals=None):
        """Set the parameters for scaling the data.
        Args:
            min_vals (np.array): minimum values for each feature
            max_vals (np.array): maximum values for each feature
        """
        self.min_vals = min_vals if min_vals is not None else self.df.min()
        self.max_vals = max_vals if max_vals is not None else self.df.max()

    def scale_data(self):
        """Scale data between min and max values."""
        assert self.min_vals is not None and self.max_vals is not None, "min_vals and max_vals must be set via set_scaler_params() first."
        scale_diff = self.max_vals - self.min_vals
        scale_diff.replace(0, 1.0, inplace=True)  # for features that are constant, set scale to 1

        self.df = (self.df - self.min_vals) / scale_diff

    def inverse_scale_data(self, scaled_data):
        df_ = pd.DataFrame(scaled_data, columns=self.df.columns)
        return df_ * (self.max_vals - self.min_vals) + self.min_vals

    def _create_sample_indices(self):
        """Create an array of indices for sampling"""
        if self.random_sampling:
            sample_idxs = self.rng.integers(low=0, high=self.df.shape[0] - self.input_len - self.pred_len, size=self.nb_of_samples)
        else:
            max_nb_of_samples = int((self.df.shape[0] - self.input_len - self.pred_len) / self.stride) + 1
            sample_idxs = np.arange(max_nb_of_samples) * self.stride
        return sample_idxs

    def __len__(self):
        """Number of samples"""
        if self.random_sampling:
            return self.nb_of_samples
        else:
            return int((self.df.shape[0] - self.input_len - self.pred_len) / self.stride) + 1

    def __getitem__(self, index):
        """Get one sample.
        A sample consists of a time window of length input_len and a consecutive time horizon of length pred_len.
        Returns:
            x (np.array): input sequence
            y (np.array): target sequence
        """
        start_idx = self.sample_idxs[index]
        end_idx = start_idx + self.input_len + self.pred_len
        df_ = self.df.iloc[start_idx:end_idx]
        x = df_.iloc[:self.input_len].values
        y = df_.iloc[self.input_len:].values

        return x, y



In [None]:
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
from scipy.fft import fft, ifft



class NoiseDataset(TSDataset):
    """Add Gaussian noise to the data."""
    def __init__(self,
                 sd=0.01,
                 **kwargs
                 ):
        super().__init__(**kwargs)
        self.noise = self.rng.normal(0, sd, self.df.shape)
        self.df += self.noise


class OffsetDataset(TSDataset):
    """Add a constant offset to a random feature of the data."""
    def __init__(self,
                 offset=0.1,
                 **kwargs
                 ):
        super().__init__(**kwargs)
        self.offset = offset
        feature = self.rng.integers(0, self.nb_of_features)
        self.df.iloc[:, feature] += offset


class DyingSignalDataset(TSDataset):
    """Multiply the a random feature of the data with a constant factor."""
    def __init__(self,
                 magnitude=0.9,
                 **kwargs
                 ):
        super().__init__(**kwargs)
        self.magnitude = magnitude
        feature = self.rng.integers(0, self.nb_of_features)
        self.df.iloc[:, feature] *= magnitude


class TimeWarpedDataset(TSDataset):
    """Warp the time index of the data.
    The time index is warped by a random factor that is sampled from a normal distribution.
    Args:
        warp_factor (float): standard deviation of the normal distribution
    """
    def __init__(self, warp_factor=0.2, **kwargs):
        super().__init__(**kwargs)
        self.warp_factor = warp_factor
        self.df = self.time_warp()

    def time_warp(self):
        # Save the original time index
        original_time_index = pd.to_numeric(self.df.index) if isinstance(self.df.index, pd.DatetimeIndex) else self.df.index.copy()

        # Generate the warped time index
        warp = self.rng.normal(loc=1.0, scale=self.warp_factor, size=len(original_time_index) - 1)
        warp = np.insert(warp, 0, 1)
        warped_time_index = np.cumsum(warp)
        warped_time_index = np.interp(warped_time_index, (warped_time_index[0], warped_time_index[-1]), (original_time_index[0], original_time_index[-1]))

        df_new = pd.DataFrame()
        for feature in self.df.columns:
            # Interpolate each feature from the original time index to the warped time index
            interp_func = interp1d(original_time_index, self.df[feature].values, fill_value='extrapolate')
            df_new[feature] = interp_func(warped_time_index)

        # Handling DateTimeIndex for the new DataFrame
        if isinstance(self.df.index, pd.DatetimeIndex):
            df_new.index = pd.to_datetime(df_new.index)

        return df_new


class HickUpDataset(TSDataset):
    """Add a hick-up fault to the data.
    The fault is a constant value that is applied to a random feature at a random position.
    Args:
        min_hick_up (float): minimum value that is added to the feature
        max_hick_up (float): maximum value that is added to the feature
        fault_probability (float): probability that a fault is applied to a sample
    """
    def __init__(self, min_hick_up=2, max_hick_up=100, fault_probability=1, **kwargs):
        super().__init__(**kwargs)
        self.min_hick_up = min_hick_up
        self.max_hick_up = max_hick_up
        self.fault_probability = fault_probability
        self.fault_mask = self._create_fault_mask()

    def _create_fault_mask(self):
        """Create a mask that simulates a hick-up fault with a range of anomaly values."""
        fault_mask = np.zeros((self.nb_of_samples, self.input_len, self.nb_of_features), dtype=np.float32)
        for i in range(self.nb_of_samples):
            if self.rng.random() < self.fault_probability:
                feature = self.rng.integers(0, self.nb_of_features)
                pos = self.rng.integers(0, self.input_len)
                # Generate a random anomaly value within the specified range
                anomaly_value = self.rng.uniform(self.min_hick_up, self.max_hick_up)
                fault_mask[i, pos, feature] = anomaly_value
        return fault_mask

    def __getitem__(self, index):
        x, y = super().__getitem__(index)
        # Apply fault mask to x
        x_with_fault = x + self.fault_mask[index]
        return x_with_fault, y


class DeadSensorDataset(TSDataset):
    """Simulate a dead sensor.
    A dead sensor is simulated by setting the values of a random feature to zero for a random duration.
    Args:
        min_dead_duration (int): minimum duration of the dead sensor during the input sequence
        dead_probability (float): probability that a sensor is dead
    """
    def __init__(self, min_dead_duration=20, dead_probability=1, **kwargs):
        super().__init__(**kwargs)
        self.min_dead_duration = min_dead_duration
        self.dead_probability = dead_probability
        self.dead_mask = self._create_dead_mask()

    def _create_dead_mask(self):
        """Create a mask that simulates a dead sensor starting at a random position."""
        mask = np.ones((self.nb_of_samples, self.input_len + self.pred_len, self.nb_of_features), dtype=np.float32)
        for i in range(self.nb_of_samples):
            if self.rng.random() < self.dead_probability:
                feature = self.rng.integers(0, self.nb_of_features)
                start_pos = self.rng.integers(0, self.input_len - self.min_dead_duration -  self.pred_len)
                end_pos = self.input_len + self.pred_len
                # Apply the dead sensor effect
                mask[i, start_pos:end_pos, feature] = 0  # TODO rescaling later leads to a value not equal to zero. Change to 'real' 0 insted?
        return mask

    def __getitem__(self, index):
        x, y = super().__getitem__(index)
        # Apply dead mask to x and y
        x_with_dead = x * self.dead_mask[index, :self.input_len, :]
        y_with_dead = y * self.dead_mask[index, self.input_len:, :]
        return x_with_dead, y_with_dead


class FrequencyFilterDataset(TSDataset):
    """Apply a frequency filter to the data.
    The frequency factor is sampled from a uniform distribution and applied to a random frequency band (low, mid, high).
    Args:
        factor_range (tuple): range of the perturbation factor. The factor is sampled from a uniform distribution.
    """
    def __init__(self, factor_range=(0.9, 1.1), **kwargs):
        super().__init__(**kwargs)
        if len(factor_range) != 2 or factor_range[0] > factor_range[1]:
            raise ValueError("factor_range must be a tuple (min_factor, max_factor) with min_factor <= max_factor")
        self.factor_range = factor_range
        self.df = self.apply_frequency_perturbation()

    def apply_frequency_perturbation(self):
        df_new = pd.DataFrame(index=self.df.index)
        band = self.rng.choice([0, 1, 2])
        factor = self.rng.uniform(*self.factor_range)
        for feature in self.df.columns:
            feature_data = self.df[feature].to_numpy()
            df_new[feature] = self.perturb_feature(feature_data, factor, band)
        return df_new

    def perturb_feature(self, data, factor, band):
        freq_data = fft(data)        
        n = len(data)
        half_n = (n + 1) // 2  # Correctly handles both even and odd n

        # Calculate start and end indices for each band
        band_ranges = [(0, half_n//3), (half_n//3, 2*half_n//3), (2*half_n//3, half_n)]
        start, end = band_ranges[band]

        # Apply the perturbation factor to the selected frequency band
        freq_data[start:end] *= factor
        freq_data[-end:-start if start != 0 else None] *= factor

        time_data_altered = ifft(freq_data).real
        return time_data_altered


In [None]:
import numpy as np
import pandas as pd
import pytorch_lightning as pl
from torch.utils.data import Subset, DataLoader



class TSDataModule(pl.LightningDataModule):
    """Data module for time series data
    Args:
        file_path (str): path to csv file containing the data
        first_column_is_date (bool): whether the first column is a date column
        nb_of_train_samples (int): number of train samples to draw. If None, all possible samples are drawn sequentially. Else, samples are drawn randomly.
        nb_of_val_and_test_samples (int): number of val/test samples to draw. If None, all possible samples are drawn sequentially. Else, samples are drawn randomly.
        train_split (float): fraction of training data
        val_split (float): fraction of validation data
        purged_fraction (float): fraction of all data to be purged of the start of the validation and test sets. This is done to avoid leakage.
        batch_size (int): batch size
        num_workers (int): number of workers for dataloader
        pin_memory (bool): pin memory for dataloader
        persistent_workers (bool): persistent workers for dataloader
        seed (int): seed for reproducibility
    """
    def __init__(self,
                 file_path, first_column_is_date=False,
                 nb_of_train_samples=None, nb_of_val_and_test_samples=None,
                 train_split=0.7, val_split=0.15, purged_fraction=0.01,
                 batch_size=64, num_workers=None, pin_memory=True, persistent_workers=False, seed=None):
        super().__init__()

        self.file_path = file_path
        self.first_column_is_date = first_column_is_date

        self.nb_of_features = None # set in setup()
        self.nb_of_train_samples = nb_of_train_samples
        self.nb_of_val_and_test_samples = nb_of_val_and_test_samples

        self.train_split = train_split
        self.val_split = val_split
        self.purged_fraction = purged_fraction

        self.batch_size = batch_size
        self.num_workers = num_workers if num_workers is not None else os.cpu_count()
        self.pin_memory = pin_memory
        self.persistent_workers = persistent_workers

        # Datasets, populated in setup()
        self.ds_train = None
        self.ds_val_dict = {}
        self.ds_test_dict = {}
        
        self.dataset_mapping = {
            'normal': TSDataset,  # 'normal' is the default scenario, i.e. no perturbation is applied
            'noise': NoiseDataset,
            'offset': OffsetDataset,
            'dying_signal': DyingSignalDataset,
            'time_warp': TimeWarpedDataset,
            'hick_up': HickUpDataset,
            'dead_sensor': DeadSensorDataset,
            'frequency_filter': FrequencyFilterDataset
        }
        self.train_scenario = "normal"
        self.test_scenarios = self.dataset_mapping.keys()  # scenarios to be used for validation and testing

        # min values of training data for scaling, they are set in setup()
        self.train_min_vals = None  
        self.train_max_vals = None

        self.seed = seed

    def _get_split_indices(self, len_df):
        train_end_idx = int(len_df * self.train_split)
        val_start_idx = train_end_idx
        val_end_idx = int(len_df * (self.train_split + self.val_split))

        purged_val_start_idx = val_start_idx + int(len_df * self.purged_fraction)
        purged_test_start_idx = val_end_idx + int(len_df * self.purged_fraction)

        # Ensure indices are within the bounds of the dataframe
        assert train_end_idx < len_df
        assert purged_val_start_idx < len_df
        assert val_end_idx < len_df
        assert purged_test_start_idx < len_df

        return train_end_idx, purged_val_start_idx, val_end_idx, purged_test_start_idx

    def setup(self, stage=None) -> None:
        # Read the complete dataset
        if self.file_path.endswith(".parquet"):
            df_all = pd.read_parquet(self.file_path)
        elif self.file_path.endswith(".csv"):
            df_all = pd.read_csv(self.file_path)
            if self.first_column_is_date:
                df_all.set_index(pd.to_datetime(df_all.iloc[:,0], format="%Y-%m-%d %H:%M:%S"), inplace=True)
                df_all.drop(df_all.columns[0], axis=1, inplace=True)
        else:
            raise ValueError("File format not supported.")
            
        self.nb_of_features = df_all.shape[1]

        # Split the dataset into train, val and test
        train_end, purged_val_start, val_end, purged_test_start = self._get_split_indices(len(df_all))
        df_train = df_all.iloc[:train_end, :]
        df_val = df_all.iloc[purged_val_start:val_end, :]
        df_test = df_all.iloc[purged_test_start:, :]

        # Normalize the datasets using the min and max values of the training set
        self.train_min_vals = df_train.min()
        self.train_max_vals = df_train.max()

        # Create the datasets
        self.ds_train = TSDataset(
            df=df_train,
            nb_of_samples=self.nb_of_train_samples,
            min_vals=self.train_min_vals,
            max_vals=self.train_max_vals,
            seed=self.seed
        )
        # Create datasets for each test scenario
        for scenario in self.test_scenarios:
            self.ds_val_dict[scenario] = self.dataset_mapping[scenario](
                df=df_val,
                nb_of_samples=self.nb_of_val_and_test_samples,
                min_vals=self.train_min_vals,
                max_vals=self.train_max_vals,
                seed=self.seed
            )
            self.ds_test_dict[scenario] = self.dataset_mapping[scenario](
                df=df_test,
                nb_of_samples=self.nb_of_val_and_test_samples,
                min_vals=self.train_min_vals,
                max_vals=self.train_max_vals,
                seed=self.seed
            )

    def train_dataloader(self) -> DataLoader:
        return DataLoader(
            self.ds_train,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            pin_memory=self.pin_memory,
            persistent_workers=self.persistent_workers,
            shuffle=True
        )

    def val_dataloader(self) -> DataLoader:
        val_dl_list = [
            DataLoader(
                self.ds_val_dict[scenario],
                batch_size=self.batch_size,
                num_workers=self.num_workers,
                pin_memory=self.pin_memory,
                persistent_workers=self.persistent_workers,
                shuffle=False
                )
            for scenario in self.test_scenarios
        ]
        return val_dl_list

    def test_dataloader(self) -> DataLoader:
        test_dl_list = [
            DataLoader(
                self.ds_test_dict[scenario],
                batch_size=self.batch_size,
                num_workers=self.num_workers,
                pin_memory=self.pin_memory,
                persistent_workers=self.persistent_workers,
                shuffle=False
                )
            for scenario in self.test_scenarios
        ]
        return test_dl_list


In [None]:
dm = TSDataModule(
    file_path="../data/processed/SWaT_Dataset_Normal_v1_sensors.parquet",
    nb_of_train_samples=1000,
    nb_of_val_and_test_samples=100,
    train_split=0.7,
    val_split=0.15,
    batch_size=1,
    num_workers=0,
    seed=42
)
dm.setup()

In [None]:
next(iter(dm.train_dataloader()))[0].shape

In [None]:
dm.ds_val_dict["normal"].df.describe()

In [None]:
dm.train_max_vals["FIT504"]

In [None]:
dm.ds_val_dict["normal"].df["FIT504"].plot()

In [None]:
import plotly.graph_objects as go
import plotly.express as px


def plot_sample(sample, title=None):
    x1, x2 = sample
    x1 = np.squeeze(x1)
    x2 = np.squeeze(x2)

    num_features = x1.shape[1]

    colors = px.colors.sample_colorscale("turbo", [n/(num_features - 1) for n in range(num_features)])

    fig = go.Figure()

    for i in range(num_features):
        color = colors[i]

        # Plot x1 (input)
        fig.add_trace(go.Scatter(
            x=np.arange(x1.shape[0]),
            y=x1[:, i],
            name=f'Feature {i} (Input)',
            mode="lines",
            line=dict(color=color)
        ))

        # Plot x2 (target)
        fig.add_trace(go.Scatter(
            x=np.arange(x1.shape[0], x1.shape[0] + x2.shape[0]),
            y=x2[:, i],
            name=f'Feature {i} (Target)',
            mode="lines",
            line=dict(color=color)
        ))

    fig.add_vline(x=x1.shape[0], line=dict(color="black", width=2, dash="dash"), 
                  annotation_text="Target Start", annotation_position="top right")  # TODO does not work for fcast overview?
    fig.update_xaxes(title_text='Time')
    fig.update_layout(width=800, height=500, title=title, 
                      font_family="Serif", font_size=14,
                      margin=dict(l=5, t=50, b=5, r=5))
    fig.show()


In [None]:
plot_sample(dm.ds_val_dict["normal"][0], title="Normal data, normalized")

In [None]:
plot_sample(dm.ds_val_dict["noise"][0], title="Noise")

In [None]:
dm.ds_val_dict["noise"][0][0].shape

In [None]:
from scipy.spatial.distance import euclidean
from fastdtw import fastdtw

In [None]:
idx = 10
x_norm = dm.ds_val_dict["normal"][idx][0]

scenarios = ["normal", "noise", "offset", "dying_signal", "time_warp", "hick_up", "dead_sensor", "frequency_filter"]

for scenario in scenarios:
    x_f = dm.ds_val_dict[scenario][idx][0]
    distance, path = fastdtw(x_norm, x_f, dist=euclidean)
    print(f"{scenario}: {distance}")

In [None]:
from tqdm import tqdm

dist_df = pd.DataFrame(columns=scenarios)
for i in tqdm(range(len(dm.ds_val_dict["normal"]))):
    x_norm = dm.ds_val_dict["normal"][i][0]
    for scenario in scenarios:
        x_f = dm.ds_val_dict[scenario][i][0]
        distance, path = fastdtw(x_norm, x_f, dist=euclidean)
        dist_df.loc[i, scenario] = distance
dist_df.describe()

In [None]:
# format to float, for some reason the dtypes are object
dist_df = dist_df.astype(float)
dist_df.describe().iloc[[1,2],:]