In [1]:
import numpy as np
from resources.dtw import dtw___
import pandas as pd
import time

# DYNAMIC TIME WARPING

Dynamic Time Warping (DTW) is a method used to measure the similarity between two time series that can differ in time or speed.

Below is a manual explanation of how DTW works, step by step:

> #### **PROBLEM SETUP**
> * We have two sequences:
>   - Sequence X = $[x_1, x_2, ..., x_n]$
>   - Sequence Y = $[y_1, y_2, ..., y_n]$
> * The goal is to find the optimal alignment between $X$ and $Y$ such that the cumulative distance between them is minimized.

> #### **DISTANCE MATRIX**
> * Construct a distance matrix $D$ of size $n \times m$, where each cell $D(i, j)$ represents the distance between $x_i$ dan $y_j$
> * The distance can be Euclidean Distance, Manhattan Distance, or Any other suitable metric:
$$ D(i, j)=|x_i-y_i| $$

> #### **ACCUMULATED COST MATRIX**
> * Construct an accumulated cost matrix $C$ of size $n \times m$ where each cell $C(i, j)$ represents the minimum cumulative cost to reach $(i, j)$ from $(1, 1)$
> * Initialize the first cell:
> $$ C(1, 1) = D(1, 1) $$
> * Fill the first row and first column:
> $$ C(i, 1) = C(i-1, 1) + D(i, 1) \:\:\:\:for\:i=2,...,n $$
> $$C(1, j) = C(1, j-1) + D(1, j) \:\:\:\:for\:i=2,...,m $$
> * For the rest of the cells, compute:
> $$ C(i,j)=D(i,j) + \min(C(i-1, j), C(i,j-1), C(i-1,j-1)) $$

> #### **WARPING PATH**
> * The warping path $W$ is sequence of cells $(i, j)$ that defines the optimal alignment between $X$ and $Y$.
> * Start from $(n,m)$ and backtrack to $(1,1)$ by choosing the cell with the minimum cumulative cost at each step.

> #### **DTW DISTANCE**
> * The DTW distance is the value in the bottom-right corner og rhe accumulated cost matrix:
> $$ DTW(X,Y)=C(n,m) $$
> * This represents the minimum cumulative distance between the two sequences.


### `func dtw_distance`
> **parameter**
> * `X`: X signal (can be complex)
> * `Y`: Y signal (can be complex)
> * 'dist': distance calculation method, default: euclidean

> **output**
> `dist`: This represents the minimum cumulative distance between the two sequences.


## MATCHING WITH EXTRACTED FEATURES

First, prepare the schema for the testing

In [2]:
# train_frame
train_05, train_10, train_15 = [], [], []

# test_type(normal/noise)_duration(%)_frame
test_normal_50_05,\
  test_normal_50_10,\
  test_normal_50_15 = [], [], []

test_normal_100_05, \
  test_normal_100_10, \
  test_normal_100_15 = [], [], []

test_noise_50_05, \
  test_noise_50_10, \
  test_noise_50_15 = [], [], []

test_noise_100_05, \
  test_noise_100_10, \
  test_noise_100_15 = [], [], []

In [3]:
def apply_schema(schema, csv_path, npy_path):
  csv = pd.read_csv(csv_path)
  for i, row in csv.iterrows():
    schema.append({
      'title': row['title'],
      'artist': row['artist'],
      'npy_path': f"{npy_path}/{row['title']}.npy",
    })

In [4]:
# schema for train dataset
apply_schema(train_05, csv_path="csv/train.csv", npy_path="npy/train/05")
apply_schema(train_10, csv_path="csv/train.csv", npy_path="npy/train/10")
apply_schema(train_15, csv_path="csv/train.csv", npy_path="npy/train/15")

# schema for test dataset
apply_schema(test_normal_50_05, csv_path="csv/test_normal_50.csv", npy_path="npy/test/normal/50/05")
apply_schema(test_normal_50_10, csv_path="csv/test_normal_50.csv", npy_path="npy/test/normal/50/10")
apply_schema(test_normal_50_15, csv_path="csv/test_normal_50.csv", npy_path="npy/test/normal/50/15")
apply_schema(test_normal_100_05, csv_path="csv/test_normal_100.csv", npy_path="npy/test/normal/100/05")
apply_schema(test_normal_100_10, csv_path="csv/test_normal_100.csv", npy_path="npy/test/normal/100/10")
apply_schema(test_normal_100_15, csv_path="csv/test_normal_100.csv", npy_path="npy/test/normal/100/15")

# apply_schema(test_noise_50_05, csv_path="csv/test_noise_50.csv", npy_path="npy/test/noise/50/05")
# apply_schema(test_noise_50_10, csv_path="csv/test_noise_50.csv", npy_path="npy/test/noise/50/10")
# apply_schema(test_noise_50_15, csv_path="csv/test_noise_50.csv", npy_path="npy/test/noise/50/15")
# apply_schema(test_noise_100_05, csv_path="csv/test_noise_100.csv", npy_path="npy/test/noise/100/05")
# apply_schema(test_noise_100_10, csv_path="csv/test_noise_100.csv", npy_path="npy/test/noise/100/10")
# apply_schema(test_noise_100_15, csv_path="csv/test_noise_100.csv", npy_path="npy/test/noise/100/15")

In [5]:
pd.DataFrame(train_05).head(5)

Unnamed: 0,title,artist,npy_path
0,4U,Convex,npy/train/05/4U.npy
1,23,Diamond Eyes,npy/train/05/23.npy
2,Watch The World Burn,"Paul Flint, Chris Linton",npy/train/05/Watch The World Burn.npy
3,Ark,Ship Wrek & Zookeepers,npy/train/05/Ark.npy
4,Arrow,Jim Yosef,npy/train/05/Arrow.npy


test the train with test, and the result is in csv saved in /result/

In [6]:
def apply_testing(train, test, result_name):
  print(f"test {result_name}")
  results = []
  for te in test:
    distances = []
    start_exec = time.time()
    for tr in train:
      x = np.load(te['npy_path'])
      y = np.load(tr['npy_path'])
      dist, _ = dtw___(x, y)
      distances.append({
        'title': tr['title'],
        'artist': tr['artist'],
        'distance': round(dist, 2),
      })
    end_exec = round((time.time() - start_exec), 2)
    min_distance = min(distances, key=lambda x: x['distance'])
    min_distance['exec_time'] = end_exec
    results.append({
      'title': te['title'],
      'predicted_title': min_distance['title'],
      'predicted_artist': min_distance['artist'],
      'distance': min_distance['distance'],
      'exec_time': min_distance['exec_time'],
    })
  pd.DataFrame(results).to_csv(f"result/matching/{result_name}.csv", index=False)

In [7]:
apply_testing(train=train_05, test=test_normal_50_05, result_name="train_05__test_normal_50_05")
apply_testing(train=train_10, test=test_normal_50_10, result_name="train_10__test_normal_50_10")
apply_testing(train=train_15, test=test_normal_50_15, result_name="train_15__test_normal_50_15")
apply_testing(train=train_05, test=test_normal_100_05, result_name="train_05__test_normal_100_05")
apply_testing(train=train_10, test=test_normal_100_10, result_name="train_10__test_normal_100_10")
apply_testing(train=train_15, test=test_normal_100_15, result_name="train_15__test_normal_100_15")

# apply_testing(train=train_05, test=test_noise_50_05, result_name="train_05__test_noise_50_05")
# apply_testing(train=train_10, test=test_noise_50_10, result_name="train_10__test_noise_50_10")
# apply_testing(train=train_15, test=test_noise_50_15, result_name="train_15__test_noise_50_15")
# apply_testing(train=train_05, test=test_noise_100_05, result_name="train_05__test_noise_100_05")
# apply_testing(train=train_10, test=test_noise_100_10, result_name="train_10__test_noise_100_10")
# apply_testing(train=train_15, test=test_noise_100_15, result_name="train_15__test_noise_100_15")

test train_05__test_normal_50_05
test train_10__test_normal_50_10
test train_15__test_normal_50_15
test train_05__test_normal_100_05
test train_10__test_normal_100_10
test train_15__test_normal_100_15
