In [None]:
import requests
import json
import time
import os
import sys
import numpy as np
from scipy.signal import cwt, ricker
import matplotlib.pyplot as plt
import pandas as pd

In [None]:
def adaptive_multi_scale_dtw(x, y, scales, window_size):
    n = len(x)
    m = len(y)
    
    # decompose
    x_wavelets = cwt(x, ricker, scales)
    y_wavelets = cwt(y, ricker, scales)

    # might be good to fit ||y-W||^2 + a||BW||^2 where would represent B is descrete second order derivative
    # but hard to say what y would be
    weights = np.ones(len(scales)) / len(scales)  # equal weights for simplicity
    
    dtw_matrix = np.zeros((n + 1, m + 1))
    dtw_matrix[0, 1:] = np.inf
    dtw_matrix[1:, 0] = np.inf
    
    for i in range(1, n + 1):
        for j in range(max(1, i - window_size), min(m + 1, i + window_size)):
            cost = np.sum(weights * np.abs(x_wavelets[:, i - 1] - y_wavelets[:, j - 1]))
            dtw_matrix[i, j] = cost + min(dtw_matrix[i - 1, j], dtw_matrix[i, j - 1], dtw_matrix[i - 1, j - 1])
    
    distance = dtw_matrix[n, m]
    print(f"Distance: {distance}")
    return distance
scales = np.arange(1, 11)
window_size = 10


In [None]:
from scipy.spatial.distance import euclidean
from fastdtw import fastdtw

In [None]:
class AdaptiveMultiScaleDTW:
    def __init__(self, scales, window_size):
        self.scales = scales
        self.window_size = window_size

    def __call__(self, x, y):
        return self.adaptive_multi_scale_dtw(x, y)

    def adaptive_multi_scale_dtw(self, x, y):
        n = len(x)
        m = len(y)
        
        x_wavelets = cwt(x, ricker, scales)
        y_wavelets = cwt(y, ricker, scales)

        
        weights = np.ones(len(scales)) / len(scales)
        
        # Multi-Scale DTW
        dtw_matrix = np.zeros((n + 1, m + 1))
        dtw_matrix[0, 1:] = np.inf
        dtw_matrix[1:, 0] = np.inf
        
        for i in range(1, n + 1):
            for j in range(max(1, i - window_size), min(m + 1, i + window_size)):
                cost = np.sum(weights * np.abs(x_wavelets[:, i - 1] - y_wavelets[:, j - 1]))
                dtw_matrix[i, j] = cost + min(dtw_matrix[i - 1, j], dtw_matrix[i, j - 1], dtw_matrix[i - 1, j - 1])
        
        distance = dtw_matrix[n, m]

        return distance

In [None]:
class ClassicDTW:
    def __call__(self, x, y):
        return self.classic_dtw(x, y)

    def classic_dtw(self, x, y):
        distance, _ = fastdtw(x, y, dist=2)
        return distance

In [None]:
def generate_timeseries(length, num_series, noise_level=0.1, seasonal_period=1000):
    t = np.arange(length)
    series = []
    
    for i in range(num_series):
        frequency = np.random.uniform(0.01, 0.1)
        sinusoid = np.sin(2 * np.pi * frequency * t)
        brownian = np.cumsum(np.random.normal(0, 0.1, length))
        seasonal = np.sin(2 * np.pi * t / seasonal_period)
        series_i = sinusoid + brownian + seasonal
        noise = np.random.normal(0, noise_level, length)
        series_i += noise
        series.append(series_i)
    
    return np.array(series)

In [None]:
length = 500

In [None]:
base_series = np.sin(2 * np.pi * 0.01* np.arange(length)) + np.cumsum(np.random.normal(0, 0.1, length)) + np.sin(2 * np.pi * np.arange(length) / 1000)
plt.plot(base_series)

In [None]:
def gen(base_series, num_series, deviation_factor=0.1, noise_level=0.01):
    length = len(base_series)
    series = [base_series]
    
    for i in range(1, num_series):
        deviation = i * deviation_factor
        brownian = np.cumsum(np.random.normal(0, deviation, length))
        noise = np.random.normal(0, deviation * noise_level, length)
        series_i = base_series + brownian + noise
        series.append(series_i)
    
    return np.array(series)

In [None]:
length = 500
num_series = 10
deviation_factor = 0.05
timeseries_data = gen(base_series, num_series, deviation_factor)

In [None]:
plt.figure(figsize=(10, 6))
for i in range(num_series):
    plt.plot(timeseries_data[i], label=f'Time Series {i + 1}')
plt.title('Synthetic Time Series Data')

In [None]:
def traditional_dtw_distance(x, y):
    distance, path = fastdtw(x, y)
    return distance

In [None]:
import sklearn
from sklearn import preprocessing

In [None]:
scales = np.arange(1, 21)  # Example scales
window_size = 10  # Example window size
distances = []
basic_distances = []
for i in range(num_series):
    x = base_series
    y = timeseries_data[i]
    
    distance = adaptive_multi_scale_dtw(x, y, scales, window_size)
    basic = traditional_dtw_distance(x, y)
    basic_distances.append(basic)
    distances.append(distance)

print("Distances between timeseries:")
print(distances)
print("Basic DTW distances between timeseries:")
print(basic_distances)

In [None]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score

# Generate labeled timeseries data
num_classes = 2
num_series_per_class = 50
length = 100
deviation_factor = 0.01

timeseries_data = []
labels = []
# base_fncs = [np.sin, np.cos]
base_series1 = np.sin(2 * np.pi * 0.01* np.arange(length)) + np.cumsum(np.random.normal(0, 0.1, length)) + np.sin(2 * np.pi * np.arange(length) / 1000)
# base_series1 = (base_series1 - base_series1.min()) / (base_series1.max() - base_series1.min())
base_series2 = np.cos(2 * np.pi * 0.1* np.arange(length)) + np.cumsum(np.random.normal(0, 0.2, length)) + np.sin(2 * np.pi * np.arange(length) / 1000)
# base_series2 = (base_series2 - base_series2.min()) / (base_series2.max() - base_series2.min())
base_series = [base_series1, base_series2]

# generate classes
for class_idx in range(num_classes):
    base = base_series[class_idx]
    class_data = gen(base, num_series_per_class, deviation_factor)
    timeseries_data.extend(class_data)
    labels.extend([class_idx] * num_series_per_class)
    plt.figure(figsize=(10, 6))
    for i in range(num_series):
        plt.plot(class_data[i], label=f'Time Series {i + 1}')
    plt.title('Synthetic Time Series Data for class ' + str(class_idx))
        
timeseries_data = np.array(timeseries_data)
labels = np.array(labels)
indices = np.arange(len(timeseries_data))
np.random.shuffle(indices)
timeseries_data = timeseries_data[indices]
labels = labels[indices]

train_size = int(0.8 * len(timeseries_data))
train_data, test_data = timeseries_data[:train_size], timeseries_data[train_size:]
train_labels, test_labels = labels[:train_size], labels[train_size:]

adaptive_dtw_metric = AdaptiveMultiScaleDTW(scales=np.arange(1,21), window_size=10)
classic_dtw = ClassicDTW()
knn_euclidean = KNeighborsClassifier(n_neighbors=3, metric=classic_dtw)
knn_adaptive_dtw = KNeighborsClassifier(n_neighbors=3, metric=adaptive_dtw_metric)

knn_euclidean.fit(train_data, train_labels)
knn_adaptive_dtw.fit(train_data, train_labels)

predictions_euclidean = knn_euclidean.predict(test_data)
predictions_adaptive_dtw = knn_adaptive_dtw.predict(test_data)

accuracy_euclidean = accuracy_score(test_labels, predictions_euclidean)
accuracy_adaptive_dtw = accuracy_score(test_labels, predictions_adaptive_dtw)

print("Accuracy with Euclidean distance:", accuracy_euclidean)
print("Accuracy with Adaptive Multi-Scale DTW distance:", accuracy_adaptive_dtw)

for i in range(len(test_data)):
    plt.plot(test_data[i], label=f'Time Series {i + 1}')
    plt.title(f'True Class: {test_labels[i]}, Predicted Class (Euclidean): {predictions_euclidean[i]}')
    plt.show()
    plt.plot(test_data[i], label=f'Time Series {i + 1}')
    plt.title(f'True Class: {test_labels[i]}, Predicted Class (Adaptive Multi-Scale DTW): {predictions_adaptive_dtw[i]}')
    plt.show()

In [None]:
t = np.linspace(0, 10, 100)
exp_growth = np.exp(t / 3) + np.random.normal(0, 0.4, t.shape)  # exponential growth
decaying_sinusoid = np.sin(t) * np.exp(-t / 5)  + np.random.normal(0, 0.4, t.shape)  # decaying sinusoid
exp_growth = (exp_growth - exp_growth.min()) / (exp_growth.max() - exp_growth.min())
decaying_sinusoid = (decaying_sinusoid - decaying_sinusoid.min()) / (decaying_sinusoid.max() - decaying_sinusoid.min())

simple_dtw_dissimilar_distance = traditional_dtw_distance(exp_growth, decaying_sinusoid)
adaptive_dtw_dissimilar_distance = adaptive_multi_scale_dtw(exp_growth, decaying_sinusoid, scales, window_size)

plt.figure(figsize=(10, 6))
plt.plot(t, exp_growth, label='Exponential Growth')
plt.plot(t, decaying_sinusoid, label='Decaying Sinusoidal')
plt.title('Highly Dissimilar Time Series')
plt.legend()
plt.xlabel('Time')
plt.ylabel('Value')
plt.show()

simple_dtw_dissimilar_distance, adaptive_dtw_dissimilar_distance