In [6]:
import os
import numpy as np
import pandas as pd
import time
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.neighbors import KNeighborsClassifier
import math
import numpy as np
import array
inf = np.inf

class SquaredEuclidean:

    @staticmethod
    def inner_dist(x, y):
        return (x - y) ** 2

    @staticmethod
    def result(x):
        if np is not None and isinstance(x, np.ndarray):
            return np.sqrt(x)
        return math.sqrt(x)

    @staticmethod
    def inner_val(x):
        return x*x


class SquaredEuclideanNdim:

    @staticmethod
    def inner_dist(x, y):
        return np.sum((x - y) ** 2)

    @staticmethod
    def result(x):
        return np.sqrt(x)

    @staticmethod
    def inner_val(x):
        return x * x


class Euclidean:

    @staticmethod
    def inner_dist(x, y):
        return abs(x - y)

    @staticmethod
    def result(x):
        return x

    @staticmethod
    def inner_val(x):
        return x


class EuclideanNdim:

    @staticmethod
    def inner_dist(x, y):
        return np.sqrt(np.sum(np.power(x - y, 2)))

    @staticmethod
    def result(x):
        return x

    @staticmethod
    def inner_val(x):
        return x


class CustomInnerDist:

    @staticmethod
    def inner_dist(x, y):
        """The distance between two points in the series.

        For n-dimensional data, the two arguments x and y will be vectors.
        Otherwise, they are scalars.

        For example, for default DTW this would be the Squared Euclidean
        distance: (a-b)**2.
        """
        raise Exception("Function not defined")

    @staticmethod
    def result(x):
        """The transformation applied to the sum of all inner distances.

        The variable x can be both a single number as a matrix.

        For example, for default DTW, which uses Squared Euclidean, this
        would be: sqrt(d). Because d = (a_0-b_0)**2 + (a_1-b_1)**2 ...
        """
        raise Exception("Function not defined")

    @staticmethod
    def inner_val(x):
        """The transformation applied to input settings like max_step."""
        raise Exception("Function not defined")


def inner_dist_cls(inner_dist="squared euclidean", use_ndim=False):
    if inner_dist == "squared euclidean":
        if use_ndim:
            use_cls = SquaredEuclideanNdim
        else:
            use_cls = SquaredEuclidean
    elif inner_dist == "euclidean":
        if use_ndim:
            use_cls = EuclideanNdim
        else:
            use_cls = Euclidean
    elif hasattr(inner_dist, 'inner_dist') and hasattr(inner_dist, 'result'):
        use_cls = inner_dist
    else:
        raise AttributeError(f"Unknown value for argument inner_dist: {inner_dist}")
    return use_cls


def inner_dist_fns(inner_dist="squared euclidean", use_ndim=False):
    use_cls = inner_dist_cls(inner_dist, use_ndim)
    return use_cls.inner_dist, use_cls.result, use_cls.inner_val


def to_c(inner_dist):
    if inner_dist == 'squared euclidean':
        return 0
    elif inner_dist == 'euclidean':
        return 1
    elif hasattr(inner_dist, 'inner_dist') and hasattr(inner_dist, 'result'):
        raise AttributeError('Custom inner distance functions are not supported for the fast C implementation')
    else:
        raise AttributeError('Unknown inner_dist: {}'.format(inner_dist))
print(SquaredEuclidean.inner_dist(5, 2))  

class DTWSettings:
    def __init__(self, window=None, use_pruning=False, max_dist=None, max_step=None,
                 max_length_diff=None, penalty=None, psi=None, inner_dist="squared euclidean",
                 use_ndim=False, use_c=False):
        """Settings for Dynamic Time Warping distance methods.

        :param window: Only allow for maximal shifts from the two diagonals smaller than this number.
            The maximally allowed warping, thus difference between indices i in series 1 and j in series 2,
            is thus ``|i-j| < 2*window + |len(s1) - len(s2)|``.
            It includes the diagonal, meaning that Euclidean distance is obtained by setting
            ``window=1.``
            If the two series are of equal length, this means that the band you see appearing
            on the cumulative cost matrix is of width 2*window-1. In other definitions of DTW
            this is referred to as the window.
        :param max_dist: Stop if the returned values will be larger than this value
        :param max_step: Do not allow steps larger than this value.
            If the difference between two values in the two series is larger than this, thus
            if ``|s1[i]-s2[j]| > max_step``, replace that value with infinity.
        :param max_length_diff: Return infinity if length of two series is larger
        :param penalty: Penalty to add if compression or expansion is applied
        :param psi: Psi relaxation parameter (ignore start and end of matching).
            If psi is a single integer, it is used for both start and end relaxations of both series.
            If psi is a 4-tuple, it is used as the psi-relaxation for
            (begin series1, end series1, begin series2, end series2).
            Useful for cyclical series.
        :param use_pruning: Prune values based on Euclidean distance.
            This is the same as passing ub_euclidean() to max_dist
        :param inner_dist: Distance between two points in the time series.
            One of 'squared euclidean' (default), 'euclidean'.
            When using the pure Python implementation (thus use_c=False) then the argument can also
            be an object that has as callable arguments 'inner_dist', 'result', and 'inner_val'. The 'inner_dist'
            function computes the distance between two points (e.g., squared euclidean) and 'result'
            is the function to apply to the final distance (e.g., sqrt when using squared euclidean).
            You can also inherit from the 'innerdistance.CustomInnerDist' class.
        :param use_ndim: Use n-dimensional (aka multivariate) series instead of 1-dimensional series.
        :param use_c: Use the C variant if available.
        """
        self.window = window
        self.use_pruning = use_pruning
        self.max_dist = max_dist
        self.max_step = max_step
        self.max_length_diff = max_length_diff
        self.penalty = penalty
        self.psi = psi
        self.inner_dist = inner_dist
        self.use_ndim = use_ndim
        self.use_c = use_c

        _, _, inner_val = inner_dist_fns(self.inner_dist)

        if not self.max_step:
            self.adj_max_step = inf
        else:
            self.adj_max_step = inner_val(self.max_step)

        if not self.max_dist:
            self.adj_max_dist = inf
        else:
            self.adj_max_dist = inner_val(self.max_dist)

        if not self.penalty:
            self.adj_penalty = 0
        else:
            self.adj_penalty = inner_val(self.penalty)

        if self.max_length_diff is None:
            self.adj_max_length_diff = inf
        else:
            self.adj_max_length_diff = self.max_length_diff

    @staticmethod
    def for_dtw(s1, s2, **kwargs):
        settings = DTWSettings(**kwargs)
        if settings.window is None:
            settings.window = max(len(s1), len(s2))
        settings.set_max_dist(s1, s2)
        return settings

    def set_max_dist(self, s1, s2):
        _, _, ival_fn = inner_dist_fns(self.inner_dist, use_ndim=self.use_ndim)
        if self.use_pruning:
            self.adj_max_dist = ival_fn(ub_euclidean(s1, s2, inner_dist=self.inner_dist))

    def kwargs(self):
        return {
            'window': self.window,
            'use_pruning': self.use_pruning,
            'max_dist': self.max_dist,
            'max_step': self.max_step,
            'max_length_diff': self.max_length_diff,
            'penalty': self.penalty,
            'psi': self.psi,
            'inner_dist': self.inner_dist,
            'use_ndim': self.use_ndim,
            'use_c': self.use_c
        }

    def c_kwargs(self):
        window = 0 if self.window is None else self.window
        max_dist = 0 if self.max_dist is None else self.max_dist
        max_step = 0 if self.max_step is None else self.max_step
        max_length_diff = 0 if (self.max_length_diff is None or math.isinf(self.max_length_diff)) \
            else self.max_length_diff
        penalty = 0 if self.penalty is None else self.penalty
        psi = 0 if self.psi is None else self.psi
        use_pruning = 0 if self.use_pruning is None else self.use_pruning
        inner_dist = to_c(self.inner_dist)
        return {
            'window': window,
            'max_dist': max_dist,
            'max_step': max_step,
            'max_length_diff': max_length_diff,
            'penalty': penalty,
            'psi': psi,
            'use_pruning': use_pruning,
            'inner_dist': inner_dist
        }

    def split_psi(self):
        psi_1b = psi_1e = psi_2b = psi_2e = 0
        if type(self.psi) is int:
            psi_1b = psi_1e = psi_2b = psi_2e = self.psi
        elif type(self.psi) in [tuple, list]:
            psi_1b, psi_1e, psi_2b, psi_2e = self.psi
        return psi_1b, psi_1e, psi_2b, psi_2e

    def __str__(self):
        r = ''
        a = self.c_kwargs()
        for k, v in a.items():
            r += '{}: {}\n'.format(k, v)
        return r
# (5 - 2) ** 3 = 3 ** 3 = 27


def ub_euclidean(s1, s2, inner_dist="squared euclidean"):
    """ See :meth:`dtaidistance.ed.euclidean_distance`"""
    return ed.distance(s1, s2, inner_dist=inner_dist)


def distance(s1, s2, only_ub=False, **kwargs):
    """
    Dynamic Time Warping.

    This function keeps a compact matrix, not the full warping paths matrix.

    Uses dynamic programming to compute::

        wps[i, j] = (s1[i]-s2[j])**2 + min(
                        wps[i-1, j  ] + penalty,  // vertical   / insertion / expansion
                        wps[i  , j-1] + penalty,  // horizontal / deletion  / compression
                        wps[i-1, j-1])            // diagonal   / match
        dtw = sqrt(wps[-1, -1])

    :param s1: First sequence
    :param s2: Second sequence
    :param only_ub: Only compute the upper bound (Euclidean).
    :param kwargs: :class:`DTWSettings` arguments

    Returns: DTW distance
    """
    s = DTWSettings.for_dtw(s1, s2, **kwargs)
    if s.use_c:
        if dtw_cc is None:
            logger.warning("C-library not available, using the Python version")
        else:
            return distance_fast(s1, s2, **s.kwargs())
    idist_fn, result_fn, ival_fn = inner_dist_fns(s.inner_dist, use_ndim=s.use_ndim)
    r, c = len(s1), len(s2)
    if s.adj_max_length_diff is not None and abs(r - c) > s.adj_max_length_diff:
        return inf
    if only_ub:
        return ival_fn(ub_euclidean(s1, s2, inner_dist=s.inner_dist))

    psi_1b, psi_1e, psi_2b, psi_2e = s.split_psi()
    length = min(c + 1, abs(r - c) + 2 * (s.window - 1) + 1 + 1 + 1)
    dtw = array.array('d', [inf] * (2 * length))
    sc = 0
    ec = 0
    for i in range(psi_2b + 1):
        dtw[i] = 0
    skip = 0
    i0 = 1
    i1 = 0
    psi_shortest = inf
   
    for i in range(r):
        #print("i={}".format(i))
        
        skipp = skip
        skip = max(0, i - max(0, r - c) - s.window + 1)
        i0 = 1 - i0
        i1 = 1 - i1
        for ii in range(i1*length, i1*length+length):
            dtw[ii] = inf
        j_start = max(0, i - max(0, r - c) - s.window + 1)
        j_end = min(c, i + max(0, c - r) + s.window)
        if sc > j_start:
            j_start = sc
        smaller_found = False
        ec_next = i
        if length == c + 1:
            skip = 0
        if psi_1b != 0 and j_start == 0 and i < psi_1b:
            dtw[i1 * length] = 0
        for j in range(j_start, j_end):
            # d = (s1[i] - s2[j])**2
            d = idist_fn(s1[i], s2[j])
            #print(d)
            if d > s.adj_max_step:
                continue
            assert j + 1 - skip >= 0
            assert j - skipp >= 0
            assert j + 1 - skipp >= 0
            assert j - skip >= 0
            dtw[i1 * length + j + 1 - skip] = d + min(dtw[i0 * length + j - skipp],
                                                      dtw[i0 * length + j + 1 - skipp] + s.adj_penalty,
                                                      dtw[i1 * length + j - skip] + s.adj_penalty)
            #print('({},{}), ({},{}), ({},{})'.format(i0, j - skipp, i0, j + 1 - skipp, i1, j - skip))
            # print('{}, {}, {}'.format(dtw[i0, j - skipp], dtw[i0, j + 1 - skipp], dtw[i1, j - skip]))
            # print('i={}, j={}, d={}, skip={}, skipp={}'.format(i,j,d,skip,skipp))
            
            #print(dtw)
            if dtw[i1 * length + j + 1 - skip] > s.adj_max_dist:
                if not smaller_found:
                    sc = j + 1
                if j >= ec:
                    break
            else:
                smaller_found = True
                ec_next = j + 1
        ec = ec_next
        if psi_1e != 0 and j_end == len(s2) and len(s1) - 1 - i <= psi_1e:
            psi_shortest = min(psi_shortest, dtw[i1 * length + j_end - skip])
    if psi_1e == 0 and psi_2e == 0:
        d = dtw[i1 * length + min(c, c + s.window - 1) - skip]
    else:
        ic = min(c, c + s.window - 1) - skip
        if psi_2e != 0:
            vc = dtw[i1 * length + ic - psi_2e:i1 * length + ic + 1]
            d = min(array_min(vc), psi_shortest)
        else:
            d = min(dtw[i1 * length + min(c, c + s.window - 1) - skip], psi_shortest)
    if s.adj_max_dist and d > s.adj_max_dist:
        d = inf
    d = result_fn(d)
    return d


# Local Path for UCR Archive
ucr_path = r"C:\Users\vaish\soft-dtw\UCRArchive_2018"

# Selected datasets for analysis
datasets = ["Lightning2"]
def dtw_distance(x, y):
    distance1=distance(x, y)
    return distance1


def compute_1nn(X_train, y_train, X_test, y_test, distance_metric):
    """Computes 1-NN classification with given distance metric."""
    if distance_metric == "euclidean":
        clf = KNeighborsClassifier(n_neighbors=1, metric="euclidean")
    elif distance_metric == "dtw":
        clf = KNeighborsClassifier(n_neighbors=1, metric=dtw_distance)
    else:
        raise ValueError("Invalid distance metric")

    clf.fit(X_train, y_train)

    total_data_points = len(X_test)
    y_pred = []
    for i, test_point in enumerate(X_test):
        print(f"Processing {i+1}/{total_data_points} in {distance_metric}...")
        y_pred.append(clf.predict([test_point])[0])

    accuracy = accuracy_score(y_test, y_pred)
    error_rate = 1 - accuracy
    precision = precision_score(y_test, y_pred, average='macro', zero_division=0)
    recall = recall_score(y_test, y_pred, average='macro', zero_division=0)
    f1 = f1_score(y_test, y_pred, average='macro', zero_division=0)

    return accuracy, error_rate, precision, recall, f1

if __name__ == "__main__":
    total_datasets = len(datasets)
    start_time = time.time()

    for idx, dataset in enumerate(datasets, start=1):
        dataset_start_time = time.time()
        print(f"\n🔹 [{idx}/{total_datasets}] Processing {dataset}...")

        dataset_path = os.path.join(ucr_path, dataset)
        train_file = os.path.join(dataset_path, f"{dataset}_TRAIN.tsv")
        test_file = os.path.join(dataset_path, f"{dataset}_TEST.tsv")

        if not os.path.exists(train_file) or not os.path.exists(test_file):
            print(f"⚠️ Skipping {dataset}, files not found.")
            continue

        train_data = np.loadtxt(train_file)
        test_data = np.loadtxt(test_file)

        X_train, y_train = train_data[:, 1:], train_data[:, 0]
        X_test, y_test = test_data[:, 1:], test_data[:, 0]

        acc_dtw, err_dtw, prec_dtw, rec_dtw, f1_dtw = compute_1nn(X_train, y_train, X_test, y_test, "dtw")
        acc_euc, err_euc, prec_euc, rec_euc, f1_euc = compute_1nn(X_train, y_train, X_test, y_test, "euclidean")

        results_df = pd.DataFrame({
            "Metric": ["Accuracy", "Error Rate", "Precision", "Recall", "F1 Score"],
            "DTW 1NN": [acc_dtw, err_dtw, prec_dtw, rec_dtw, f1_dtw],
            "Euclidean 1NN": [acc_euc, err_euc, prec_euc, rec_euc, f1_euc]
        })

        print("\n📊 Performance Metrics:\n")
        print(results_df)

        dataset_elapsed_time = time.time() - dataset_start_time
        elapsed_time = time.time() - start_time
        remaining_time = (elapsed_time / idx) * (total_datasets - idx)
        print(f"⏳ Time taken for {dataset}: {dataset_elapsed_time:.2f}s")
        print(f"⏳ Estimated remaining time: {remaining_time:.2f}s")

    print(f"\n✅ All datasets processed in {time.time() - start_time:.2f}s! 🎉")









9

🔹 [1/1] Processing Lightning2...
Processing 1/61 in dtw...
Processing 2/61 in dtw...
Processing 3/61 in dtw...
Processing 4/61 in dtw...
Processing 5/61 in dtw...
Processing 6/61 in dtw...
Processing 7/61 in dtw...
Processing 8/61 in dtw...
Processing 9/61 in dtw...
Processing 10/61 in dtw...
Processing 11/61 in dtw...
Processing 12/61 in dtw...
Processing 13/61 in dtw...
Processing 14/61 in dtw...
Processing 15/61 in dtw...
Processing 16/61 in dtw...
Processing 17/61 in dtw...
Processing 18/61 in dtw...
Processing 19/61 in dtw...
Processing 20/61 in dtw...
Processing 21/61 in dtw...
Processing 22/61 in dtw...
Processing 23/61 in dtw...
Processing 24/61 in dtw...
Processing 25/61 in dtw...
Processing 26/61 in dtw...
Processing 27/61 in dtw...
Processing 28/61 in dtw...
Processing 29/61 in dtw...
Processing 30/61 in dtw...
Processing 31/61 in dtw...
Processing 32/61 in dtw...
Processing 33/61 in dtw...
Processing 34/61 in dtw...
Processing 35/61 in dtw...
Processing 36/61 in dtw...
P