# Import and Set Up

In [1]:
%matplotlib inline
%load_ext Cython


import numpy as np
import math
import os
import matplotlib
import matplotlib.pyplot as plt
import librosa as lb
import librosa.display as lbd
import IPython.display as ipd
import scipy.signal as ss
import scipy.spatial as sspat
import scipy.fft as sfft
import warnings
import pickle as pkl
import time
import itertools

In [None]:
%cd ../
import CythonDTW
import TSM_Import

In [3]:
sr = 22050

## Load Data

In [4]:
fileId_path = "experiments/fileIds.pkl"
queryInfo_path = "experiments/queryInfo.pkl"
timeData_path = "experiments/timeData.pkl"
audioFiles_path = "experiments/audioFiles.pkl"
with open(fileId_path, 'rb') as f:
    fileIds=pkl.load(f)
with open(queryInfo_path, 'rb') as f:
    queryInfo=pkl.load(f)
with open(timeData_path, 'rb') as f:
    timeData=pkl.load(f)
with open(audioFiles_path, 'rb') as f:
    audioFiles=pkl.load(f)

# Helper Functions

## Data Management

In [19]:
def getAssignment():
    '''
    Return: 
            a list of all combinations of soloist segments 
            and orchestra reference tracks information 
            in the form of piece_id,segment_id,orch_id,solo_id
            where piece_id and segment_id are numbers 
            and orch_id and solo_id are the names of the audio files     
    '''
    assignment = []
    count = 0
    time_signature = [2,4,4,4]
    orch_piece, solo_piece  = get_piece()
    for piece_id in range(len(orch_piece)):
        
        segment_info = queryInfo['p'+str(piece_id+1)+'s']
        segment = []
        for raw_seg in segment_info:
            start, end = raw_seg[1:raw_seg.index(",")], raw_seg[raw_seg.index(",")+1:-1]
            segment.append([start,end])
        
        for segment_id in range(len(segment)):
            start, end = segment[segment_id]
            
            query_list = []
            query_index = start
            #print(end)
            while query_index != end:
                #print(query_index)
                query_list.append(query_index)
                measure = int(query_index[:query_index.index('.')])
                beat = int(query_index[query_index.index('.')+1:])
                if beat == time_signature[piece_id]:
                    beat = 1
                    measure+=1
                    query_index = str(measure)+"."+str(beat)
                else:
                    beat+=1
                    query_index = str(measure)+"."+str(beat)
                query_list.append(query_index)
                
            for orch_id,solo_id in itertools.product(orch_piece[piece_id],solo_piece[piece_id]):
                
                
                assignment.append([piece_id,segment_id,orch_id,solo_id])
                
    return assignment

In [None]:
def get_piece():
    '''
    Return:
            a list of orchestra audio file name  
            a list of soloist audio file name
    '''
    solo_piece = [[] for i in range(4)]
    orch_piece = [[] for i in range(4)]
    for j in timeData:
        if j[2]=="s":
            solo_piece[int(j[1])-1].append(j)
        else:
            orch_piece[int(j[1])-1].append(j)
    return orch_piece, solo_piece

In [None]:
def writeToFile( solo_id, orch_id, segment_id, error_ls, hypdir):
    '''
    Argument: solo_id, orch_id, segment_id,: 
              identifying information for a solo segment 
              and an orchestra audio
              error_ls: the difference between prediction and ground truth 
              to be stored
              hypdir: name of folder for the file to be stored in
    Function: use pickle to dump the error list 
    '''
    if hypdir != None:
        experiment = "experiments/{}".format(hypdir)
        if not os.path.exists(experiment):
            os.mkdir(experiment)
        fname = "experiments/{}/{}-{}-{}.hyp".format(hypdir, solo_id, orch_id,segment_id)
        with open (fname, 'wb') as f:
            pkl.dump((error_ls),f)

## Chroma Feature, Cost Matrix, and Inverse Time Function

In [None]:
def getChromaFeatures(audio):
    '''
    Argument: an audio file (array)
    Return:   the L2 normalized chroma feature
    '''
    chroma = lb.feature.chroma_stft(audio, norm=2)
    return chroma

In [None]:
def getCostMatrix(query, ref):
    '''
    Argument: query chroma feature and reference chroma feature
    Return: cosine distance cost matrix
    '''
    return sspat.distance.cdist(query.T, ref.T, metric='cosine')

In [11]:
def getITF(wp,itf_length):
    '''
    Argument: wp from a DTW and the desired length 
                of the inverse time function
    Return: inverse time function.          
    '''
    array = np.zeros(itf_length+1)
    wp = wp * 512
    
    slope = 1
    
    for pair_idx in range(1,len(wp)):
        x0, x1 = wp[pair_idx-1,0],wp[pair_idx,0]
        y0, y1 = wp[pair_idx-1,1],wp[pair_idx,1]
        slope = (y1-y0)/(x1-x0)
        array[x0:x1] = y0 + slope * np.arange(x1-x0)
        array[x1] = y1
    
    wp_end_x, wp_end_y = wp[-1][0], wp[-1][1]
    
    if len(array) > wp_end_x+1:
        array[wp_end_x+1:] = wp_end_y + np.arange(len(array) - wp_end_x -1)*slope + 1
    
    return array

## Backend of Temporal DTW's Customized DTW

In [20]:
def realTimeDTW(C, steps, weights, est_curr_frame):
    '''
    Argument: 
                C: cost matrix
                steps: transition steps
                weights: correspoding weights for the transition steps
                est_curr_frame: estiamted frame index of the current time
    Return: 
                wp: alignment of the soloist clip and reference audio
    '''
    # set params
    assert len(steps) % 2 == 0, "The length of steps must be even."
    dn = np.array(steps[::2], dtype=np.uint32)
    dm = np.array(steps[1::2], dtype=np.uint32)
    dw = weights
    subsequence = True
    parameter = {'dn': dn, 'dm': dm, 'dw': dw, 'SubSequence': subsequence}

    # DTW
    [D, s] = DTW_Cost_To_AccumCostAndSteps(C, parameter, est_curr_frame)
    [wp, endCol, endCost] = DTW_GetPath(D, s, parameter)

    # Reformat the output
    wp = wp.T[::-1]
    return wp


In [16]:
%%cython
import numpy as np
cimport numpy as np
cimport cython

import sys
import time


DTYPE_INT32 = np.int32
ctypedef np.int32_t DTYPE_INT32_t

DTYPE_FLOAT = np.float64
ctypedef np.float64_t DTYPE_FLOAT_t

cdef DTYPE_FLOAT_t MAX_FLOAT = float('inf')

# careful, without bounds checking can mess up memory - also can't use negative indices I think (like x[-1])
@cython.boundscheck(False) # turn off bounds-checking for entire function
def DTW_Cost_To_AccumCostAndSteps(Cin, parameter, int est_curr_frame):
    '''
    Inputs
        C: The cost Matrix
    '''
    '''
    Section for checking and catching errors in the inputs
    '''

    cdef np.ndarray[DTYPE_FLOAT_t, ndim=2] C
    try:
        C = np.array(Cin, dtype=DTYPE_FLOAT)
    except TypeError:
        print(bcolors.FAIL + "FAILURE: The type of the cost matrix is wrong - please pass in a 2-d numpy array" + bcolors.ENDC)
        return [-1, -1, -1]
    except ValueError:
        print(bcolors.FAIL + "FAILURE: The type of the elements in the cost matrix is wrong - please have each element be a float (perhaps you passed in a matrix of ints?)" + bcolors.ENDC)
        return [-1, -1, -1]

    cdef np.ndarray[np.uint32_t, ndim=1] dn
    cdef np.ndarray[np.uint32_t, ndim=1] dm
    cdef np.ndarray[DTYPE_FLOAT_t, ndim=1] dw
    # make sure dn, dm, and dw are setup
    # dn loading and exception handling
    if ('dn'  in parameter.keys()):
        try:

            dn = np.array(parameter['dn'], dtype=np.uint32)
        except TypeError:
            print(bcolors.FAIL + "FAILURE: The type of dn (row steps) is wrong - please pass in a 1-d numpy array that holds uint32s" + bcolors.ENDC)
            return [-1, -1, -1]
        except ValueError:
            print(bcolors.FAIL + "The type of the elements in dn (row steps) is wrong - please have each element be a uint32 (perhaps you passed a long?). You can specify this when making a numpy array like: np.array([1,2,3],dtype=np.uint32)" + bcolors.ENDC)
            return [-1, -1, -1]
    else:
        dn = np.array([1, 1, 0], dtype=np.uint32)
    # dm loading and exception handling
    if 'dm'  in parameter.keys():
        try:
            dm = np.array(parameter['dm'], dtype=np.uint32)
        except TypeError:
            print(bcolors.FAIL + "FAILURE: The type of dm (col steps) is wrong - please pass in a 1-d numpy array that holds uint32s" + bcolors.ENDC)
            return [-1, -1, -1]
        except ValueError:
            print(bcolors.FAIL + "FAILURE: The type of the elements in dm (col steps) is wrong - please have each element be a uint32 (perhaps you passed a long?). You can specify this when making a numpy array like: np.array([1,2,3],dtype=np.uint32)" + bcolors.ENDC)
            return [-1, -1, -1]
    else:
        print(bcolors.FAIL + "dm (col steps) was not passed in (gave default value [1,0,1]) " + bcolors.ENDC)
        dm = np.array([1, 0, 1], dtype=np.uint32)
    # dw loading and exception handling
    if 'dw'  in parameter.keys():
        try:
            dw = np.array(parameter['dw'], dtype=DTYPE_FLOAT)
        except TypeError:
            print(bcolors.FAIL + "FAILURE: The type of dw (step weights) is wrong - please pass in a 1-d numpy array that holds floats" + bcolors.ENDC)
            return [-1, -1, -1]
        except ValueError:
            print(bcolors.FAIL + "FAILURE:The type of the elements in dw (step weights) is wrong - please have each element be a float (perhaps you passed ints or a long?). You can specify this when making a numpy array like: np.array([1,2,3],dtype=np.float64)" + bcolors.ENDC)
            return [-1, -1, -1]
    else:
        dw = np.array([1, 1, 1], dtype=DTYPE_FLOAT)
        print(bcolors.FAIL + "dw (step weights) was not passed in (gave default value [1,1,1]) " + bcolors.ENDC)

    
    '''
    Section where types are given to the variables we're going to use 
    '''
    # create matrices to store our results (D and E)
    cdef DTYPE_INT32_t numRows = C.shape[0] # only works with np arrays, use np.shape(x) will work on lists? want to force to use np though?
    cdef DTYPE_INT32_t est_curr_frame1 = est_curr_frame
    cdef DTYPE_INT32_t numCols = C.shape[1]
    cdef DTYPE_INT32_t numDifSteps = np.size(dw)

    cdef unsigned int maxRowStep = max(dn)
    cdef unsigned int maxColStep = max(dm)

    cdef np.ndarray[np.uint32_t, ndim=2] steps = np.zeros((numRows,numCols), dtype=np.uint32)
    cdef np.ndarray[DTYPE_FLOAT_t, ndim=2] accumCost = np.ones((maxRowStep + numRows, maxColStep + numCols), dtype=DTYPE_FLOAT) * MAX_FLOAT

    cdef DTYPE_FLOAT_t bestCost
    cdef DTYPE_INT32_t bestCostIndex
    cdef DTYPE_FLOAT_t costForStep
    cdef unsigned int row, col
    cdef unsigned int stepIndex

    '''
    The start of the actual algorithm, now that all our variables are set up
    '''
    # initializing the cost matrix - depends on whether its subsequence DTW
    # essentially allow us to hop on the bottom anywhere (so could start partway through one of the signals)
    if parameter['SubSequence']:
        est_curr_frame1 = max(min(est_curr_frame1, numCols),0)
        
        
        
        ###################################################################################
        lower_bound = max(est_curr_frame1-15,0)
        upper_bound = min(est_curr_frame1+25,numCols)
        ###################################################################################
        
        
        
        
        
        ### START CODE BLOCK ###
        inf = float("inf")
        accumCost[maxRowStep,maxColStep:maxColStep+lower_bound]=.2*np.arange(lower_bound,0,-1)
        accumCost[maxRowStep,maxColStep+upper_bound:]=.2*np.arange(numCols-upper_bound)
        accumCost[maxRowStep,maxColStep+lower_bound:maxColStep+upper_bound] = 0
    else:
        accumCost[maxRowStep, maxColStep] = C[0,0]
        

    # filling the accumulated cost matrix
    for row in range(maxRowStep, numRows + maxRowStep, 1):
        for col in range(maxColStep, numCols + maxColStep, 1):
            bestCost = accumCost[<unsigned int>row, <unsigned int>col] # initialize with what's there - so if is an entry point, then can start low
            bestCostIndex = 0
            # go through each step, find the best one
            for stepIndex in range(numDifSteps):
                #costForStep = accumCost[<unsigned int>(row - dn[<unsigned int>(stepIndex)]), <unsigned int>(col - dm[<unsigned int>(stepIndex)])] + dw[<unsigned int>(stepIndex)] * C[<unsigned int>(row - maxRowStep), <unsigned int>(col - maxColStep)]
                costForStep = accumCost[<unsigned int>((row - dn[(stepIndex)])), <unsigned int>((col - dm[(stepIndex)]))] + dw[stepIndex] * C[<unsigned int>(row - maxRowStep), <unsigned int>(col - maxColStep)]
                if costForStep < bestCost:
                    bestCost = costForStep
                    bestCostIndex = stepIndex
            # save the best cost and best cost index
            accumCost[row, col] = bestCost
            steps[<unsigned int>(row - maxRowStep), <unsigned int>(col - maxColStep)] = bestCostIndex

    # return the accumulated cost along with the matrix of steps taken to achieve that cost
    return [accumCost[maxRowStep:, maxColStep:], steps]

@cython.boundscheck(False) # turn off bounds-checking for entire function
def DTW_GetPath(np.ndarray[DTYPE_FLOAT_t, ndim=2] accumCost, np.ndarray[np.uint32_t, ndim=2] stepsForCost, parameter):
    '''

    Parameter should have: 'dn', 'dm', 'dw', 'SubSequence'
    '''

    cdef np.ndarray[unsigned int, ndim=1] dn
    cdef np.ndarray[unsigned int, ndim=1] dm
    cdef np.uint8_t subseq
    # make sure dn, dm, and dw are setup
    if ('dn'  in parameter.keys()):
        dn = parameter['dn']
    else:
        dn = np.array([1, 1, 0], dtype=DTYPE_INT32)
    if 'dm'  in parameter.keys():
        dm = parameter['dm']
    else:
        dm = np.array([1, 0, 1], dtype=DTYPE_INT32)
    if 'SubSequence' in parameter.keys():
        subseq = parameter['SubSequence']
    else:
        subseq = 0

    cdef np.uint32_t numRows
    cdef np.uint32_t numCols
    cdef np.uint32_t curRow
    cdef np.uint32_t curCol
    cdef np.uint32_t endCol
    cdef DTYPE_FLOAT_t endCost

    numRows = accumCost.shape[0]
    numCols = accumCost.shape[1]

    # either start at the far corner (non sub-sequence)
    # or start at the lowest cost entry in the last row (sub-sequence)
    # where all of the signal along the row has been used, but only a 
    # sub-sequence of the signal along the columns has to be used
    curRow = numRows - 1
    if subseq:
        curCol = np.argmin(accumCost[numRows - 1, :])
    else:
        curCol = numCols - 1

    endCol = curCol
    endCost = accumCost[curRow, curCol]

    cdef np.uint32_t curRowStep
    cdef np.uint32_t curColStep
    cdef np.uint32_t curStepIndex


    cdef np.ndarray[np.uint32_t, ndim=2] path = np.zeros((2, numRows + numCols), dtype=np.uint32) # make as large as could need, then chop at the end
    path[0, 0] = curRow
    path[1, 0] = curCol

    cdef np.uint32_t stepsInPath = 1 # starts at one, we add in one before looping
    cdef np.uint32_t stepIndex = 0
    cdef np.int8_t done = (subseq and curRow == 0) or (curRow == 0 and curCol == 0)
    while not done:
        if accumCost[curRow, curCol] == MAX_FLOAT:
            print('A path is not possible')
            break

        # you're done if you've made it to the bottom left (non sub-sequence)
        # or just the bottom (sub-sequence)
        # find the step size
        curStepIndex = stepsForCost[curRow, curCol]
        curRowStep = dn[curStepIndex]
        curColStep = dm[curStepIndex]
        # backtrack by 1 step
        curRow = curRow - curRowStep
        curCol = curCol - curColStep
        # add your new location onto the path
        path[0, stepsInPath] = curRow
        path[1, stepsInPath] = curCol
        stepsInPath = stepsInPath + 1
        # check to see if you're done
        done = (subseq and curRow == 0) or (curRow == 0 and curCol == 0)

    # reverse the path (a matrix with two rows) and return it
    return [np.fliplr(path[:, 0:stepsInPath]), endCol, endCost]

class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'
# backend code for custom DTW. Use cython for speed

## Temporal DTW System

In [12]:
class temporalDTW():
    '''
    Temporal DTW system
    '''
    def __init__(self, ref_audio):
        '''
        Argument: orchestra (reference) audio
        '''
        self.ref_chroma = getChromaFeatures(ref_audio)
        self.est_curr_frame = 0

    def newSeg(self, est_ref_start_time, est_ref_end_time):
        '''
        Argument: estimation of the start and end time of the 
                  section of orchestra that corresponds to 
                  the soloist performance.
                  Update when new soloist segment is used
                  Can be early by 1 second.
                  Specifically, it can create an audio is a superset 
                  of the orchestra section relevant for accompaniment,
                  but not the subset
        '''
        self.est_curr_frame = max(est_ref_start_time * sr / 512,0) 
        self.start_chroma_idx = max(est_ref_start_time * sr /512,0)
        self.end_chroma_idx = max(est_ref_end_time * sr/512,0)
        self.short_ref_chroma = self.ref_chroma[:,int(self.start_chroma_idx):int(self.end_chroma_idx)]
        
    def align(self,query_audio):
        '''
        Argument: query audio: the short clip (0.5 secs) from soloist performance
                  0.5 seconds is not a strict requirement
        Return:   itf: inverse time function 
                  wp: wp from DTW
        '''
        # compute chroma feature
        query_chroma = getChromaFeatures(query_audio)
        self.cost_matrix = getCostMatrix(query_chroma, self.short_ref_chroma)
        
        # modified DTW
        steps = [2, 1, 1, 2, 1, 1]  
        weights = [2,1,1]        
        wp = realTimeDTW(self.cost_matrix, steps, weights,int(self.est_curr_frame-self.start_chroma_idx))
        wp = np.sort(wp,axis=0) 
        wp[:,1]+=int(self.start_chroma_idx)
        
        #generate inverse time function
        itf_length = len(query_audio)         
        itf = getITF(wp, itf_length)
        
        
        # update estimated current time index
        self.est_curr_frame = int(max(self.est_curr_frame, wp[-1,1]))
        
        return itf, wp 

# Sample

In [23]:
def sample(piece_id,segment_id,orch_id,solo_id):
    '''
    Argument: identifying info for a solo segment and a reference audio
    Return:   a list consisted of difference in seconds 
              between prediction and ground truth 
              and a inverse time function for the time stretched
              accompaniment of that soloist segment
              Warning: Accuracy is LOW (75% accuracy for 1sec tolerance)
    '''
    query_len = .5
    
    diff_ls = []
    
    time_signature = [2,4,4,4]
    
    segment_info = queryInfo['p'+str(piece_id+1)+'s']
    segment = []
    for raw_seg in segment_info:
        start, end = raw_seg[1:raw_seg.index(",")], raw_seg[raw_seg.index(",")+1:-1]
        segment.append([start,end])
    
    start, end = segment[segment_id]
    query_list = []
    query_index = start
    while query_index != end:            
        query_list.append(query_index)
        measure = int(query_index[:query_index.index('.')])
        beat = int(query_index[query_index.index('.')+1:])
        if beat == time_signature[piece_id]:
            beat = 1
            measure+=1
            query_index = str(measure)+"."+str(beat)
        else:
            beat+=1
            query_index = str(measure)+"."+str(beat)
            query_list.append(query_index)
    
    
    ref_audio,solo_audio = audioFiles[orch_id],audioFiles[solo_id][segment_id]
    
    first_query = query_list[0]
    
    orch_time_data,solo_time_data = timeData[orch_id],timeData[solo_id]
    
    orch_start_time, solo_start_time = orch_time_data[first_query], solo_time_data[first_query]
    orch_end_time = orch_time_data[query_list[-1]]
    est_ref_start_time, est_ref_end_time = max(orch_start_time - 1,0),orch_end_time+1
    
    tDTW = temporalDTW(ref_audio)
    tDTW.newSeg(est_ref_start_time, est_ref_end_time)
    
    itf_ls = []
    
    for query_sec in range(0, len(solo_audio), int(query_len*sr) ):
        

        
        input_audio = solo_audio[int(query_sec):int(query_sec+query_len*sr)]
        
        if len(input_audio) < 1:
            print("input audio too short")
        
        #print(len(input_audio)/sr, orch_start_time)
        
        itf, wp = tDTW.align(input_audio)
        
        #return itf,wp
        
        itf_ls.extend(itf)
        
    itf_ls = np.array(itf_ls)
    
    #print(itf_ls.shape)
    
    duplicate_set = set()
    

    
    for idx, query in enumerate(query_list):
        solo_input_sec = solo_time_data[query] 
        orch_output_sec = orch_time_data[query]
        
        if math.isnan(solo_input_sec) or math.isnan(orch_output_sec):
            continue
        
        solo_input_sec -=  solo_start_time
        solo_input_idx = int(solo_input_sec * sr)
        
        ground_truth = orch_output_sec * sr
        
        if solo_input_sec in duplicate_set:
            continue
        else:
            duplicate_set.add(solo_input_sec)
    
        if solo_input_idx >= len(itf_ls):
            print("index too large")
            continue
        
        prediction = itf_ls[solo_input_idx]
        

        
        diff =  ground_truth - prediction 
        
        if (abs(diff/sr) > 1.2 and debug) or (abs(diff/sr)>5): 
            print( piece_id,segment_id,orch_id,solo_id, idx )
            print("    ",diff/sr,"    ", ground_truth/sr,prediction/sr)
        
        diff_ls.append(diff)
        
        #print(prediction/sr, ground_truth/sr, diff/sr,solo_input_sec)
    
    return diff_ls, itf_ls