### Variations of DTW

In [8]:
%%cython
import numpy as np
cimport numpy as np
cimport cython

import sys
import time

UsageError: Cell magic `%%cython` not found.


In [16]:
%%cython
import numpy as np
cimport numpy as np
cimport cython

import sys
import time


DTYPE_INT32 = np.int32
ctypedef np.int32_t DTYPE_INT32_t

DTYPE_FLOAT = np.float64
ctypedef np.float64_t DTYPE_FLOAT_t

cdef DTYPE_FLOAT_t MAX_FLOAT = float('inf')

# careful, without bounds checking can mess up memory - also can't use negative indices I think (like x[-1])
@cython.boundscheck(False) # turn off bounds-checking for entire function
def DTW_Cost_To_AccumCostAndSteps(Cin, parameter, WarpMax=None):
    '''
    Inputs
        C: The cost Matrix
    '''


    '''
    Section for checking and catching errors in the inputs
    '''

    cdef np.ndarray[DTYPE_FLOAT_t, ndim=2] C
    try:
        C = np.array(Cin, dtype=DTYPE_FLOAT)
    except TypeError:
        print(bcolors.FAIL + "FAILURE: The type of the cost matrix is wrong - please pass in a 2-d numpy array" + bcolors.ENDC)
        return [-1, -1, -1]
    except ValueError:
        print(bcolors.FAIL + "FAILURE: The type of the elements in the cost matrix is wrong - please have each element be a float (perhaps you passed in a matrix of ints?)" + bcolors.ENDC)
        return [-1, -1, -1]

    cdef np.ndarray[np.uint32_t, ndim=1] dn
    cdef np.ndarray[np.uint32_t, ndim=1] dm
    cdef np.ndarray[DTYPE_FLOAT_t, ndim=1] dw
    # make sure dn, dm, and dw are setup
    # dn loading and exception handling
    if ('dn'  in parameter.keys()):
        try:

            dn = np.array(parameter['dn'], dtype=np.uint32)
        except TypeError:
            print(bcolors.FAIL + "FAILURE: The type of dn (row steps) is wrong - please pass in a 1-d numpy array that holds uint32s" + bcolors.ENDC)
            return [-1, -1, -1]
        except ValueError:
            print(bcolors.FAIL + "The type of the elements in dn (row steps) is wrong - please have each element be a uint32 (perhaps you passed a long?). You can specify this when making a numpy array like: np.array([1,2,3],dtype=np.uint32)" + bcolors.ENDC)
            return [-1, -1, -1]
    else:
        dn = np.array([1, 1, 0], dtype=np.uint32)
    # dm loading and exception handling
    if 'dm'  in parameter.keys():
        try:
            dm = np.array(parameter['dm'], dtype=np.uint32)
        except TypeError:
            print(bcolors.FAIL + "FAILURE: The type of dm (col steps) is wrong - please pass in a 1-d numpy array that holds uint32s" + bcolors.ENDC)
            return [-1, -1, -1]
        except ValueError:
            print(bcolors.FAIL + "FAILURE: The type of the elements in dm (col steps) is wrong - please have each element be a uint32 (perhaps you passed a long?). You can specify this when making a numpy array like: np.array([1,2,3],dtype=np.uint32)" + bcolors.ENDC)
            return [-1, -1, -1]
    else:
        print(bcolors.FAIL + "dm (col steps) was not passed in (gave default value [1,0,1]) " + bcolors.ENDC)
        dm = np.array([1, 0, 1], dtype=np.uint32)
    # dw loading and exception handling
    if 'dw'  in parameter.keys():
        try:
            dw = np.array(parameter['dw'], dtype=DTYPE_FLOAT)
        except TypeError:
            print(bcolors.FAIL + "FAILURE: The type of dw (step weights) is wrong - please pass in a 1-d numpy array that holds floats" + bcolors.ENDC)
            return [-1, -1, -1]
        except ValueError:
            print(bcolors.FAIL + "FAILURE:The type of the elements in dw (step weights) is wrong - please have each element be a float (perhaps you passed ints or a long?). You can specify this when making a numpy array like: np.array([1,2,3],dtype=np.float64)" + bcolors.ENDC)
            return [-1, -1, -1]
    else:
        dw = np.array([1, 1, 1], dtype=DTYPE_FLOAT)
        print(bcolors.FAIL + "dw (step weights) was not passed in (gave default value [1,1,1]) " + bcolors.ENDC)

    
    '''
    Section where types are given to the variables we're going to use 
    '''
    # create matrices to store our results (D and E)
    cdef DTYPE_INT32_t numRows = C.shape[0] # only works with np arrays, use np.shape(x) will work on lists? want to force to use np though?
    cdef DTYPE_INT32_t numCols = C.shape[1]
    cdef DTYPE_INT32_t numDifSteps = np.size(dw)

    cdef unsigned int maxRowStep = max(dn)
    cdef unsigned int maxColStep = max(dm)

    cdef np.ndarray[np.uint32_t, ndim=2] steps = np.zeros((numRows,numCols), dtype=np.uint32)
    cdef np.ndarray[DTYPE_FLOAT_t, ndim=2] accumCost = np.ones((maxRowStep + numRows, maxColStep + numCols), dtype=DTYPE_FLOAT) * MAX_FLOAT

    cdef DTYPE_FLOAT_t bestCost
    cdef DTYPE_INT32_t bestCostIndex
    cdef DTYPE_FLOAT_t costForStep
    cdef unsigned int row, col
    cdef unsigned int stepIndex

    '''
    The start of the actual algorithm, now that all our variables are set up
    '''
    # initializing the cost matrix - depends on whether its subsequence DTW
    # essentially allow us to hop on the bottom anywhere (so could start partway through one of the signals)
    if parameter['SubSequence']:
        for col in range(numCols):
            accumCost[maxRowStep, col + maxColStep] = C[0, col]
    else:
        accumCost[maxRowStep, maxColStep] = C[0,0]

    # ADDED
    remove_x = False
    remove_y = False

    # filling the accumulated cost matrix
    for row in range(maxRowStep, numRows + maxRowStep, 1):
        if WarpMax and row % WarpMax == 0 and row > 1: # ADDED
            remove_x = True

        for col in range(maxColStep, numCols + maxColStep, 1):
            if WarpMax and col % WarpMax == 0: # ADDED
                remove_y = True
            
            bestCost = accumCost[<unsigned int>row, <unsigned int>col] # initialize with what's there - so if is an entry point, then can start low
            bestCostIndex = 0
            # go through each step, find the best one
            for stepIndex in range(numDifSteps):
                if remove_x and dn[(stepIndex)]==1 and dm[(stepIndex)]==0: # ADDED
                    continue
                if remove_y and dm[(stepIndex)]==1 and dn[(stepIndex)]==0:
                    continue
                costForStep = accumCost[<unsigned int>((row - dn[(stepIndex)])), <unsigned int>((col - dm[(stepIndex)]))] + dw[stepIndex] * C[<unsigned int>(row - maxRowStep), <unsigned int>(col - maxColStep)]
                if costForStep < bestCost:
                    bestCost = costForStep
                    bestCostIndex = stepIndex
            
            # save the best cost and best cost index
            accumCost[row, col] = bestCost
            steps[<unsigned int>(row - maxRowStep), <unsigned int>(col - maxColStep)] = bestCostIndex
            remove_y = False
        remove_x = False

    # return the accumulated cost along with the matrix of steps taken to achieve that cost
    return [accumCost[maxRowStep:, maxColStep:], steps]

@cython.boundscheck(False) # turn off bounds-checking for entire function

def DTW_GetPath(np.ndarray[DTYPE_FLOAT_t, ndim=2] accumCost, np.ndarray[np.uint32_t, ndim=2] stepsForCost, parameter):
    '''

    Parameter should have: 'dn', 'dm', 'dw', 'SubSequence'
    '''

    cdef np.ndarray[unsigned int, ndim=1] dn
    cdef np.ndarray[unsigned int, ndim=1] dm
    cdef np.uint8_t subseq
    cdef np.int32_t startCol # added
    # make sure dn, dm, and dw are setup
    if ('dn'  in parameter.keys()):
        dn = parameter['dn']
    else:
        dn = np.array([1, 1, 0], dtype=DTYPE_INT32)
    if 'dm'  in parameter.keys():
        dm = parameter['dm']
    else:
        dm = np.array([1, 0, 1], dtype=DTYPE_INT32)
    if 'SubSequence' in parameter.keys():
        subseq = parameter['SubSequence']
    else:
        subseq = 0
    
    # added START
    if 'startCol' in parameter.keys(): 
        startCol = parameter['startCol']
    else:
        startCol = -1
    # added END

    cdef np.uint32_t numRows
    cdef np.uint32_t numCols
    cdef np.uint32_t curRow
    cdef np.uint32_t curCol
    cdef np.uint32_t endCol
    cdef DTYPE_FLOAT_t endCost

    numRows = accumCost.shape[0]
    numCols = accumCost.shape[1]

    # either start at the far corner (non sub-sequence)
    # or start at the lowest cost entry in the last row (sub-sequence)
    # where all of the signal along the row has been used, but only a 
    # sub-sequence of the signal along the columns has to be used
    curRow = numRows - 1
    if subseq:
        curCol = np.argmin(accumCost[numRows - 1, :])
    else:
        curCol = numCols - 1
        
    # added - if specified, overrides above
    if startCol >= 0:
        curCol = startCol

    endCol = curCol
    endCost = accumCost[curRow, curCol]

    cdef np.uint32_t curRowStep
    cdef np.uint32_t curColStep
    cdef np.uint32_t curStepIndex


    cdef np.ndarray[np.uint32_t, ndim=2] path = np.zeros((2, numRows + numCols), dtype=np.uint32) # make as large as could need, then chop at the end
    path[0, 0] = curRow
    path[1, 0] = curCol

    cdef np.uint32_t stepsInPath = 1 # starts at one, we add in one before looping
    cdef np.uint32_t stepIndex = 0
    cdef np.int8_t done = (subseq and curRow == 0) or (curRow == 0 and curCol == 0)
    while not done:
        if accumCost[curRow, curCol] == MAX_FLOAT:
            print('A path is not possible')
            break

        # you're done if you've made it to the bottom left (non sub-sequence)
        # or just the bottom (sub-sequence)
        # find the step size
        curStepIndex = stepsForCost[curRow, curCol]
        curRowStep = dn[curStepIndex]
        curColStep = dm[curStepIndex]
        # backtrack by 1 step
        curRow = curRow - curRowStep
        curCol = curCol - curColStep
        # add your new location onto the path
        path[0, stepsInPath] = curRow
        path[1, stepsInPath] = curCol
        stepsInPath = stepsInPath + 1
        # check to see if you're done
        done = (subseq and curRow == 0) or (curRow == 0 and curCol == 0)

    # reverse the path (a matrix with two rows) and return it
    return [np.fliplr(path[:, 0:stepsInPath]), endCol, endCost]

class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

UsageError: Cell magic `%%cython` not found.


#### Downsampling and upsampling DTWs

In [11]:
def alignDTW_sub(F1, F2, downsample, steps, weights, outfile, profile):
    '''identical to alignDTW, but does not load in the files and instead takes in matrices'''
    times = []
    times.append(time.time())
    C = 1 - F1[:,0::downsample].T @ F2[:,0::downsample] # cos distance metric
    times.append(time.time())

    dn = steps[:,0].astype(np.uint32)
    dm = steps[:,1].astype(np.uint32)
    parameters = {'dn': dn, 'dm': dm, 'dw': weights, 'SubSequence': False}
    [D, s] = DTW_Cost_To_AccumCostAndSteps(C, parameters)
    times.append(time.time())
    [wp, endCol, endCost] = DTW_GetPath(D, s, parameters)
    times.append(time.time())
    if outfile:
        pickle.dump(wp, open(outfile, 'wb'))
    
    if profile:
        return wp, np.diff(times)
    else:
        return wp

In [17]:
def DTW1_downsampleQuantized(featfile1, featfile2, steps, weights, downsample, outfile = None, profile = False):
    '''Downsample longer sequence. For each position in the downsampled sequence, 
    assign the nearest feature vector (to handle fractional positions).'''
    
    F1 = np.load(featfile1) # 12 x N
    F2 = np.load(featfile2) # 12 x M

    shorter, longer = None, None
    
    # boolean to keep track of which is F1, F2
    F1_long = None
    # determine which is longer
    if F1.shape[1] >= F2.shape[1]:
        longer, shorter = F1, F2
        F1_long = True
    else:
        longer, shorter = F2, F1
        F1_long = False

    downsampled = np.zeros(shorter.shape)
    # determine factor and corresponding indices
    factor_down = longer.shape[1]/shorter.shape[1]
    idx_down = [round(factor_down*i) for i in range(shorter.shape[1])]

    for i in range(shorter.shape[1]): # for each column in new empty array, get corresponding idx to use to select value in longer
        idx = idx_down[i]
        downsampled[:,i] = longer[:,idx] # copy values over
    
    # get original variables
    if F1_long:
        F1 = downsampled
    else:
        F2 = downsampled

    # just same code as alignDTW
    times = []
    times.append(time.time())
    C = 1 - F1[:,0::downsample].T @ F2[:,0::downsample] # cos distance metric
    times.append(time.time())

    dn = steps[:,0].astype(np.uint32)
    dm = steps[:,1].astype(np.uint32)
    parameters = {'dn': dn, 'dm': dm, 'dw': weights, 'SubSequence': False}
    [D, s] = DTW_Cost_To_AccumCostAndSteps(C, parameters)
    times.append(time.time())
    [wp, endCol, endCost] = DTW_GetPath(D, s, parameters)

    # convert back to same dimensions
    if F1_long:
        wp[0] = wp[0]*factor_down
    else:
        wp[1] = wp[1]*factor_down
    #
    times.append(time.time())

    # write to pkl file
    if outfile:
        pickle.dump(wp, open(outfile, 'wb'))
    
    if profile:
        return wp, np.diff(times)
    else:
        return wp

In [10]:
def DTW1_downsampleInterpolate(featfile1, featfile2, steps, weights, downsample, outfile = None, profile = False):
    '''Same as DTW_downsample_quantized with one difference: rather than assigning the nearest feature vector, 
    handle fractional positions by doing a linear interpolation between the two bordering feature vectors'''
    
    F1 = np.load(featfile1) # 12 x N
    F2 = np.load(featfile2) # 12 x M

    shorter, longer = None, None
    
    # boolean to keep track of which is F1, F2
    F1_long = None
    # determine which is longer
    if F1.shape[1] >= F2.shape[1]:
        longer, shorter = F1, F2
        F1_long = True
    else:
        longer, shorter = F2, F1
        F1_long = False

    downsampled = np.zeros(shorter.shape)
    # determine factor and corresponding indices
    factor_down = longer.shape[1]/shorter.shape[1]
    idx_down = [factor_down*i for i in range(shorter.shape[1])]

    for i in range(shorter.shape[1]): # for each column in new empty array, get corresponding idx to use to select value in longer
        idx = idx_down[i]
        fraction1 = idx - math.floor(idx) # to use with higher number. e.g. if 1.73, then fraction1 = 0.73
        fraction2 = 1 - fraction1
        downsampled[:,i] = longer[:,math.floor(idx)]*fraction2 + longer[:,math.ceil(idx)]*fraction1 # copy values over with linear interpolation

    # get original variables
    if F1_long:
        F1 = downsampled
    else:
        F2 = downsampled

    # just same code as alignDTW
    times = []
    times.append(time.time())
    C = 1 - F1[:,0::downsample].T @ F2[:,0::downsample] # cos distance metric
    times.append(time.time())

    dn = steps[:,0].astype(np.uint32)
    dm = steps[:,1].astype(np.uint32)
    parameters = {'dn': dn, 'dm': dm, 'dw': weights, 'SubSequence': False}
    [D, s] = DTW_Cost_To_AccumCostAndSteps(C, parameters)
    times.append(time.time())
    [wp, endCol, endCost] = DTW_GetPath(D, s, parameters)

    # convert back to same dimensions
    if F1_long:
        wp[0] = wp[0]*factor_down
    else:
        wp[1] = wp[1]*factor_down
    #
    times.append(time.time())

    # write to pkl file
    if outfile:
        pickle.dump(wp, open(outfile, 'wb'))
    
    if profile:
        return wp, np.diff(times)
    else:
        return wp

In [11]:
def DTW1_upsampleQuantized(featfile1, featfile2, steps, weights, downsample, outfile = None, profile = False):
    '''For each position in the upsampled sequence, assign the nearest feature vector (to handle fractional positions)'''

    F1 = np.load(featfile1) # 12 x N
    F2 = np.load(featfile2) # 12 x M
    
    shorter, longer = None, None
    
    # boolean to keep track of which is F1, F2
    F1_long = None
    # determine which is longer
    if F1.shape[1] >= F2.shape[1]:
        longer, shorter = F1, F2
        F1_long = True
    else:
        longer, shorter = F2, F1
        F1_long = False

    upsampled = np.zeros(longer.shape)

    # determine factor and corresponding indices
    factor_up = shorter.shape[1]/longer.shape[1]
    max_idx = shorter.shape[1] - 1
    idx_up = [] # want to get indices
    for i in range(longer.shape[1]): # for i in range longer
        idx = round(factor_up*i)
        if idx > max_idx: # need to check that index not out of range
            idx -= 1
        idx_up.append(idx)

    for i in range(longer.shape[1]): # for each column in new empty array, get corresponding idx to use to select value in shorter
        idx = idx_up[i]
        upsampled[:,i] = shorter[:,idx] # copy values over
    
    # get original variables
    if F1_long:
        F2 = upsampled
    else:
        F1 = upsampled

    # just same code as alignDTW
    times = []
    times.append(time.time())
    C = 1 - F1[:,0::downsample].T @ F2[:,0::downsample] # cos distance metric
    times.append(time.time())

    dn = steps[:,0].astype(np.uint32)
    dm = steps[:,1].astype(np.uint32)
    parameters = {'dn': dn, 'dm': dm, 'dw': weights, 'SubSequence': False}
    [D, s] = DTW_Cost_To_AccumCostAndSteps(C, parameters)
    times.append(time.time())
    [wp, endCol, endCost] = DTW_GetPath(D, s, parameters)

    # convert back to same dimensions
    if F1_long:
        wp[1] = wp[1]*factor_up
    else:
        wp[0] = wp[0]*factor_up
    #
    times.append(time.time())

    # write to pkl file
    if outfile:
        pickle.dump(wp, open(outfile, 'wb'))
    
    if profile:
        return wp, np.diff(times)
    else:
        return wp

In [12]:
def DTW1_upsampleInterpolate(featfile1, featfile2, steps, weights, downsample, outfile = None, profile = False):
    '''Same as DTW_upsample_quantized with one difference: rather than assigning the nearest feature vector, 
    handle fractional positions by doing a linear interpolation between the two bordering feature vectors'''
    
    F1 = np.load(featfile1) # 12 x N
    F2 = np.load(featfile2) # 12 x M
    
    shorter, longer = None, None
    
    # boolean to keep track of which is F1, F2
    F1_long = None
    # determine which is longer
    if F1.shape[1] >= F2.shape[1]:
        longer, shorter = F1, F2
        F1_long = True
    else:
        longer, shorter = F2, F1
        F1_long = False

    upsampled = np.zeros(longer.shape)

    # determine factor and corresponding indices
    factor_up = shorter.shape[1]/longer.shape[1]
    max_idx = shorter.shape[1] - 1
    idx_up = [factor_up*i for i in range(longer.shape[1])]

    for i in range(longer.shape[1]): # for each column in new empty array, get corresponding idx to use to select value in longer
        idx = idx_up[i]
        if math.ceil(idx) > max_idx: # check for index out of range
            upsampled[:,i] = shorter[:,math.floor(idx)]
        else:
            fraction1 = idx - math.floor(idx) # to use with higher number. e.g. if 1.73, then fraction1 = 0.73
            fraction2 = 1 - fraction1
            upsampled[:,i] = shorter[:,math.floor(idx)]*fraction2 + shorter[:,math.ceil(idx)]*fraction1 # copy values over with linear interpolation
    
    # get original variables
    if F1_long:
        F2 = upsampled
    else:
        F1 = upsampled

    # just same code as alignDTW
    times = []
    times.append(time.time())
    C = 1 - F1[:,0::downsample].T @ F2[:,0::downsample] # cos distance metric
    times.append(time.time())

    dn = steps[:,0].astype(np.uint32)
    dm = steps[:,1].astype(np.uint32)
    parameters = {'dn': dn, 'dm': dm, 'dw': weights, 'SubSequence': False}
    [D, s] = DTW_Cost_To_AccumCostAndSteps(C, parameters)
    times.append(time.time())
    [wp, endCol, endCost] = DTW_GetPath(D, s, parameters)

    # convert back to same dimensions
    if F1_long:
        wp[1] = wp[1]*factor_up
    else:
        wp[0] = wp[0]*factor_up
    #
    times.append(time.time())

    # write to pkl file
    if outfile:
        pickle.dump(wp, open(outfile, 'wb'))
    
    if profile:
        return wp, np.diff(times)
    else:
        return wp

#### Adaptive weights

In [13]:
def DTW_adaptiveWeight1(featfile1, featfile2, steps, weights=None, downsample=None, outfile = None, profile = False):
    '''DTW but with a weighting scheme ensures that both axes contribute the same weighted Manhattan distance cost'''
    # calculate weights:
    F1 = np.load(featfile1) # 12 x N
    F2 = np.load(featfile2) # 12 x M

    w1, w2 = None, None # keep track of which is the longer sequence

    # calculate weight ratios
    if F1.shape[1] >= F2.shape[1]:
        Lmax, Lmin = F1.shape[1], F2.shape[1]
        w1, w2 = Lmax/Lmin, 1 # calulate ratios
    else:
        Lmax, Lmin = F2.shape[1], F1.shape[1]
        w1, w2 = 1, Lmax/Lmin # calulate ratios

    # calculate weights for each step
    weights = []
    # for step in steps, calculate weight
    for step in steps: # step is e.g. [1,1]
        wy = step[0]*w2 # step[0] is in Lmin direction
        wx = step[1]*w1
        weights.append(wy + wx)
    weights = np.array(weights)

    # just same code as alignDTW
    return alignDTW_sub(F1, F2, downsample, steps, weights, outfile, profile)


In [14]:
def DTW_adaptiveWeight2(featfile1, featfile2, steps, weights=None, downsample=None, outfile = None, profile = False):
    '''DTW but with a different weighting scheme that accounts for the length of the sequences'''
    # calculate weights w1, w2, w3
    F1 = np.load(featfile1) # 12 x N
    F2 = np.load(featfile2) # 12 x M

    w1, w2, w3 = None, None, None # keep track of which is the longer sequence

    # calculate weight ratios
    if F1.shape[1] >= F2.shape[1]:
        Lmax, Lmin = F1.shape[1], F2.shape[1]
        w1, w2 = Lmax/Lmin, 1 # calulate ratios
    else:
        Lmax, Lmin = F2.shape[1], F1.shape[1]
        w1, w2 = 1, Lmax/Lmin # calulate ratios
    w3 = 1 + Lmax/Lmin
    weights = np.array([w1, w2, w3])

    # just same code as alignDTW
    return alignDTW_sub(F1, F2, downsample, steps, weights, outfile, profile)
    

In [None]:
# featfile1 = 'features/clean/Chopin_Op017No4/Chopin_Op017No4_Afanassiev-2001_pid9130-01.npy'
# featfile2 = 'features/clean/Chopin_Op017No4/Chopin_Op017No4_Ashkenazy-1981_pid9058-13.npy'
# steps = np.array([1,1,1,2,2,1]).reshape((-1,2))
# weights = np.array([2,3,3])

# F1, F2, steps, weights = DTW_adaptiveWeight1(featfile1, featfile2, steps, weights, downsample, outfile = None, profile = False)

#### Selective transitions

In [15]:
def DTW_selectiveTransitions(featfile1, featfile2, steps, weights, downsample, outfile = None, profile = False):
    '''This is same as DTW except that the set of allowable transitions at each position (i,j) in the pairwise cost matrix may be different
Let the max desired time warp factor be WarpMax (an integer)
If i%WarpMax == 0, then remove (1,0) transition from that cell
If j%WarpMax == 0, then remove (0,1) transition from that cell
Otherwise, keep the transitions (0,1), (1,0), (1,1) with weights 1, 1, 2
See example figure below for WarpMax = 3
'''