In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import csv
import itertools
import os
from dataclasses import dataclass
from datetime import datetime

import numpy as np
import pandas as pd
from func_timeout import FunctionTimedOut, func_timeout
from sklearn.metrics import accuracy_score
from sklearn.svm import LinearSVC
from tqdm.notebook import tqdm

In [None]:
import warnings

warnings.filterwarnings("ignore")

In [None]:
files = !(find ../UCRArchive_2018/ -maxdepth 2 -type f -name "*TRAIN.tsv" -exec ls -al {} \; | sort -k 5 -n | sed 's/ \+/\t/g' | cut -f 9)

In [None]:
@dataclass
class FileNames:

    name: str

    train_file: str
    test_file: str

    train_dtw: str
    train_fastdtw: str

    test_dtw: str
    test_fastdtw: str

In [None]:
sort_files = []

for file_name in tqdm(files):
    name = file_name.split("/")[-1].replace("_TRAIN.tsv", "")
    test_file = file_name.replace("TRAIN.tsv", "TEST.tsv")

    train_dtw = file_name.replace(".tsv", "_train_dtw.csv")
    train_fastdtw = file_name.replace(".tsv", "_train_fastdtw.csv")

    test_dtw = test_file.replace(".tsv", "_train_dtw.csv")
    test_fastdtw = test_file.replace(".tsv", "_train_fastdtw.csv")

    if not all(
        [
            os.path.exists(x)
            for x in (
                train_dtw,
                train_fastdtw,
                test_dtw,
                test_fastdtw,
            )
        ]
    ):
        continue

    fl = FileNames(
        name=name,
        train_file=file_name,
        test_file=test_file,
        train_dtw=train_dtw,
        train_fastdtw=train_fastdtw,
        test_dtw=test_dtw,
        test_fastdtw=test_fastdtw,
    )

    frame = pd.read_csv(file_name, delimiter="\t", header=None)
    test_frame = pd.read_csv(test_file, delimiter="\t", header=None)
    sort_files.append([frame.shape[0] + test_frame.shape[0], frame.shape[1], fl])

In [None]:
sort_files = sorted(sort_files, key=lambda x: x[0])
sort_files

In [None]:
from typing import Any, Callable, List, Optional, Union

import numpy as np
from fastdtw import fastdtw
from scipy.stats import spearmanr
from sklearn.base import RegressorMixin, TransformerMixin
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.linear_model import RANSACRegressor
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.utils import check_array, check_random_state
from tqdm.notebook import tqdm


def fastdtw_distance(x: Any, y: Any) -> float:
    return fastdtw(x, y)[0]


def euclidian_distance(x: Any, y: Any) -> float:
    return np.linalg.norm(x - y)


# Implementation of FDTW using Linear Regression for
# new prototype selection
class FeatureDTWTransformer(TransformerMixin):
    def __init__(
        self,
        n_start: int = 30,
        n_add: int = 10,
        n_max: int = 100,
        by: str = "mean",
        p_max: float = 0.7,
        regressor: RegressorMixin = GradientBoostingRegressor,
        copy_prototypes: bool = True,
        distance_func: Callable[[Any, Any], float] = fastdtw_distance,
        random_state: Optional[int] = None,
        n_jobs: Optional[int] = None,
    ) -> None:
        self.n_start = n_start
        self.n_add = n_add
        self.n_max = n_max
        self.random_state = random_state
        self.copy_prototypes = copy_prototypes
        self.distance_func = distance_func
        self.regressor = regressor
        self.n_jobs = n_jobs

        self.by = by
        self.p_max = p_max

    def fit_step(self, X: Any, y: Any = None) -> bool:

        if self.index_.shape[0] >= self.n_max:
            return False

        X_ = self.distances_

        p_all = []

        regressor = self.regressor(random_state=42)

        for i, prototype in enumerate(self.prototypes_):

            regressor.fit(X_[self.index_][:, i].reshape(-1, 1), self.s_corr_[i])

            predicted = regressor.predict(X_[:, i].reshape(-1, 1))

            predicted[predicted > 1] = 1
            predicted[predicted < -1] = -1

            predicted[self.index_] = self.s_corr_[i]

            p_all.append(predicted)

        p_all = np.abs(p_all)

        p_mean = p_all.mean(axis=0)
        p_max = p_all.max(axis=0)

        condition = (p_mean < 0.5) & (p_max < self.p_max)

        if self.by == "mean":
            sort_by = p_mean.argsort()
        else:
            sort_by = p_max.argsort()

        new_r = sort_by[condition[sort_by]][: self.n_add]

        if new_r.shape[0] == 0:
            return False

        self.add_protype(new_r, X)

        return True

    def fit(self, X: Any, y: Any = None) -> "FeatureDTWTransformer":

        raw_data = self.__check_array(X)

        self.fin = False
        self._shape = raw_data.shape

        rnd = check_random_state(self.random_state)

        self.index_ = rnd.choice(self._shape[0], self.n_start, replace=False)

        self.prototypes_ = np.array(raw_data[self.index_], copy=self.copy_prototypes)

        self.transform(X)

        while self.fit_step(X, y):
            pass

        return self

    def transform(self, X: Any, y: Any = None) -> np.ndarray:
        raw_data = self.__check_array(X)

        self.distances_ = raw_data[:, self.index_]

        self.s_corr_ = spearmanr(self.distances_, axis=0)[0]

        return self.distances_

    def add_protype(self, index: Union[int, List[int]], X: Any) -> np.ndarray:

        if isinstance(index, int):
            index = [index]

        mask = ~np.isin(index, self.index_)

        new_index = np.array(index)[mask]

        raw_data = self.__check_array(X)

        new_prototypes = raw_data[new_index]

        self.distances_ = np.hstack(
            (
                self.distances_,
                raw_data[:, new_index],
            )
        )

        self.prototypes_ = np.vstack((self.prototypes_, new_prototypes))
        self.index_ = np.append(self.index_, new_index)
        self.s_corr_ = spearmanr(self.distances_, axis=0)[0]

        return self.distances_

    def remove_prototype(self, index: Union[int, List[int]]) -> np.ndarray:
        if isinstance(index, int):
            index = [index]

        mask = ~np.isin(self.index_, index)

        self.index_ = self.index_[mask]
        self.prototypes_ = self.prototypes_[mask]
        self.distances_ = self.distances_[:, mask]

        return self.distances_

    def __check_array(self, X: Any) -> np.ndarray:
        return check_array(
            X, accept_sparse=False, dtype="numeric", force_all_finite="allow-nan"
        )

In [None]:
np.random.seed(42)

with open(f"../logs/classification-{datetime.now().isoformat()}.csv", "w") as out_file:
    writer = csv.writer(out_file, delimiter=",")
    writer.writerow(
        [
            "dataset",
            "n_features",
            "n_max",
            "1NN_fastdtw",
            "features_fastdtw",
            "fdtw_linear_fastdtw",
            "n_linear_used",
        ]
    )
    for n_samples, n_len, file_name in tqdm(sort_files):

        name = file_name.name

        train_frame = pd.read_csv(file_name.train_file, delimiter="\t", header=None)
        test_frame = pd.read_csv(file_name.test_file, delimiter="\t", header=None)

        y_train = train_frame[0].values
        y_test = test_frame[0].values

        train_fastdtw = pd.read_csv(file_name.train_fastdtw, delimiter=",", header=None)
        test_fastdtw = pd.read_csv(file_name.test_fastdtw, delimiter=",", header=None)

        n_max = np.min([np.rint(0.5 * train_fastdtw.shape[0]).astype(int), 100])
        row = [name, n_samples, n_max]

        row.append(
            round(
                accuracy_score(
                    y_pred=y_train[np.argmin(test_fastdtw.values, axis=1)],
                    y_true=y_test,
                ),
                3,
            )
        )

        # Features DTW
        try:
            X_train = train_fastdtw.values
            X_test = test_fastdtw.values

            svc = LinearSVC(random_state=42, max_iter=1000)
            func_timeout(600, svc.fit, args=(X_train, y_train))
            predicted = func_timeout(600, svc.predict, args=(X_test,))

            row.append(round(accuracy_score(y_true=y_test, y_pred=predicted), 3))
        except FunctionTimedOut:
            continue

        try:
            arr = []
            n_used = []

            for i in range(3):
                n_start = np.max([np.rint(0.2 * X_train.shape[0]).astype(int), 10])
                n_add = n_start

                fdtw = FeatureDTWTransformer(
                    n_start=n_start, n_add=n_add, n_max=n_max, by="mean", p_max=0.7
                )

                X_train = train_fastdtw.values

                fdtw.fit(X_train)

                X_train = X_train[:, fdtw.index_]

                X_test = test_fastdtw.values[:, fdtw.index_]

                svc = LinearSVC(random_state=42, max_iter=1000)
                func_timeout(600, svc.fit, args=(X_train, y_train))
                predicted = func_timeout(600, svc.predict, args=(X_test,))

                arr.append(accuracy_score(y_true=y_test, y_pred=predicted))
                n_used.append(fdtw.index_.shape[0])

            row.append(round(np.mean(arr), 3))
            row.append(round(np.mean(n_used), 3))
        except FunctionTimedOut:
            continue

        writer.writerow(row)
        out_file.flush()