In [2]:
%load_ext Cython

In [99]:
%%cython

import numpy as np
cimport numpy as np
cimport cython

DIST_DTYPE = np.float32
PATH_DTYPE = np.int

ctypedef np.float32_t DIST_DTYPE_t
ctypedef np.int_t PATH_DTYPE_t


@cython.boundscheck(False) # turn off bounds-checking for entire function
@cython.wraparound(False)  # turn off negative index wrapping for entire function
def dynamic_time_warping(np.ndarray[DIST_DTYPE_t, ndim=2] precomputed_distances):
    assert precomputed_distances.dtype == DIST_DTYPE

    cdef int n = precomputed_distances.shape[0]
    cdef int m = precomputed_distances.shape[1]
    assert n > 1, m > 1

    cdef np.ndarray[DIST_DTYPE_t, ndim=2] cost = np.full([n + 1, m + 1], np.inf, dtype=DIST_DTYPE)
    cost[0, 0] = 0

    cdef DIST_DTYPE_t dist
    cdef int i, j
    for i in range(1, n + 1):
        for j in range(1, m + 1):
            dist = precomputed_distances[i-1, j-1]
            cost[i, j] = dist + min(cost[i-1, j],
                                    cost[i, j-1],
                                    cost[i-1, j-1])
    cdef np.ndarray[PATH_DTYPE_t, ndim=2] path = compute_path(cost)
    return cost[1:, 1:], path


@cython.boundscheck(False) # turn off bounds-checking for entire function
@cython.wraparound(False)  # turn off negative index wrapping for entire function
def compute_path(np.ndarray[DIST_DTYPE_t, ndim=2] cost):

    cdef int i = cost.shape[0]
    cdef int j = cost.shape[1]

    cdef np.ndarray[PATH_DTYPE_t, ndim=2] p = np.zeros([i * j, 2], dtype=PATH_DTYPE)
    cdef int p_idx = p.shape[0] - 1

    i -= 2
    j -= 2

    p[p_idx, 0] = i
    p[p_idx, 1] = j
    p_idx -= 1

    cdef int k
    while i > 0 or j > 0:
        k = np.argmin((cost[i, j], cost[i, j + 1], cost[i + 1, j]))
        if k == 0:
            i -= 1
            j -= 1
        elif k == 1:
            i -= 1
        else:
            j -= 1

        p[p_idx, 0] = i
        p[p_idx, 1] = j
        p_idx -= 1

    return p[(p_idx + 1):]


In [84]:
def compute_path_slow(cost: np.ndarray) -> np.ndarray:
    # From: https://github.com/pierre-rouanet/dtw/blob/32e710adeb27764c4958fed5c57b6ed24fdc9bdf/dtw/dtw.py#L100

    i, j = cost.shape
    i -= 2
    j -= 2
    p, q = [i], [j]

    while i > 0 or j > 0:
        k = np.argmin((cost[i, j], cost[i, j + 1], cost[i + 1, j]))
        if k == 0:
            i -= 1
            j -= 1
        elif k == 1:
            i -= 1
        else:
            j -= 1
        p.append(i)
        q.append(j)

    return np.c_[p[::-1], q[::-1]]


def dynamic_time_warping_slow(precomputed_distances: np.ndarray):

    n, m = precomputed_distances.shape
    assert n > 1, m > 1

    cost = np.full((n + 1, m + 1), np.inf)
    cost[0, 0] = 0

    for i in range(1, n + 1):
        for j in range(1, m + 1):
            dist = precomputed_distances[i-1, j-1]
            cost[i, j] = dist + min(cost[i-1, j],
                                    cost[i, j-1],
                                    cost[i-1, j-1])
    path = compute_path_slow(cost)
    return cost[1:, 1:], path

In [106]:
the_input = np.random.randn(40,70).astype(np.float32)

In [107]:
c1, p1 = dynamic_time_warping(the_input)

In [108]:
c2, p2 = dynamic_time_warping_slow(the_input)

In [109]:
np.allclose(c1, c2), np.all(p1 == p2)

(False, True)

In [113]:
np.abs(c1-c2).max()

3.040558658540249e-05

In [105]:
large_input = np.random.randn(300,300).astype(np.float32)
large_input_cost = dynamic_time_warping(large_input)[0]

In [92]:
%timeit -n10 -r10 dynamic_time_warping_slow(large_input)

223 ms ± 1.15 ms per loop (mean ± std. dev. of 10 runs, 10 loops each)


In [100]:
%timeit -n10 -r10 dynamic_time_warping(large_input)

2.31 ms ± 174 µs per loop (mean ± std. dev. of 10 runs, 10 loops each)


In [96]:
%timeit -n10 -r10 compute_path(large_input_cost)

1.91 ms ± 161 µs per loop (mean ± std. dev. of 10 runs, 10 loops each)


In [78]:
%timeit -n10 -r100 compute_path_slow(large_input_cost)

2.62 ms ± 204 µs per loop (mean ± std. dev. of 100 runs, 10 loops each)
