# Transformers and Pipelines test on DatasetList object

In this notebook we check the `caits.transformers` and Sklearn Pipelines consisting of `caits.transformers`.

## Importing libraries

In [None]:
import numpy as np
from caits.filtering import filter_butterworth
from caits.fe import mean_value, std_value, stft, melspectrogram, istft, central_moments, mfcc_mean
from caits.dataset._dataset3 import CoreArray, DatasetList
from caits.properties import magnitude_signal
from caits.transformers._func_transformer_v2 import FunctionTransformer
from caits.transformers._feature_extractor_v2 import FeatureExtractorSignal
from caits.transformers._feature_extractor_scalar import FeatureExtractorScalar
from caits.transformers._func_transformer_2d_v2 import FunctionTransformer2D
from caits.transformers._feature_extractor_2d_v2 import FeatureExtractorSpectrum
from sklearn.preprocessing import StandardScaler

## Dataset loading

For this notebook we will use the data/GestureSet_small dataset.

In [None]:
from caits.loading import csv_loader

data = csv_loader("data/GestureSet_small")


In [None]:
X, y, id = data["X"], data["y"], data["id"]
caitsX = [CoreArray(values=x.values, axis_names={
    "axis_1": {
        col: i for i, col in enumerate(x.columns)
    }
}) for x in X]
type(caitsX[0]), type(y[0]), type(id[0])


In [None]:
datasetListObj = DatasetList(caitsX, y, id)
datasetListObj

## FunctionTransformer

This transformer is mainly used for transforming the `X` attribute of the `DatasetList` object into a list of `CaitsArray`s with the shape maintained.

We test the `caits.transformer.FunctionTransformer` using the `caits.fe.filter_butterworth` function.

In [None]:
functionTransformer = FunctionTransformer(filter_butterworth, fs=200, filter_type='highpass', cutoff_freq=50)
transformedList = functionTransformer.fit_transform(datasetListObj)
transformedList

In [None]:
datasetListObj.X[0].values

In [None]:
transformedList.X[0].values

## FeatureExtractor

This transformer is mainly used for extracting single values per column or per row (if axis=1) for each instance of `DatasetList.X`.

We test the `caits.transformer.FeatureExtractor` using the `caits.fe.mean_value` and `caits.fe.std_value`.

In [None]:
featureExtractor = FeatureExtractorScalar([
    {
        "func": mean_value,
        "params": {}
    },
    {
        "func": std_value,
        "params": {
            "ddof": 0
        }
    }
])


In [None]:
tmp = featureExtractor.fit_transform(datasetListObj)
tmp.X

## FeatureExtractor2D

This transformer is mainly used for extracting 2D features per column of each instance of `DatasetList.X`.

We test this using the `caits.fe.melspectrogram` and `caits.fe.stft`.
Applying each of these functions will transform each 2D `CaitsArray` of `DatasetList.X` into a 3D `CaitsArray`.

In [None]:
featureExtractor2D = FeatureExtractorSpectrum(melspectrogram, n_fft=10, hop_length=10)
tmp = featureExtractor2D.fit_transform(datasetListObj)

In [None]:
tmp.X

In [None]:
featureExtractor2D = FeatureExtractorSpectrum(stft, n_fft=10, hop_length=10)
tmp = featureExtractor2D.fit_transform(datasetListObj)

In [None]:
tmp.X[923].values.shape

## FunctionTransformer2D

This is mainly used to inverse the `featureExtractor2D` process. So, if `DatasetList.X` is a list of 3D `CaitsArray` objects, it will be
transformed in a list of 2D `CaitsArray`.

To test this we use the `caits.fe.istft` on the transformed `DatasetList` object using `caits.fe.stft`.

In [None]:
functionTransformer = FunctionTransformer2D(istft, hop_length=10)
tmp = functionTransformer.fit_transform(tmp)

In [None]:
tmp.X[100].shape, datasetListObj.X[100].shape

## SlidingWindow

This is used for performing the sliding window process in each instance of the `DatasetList` object.

The final windows will be appended in a single `DatasetList` object.

In [None]:
from caits.transformers._sliding_window_v2 import SlidingWindow

slidingWindow = SlidingWindow(window_size=10, overlap=5)
tmp = slidingWindow.fit_transform(datasetListObj)

In [None]:
tmp.X[0]

In [None]:
len(tmp.X), len(tmp.y), len(tmp._id)

## AugmentSignal

This is used for augmenting a `DatasetList` dataset, by processing the instances of the original dataset and appending them to a new `DatasetList` object.
This process can be repeated for a number of times, if desired.

As a use case, we add white noise and then performing time warping to each instance of the dataset after sliding window is performed. This process
is repeated two times, so if our original dataset has `N` instances, the resulting dataset will be consisting of `3*N` instances.

In [None]:
from caits.transformers._augment_singal import AugmentSignal
from caits.augmentation import add_noise_ts, time_warp_ts

augmentation_transformer = AugmentSignal(
    [
        {
            "func": time_warp_ts,
            "params": {
                "n_speed_change": 4
            }
        },
        {
            "func": add_noise_ts,
            "params": {
                "loc": 0,
                "scale": 1,
            }
        }
    ],
)


In [None]:
# augmented_tmp = augmentation_transformer.fit_transform(tmp)

In [None]:
# tmp, augmented_tmp


In [None]:
len(tmp.X), len(tmp.y), len(tmp._id)


In [None]:
# len(augmented_tmp.X), len(augmented_tmp.y), len(augmented_tmp._id)


In [None]:
# tmp.X[0], augmented_tmp.X[0]


In [None]:
# augmented_tmp.X[0], augmented_tmp.X[20178]


## DatasetToArray

This is used for transforming `DatasetList.X` attribute to a single `np.array`.

In this case, each window will be flattened and then all windows will be stacked in a single `np.array`, where each row is a
flattened window.

In [None]:
from caits.transformers._data_converters_v2 import DatasetToArray

dataFlatten = DatasetToArray(flatten=True, dtype=np.float64)

dataFlatten.fit(tmp)


In [None]:
tmp_conv = dataFlatten.transform(tmp)
tmp_conv

In [None]:
tmp_conv.X

## ArrayToDataset

This is mainly used to transform `DatasetList.X`, which is a single `np.array` in a list of `CaitsArrays` reshaped.

In this case we inverse the previous step, taking each flattened window (row of the `CaitsArray) and transforming it
in a 2D `CaitsArray`, and then placing them in a list.

In [None]:
from caits.transformers._data_converters_v2 import ArrayToDataset

shape = tmp.X[0].shape

dataInverseFlatten = ArrayToDataset(
    shape=shape,
    dtype=np.float64,
    axis_names={"axis_1": tmp.X[0].axis_names["axis_1"]}
)

dataInverseFlatten.fit(tmp_conv)

tmp_conv_inv = dataInverseFlatten.transform(tmp_conv)


In [None]:
tmp_conv_inv.X[0]


## SklearnWrapper

This is mainly used to wrap various `sklearn.transformers` in a way where they internally process a `np.array`, but by
always inserting and returning a `DatasetList` object. This is a necessary concept for using `sklearn` capabilities, without
losing the structure and attributes of the various `DatasetList` objects.

In this case, we test this using `sklearn.preprocesssing.StandardScaler` on the flattened `DatasetList` object.

In [None]:
from caits.transformers._sklearn_wrapper import SklearnWrapper
scaler = SklearnWrapper(StandardScaler)

tmp_flat_scaled = scaler.fit_transform(tmp_conv)

In [None]:
scaler.fitted_transformer_.mean_.shape, scaler.fitted_transformer_.var_.shape

In [None]:
# tmp_flat_scaled.X[0].shape, len(tmp_flat_scaled.y), len(tmp_flat_scaled._id)
tmp_flat_scaled.X

## Pipelines

In this subsection we will test the `sklearn.pipeline.Pipeline` using the `caits.transformer`s.



### Use case 1

1) the original `DatasetList` object is split into train and test parts
2) a pipeline is constructed that performs flattening, standard scaling and unflattening
3) the train set is fit in the pipeline and transformed
4) the test set is transformed using this pipeline


In [None]:
tmp_train, tmp_test, = tmp.train_test_split(random_state=42)
tmp_train, tmp_test, type(tmp_train.X[0]), type(tmp_test.X[0])

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, FunctionTransformer, MinMaxScaler

pipeline = Pipeline(
    [
        ("conv", dataFlatten),
        ("scaler", scaler),
        ("conv_inv", dataInverseFlatten),
    ]
)

In [None]:
pipeline.fit(tmp_train)

In [None]:
pipeline.named_steps["scaler"].fitted_transformer_.mean_

In [None]:
final_train = pipeline.fit_transform(tmp_train)
final_test = pipeline.transform(tmp_test)

In [None]:
final_train, final_test

In [None]:
final_train.X[0].shape, final_test.X[0].shape

In [None]:
len(final_train.y), len(final_test.y)

In [None]:
print(pipeline.named_steps['scaler'].fitted_transformer_.mean_.shape)
print(pipeline.named_steps['scaler'].fitted_transformer_.var_.shape)


In [None]:
tmp_train.X[0]

In [None]:
final_train.X[0]

In [None]:
from caits.visualization import plot_signal

plot_signal(tmp_train.X[0].values, return_mode=False)

In [None]:
plot_signal(final_train.X[0].values, return_mode=False)


In [None]:
plot_signal(tmp_test.X[0].values, return_mode=False)


In [None]:
plot_signal(final_test.X[0].values, return_mode=False)


### Use Case 2

1) the original `DatasetList` object is split into train and test parts (already done before)
2) a pipeline is constructed that performs flattening, standard scaling and PCA
3) the train set is fit in the pipeline and transformed
4) the test set is transformed using this pipeline


In [None]:
from sklearn.decomposition import PCA

pipeline2 = Pipeline(
    [
        ("conv", dataFlatten),
        ("scaler", SklearnWrapper(StandardScaler)),
        ("pca", SklearnWrapper(PCA, {"n_components": 2})),
    ]
)

tmp_pca_train = pipeline2.fit_transform(tmp_train)
tmp_pca_test = pipeline2.transform(tmp_test)

In [None]:
tmp_pca_train, tmp_pca_test, tmp_pca_train.X.shape, tmp_pca_test.X.shape

In [None]:
def plot_scatter_pca(
        arr: np.ndarray,
        c_name: str="y",
        cmap_set: str = "plasma"

):
    import matplotlib.pyplot as plt

    if arr.ndim == 2:
        plt.style.use('classic')
        plt.figure(figsize=(16, 8))
        plt.scatter(arr[:, 0], arr[:, 1], c=c_name, cmap=cmap_set)
        plt.xlabel('First principal component')
        plt.ylabel('Second Principal Component')

    elif arr.ndim == 3:
        plt.style.use('classic')
        fig = plt.figure(figsize=(16, 8))
        ax = fig.add_subplot(111, projection='3d')
        ax.scatter(arr[:, 0], arr[:, 1], arr[:, 2], c=c_name, cmap=cmap_set)
        ax.set_xlabel('First principal component')
        ax.set_ylabel('Second Principal Component')
        ax.set_zlabel('Third Principal Component')

    else:
        print("The DataFrame has more than 4 columns.")


In [None]:
plot_scatter_pca(tmp_pca_train.X.values, cmap_set="viridis")

In [None]:
plot_scatter_pca(tmp_pca_test.X.values, cmap_set="viridis")

### Use Case 3

1) the original `DatasetList` object is split into train and test parts (already done before)
2) a pipeline is constructed that performs flattening, normalization, quantile transform and unflattening.
3) the train set is fit in the pipeline and transformed
4) the test set is transformed using this pipeline


In [None]:
from sklearn.preprocessing import MinMaxScaler, QuantileTransformer

# kwargs_dict = {
#     "output_distribution": "uniform",
#     "n_quantiles": 100
# }

pipe_sklearn = Pipeline(
    [
        ("flatten", dataFlatten),
        ("scaler", SklearnWrapper(MinMaxScaler)),
        ("quantile", SklearnWrapper(QuantileTransformer, {"output_distribution": "uniform", "n_quantiles": 100})),
        # ("quantile", SklearnWrapper(QuantileTransformer, **kwargs_dict)),
        # ("pca", SklearnWrapper(PCA, n_components=2)),
        ("unflatten", dataInverseFlatten)
    ]
)

In [None]:
train = pipe_sklearn.fit_transform(tmp_train)
test = pipe_sklearn.transform(tmp_test)

In [None]:
plot_signal(tmp_train.X[0].values, return_mode=False)

In [None]:
plot_signal(train.X[0].values, return_mode=False)

In [None]:
plot_signal(tmp_test.X[0].values, return_mode=False)

In [None]:
plot_signal(test.X[0].values, return_mode=False)

## ColumnTransformer

In [None]:
from caits.transformers._column_transformer import ColumnTransformer

dataFlatten = DatasetToArray(flatten=True)
dataInverseFlattenAcc = ArrayToDataset(shape=(10,3))
dataInverseFlattenGyr = ArrayToDataset(shape=(10,3))

pipe1 = Pipeline(
    [
        ("flatten", dataFlatten),
        ("scaler", SklearnWrapper(MinMaxScaler)),
        ("unflatten", dataInverseFlattenAcc)
    ]
)

pipe2 = Pipeline(
    [
        ("flatten", dataFlatten),
        ("scaler", SklearnWrapper(StandardScaler)),
        ("unflatten", dataInverseFlattenGyr)
    ]
)

column_tr = ColumnTransformer(
    [
        ("acc_pipe", pipe1, ["acc_x_axis_g", "acc_y_axis_g", "acc_z_axis_g"], ["new_acc_x", "new_acc_y", "new_acc_z"]),
        ("gyr_pipe", pipe2, ["gyr_x_axis_deg/s", "gyr_y_axis_deg/s", "gyr_z_axis_deg/s"], ["new_gyr_x", "new_gyr_y", "new_gyr_z"])
    ],
    unify=False
)


In [None]:
tmp_train.X

In [None]:
column_tr.fit_transform(tmp_train)


In [None]:
column_tr.transformations_[0][1].named_steps

In [None]:
train_col_tr = column_tr.transform(tmp_train)
train_col_tr, train_col_tr.X[0]

In [None]:
test_col_tr = column_tr.transform(tmp_test)
test_col_tr, test_col_tr.X[0]

## Big test

In [None]:
from caits.filtering import filter_median_gen
from caits.transformers._func_transformer_v2 import FunctionTransformer
from caits.transformers._sklearn_wrapper import SklearnWrapper
from caits.transformers._column_transformer import ColumnTransformer

pipe_filter = Pipeline(
    [
        ("median", FunctionTransformer(filter_median_gen, window_size=10)),
        ("butterworth", FunctionTransformer(filter_butterworth, fs=10, filter_type='highpass', cutoff_freq=2))
    ]
)

pipe_scaler = Pipeline(
    [
        ("flatten", DatasetToArray(flatten=True)),
        ("scaler", SklearnWrapper(StandardScaler)),
        ("unflatten", ArrayToDataset(shape=(20,3))),
    ]
)

mag_tr = FeatureExtractorSignal(
    [
        {
            "func": magnitude_signal,
            "params": {
                "axis": 1
            }
        }
    ], axis=1
)

column_tr1 = ColumnTransformer(
    [
        ("filter_acc_x_gyr_x", pipe_filter, ["acc_x_axis_g", "gyr_x_axis_deg/s"], ["new_acc_x", "new_gyr_x"]),
        ("filter_acc_y_gyr_y", pipe_filter, ["acc_y_axis_g", "gyr_y_axis_deg/s"], ["new_acc_y", "new_gyr_y"]),
    ],
    unify=False
)

column_tr2 = ColumnTransformer(
    [
        ("scale_acc_x_acc_y_acc_z", pipe_scaler, ["acc_x_axis_g", "acc_y_axis_g", "acc_z_axis_g"], ["new_acc_x", "new_acc_y", "new_acc_z"]),
    ],
    unify=True
)

column_tr3 = ColumnTransformer(
    [
        ("mag_calc_1", mag_tr, ["acc_x_axis_g", "acc_y_axis_g", "acc_z_axis_g"], ["mag_acc"]),
        ("mag_calc_2", mag_tr, ["gyr_x_axis_deg/s", "gyr_y_axis_deg/s", "gyr_z_axis_deg/s"], ["mag_gyr"]),
        ("mag_calc_3", mag_tr, ["new_acc_x", "new_acc_y", "new_acc_z"], ["new_mag_gyr"]),
    ],
    unify=True
)

final_pipe = Pipeline(
    [
        ("filter", column_tr1),
        ("scale", column_tr2),
        ("mag", column_tr3),
    ]
)


In [None]:
from caits.transformers._sliding_window_v2 import SlidingWindow

sw_transformer = SlidingWindow(window_size=20, overlap=5)
sw_data = sw_transformer.fit_transform(datasetListObj)
tmp_train, tmp_test = sw_data.train_test_split(test_size=0.2)
tmp_train, tmp_test


In [None]:
final_train = final_pipe.fit_transform(tmp_train)
final_train.X[0]

## Big test

In [None]:
from caits.fe import (
mean_value,
std_value,
variance_value,
kurtosis_value,
dominant_frequency,
max_value,
average_power,
min_value,
energy,
crest_factor,
sample_skewness,
delta,
rms_max,
rms_min,
rms_value,
rms_mean,
zcr_max,
zcr_min,
zcr_value,
zcr_mean,
spectral_bandwidth,
spectral_std,
spectral_kurtosis,
spectral_slope,
spectral_spread,
spectral_rolloff,
spectral_skewness,
spectral_centroid,
spectral_decrease,
spectral_flatness,
median_value,
max_possible_amplitude,
central_moments,
envelope_energy_peak_detection,
spectral_values,
underlying_spectral,
mfcc_mean,
)

scalar_tr = FeatureExtractorScalar(
    [
        {
            "func": mean_value
        },
        {
            "func": std_value
        },
        {
            "func": variance_value
        },
        {
            "func": kurtosis_value
        },
        {
            "func": dominant_frequency,
            "params": {
                "fs": 100
            }
        },
        {
            "func": max_value
        },
        {
            "func": crest_factor
        },
        {
            "func": min_value
        },
        {
            "func": energy
        },
        {
            "func": crest_factor
        },
        {
            "func": average_power
        },
        {
            "func": sample_skewness
        },
        {
            "func": rms_mean,
            "params": {
                "frame_length": 20,
                "hop_length": 10
            }
        },
        {
            "func": rms_value,
        },
        {
            "func": rms_max,
            "params": {
                "frame_length": 20,
                "hop_length": 10
            }
        },
        {
            "func": rms_min,
            "params": {
                "frame_length": 20,
                "hop_length": 10
            }
        },
        {
            "func": zcr_value
        },
        {
            "func": zcr_max,
            "params": {
                "frame_length": 20,
                "hop_length": 10
            }
        },
        {
            "func": zcr_min,
            "params": {
                "frame_length": 20,
                "hop_length": 10
            }
        },
        {
            "func": zcr_mean,
            "params": {
                "frame_length": 20,
                "hop_length": 10
            }
        },
        {
            "func": spectral_bandwidth,
            "params": {
                "fs": 100
            }
        },
        {
            "func": spectral_std,
            "params": {
                "fs": 100
            }
        },
        {
            "func": spectral_kurtosis,
            "params": {
                "fs": 100
            }
        },
        {
            "func": spectral_slope,
            "params": {
                "fs": 100
            }
        },
        {
            "func": spectral_rolloff,
            "params": {
                "fs": 100
            }
        },
        {
            "func": spectral_skewness,
            "params": {
                "fs": 100
            }
        },
        {
            "func": spectral_centroid,
            "params": {
                "fs": 100
            }
        },
        {
            "func": spectral_decrease,
            "params": {
                "fs": 100
            }
        },
        {
            "func": spectral_flatness,
            "params": {
                "fs": 100
            }
        },
        {
            "func": median_value,
        },
        {
            "func": central_moments
        },
        {
            "func": max_possible_amplitude

        },
        {
            "func": delta,
        },
        {
            "func": spectral_spread,
            "params": {
                "fs": 100
            }
        },
    ]
)

In [None]:
scaled_data = scalar_tr.fit_transform(sw_data)

In [None]:
scaled_data.X[0]

In [None]:
scaled = scaled_data.to_numpy(flatten=True)
scaled

In [None]:
scaled[0].shape

In [None]:
flat_scaled = scaled_data.flatten()
flat_scaled

In [None]:
flat_scaled.X