# Joe Zacaroli - Audio Identification Assignment

  

You will first need to run the first 8 cells to create all the required functions.

I would not recommend running the cells preceded with 'Not intended for marker use' because they either aren't guaranteed to work or they may just take a long time to run.

Then there are some cells at the bottom preceded with 'Functions and cells required for assignment'. These are what you will need to run to create the desired output text file.

In [10]:
import os
import numpy as np
import librosa
import librosa.display
from matplotlib import pyplot as plt
from skimage.feature import peak_local_max
from scipy.stats import mode
from random import randrange
import math
import random
import time

#To use line_profiler: 'pip install line_profiler' and uncomment the next line.
#%load_ext line_profiler

The line_profiler extension is already loaded. To reload it, use:
  %reload_ext line_profiler


In [2]:
def computeFingerprint(fileName, params, plotSpectrograms=False):
    """Fingerprint is computed by either (a) finding peaks in the melspectrogram using the inverted list method.
                                         (b) finding pairs of peaks in the melspectrogram.
    """
    snd, sr = librosa.load(fileName)
    lengthOfFile_seconds=len(snd)//sr
    
    #Get all of the variables out of the params
    usePeakPairs = params['usePeakPairs']
    hopLength = params['hopLength']
    nFFT = params['nFFT']
    maxPeaksPerSecond = params['maxPeaksPerSecond']
    peakDetectionMinDistance = params['peakDetectionMinDistance']
    fMax = params['fMax']
    fanOut = params['fanOut']
    targetZoneK = params['targetZoneK']
    targetZoneT = params['targetZoneT']
    
    useSTFT = True
    if useSTFT:
        S = np.abs(librosa.core.stft(snd, n_fft=nFFT, hop_length=hopLength))
    else:
        S = librosa.feature.melspectrogram(snd, sr=sr, hop_length=hopLength, n_mels=nFFT, fmax=fMax)

    #using skimage's peak detection function.
    coordinates = peak_local_max(S, min_distance=peakDetectionMinDistance, num_peaks=maxPeaksPerSecond*lengthOfFile_seconds)
    #sort them by their time step
    coordinates = coordinates[np.argsort(coordinates[:, 1])]
    
    if plotSpectrograms:
        if useSTFT:
            librosa.display.specshow(librosa.amplitude_to_db(D, ref=np.max), y_axis='log', x_axis='time')
            plt.title('Power spectrogram')
            plt.colorbar(format='%+2.0f dB')
            plt.tight_layout()
            plt.show()
        else:
            #Find frequencies of the mel bins I'm using.
            mel_freq_bins = librosa.core.mel_frequencies(n_mels=n_hashes, fmax=fMax)

            S_dB = librosa.power_to_db(S, ref=np.max)
            plt.figure(figsize=(10, 2))
            plt.title('Mel-frequency spectrogram: ' + fileName)
            librosa.display.specshow(S_dB, x_axis='time', y_axis='mel', sr=sr, fmax=fMax)
            plt.figure(figsize=(10, 2))
            for coordinate in coordinates:
                plt.plot(coordinate[1]*hopLength/sr, mel_freq_bins[coordinate[0]], 'wo')
            librosa.display.specshow(S_dB, x_axis='time', y_axis='mel', sr=sr, fmax=fMax)
    
    hashLists = constructHashLists(coordinates, nFFT, usePeakPairs, fanOut=fanOut, targetZoneK=targetZoneK, targetZoneT=targetZoneT)
    
    return hashLists

def constructHashLists(peakCoordinates, n_hashes, usePeakPairs, fanOut=10, targetZoneK=20, targetZoneT=30):
    """Creates a list for each hash (frequency band) which contains the
    points at which that frequency band has a peak in the audio.
    
    Inputs: peakCoordinates - np array of the form:
            [[hashKey, timeStep],
            [hashKey, timeStep]
                      ..
                      ..
            [hashKey, timeStep]]
            
            n_hashes - the number of hash lists to create.
            
            usePeakPairs (Bool) - defines whether or not to use the inverted list or peak pair methods.
            fanOut, targetZoneK, targetZoneT are used and defined in the peak pair method.
    
    Outputs: hash_lists. Dictionary of the form: {hash_number, hash_list}
                        where hash_number is just a frequency bin number and hash_list is a list of points where that frequency has a peak"""
    
    if usePeakPairs:
        return constructPeakPairHashLists(peakCoordinates, n_hashes, fanOut=fanOut, targetZoneK=targetZoneK, targetZoneT=targetZoneT)
    
    hash_lists = {}
    for hashNum in range(n_hashes):
        hash_list = peakCoordinates[np.nonzero(peakCoordinates[:,0]==hashNum),1]
        if np.size(hash_list)>0:
            hash_lists[hashNum] = hash_list[0]
    
    return hash_lists

def constructPeakPairHashLists(peakCoordinates, n_hashes, fanOut=10, targetZoneK=20, targetZoneT=30):
    """
    For each anchor peak in peakCoordinates, find other peaks within a target zone, up to a maximum of fanOut.
    Inputs: peakCoordinates - as defined above
            fanOut - The maximum amount of hashed points to return for each peak
            targetZoneK - Half the target zone frequency size
            targetZoneT - The target zone time size
    Returns: hash_list. Dictionary of the form: {hash_key, time_offset}
                        where hash_key is a tuple of (k1, k2, t2-t1)]"""
    hash_list = {}
    
    for anchorPeak in peakCoordinates:
        numberOfHashesAddedForThisPeak=0
        for otherPeak in peakCoordinates:
            #Check that it's within the time range of the target zone.
            if anchorPeak[1]+15 < otherPeak[1] <= anchorPeak[1] + targetZoneT+15:
                #Check that it's within the frequency range of the target zone.
                if anchorPeak[0]-targetZoneK <= otherPeak[0] <= anchorPeak[0]+targetZoneK:
                    #Found another peak in the target zone! Add it to the hash_list
                    hash_list[(anchorPeak[0], otherPeak[0], otherPeak[1]-anchorPeak[1])] = anchorPeak[1]
                    numberOfHashesAddedForThisPeak += 1
            
            if numberOfHashesAddedForThisPeak >= fanOut:
                break
    return hash_list

In [3]:
def movingAverage(x, kernel_size=3):
    """ 
    Moving average of a numpy array x
    kernel_size is the size of the moving average to be applied. Should be an odd number!
    """
    ret_val = np.zeros(len(x)-kernel_size+1)
    assert(kernel_size&0x1)
    
    for i in range(kernel_size//2, len(x)-(kernel_size//2)):
        ret_val[i-1-kernel_size//2] = sum(x[i-(kernel_size//2):1+i+(kernel_size//2)])/kernel_size

    return ret_val

X = movingAverage(np.array([1, 1, 1, 1, 1, 1, 1, 1, 1]), kernel_size=7)
assert((X==[1,1,1]).all())

In [4]:
def computeMatchingFunction(queryHashList, databaseHashList, usePeakPairs=False, plotStuff=False):
    """Computes the matching function for a query fingerprint given a database fingerprint.
    
    If usePeakPairs is false then fingerprints are of the form: {hashNumber: [hash_list], hashNumber: [hash_list], .. , hashNumber: [hash_list]}
                                and the inverted list technique is used
    
    If usePeakPairs is true then fingerprints are of the form: {(hashTuple):timeOffset, (hashTuple):timeOffset, .. , (hashTuple):timeOffset}
                                and the technique proposed by Wang (2003) is used."""
    
    if usePeakPairs:
        return computePairMatchingFunction(queryHashList, databaseHashList)
    
    #For each query point, We take away the time step value from the associated hash list.
    #Then work out what number appears in the indicator functions the most times.
    indicatorFunctions = np.empty((1,0), int)
    for h in queryHashList:
        for n in queryHashList[h]:
            #There is a peak defined by (n,h)
            #Only look up if there exists a hash list in the database recording for this frequency bin h.
            if (h in databaseHashList):
                #Take away the time step value for this query point from every time point in the database's hash list.
                indicatorFunctions = np.append(indicatorFunctions, databaseHashList[h] - n)
    
    #It's possible that none of the hash lists of the query line up with the hash lists of the recording.
    #If this is the case, return 0 as there are zero matches
    if np.size(indicatorFunctions)==0:
        return 0
    
    # EXPERIMENTING WITH SMOOTHING: Returning the max of the smoothed histogram.
    # Uncomment the following 5 lines to use it.
    #minVal = min(indicatorFunctions)
    #maxVal = max(indicatorFunctions)
    #indicatorHistVals = np.histogram(indicatorFunctions, bins=np.arange(minVal, maxVal))[0]
    #indicatorHistValsSmoothed = movingAverage(indicatorHistVals)
    #return max(indicatorHistValsSmoothed)
    
    #Return the count of the mode of all of the indicator functions. Returning the modal value would tell us how far through the database recording we are.
    return mode(indicatorFunctions)[1][0]

def computePairMatchingFunction(queryHashList, databaseHashList):
    """Find all hashes that are in both dictionaries, the corresponding offset times are calculated and we can make a histogram from these
    Inputs: queryHashList (Dictionary) - contains """
    indicatorFunctions = np.empty((1,0), int)
    
    for queryHashKey in queryHashList:
        if queryHashKey in databaseHashList:
            #Add the time difference between matching pair of peaks.
            indicatorFunctions = np.append(indicatorFunctions, queryHashList[queryHashKey] - databaseHashList[queryHashKey])
    
    if np.size(indicatorFunctions)==0:
        return 0
    return mode(indicatorFunctions)[1][0]

#Test the compute matching function using the example from the lectures
queryPrint = {2: np.array([0,2]), 3: np.array([1]), 4: np.array([1,2])}
databasePrint = {1: np.array([0, 1, 3, 5]), 2: np.array([2, 4]), 3: np.array([0, 3, 5]), 4: np.array([3, 4])}

score=computeMatchingFunction(queryPrint, databasePrint)
#assert(score==5)

In [40]:
def findMatchesForQuery(queryFileName, params, path_to_fingerprints=-1):
    """Computes the matching function for all of the files in the database and returns the one with the maximal value."""
    
    #Get all of the variables out of the params
    usePeakPairs = params['usePeakPairs']
    
    queryFingerprint = computeFingerprint(queryFileName, params)
    
    scores = {}
    
    
    if path_to_fingerprints==-1:
        path_to_fingerprints = 'database_recordings'
    
    for entry in os.scandir(path_to_fingerprints):
        if (entry.path.endswith('pairs.npy') and usePeakPairs) or (entry.path.endswith('print.npy') and (usePeakPairs==False)):
            databaseFingerprint = np.load(entry.path,allow_pickle=True)[()]
            scoreForDBRecording = computeMatchingFunction(queryFingerprint, databaseFingerprint, usePeakPairs=usePeakPairs)
            scores[entry.path] = scoreForDBRecording
            
    return scores

In [43]:
def getSongNamesFromFileNames(querySnippetFileName, databaseFileName):
    """ Removes the gubbins around the file names. Used to identify whether the right song has been matched for a query."""
    #querySnippetFileName of the form 'query_recordings/classical.00003-snippet-x-x.wav'
    
    #Remove the 'query_recordings'
    querySnippetFileName = querySnippetFileName[17:]
    #Remove the '-snippet-x-x.wav'
    if querySnippetFileName.startswith('classical'):
        querySnippetFileName = querySnippetFileName[:15]
    elif querySnippetFileName.startswith('jazz'):
        querySnippetFileName = querySnippetFileName[:10]
    elif querySnippetFileName.startswith('pop'):
        querySnippetFileName = querySnippetFileName[:9]


    #databaseFileName of the form 'database_recordings/classical.00052_print.npy'
    # OR 'database_fingerprints/classical.00052_print.npy'
    if databaseFileName.startswith('database_recordings'):
        #Strip out the 'database_recordings'
        databaseFileName = databaseFileName[20:]
    else:
        #Strip out the 'database_fingerprints'
        databaseFileName = databaseFileName[22:]
    #Strip out the '_print.npy'
    if databaseFileName.startswith('classical'):
        databaseFileName = databaseFileName[:15]
    elif databaseFileName.startswith('jazz'):
        databaseFileName = databaseFileName[:10]
    elif databaseFileName.startswith('pop'):
        databaseFileName = databaseFileName[:9]
    
    return querySnippetFileName, databaseFileName

In [7]:
def getNBestScoringRecordings(scores, N=1):
    """Simple search through the scores dictionary to find the best database recording given a query that has already been matched against the entire database.
    
    Inputs: scores (dictionary{database_recording_name : matchScore for a given query.})
            N - How many best recordings to return.
    """
    bestScoringRecordings = []
    
    for n in range(N):
        maxScore = 0
        bestScoringRecording = -1
        for recordingName in scores:
            if scores[recordingName] > maxScore:
                bestScoringRecording = recordingName
                maxScore = scores[recordingName]
        bestScoringRecordings += [bestScoringRecording]
        del scores[bestScoringRecording]
    
    return bestScoringRecordings

In [13]:
def computeFingerprintsForEntireDataset(params, path_to_database='database_recordings', path_to_fingerprints=-1):
    start = time.time()
    for entry in os.scandir(path_to_database):
        if entry.path[-4:] == '.wav':
            #print('Processing', entry.name)
            fingerprint = computeFingerprint(entry.path, params)
            
            #Save the fingerprint
            if path_to_fingerprints == -1:
                if params['usePeakPairs']:
                    np.save(entry.path[:-4] + '_print_pairs.npy', fingerprint)
                else:
                    np.save(entry.path[:-4] + '_print.npy', fingerprint)
            else:
                if params['usePeakPairs']:
                    np.save(path_to_fingerprints + '/' + entry.name[:-4] + '_print_pairs.npy', fingerprint)
                else:
                    np.save(path_to_fingerprints + '/' + entry.name[:-4] + '_print.npy', fingerprint)


    end = time.time()
    fingerprintComputationTime = end-start
    return fingerprintComputationTime

def computeMatchesForEntireQueryDataset(params, path_to_queryset='query_recordings'):
    numberOfMatches = 0
    numberOfQueries = 0
    start = time.time()
    for entry in os.scandir(path_to_queryset):
        if entry.path[-4:] == '.wav':
            queryFile = entry.path
            #try:
            scores = findMatchesForQuery(queryFile, params)
            bestScoringRecordings = getNBestScoringRecordings(scores, N=params['nBest'])

            for recording in bestScoringRecordings:
                querySong, matchedSong = getSongNamesFromFileNames(queryFile, recording)

                if querySong == matchedSong:
                    numberOfMatches += 1
            #except:
            #    print('Issue handling', queryFile)

            numberOfQueries += 1
            
            #if numberOfQueries%2==0:
            #    print('Total number of queries:', numberOfQueries)
            #    print('Total number of matches:', numberOfMatches)
    
        #Early stop for grid search:
        #if numberOfQueries >= 50:
        #    break
            
    accuracy = 100*numberOfMatches/numberOfQueries
    end = time.time()
    matchingComputationTime = end-start
    return matchingComputationTime, accuracy

def processEntireDataset(params):
    """First compute all of the fingerprints for the entire dataset. Then find the best matches for all of the queries in the dataset."""
    
    fingerprintComputationTime = computeFingerprintsForEntireDataset(params)
    
    matchingComputationTime, accuracy = computeMatchesForEntireQueryDataset(params)
    
    return fingerprintComputationTime, matchingComputationTime, accuracy

# Next cell not intended for marker use

# Score the whole dataset using the best parameters from the grid search.

In [None]:
params = {
    'usePeakPairs':False,
    'hopLength':512, #Spectrogram hop length
    'nFFT':1024, #Defines the number of bins in the stft or melspectrogram.
    'maxPeaksPerSecond':15,
    'peakDetectionMinDistance':10,
    'fMax':4000, #only used for melspectrogram
    'fanOut':10,
    'targetZoneK':100,
    'targetZoneT':15,
    'nBest':5 #Returning the nBest best scoring fingerprints from the database.
}
fingerprintComputationTime, matchingComputationTime, accuracy = processEntireDataset(params)
print('Pairs:', params['usePeakPairs'], 'nFFT:', params['nFFT'], 'maxPeaksPerSecond:', params['maxPeaksPerSecond'],
      'PeakDetectionMinDistance:', params['peakDetectionMinDistance'],
     'fanOut:', params['fanOut'], 'targetZoneK:', params['targetZoneK'], 'targetZoneT:', params['targetZoneT'])
print('Time taken to compute fingerprints:', '{:04.1f}'.format(fingerprintComputationTime),
      ' Time taken to compute matches:', '{:04.1f}'.format(matchingComputationTime),
      ' Accuracy:', '{:04.1f}'.format(accuracy))
params = {
    'usePeakPairs':True,
    'hopLength':512,
    'nFFT':1024,
    'maxPeaksPerSecond':15,
    'peakDetectionMinDistance':10,
    'fMax':4000,
    'fanOut':10,
    'targetZoneK':100,
    'targetZoneT':15,
    'nBest':5
}
fingerprintComputationTime, matchingComputationTime, accuracy = processEntireDataset(params)
print('Pairs:', params['usePeakPairs'], 'nFFT:', params['nFFT'], 'maxPeaksPerSecond:', params['maxPeaksPerSecond'],
      'PeakDetectionMinDistance:', params['peakDetectionMinDistance'],
     'fanOut:', params['fanOut'], 'targetZoneK:', params['targetZoneK'], 'targetZoneT:', params['targetZoneT'])
print('Time taken to compute fingerprints:', '{:04.1f}'.format(fingerprintComputationTime),
      ' Time taken to compute matches:', '{:04.1f}'.format(matchingComputationTime),
      ' Accuracy:', '{:04.1f}'.format(accuracy))

# Next cell not intended for marker use

# Perform a Grid Search

In [None]:
#Grid Search over some parameters
#Peak Detection Parameters
nffts = [1024] #[1024, 2048]
peaksPerSeconds = [10] #[10, 20, 30] #Try 10 again?!
peakMinDistances = [10] #[4, 10, 20]
#Pairwise Matching Parameters
fanOuts = [10] # [5, 10]
targetZoneKs = [400] # [20, 50, 100]
targetZoneTs = [15, 25, 35]
for nFFT in nffts:
    for maxPeaksPerSecond in peaksPerSeconds:
        for peakDetectionMinDistance in peakMinDistances:
            for fanOut in fanOuts:
                for targetZoneK in targetZoneKs:
                    for targetZoneT in targetZoneTs:
                        params = {
                            'usePeakPairs':True,
                            'hopLength':512, #Spectrogram hop length
                            'nFFT':nFFT, #Defines the number of bins in the stft or melspectrogram.
                            'maxPeaksPerSecond':maxPeaksPerSecond, #
                            'peakDetectionMinDistance':peakDetectionMinDistance,
                            'fMax':4000, #only used for melspectrogram
                            'fanOut':fanOut,
                            'targetZoneK':targetZoneK,
                            'targetZoneT':targetZoneT
                        }

                        fingerprintComputationTime, matchingComputationTime, accuracy = processEntireDataset(params)
                        print('Pairs:', params['usePeakPairs'], 'nFFT:', params['nFFT'], 'maxPeaksPerSecond:', params['maxPeaksPerSecond'], 'PeakDetectionMinDistance:', params['peakDetectionMinDistance'],
                             'fanOut:', params['fanOut'], 'targetZoneK:', params['targetZoneK'], 'targetZoneT:', params['targetZoneT'])
                        print('Time taken to compute fingerprints:', '{:04.1f}'.format(fingerprintComputationTime), ' Time taken to compute matches:', '{:04.1f}'.format(matchingComputationTime), ' Accuracy:', '{:04.1f}'.format(accuracy))

# Next cell not intended for marker use.

# Using line_profiler on one file

Turns out most of the load is coming from loading the .npy files! And therefore the timing is mostly dependant on the database fingerprint size.

In [42]:
params = {
    'usePeakPairs':True,
    'hopLength':512, 
    'nFFT':1024,
    'maxPeaksPerSecond':15,
    'peakDetectionMinDistance':10,
    'fMax':4000, 
    'fanOut':10,
    'targetZoneK':100,
    'targetZoneT':15,
    'nBest':3
}
queryFile = 'query_recordings/classical.00000-snippet-10-0.wav'
entryname = 'classical.00000-snippet-10-0'
queryFingerprint = computeFingerprint(queryFile, params)
path_to_fingerprints = 'database_fingerprints'

if params['usePeakPairs']:
    np.save(path_to_fingerprints + '/' + entryname + '_print_pairs.npy', queryFingerprint)
else:
    np.save(path_to_fingerprints + '/' + entryname + '_print.npy', queryFingerprint)

#actualFile = 'database_recordings/classical.00000_print.npy'
#actualFingerprint = np.load(actualFile,allow_pickle=True)[()]
%lprun -f findMatchesForQuery findMatchesForQuery(queryFile, params, path_to_fingerprints=path_to_fingerprints)
#scores = computeMatchingFunction

# Functions and cells required for assignment

The above code was all useful for me in doing the assignment. The following two functions are for marking purposes. The cells following these functions have example uses of them.

In [44]:
def fingerprintBuilder(pathToDatabase, pathToFingerprints):
    """Build fingerprints for the entire database of recordings.
    
    Inputs: pathToDatabase. The name of the folder (located in the same directory as this notebook) where the database recordings are held.
            pathToFingerprints. The name of the folder (located in the same directory as this notebook) where the fingerprints will be saved to."""
    print('Building fingerprints for all files located in ' + pathToDatabase + ' and placing them in ' + pathToFingerprints)
    params = {
        'usePeakPairs':True,
        'hopLength':512, 
        'nFFT':1024,
        'maxPeaksPerSecond':15,
        'peakDetectionMinDistance':10,
        'fMax':4000, 
        'fanOut':10,
        'targetZoneK':100,
        'targetZoneT':15,
        'nBest':3
    }
    computeFingerprintsForEntireDataset(params, path_to_database=pathToDatabase, path_to_fingerprints=pathToFingerprints)
    print('Finished building fingerprints.')

    
def audioIdentification(pathToQueryset, pathToFingerprints, pathToOutput):
    """Compute matches for all possible query recordings in the query set.
    
    Inputs: pathToQueryset. The name of the folder (located in the same directory as this notebook)
                                where the queryset recordings are held.
            pathToFingerprints. The name of the folder (located in the same directory as this notebook)
                                where the fingerprints (calculated using fingerprintBuilder) are held.
            pathToOutput. The name of the .txt file (located in the same directory as this notebook)
                            where the output from audio matching is saved to. 
            """
    print('Computing matches for all possible query recordings in ' + pathToQueryset + ' and placing the results in ' + pathToOutput)
    params = {
        'usePeakPairs':True,
        'hopLength':512, 
        'nFFT':1024, 
        'maxPeaksPerSecond':15,
        'peakDetectionMinDistance':10,
        'fMax':4000,
        'fanOut':10,
        'targetZoneK':100,
        'targetZoneT':15,
        'nBest':3
    }

    with open(pathToOutput, 'w') as outputFile:
        for entry in os.scandir(pathToQueryset):
            if entry.path[-4:] == '.wav':
                queryFile = entry.path
                scores = findMatchesForQuery(queryFile, params, path_to_fingerprints=pathToFingerprints)
                bestScoringRecordings = getNBestScoringRecordings(scores, N=params['nBest'])
            
                rowText = entry.name
                for recording in bestScoringRecordings:
                    querySong, matchedSong = getSongNamesFromFileNames(queryFile, recording)
                    rowText = rowText + '\t' + matchedSong + '.wav'
                outputFile.write(rowText + '\n')
    print('Finished computing all matches.')
                    

In [45]:
pathToDatabaseRecordings = 'database_recordings'
pathToFingerprints = 'database_fingerprints'
pathToQueryset = 'query_recordings'
pathToOutput = 'output.txt'

In [46]:
fingerprintBuilder(pathToDatabaseRecordings, pathToFingerprints)

Building fingerprints for all files located in database_recordings and placing them in database_fingerprints
Finished building fingerprints.


In [47]:
audioIdentification(pathToQueryset, pathToFingerprints, pathToOutput)

Computing matches for all possible query recordings in query_recordings and placing the results in output.txt
Finished computing all matches.
