In [4]:
import numpy as np
import librosa as lb
import os
import os.path
from pathlib import Path
import multiprocessing
from multiprocessing import Pool
import glob
from tqdm import tqdm

ModuleNotFoundError: No module named 'tqdm'

In [3]:
ANNOTATIONS_ROOT = Path('train_data/beat')
AUDIO_ROOT = ('Full_Alignment_Recordings')
FEATURES_ROOT = Path('features')
Scenario_1_files = 'Full_Alignment_Recordings/Scenario_1'
Scenario_2_files = 'Full_Alignment_Recordings/Scenario_2'
Scenario_3_files = 'Full_Alignment_Recordings/Scenario_3'
Scenario_4_files = 'Full_Alignment_Recordings/Scenario_4'

In [4]:
scenario1files = glob.glob(Scenario_1_files + "/*")
scenario2files = glob.glob(Scenario_2_files + "/*")
scenario3files = glob.glob(Scenario_3_files + "/*")
scenario4files = glob.glob(Scenario_4_files + "/*")
inFiles = []
for file in scenario1files:
    inFiles.append(file)
for file in scenario2files:
    inFiles.append(file)
for file in scenario3files:
    inFiles.append(file)
for file in scenario4files:
    inFiles.append(file)

print(len(inFiles))

9


In [5]:
if not os.path.exists(FEATURES_ROOT):
    os.mkdir(FEATURES_ROOT)

### Compute features on clean audio

First we compute features on the audio.

In [6]:
def compute_chroma_single(infile, outfile, sr = 22050, hop_length=512):
    print('loading', infile)
    y, sr = lb.core.load(infile, sr = sr)
    print(infile, 'loaded')
    # F = lb.feature.chroma_cens(y, sr=sr, hop_length=hop_length)
    print('computing features for', infile)
    F = lb.feature.chroma_cqt(y=y, sr=sr, hop_length=hop_length, norm=2)
    print('saving to', outfile,'...')
    np.save(outfile, F)
    print('saved')
    return

In [7]:
def compute_chroma_batch(filelist, outdir, n_cores):

    # prep inputs for parallelization
    inputs = []
    with open(filelist, 'r') as f:
        for line in f:
            relpath = line.strip()
            reldir, fileid = os.path.split(relpath)
            featdir = outdir / reldir
            featdir.mkdir(parents=True, exist_ok=True)
            featfile = (featdir / fileid).with_suffix('.npy')
            audiofile = relpath
            if os.path.exists(featfile):
                print(f"Skipping {featfile}")
            else:
                inputs.append((audiofile, featfile))

    # process files in parallel
    pool = multiprocessing.Pool(processes = n_cores)
    print('computing features...')
    pool.starmap(compute_chroma_single, inputs)
    print('done')
    
    return

In [8]:
compute_chroma_batch(Path("cfg_files/B_scenario.txt"), Path("scenario_feat"), 4) #set number of cores to 1 for debugging

Skipping scenario_feat/Sub_Alignment_Recordings/Scenario_1/sub_align_matching_30sec_recording1.npy
Skipping scenario_feat/Sub_Alignment_Recordings/Scenario_1/sub_align_matching_recording2.npy
Skipping scenario_feat/Sub_Alignment_Recordings/Scenario_2/sub_align_non_matching_30sec_recording1.npy
Skipping scenario_feat/Sub_Alignment_Recordings/Scenario_2/sub_align_non_matching_recording2.npy
Skipping scenario_feat/Sub_Alignment_Recordings/Scenario_3/sub_align_matching_spliced_recording1.npy
Skipping scenario_feat/Sub_Alignment_Recordings/Scenario_3/sub_align_matching_recording2.npy
Skipping scenario_feat/Sub_Alignment_Recordings/Scenario_4/sub_align_non_matching_spliced_recording1.npy
Skipping scenario_feat/Sub_Alignment_Recordings/Scenario_4/sub_align_non_matching_recording2.npy
computing features...
done


### Alignment

In [9]:
%matplotlib inline
%load_ext Cython

In [10]:
import numpy as np
import matplotlib.pyplot as plt
import librosa as lb
import os.path
from pathlib import Path
import pickle
import multiprocessing
import time
import gc

In [11]:
%%cython
import numpy as np
cimport numpy as np
cimport cython

import sys
import time


DTYPE_INT32 = np.int32
ctypedef np.int32_t DTYPE_INT32_t

DTYPE_FLOAT = np.float64
ctypedef np.float64_t DTYPE_FLOAT_t

cdef DTYPE_FLOAT_t MAX_FLOAT = float('inf')

# careful, without bounds checking can mess up memory - also can't use negative indices I think (like x[-1])
@cython.boundscheck(False) # turn off bounds-checking for entire function
def DTW_Cost_To_AccumCostAndSteps(Cin, parameter):
    '''
    Inputs
        C: The cost Matrix
    '''


    '''
    Section for checking and catching errors in the inputs
    '''

    cdef np.ndarray[DTYPE_FLOAT_t, ndim=2] C
    try:
        C = np.array(Cin, dtype=DTYPE_FLOAT)
    except TypeError:
        print(bcolors.FAIL + "FAILURE: The type of the cost matrix is wrong - please pass in a 2-d numpy array" + bcolors.ENDC)
        return [-1, -1, -1]
    except ValueError:
        print(bcolors.FAIL + "FAILURE: The type of the elements in the cost matrix is wrong - please have each element be a float (perhaps you passed in a matrix of ints?)" + bcolors.ENDC)
        return [-1, -1, -1]

    cdef np.ndarray[np.uint32_t, ndim=1] dn
    cdef np.ndarray[np.uint32_t, ndim=1] dm
    cdef np.ndarray[DTYPE_FLOAT_t, ndim=1] dw
    # make sure dn, dm, and dw are setup
    # dn loading and exception handling
    if ('dn'  in parameter.keys()):
        try:

            dn = np.array(parameter['dn'], dtype=np.uint32)
        except TypeError:
            print(bcolors.FAIL + "FAILURE: The type of dn (row steps) is wrong - please pass in a 1-d numpy array that holds uint32s" + bcolors.ENDC)
            return [-1, -1, -1]
        except ValueError:
            print(bcolors.FAIL + "The type of the elements in dn (row steps) is wrong - please have each element be a uint32 (perhaps you passed a long?). You can specify this when making a numpy array like: np.array([1,2,3],dtype=np.uint32)" + bcolors.ENDC)
            return [-1, -1, -1]
    else:
        dn = np.array([1, 1, 0], dtype=np.uint32)
    # dm loading and exception handling
    if 'dm'  in parameter.keys():
        try:
            dm = np.array(parameter['dm'], dtype=np.uint32)
        except TypeError:
            print(bcolors.FAIL + "FAILURE: The type of dm (col steps) is wrong - please pass in a 1-d numpy array that holds uint32s" + bcolors.ENDC)
            return [-1, -1, -1]
        except ValueError:
            print(bcolors.FAIL + "FAILURE: The type of the elements in dm (col steps) is wrong - please have each element be a uint32 (perhaps you passed a long?). You can specify this when making a numpy array like: np.array([1,2,3],dtype=np.uint32)" + bcolors.ENDC)
            return [-1, -1, -1]
    else:
        print(bcolors.FAIL + "dm (col steps) was not passed in (gave default value [1,0,1]) " + bcolors.ENDC)
        dm = np.array([1, 0, 1], dtype=np.uint32)
    # dw loading and exception handling
    if 'dw'  in parameter.keys():
        try:
            dw = np.array(parameter['dw'], dtype=DTYPE_FLOAT)
        except TypeError:
            print(bcolors.FAIL + "FAILURE: The type of dw (step weights) is wrong - please pass in a 1-d numpy array that holds floats" + bcolors.ENDC)
            return [-1, -1, -1]
        except ValueError:
            print(bcolors.FAIL + "FAILURE:The type of the elements in dw (step weights) is wrong - please have each element be a float (perhaps you passed ints or a long?). You can specify this when making a numpy array like: np.array([1,2,3],dtype=np.float64)" + bcolors.ENDC)
            return [-1, -1, -1]
    else:
        dw = np.array([1, 1, 1], dtype=DTYPE_FLOAT)
        print(bcolors.FAIL + "dw (step weights) was not passed in (gave default value [1,1,1]) " + bcolors.ENDC)


    '''
    Section where types are given to the variables we're going to use
    '''
    # create matrices to store our results (D and E)
    cdef DTYPE_INT32_t numRows = C.shape[0] # only works with np arrays, use np.shape(x) will work on lists? want to force to use np though?
    cdef DTYPE_INT32_t numCols = C.shape[1]
    cdef DTYPE_INT32_t numDifSteps = np.size(dw)

    cdef unsigned int maxRowStep = max(dn)
    cdef unsigned int maxColStep = max(dm)

    cdef np.ndarray[np.uint32_t, ndim=2] steps = np.zeros((numRows,numCols), dtype=np.uint32)
    cdef np.ndarray[DTYPE_FLOAT_t, ndim=2] accumCost = np.ones((maxRowStep + numRows, maxColStep + numCols), dtype=DTYPE_FLOAT) * MAX_FLOAT

    cdef DTYPE_FLOAT_t bestCost
    cdef DTYPE_INT32_t bestCostIndex
    cdef DTYPE_FLOAT_t costForStep
    cdef unsigned int row, col
    cdef unsigned int stepIndex

    '''
    The start of the actual algorithm, now that all our variables are set up
    '''
    # initializing the cost matrix - depends on whether its subsequence DTW
    # essentially allow us to hop on the bottom anywhere (so could start partway through one of the signals)
    if parameter['SubSequence']:
        for col in range(numCols):
            accumCost[maxRowStep, col + maxColStep] = C[0, col]
    else:
        accumCost[maxRowStep, maxColStep] = C[0,0]

    # filling the accumulated cost matrix
    for row in range(maxRowStep, numRows + maxRowStep, 1):
        for col in range(maxColStep, numCols + maxColStep, 1):
            bestCost = accumCost[<unsigned int>row, <unsigned int>col] # initialize with what's there - so if is an entry point, then can start low
            bestCostIndex = 0
            # go through each step, find the best one
            for stepIndex in range(numDifSteps):
                #costForStep = accumCost[<unsigned int>(row - dn[<unsigned int>(stepIndex)]), <unsigned int>(col - dm[<unsigned int>(stepIndex)])] + dw[<unsigned int>(stepIndex)] * C[<unsigned int>(row - maxRowStep), <unsigned int>(col - maxColStep)]
                costForStep = accumCost[<unsigned int>((row - dn[(stepIndex)])), <unsigned int>((col - dm[(stepIndex)]))] + dw[stepIndex] * C[<unsigned int>(row - maxRowStep), <unsigned int>(col - maxColStep)]
                if costForStep < bestCost:
                    bestCost = costForStep
                    bestCostIndex = stepIndex
            # save the best cost and best cost index
            accumCost[row, col] = bestCost
            steps[<unsigned int>(row - maxRowStep), <unsigned int>(col - maxColStep)] = bestCostIndex

    # return the accumulated cost along with the matrix of steps taken to achieve that cost
    return [accumCost[maxRowStep:, maxColStep:], steps]

@cython.boundscheck(False) # turn off bounds-checking for entire function
def DTW_GetPath(np.ndarray[DTYPE_FLOAT_t, ndim=2] accumCost, np.ndarray[np.uint32_t, ndim=2] stepsForCost, parameter):
    '''

    Parameter should have: 'dn', 'dm', 'dw', 'SubSequence'
    '''

    cdef np.ndarray[unsigned int, ndim=1] dn
    cdef np.ndarray[unsigned int, ndim=1] dm
    cdef np.uint8_t subseq
    cdef np.int32_t startCol # added
    # make sure dn, dm, and dw are setup
    if ('dn'  in parameter.keys()):
        dn = parameter['dn']
    else:
        dn = np.array([1, 1, 0], dtype=DTYPE_INT32)
    if 'dm'  in parameter.keys():
        dm = parameter['dm']
    else:
        dm = np.array([1, 0, 1], dtype=DTYPE_INT32)
    if 'SubSequence' in parameter.keys():
        subseq = parameter['SubSequence']
    else:
        subseq = 0

    # added START
    if 'startCol' in parameter.keys():
        startCol = parameter['startCol']
    else:
        startCol = -1
    # added END

    cdef np.uint32_t numRows
    cdef np.uint32_t numCols
    cdef np.uint32_t curRow
    cdef np.uint32_t curCol
    cdef np.uint32_t endCol
    cdef DTYPE_FLOAT_t endCost

    numRows = accumCost.shape[0]
    numCols = accumCost.shape[1]

    # either start at the far corner (non sub-sequence)
    # or start at the lowest cost entry in the last row (sub-sequence)
    # where all of the signal along the row has been used, but only a
    # sub-sequence of the signal along the columns has to be used
    curRow = numRows - 1
    if subseq:
        curCol = np.argmin(accumCost[numRows - 1, :])
    else:
        curCol = numCols - 1

    # added - if specified, overrides above
    if startCol >= 0:
        curCol = startCol

    endCol = curCol
    endCost = accumCost[curRow, curCol]

    cdef np.uint32_t curRowStep
    cdef np.uint32_t curColStep
    cdef np.uint32_t curStepIndex


    cdef np.ndarray[np.uint32_t, ndim=2] path = np.zeros((2, numRows + numCols), dtype=np.uint32) # make as large as could need, then chop at the end
    path[0, 0] = curRow
    path[1, 0] = curCol

    cdef np.uint32_t stepsInPath = 1 # starts at one, we add in one before looping
    cdef np.uint32_t stepIndex = 0
    cdef np.int8_t done = (subseq and curRow == 0) or (curRow == 0 and curCol == 0) 
    while not done:
        if accumCost[curRow, curCol] == MAX_FLOAT:
            print('A path is not possible')
            break
        
        # you're done if you've made it to the bottom left (non sub-sequence)
        # or just the bottom (sub-sequence)
        # find the step size
        curStepIndex = stepsForCost[curRow, curCol]
        curRowStep = dn[curStepIndex]
        curColStep = dm[curStepIndex]
        # backtrack by 1 step
        curRow = curRow - curRowStep
        curCol = curCol - curColStep
        # add your new location onto the path
        path[0, stepsInPath] = curRow
        path[1, stepsInPath] = curCol
        stepsInPath = stepsInPath + 1
        # check to see if you're done
        done = (subseq and curRow == 0) or (curRow == 0 and curCol == 0)

    # reverse the path (a matrix with two rows) and return it
    return [np.fliplr(path[:, 0:stepsInPath]), endCol, endCost]

class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

In [12]:
def alignDTW(featfile1, featfile2, steps, weights, downsample, outfile = None, profile = False, subsequence=False):

    F1 = np.load(featfile1) # 12 x N
    F2 = np.load(featfile2) # 12 x M
    if subsequence: 
        times = []
        times.append(time.time())
        C = 1 - F1[:,0::downsample].T @ F2[:,0::downsample] # cos distance metric
        times.append(time.time())
        dn = steps[:,0].astype(np.uint32)
        dm = steps[:,1].astype(np.uint32)
        parameters = {'dn': dn, 'dm': dm, 'dw': weights, 'SubSequence': True}
        [D, s] = DTW_Cost_To_AccumCostAndSteps(C, parameters)
        times.append(time.time())
        [wp, endCol, endCost] = DTW_GetPath(D, s, parameters)
        times.append(time.time())
        if outfile:
            pickle.dump(wp, open(outfile, 'wb'))
    else: 
        if max(F1.shape[1], F2.shape[1]) / min(F1.shape[1], F2.shape[1]) >= 2: # no valid path possible
            if outfile:
                pickle.dump(None, open(outfile, 'wb'))
            return None
        times = []
        times.append(time.time())
        C = 1 - F1[:,0::downsample].T @ F2[:,0::downsample] # cos distance metric
        times.append(time.time())

        dn = steps[:,0].astype(np.uint32)
        dm = steps[:,1].astype(np.uint32)
        parameters = {'dn': dn, 'dm': dm, 'dw': weights, 'SubSequence': False}
        [D, s] = DTW_Cost_To_AccumCostAndSteps(C, parameters)
        times.append(time.time())
        [wp, endCol, endCost] = DTW_GetPath(D, s, parameters)
        times.append(time.time())
        if outfile:
            pickle.dump(wp, open(outfile, 'wb'))

    if profile:
        return wp, np.diff(times)
    else:
        return wp, D, C, s, parameters

In [13]:
def Bmat_from_Dmat(D, steps, weights):
    B = np.dstack((np.zeros_like(D),np.zeros_like(D)))
    for i in tqdm(range(D.shape[0])):
        for j in range(D.shape[1]):
            bestCost = np.infty
            bestStep = -1
            for stepnum in range(steps.shape[0]):    
                if i-steps[stepnum,0] < 0:
                    pass                       
                elif j-steps[stepnum,1] < 0:
                    pass
                else:
                    tempCost = D[i-steps[stepnum,0], j-steps[stepnum,1]]
                    if tempCost < bestCost:
                        bestCost = tempCost
                        bestStep = stepnum
            B[i,j] = [i-steps[bestStep,0], j-steps[bestStep, 1]]
    return B

In [17]:
def Bpoint_from_Dpoint(D, steps, weights):
    bestCost = np.infty
    bestStep = -1
    for stepnum in range(steps.shape[0]):    
        if i-steps[stepnum,0] < 0:
            pass                       
        elif j-steps[stepnum,1] < 0:
            pass
        else:
            tempCost = D[i-steps[stepnum,0], j-steps[stepnum,1]]
            if tempCost < bestCost:
                bestCost = tempCost
                bestStep = stepnum
    B[i,j] = [i-steps[bestStep,0], j-steps[bestStep, 1]]
    return B

### Full Alignment

In [14]:
scenario1Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_1" + "/*")
scenario2Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_2" + "/*")
scenario3Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_3" + "/*")
scenario4Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_4" + "/*")
steps = np.array([1,1,1,2,2,1]).reshape((-1,2))
weights = np.array([2,3,3])
downsample = 1

print('Scenario 1')
wp1, D1, C1, s1, parameters1 = (alignDTW(scenario1Features[0], scenario1Features[1], steps, weights, downsample))
print('Scenario 2')
wp2, D2, C2, s2, parameters2 = (alignDTW(scenario2Features[0], scenario2Features[1], steps, weights, downsample))
print('Scenario 3')
wp3, D3, C3, s3, parameters3  = (alignDTW(scenario3Features[0], scenario3Features[1], steps, weights, downsample))
print('Scenario 4')
wp4, D4, C4, s4, parameters4 = (alignDTW(scenario4Features[0], scenario4Features[1], steps, weights, downsample))

Scenario 1
Scenario 2
Scenario 3
Scenario 4


## Idea 5 -- Adding noise to the features at different SNRs

In [None]:
def noisy_alignDTW(featfile1, featfile2, SNR_dB, steps, weights, downsample, outfile = None, profile = False):

    F1 = np.load(featfile1) # 12 x N
    F2 = np.load(featfile2) # 12 x M

    noise_power_F1 = np.max(np.abs(F1))**2 / (10**(SNR_dB / 10))
    noise_power_F2 = np.max(np.abs(F2))**2 / (10**(SNR_dB / 10))

    noise_F1 = np.random.normal(scale=np.sqrt(noise_power_F1), size=F1.shape)
    noise_F2 = np.random.normal(scale=np.sqrt(noise_power_F2), size=F2.shape)

    F1 += noise_F1
    F2 += noise_F2

    if max(F1.shape[1], F2.shape[1]) / min(F1.shape[1], F2.shape[1]) >= 2: # no valid path possible
        if outfile:
            pickle.dump(None, open(outfile, 'wb'))
        return None
    times = []
    times.append(time.time())
    C = 1 - F1[:,0::downsample].T @ F2[:,0::downsample] # cos distance metric
    times.append(time.time())

    dn = steps[:,0].astype(np.uint32)
    dm = steps[:,1].astype(np.uint32)
    parameters = {'dn': dn, 'dm': dm, 'dw': weights, 'SubSequence': False}
    [D, s] = DTW_Cost_To_AccumCostAndSteps(C, parameters)
    times.append(time.time())
    [wp, endCol, endCost] = DTW_GetPath(D, s, parameters)
    times.append(time.time())
    if outfile:
        pickle.dump(wp, open(outfile, 'wb'))

    if profile:
        return wp, np.diff(times)
    else:
        return wp, D, C, s, parameters

In [None]:
# Add 10dB noise to feature matrices
SNR_dB = 10
num_trials = 10
scenario1Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_1" + "/*")
scenario2Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_2" + "/*")
scenario3Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_3" + "/*")
scenario4Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_4" + "/*")
steps = np.array([1,1,1,2,2,1]).reshape((-1,2))
weights = np.array([2,3,3])
downsample = 1

noisy_wp_vec_10dB = []
noisy_D_vec_10dB = []
noisy_C_vec_10dB = []
noisy_s_vec_10dB = []
noisy_parameters_vec_10dB = []

for i in range(num_trials):
  wp1, D1, C1, s1, parameters1 = (noisy_alignDTW(scenario1Features[0], scenario1Features[1], SNR_dB, steps, weights, downsample))
  wp2, D2, C2, s2, parameters2 = (noisy_alignDTW(scenario2Features[0], scenario2Features[1], SNR_dB, steps, weights, downsample))
  wp3, D3, C3, s3, parameters3  = (noisy_alignDTW(scenario3Features[0], scenario3Features[1], SNR_dB, steps, weights, downsample))
  wp4, D4, C4, s4, parameters4 = (noisy_alignDTW(scenario4Features[0], scenario4Features[1], SNR_dB, steps, weights, downsample))

  noisy_wp_vec_10dB.append([wp1, wp2, wp3, wp4])
  noisy_D_vec_10dB.append([D1, D2, D3, D4])
  noisy_C_vec_10dB.append([C1, C2, C3, C4])
  noisy_s_vec_10dB.append([s1, s2, s3, s4])
  noisy_parameters_vec_10dB.append([parameters1, parameters2, parameters3, parameters4])

  print(f"Trial {i+1} done")

In [None]:
# Extracting wp y values for 4 scenarios
for i in range(len(noisy_wp_vec_10dB)):
  wp1_x = noisy_wp_vec_10dB[i][0][0]
  wp2_x = noisy_wp_vec_10dB[i][1][0]
  wp3_x = noisy_wp_vec_10dB[i][2][0]
  wp4_x = noisy_wp_vec_10dB[i][3][0]

# Extracting wp y values for 4 scenarios
for i in range(len(noisy_wp_vec_10dB)):
  wp1_y = noisy_wp_vec_10dB[i][0][1]
  wp2_y = noisy_wp_vec_10dB[i][1][1]
  wp3_y = noisy_wp_vec_10dB[i][2][1]
  wp4_y = noisy_wp_vec_10dB[i][3][1]

In [None]:
window_size = 200
num_windows = len(wp1_y) // window_size
std_dev_values_wp1 = []

for i in range(num_windows):
    start_idx = i * window_size
    end_idx = (i + 1) * window_size
    y_window = wp1_y[start_idx:end_idx]
    std_dev_y = np.std(y_window)
    std_dev_values_wp1.append(std_dev_y)

# Getting original data at window indices
window_indices = np.arange(0, len(wp1_x), window_size)
windowed_wp1 = []
for window_idx in window_indices:
    windowed_wp1.append(wp1[1][window_idx])

In [None]:
# Plotting Scenario 1 10dB
plt.figure(figsize=(12, 8))

plt.scatter(wp1[0], wp1[1], label='Ground Truth Warping Path', color='blue')

for i in range(len(noisy_wp_vec_10dB)): 
  plt.plot(noisy_wp_vec_10dB[i][0][0], noisy_wp_vec_10dB[i][0][1], linestyle = '--',label=f'Noisy Path {i+1}')

#TODO: Figure out proper spacing of window_indices
# plt.errorbar(window_indices, windowed_wp1, yerr=std_dev_values_wp1, fmt='o', label='Error Bars')

plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('Scenario 1 10dB Warping Paths Overlay')
plt.xlabel('Reference')
plt.ylabel('Query')
plt.show()

In [None]:
# Plotting Scenario 2 10dB
plt.figure(figsize=(12, 8))

plt.scatter(wp2[0], wp2[1], label='Ground Truth Warping Path', color='blue')

for i in range(len(noisy_wp_vec_10dB)): 
  plt.plot(noisy_wp_vec_10dB[i][1][0], noisy_wp_vec_10dB[i][1][1], linestyle = '--',label=f'Noisy Path {i+1}')

plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('Scenario 2 10dB Warping Paths Overlay')
plt.xlabel('Reference')
plt.ylabel('Query')
plt.show()

In [None]:
# Plotting Scenario 3 10dB
plt.figure(figsize=(12, 8))

plt.scatter(wp3[0], wp3[1], label='Ground Truth Warping Path', color='blue')

for i in range(len(noisy_wp_vec_10dB)): 
  plt.plot(noisy_wp_vec_10dB[i][2][0], noisy_wp_vec_10dB[i][2][1], linestyle = '--',label=f'Noisy Path {i+1}')

plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('Scenario 3 10dB Warping Paths Overlay')
plt.xlabel('Reference')
plt.ylabel('Query')
plt.show()

In [None]:
# Plotting Scenario 4 10dB
plt.figure(figsize=(12, 8))

plt.scatter(wp4[0], wp4[1], label='Ground Truth Warping Path', color='blue')

for i in range(len(noisy_wp_vec_10dB)): 
  plt.plot(noisy_wp_vec_10dB[i][3][0], noisy_wp_vec_10dB[i][3][1], linestyle = '--',label=f'Noisy Path {i+1}')

plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('Scenario 4 10dB Warping Paths Overlay')
plt.xlabel('Reference')
plt.ylabel('Query')
plt.show()

In [None]:
# Add 0 dB noise to feature matrices
SNR_dB = 0
num_trials = 10
scenario1Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_1" + "/*")
scenario2Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_2" + "/*")
scenario3Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_3" + "/*")
scenario4Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_4" + "/*")
steps = np.array([1,1,1,2,2,1]).reshape((-1,2))
weights = np.array([2,3,3])
downsample = 1

noisy_wp_vec_0dB = []
noisy_D_vec_0dB = []
noisy_C_vec_0dB = []
noisy_s_vec_0dB = []
noisy_parameters_vec_0dB = []

for i in range(num_trials):
  wp1, D1, C1, s1, parameters1 = (noisy_alignDTW(scenario1Features[0], scenario1Features[1], SNR_dB, steps, weights, downsample))
  wp2, D2, C2, s2, parameters2 = (noisy_alignDTW(scenario2Features[0], scenario2Features[1], SNR_dB, steps, weights, downsample))
  wp3, D3, C3, s3, parameters3  = (noisy_alignDTW(scenario3Features[0], scenario3Features[1], SNR_dB, steps, weights, downsample))
  wp4, D4, C4, s4, parameters4 = (noisy_alignDTW(scenario4Features[0], scenario4Features[1], SNR_dB, steps, weights, downsample))

  noisy_wp_vec_0dB.append([wp1, wp2, wp3, wp4])
  noisy_D_vec_0dB.append([D1, D2, D3, D4])
  noisy_C_vec_0dB.append([C1, C2, C3, C4])
  noisy_s_vec_0dB.append([s1, s2, s3, s4])
  noisy_parameters_vec_0dB.append([parameters1, parameters2, parameters3, parameters4])

  print(f"Trial {i+1} done")

In [None]:
# Plotting Scenario 1 0dB
plt.figure(figsize=(12, 8))

plt.scatter(wp1[0], wp1[1], label='Ground Truth Warping Path', color='blue')

for i in range(len(noisy_wp_vec_0dB)): 
  plt.plot(noisy_wp_vec_0dB[i][0][0], noisy_wp_vec_0dB[i][0][1], linestyle = '--',label=f'Noisy Path {i+1}')

plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('Scenario 1 0dB Warping Paths Overlay')
plt.xlabel('Reference')
plt.ylabel('Query')
plt.show()

In [None]:
# Plotting Scenario 2 0dB
plt.figure(figsize=(12, 8))

plt.scatter(wp1[0], wp1[1], label='Ground Truth Warping Path', color='blue')

for i in range(len(noisy_wp_vec_0dB)): 
  plt.plot(noisy_wp_vec_0dB[i][1][0], noisy_wp_vec_0dB[i][1][1], linestyle = '--',label=f'Noisy Path {i+1}')

plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('Scenario 2 0dB Warping Paths Overlay')
plt.xlabel('Reference')
plt.ylabel('Query')
plt.show()

In [None]:
# Plotting Scenario 3 0dB
plt.figure(figsize=(12, 8))

plt.scatter(wp1[0], wp1[1], label='Ground Truth Warping Path', color='blue')

for i in range(len(noisy_wp_vec_0dB)): 
  plt.plot(noisy_wp_vec_0dB[i][2][0], noisy_wp_vec_0dB[i][2][1], linestyle = '--',label=f'Noisy Path {i+1}')

plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('Scenario 3 0dB Warping Paths Overlay')
plt.xlabel('Reference')
plt.ylabel('Query')
plt.show()

In [None]:
# Plotting Scenario 4 0dB
plt.figure(figsize=(12, 8))

plt.scatter(wp1[0], wp1[1], label='Ground Truth Warping Path', color='blue')

for i in range(len(noisy_wp_vec_0dB)): 
  plt.plot(noisy_wp_vec_0dB[i][3][0], noisy_wp_vec_0dB[i][3][1], linestyle = '--',label=f'Noisy Path {i+1}')

plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('Scenario 4 0dB Warping Paths Overlay')
plt.xlabel('Reference')
plt.ylabel('Query')
plt.show()

In [None]:
# Add 5 dB noise to feature matrices
SNR_dB = 5
num_trials = 10
scenario1Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_1" + "/*")
scenario2Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_2" + "/*")
scenario3Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_3" + "/*")
scenario4Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_4" + "/*")
steps = np.array([1,1,1,2,2,1]).reshape((-1,2))
weights = np.array([2,3,3])
downsample = 1

noisy_wp_vec_5dB = []
noisy_D_vec_5dB = []
noisy_C_vec_5dB = []
noisy_s_vec_5dB = []
noisy_parameters_vec_5dB = []

for i in range(num_trials):
  wp1, D1, C1, s1, parameters1 = (noisy_alignDTW(scenario1Features[0], scenario1Features[1], SNR_dB, steps, weights, downsample))
  wp2, D2, C2, s2, parameters2 = (noisy_alignDTW(scenario2Features[0], scenario2Features[1], SNR_dB, steps, weights, downsample))
  wp3, D3, C3, s3, parameters3  = (noisy_alignDTW(scenario3Features[0], scenario3Features[1], SNR_dB, steps, weights, downsample))
  wp4, D4, C4, s4, parameters4 = (noisy_alignDTW(scenario4Features[0], scenario4Features[1], SNR_dB, steps, weights, downsample))

  noisy_wp_vec_5dB.append([wp1, wp2, wp3, wp4])
  noisy_D_vec_5dB.append([D1, D2, D3, D4])
  noisy_C_vec_5dB.append([C1, C2, C3, C4])
  noisy_s_vec_5dB.append([s1, s2, s3, s4])
  noisy_parameters_vec_5dB.append([parameters1, parameters2, parameters3, parameters4])

  print(f"Trial {i+1} done")

In [None]:
# Plotting Scenario 1 5dB
plt.figure(figsize=(12, 8))

plt.scatter(wp1[0], wp1[1], label='Ground Truth Warping Path', color='blue')

for i in range(len(noisy_wp_vec_5dB)): 
  plt.plot(noisy_wp_vec_5dB[i][0][0], noisy_wp_vec_5dB[i][0][1], linestyle = '--',label=f'Noisy Path {i+1}')

plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('Scenario 1 5dB Warping Paths Overlay')
plt.xlabel('Reference')
plt.ylabel('Query')
plt.show()

In [None]:
# Plotting Scenario 2 5dB
plt.figure(figsize=(12, 8))

plt.scatter(wp2[0], wp2[1], label='Ground Truth Warping Path', color='blue')

for i in range(len(noisy_wp_vec_5dB)): 
  plt.plot(noisy_wp_vec_5dB[i][1][0], noisy_wp_vec_5dB[i][1][1], linestyle = '--',label=f'Noisy Path {i+1}')

plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('Scenario 2 5dB Warping Paths Overlay')
plt.xlabel('Reference')
plt.ylabel('Query')
plt.show()

In [None]:
# Plotting Scenario 3 5dB
plt.figure(figsize=(12, 8))

plt.scatter(wp3[0], wp3[1], label='Ground Truth Warping Path', color='blue')

for i in range(len(noisy_wp_vec_5dB)): 
  plt.plot(noisy_wp_vec_5dB[i][2][0], noisy_wp_vec_5dB[i][2][1], linestyle = '--',label=f'Noisy Path {i+1}')

plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('Scenario 3 5dB Warping Paths Overlay')
plt.xlabel('Reference')
plt.ylabel('Query')
plt.show()

In [None]:
# Plotting Scenario 4 5dB
plt.figure(figsize=(12, 8))

plt.scatter(wp4[0], wp4[1], label='Ground Truth Warping Path', color='blue')

for i in range(len(noisy_wp_vec_5dB)): 
  plt.plot(noisy_wp_vec_5dB[i][3][0], noisy_wp_vec_5dB[i][3][1], linestyle = '--',label=f'Noisy Path {i+1}')

plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('Scenario 4 5dB Warping Paths Overlay')
plt.xlabel('Reference')
plt.ylabel('Query')
plt.show()

In [None]:
# Add 2 dB noise to feature matrices
SNR_dB = 2
num_trials = 10
scenario1Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_1" + "/*")
scenario2Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_2" + "/*")
scenario3Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_3" + "/*")
scenario4Features = glob.glob("scenario_feat/Full_Alignment_Recordings/Scenario_4" + "/*")
steps = np.array([1,1,1,2,2,1]).reshape((-1,2))
weights = np.array([2,3,3])
downsample = 1

noisy_wp_vec_2dB = []
noisy_D_vec_2dB = []
noisy_C_vec_2dB = []
noisy_s_vec_2dB = []
noisy_parameters_vec_2dB = []

for i in range(num_trials):
  wp1, D1, C1, s1, parameters1 = (noisy_alignDTW(scenario1Features[0], scenario1Features[1], SNR_dB, steps, weights, downsample))
  wp2, D2, C2, s2, parameters2 = (noisy_alignDTW(scenario2Features[0], scenario2Features[1], SNR_dB, steps, weights, downsample))
  wp3, D3, C3, s3, parameters3  = (noisy_alignDTW(scenario3Features[0], scenario3Features[1], SNR_dB, steps, weights, downsample))
  wp4, D4, C4, s4, parameters4 = (noisy_alignDTW(scenario4Features[0], scenario4Features[1], SNR_dB, steps, weights, downsample))

  noisy_wp_vec_2dB.append([wp1, wp2, wp3, wp4])
  noisy_D_vec_2dB.append([D1, D2, D3, D4])
  noisy_C_vec_2dB.append([C1, C2, C3, C4])
  noisy_s_vec_2dB.append([s1, s2, s3, s4])
  noisy_parameters_vec_2dB.append([parameters1, parameters2, parameters3, parameters4])

  print(f"Trial {i+1} done")

In [None]:
# Plotting Scenario 1 2dB
plt.figure(figsize=(12, 8))

plt.scatter(wp1[0], wp1[1], label='Ground Truth Warping Path', color='blue')

for i in range(len(noisy_wp_vec_2dB)): 
  plt.plot(noisy_wp_vec_2dB[i][0][0], noisy_wp_vec_2dB[i][0][1], linestyle = '--',label=f'Noisy Path {i+1}')

plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('Scenario 1 2dB Warping Paths Overlay')
plt.xlabel('Reference')
plt.ylabel('Query')
plt.show()

In [None]:
# Plotting Scenario 2 2dB
plt.figure(figsize=(12, 8))

plt.scatter(wp1[0], wp1[1], label='Ground Truth Warping Path', color='blue')

for i in range(len(noisy_wp_vec_2dB)): 
  plt.plot(noisy_wp_vec_2dB[i][1][0], noisy_wp_vec_2dB[i][1][1], linestyle = '--',label=f'Noisy Path {i+1}')

plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('Scenario 2 2dB Warping Paths Overlay')
plt.xlabel('Reference')
plt.ylabel('Query')
plt.show()

In [None]:
# Plotting Scenario 3 2dB
plt.figure(figsize=(12, 8))

plt.scatter(wp1[0], wp1[1], label='Ground Truth Warping Path', color='blue')

for i in range(len(noisy_wp_vec_2dB)): 
  plt.plot(noisy_wp_vec_2dB[i][2][0], noisy_wp_vec_2dB[i][2][1], linestyle = '--',label=f'Noisy Path {i+1}')

plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('Scenario 3 2dB Warping Paths Overlay')
plt.xlabel('Reference')
plt.ylabel('Query')
plt.show()

In [None]:
# Plotting Scenario 4 2dB
plt.figure(figsize=(12, 8))

plt.scatter(wp1[0], wp1[1], label='Ground Truth Warping Path', color='blue')

for i in range(len(noisy_wp_vec_2dB)): 
  plt.plot(noisy_wp_vec_2dB[i][3][0], noisy_wp_vec_2dB[i][3][1], linestyle = '--',label=f'Noisy Path {i+1}')

plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.title('Scenario 4 2dB Warping Paths Overlay')
plt.xlabel('Reference')
plt.ylabel('Query')
plt.show()