# Dynamic Time Warping

Dynamic time warping (DWT) is a dynamic programming algorithm to find the **optimal** alignment between to time-dependent sequences.

In this notebook we will explore how this algorithm works.

In [None]:
import numpy as np
from helper import generate_example_sequences
import matplotlib.pyplot as plt
from scipy.spatial.distance import cdist
%matplotlib inline

In [None]:
# This is a helper method to generate sample sequences
X, Y, gr_path = generate_example_sequences(10, noise_scale=0.1)


vmax = max(max(abs(X.max()), abs(X.min())), max(abs(Y.max()), abs(Y.min())))
fig, axes = plt.subplots(2, sharex=False)
axes[0].imshow(X.T, cmap='gray', origin='lower', aspect='equal', interpolation=None, vmax=vmax, vmin=-vmax)
axes[1].imshow(Y.T, cmap='gray', origin='lower', aspect='equal', interpolation=None, vmax=vmax, vmin=-vmax)
axes[0].set_xlim((0, 20))
axes[1].set_xlim((0, 20))

In [None]:
def pairwise_distance_matrix(X, Y, metric='euclidean'):
    """
    Compute pairwise distance matrix of two sequences
    
    Parameters
    ----------
    X : np.ndarray
        A 2D array with size (n_observations, n_features)
    Y : np.ndarray
        A 2D array with size (m_observations, n_features)
    metric: str
        A string defining a metric (see possibilities 
        in scipy.spatial.distance.cdist)

    Returns
    -------
    D : np.ndarray
        Pairwise cost matrix
    """
    if X.ndim == 1:
        X, Y = np.atleast_2d(X, Y)
        X = X.T
        Y = Y.T
    C = cdist(X, Y, metric=metric)
    return C

C = pairwise_distance_matrix(X, Y, metric='euclidean')
print(C.shape)

In [None]:
# Visualize cost matrix
C = pairwise_distance_matrix(X, Y)
plt.imshow(C, origin='lower', aspect='equal', cmap='gray')

In [None]:
def accumulated_cost_matrix(C):
    """
    Dynamic time warping cost matrix from a pairwise distance matrix

    Parameters
    ----------
    D : double array
        Pairwise distance matrix (computed e.g., with `cdist`).

    Returns
    -------
    D : np.ndarray
        Accumulated cost matrix
    """
    N = C.shape[0]
    M = C.shape[1]
    D = np.zeros((N, M))
    D[0, 0] = C[0, 0]
    for n in range(1, N):
        D[n, 0] = D[n-1, 0] + C[n, 0]
    for m in range(1, M):
        D[0, m] = D[0, m-1] + C[0, m]
    for n in range(1, N):
        for m in range(1, M):
            D[n, m] = C[n, m] + min(D[n-1, m], D[n, m-1], D[n-1, m-1])
    return D

D = accumulated_cost_matrix(C) 

# Visualize accumulated cost matrix
plt.imshow(D, origin='lower', aspect='equal', cmap='gray')
plt.xlabel('Sequence Y')
plt.ylabel('Sequence X')

In [None]:
def optimal_warping_path(D):
    """
    Compute the warping path given an accumulated cost matrix

    Parameters
    ----------
    D: np.ndarray
        Accumulated cost Matrix
    
    Returns
    -------
    P: np.ndarray
        Optimal warping path
    """
    N = D.shape[0]
    M = D.shape[1]
    n = N - 1
    m = M - 1
    P = [(n, m)]
    while n > 0 or m > 0:
        if n == 0:
            cell = (0, m - 1)
        elif m == 0:
            cell = (n - 1, 0)
        else:
            val = min(D[n-1, m-1], D[n-1, m], D[n, m-1])
            if val == D[n-1, m-1]:
                cell = (n-1, m-1)
            elif val == D[n-1, m]:
                cell = (n-1, m)
            else:
                cell = (n, m-1)
        P.append(cell)
        (n, m) = cell
    P.reverse()
    return np.array(P)
        
P = optimal_warping_path(D)


In [None]:
plt.figure(figsize=(9, 3))
plt.subplot(1, 2, 1)
plt.imshow(C, cmap='gray_r', origin='lower', aspect='equal')
plt.plot(P[:, 1], P[:, 0], marker='o', color='r')
plt.clim([0, np.max(C)])
plt.colorbar()
plt.title('$C$ with optimal warping path')
plt.xlabel('Sequence Y')
plt.ylabel('Sequence X')

plt.subplot(1, 2, 2)
plt.imshow(D, cmap='gray_r', origin='lower', aspect='equal')
plt.plot(P[:, 1], P[:, 0], marker='o', color='r')
plt.clim([0, np.max(D)])
plt.colorbar()
plt.title('$D$ with optimal warping path')
plt.xlabel('Sequence Y')
plt.ylabel('Sequence X')

plt.tight_layout()


But of course, this is a very slow implementation of the algorithm!