In [242]:
import numpy as np
import sounddevice as SD
from scipy.io import wavfile
import scipy.fft as fft
import matplotlib.pyplot as plt
import numba as nb
import os
import functools
import time
import math
def time_func(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        t0 = time.perf_counter()
        value = func(*args, **kwargs)
        t = time.perf_counter()-t0
        print(func.__name__, t)
        return value
    return wrapper

@nb.njit(['i2[:,:](i2[:],i4)','i4[:,:](i4[:],i4)', 'i8[:,:](i8[:],i8)'], parallel = True)
def Chunking_Sample(sample, chunk_size):
    ''' This is an optimised chunking algorithm to split
     a sample into a 2D array of time slices
    '''
    result_type = sample.dtype
    chunks = int(np.floor(sample.size/chunk_size))
    result = np.zeros((chunks,chunk_size), dtype=result_type)
    for chunk_num in nb.prange(chunks):
        result[chunk_num] = sample[chunk_num*chunk_size : (chunk_num+1)*chunk_size]
    return result
@nb.njit(['i2[:,:,:](i2[:,:],i4)','i4[:,:,:](i4[:,:],i4)', 'i8[:,:,:](i8[:,:],i8)'], parallel = True)
def Chunking_Sample_ARRAY(sample, chunk_size):
    ''' This is an optimised chunking algorithm to split
     a sample into a 2D array of time slices
    '''
    result_type = sample.dtype
    chunks = int(np.floor(sample.shape[-1]/chunk_size))
    result = np.zeros((sample.shape[0],chunks,chunk_size), dtype=result_type)
    for i in nb.prange(sample.shape[0]):
        for chunk_num in nb.prange(chunks):
            result[i, chunk_num] = sample[i, chunk_num*chunk_size : (chunk_num+1)*chunk_size]
    return result
@nb.njit('f8[:,:](f8[:],i8, i2)', parallel = True)
def Chunking_Multisample(data, chunksize, chunks):
    start = np.random.choice(data.size-chunksize, size = chunks)
    # take a slice of the data from the song
    # data = data[start:start+int(rate*record_duration)]
    result = np.zeros((chunks, chunksize), dtype = data.dtype)
    for i in nb.prange(start.shape[0]):
        result[i] = data[start[i]:start[i]+chunksize]
    return result


@nb.njit('Tuple((f8[:,:,:],f8[:,:]))(f8[:], c16[:,:,:],f8,i4)', parallel = True)
def hashed_freqs_ARRAY(freqs, fourier, time_slices, octave_factor):
    ''' Function to generate a set of times and the peaks of the fourier transform 
    for a sample of music'''
    semitone_steps = int(12/octave_factor)
    base_range = 5
    range_lower = int(base_range*octave_factor)
    range_upper = int(base_range*octave_factor) - (octave_factor-1)
    semitones = np.array([i*semitone_steps for i in range(-range_lower,range_upper,1)], dtype=np.int32)
    bin_edges = note_frequencies(semitones, 440)
    bin_edges_reduced = [bin_edges[0]]
    for i in range(len(bin_edges)-1):
        if bin_edges[i+1] - bin_edges[i] > 12:
            bin_edges_reduced.append(bin_edges[i+1])
    bin_edges = bin_edges_reduced.copy()
    bins = len(bin_edges)-1
    peaks = np.empty((*fourier.shape[:-1],bins), dtype = np.float64)
    times = np.empty(fourier.shape[:-1], dtype = np.float64)
    # resolution gives the length of a section of each bin
    abs_data = np.abs(fourier)
    for test in nb.prange(fourier.shape[0]):
        for time in nb.prange(fourier.shape[1]):
            # iterates through each time slice of music
            for bin in nb.prange(bins):
                # works through each bin range to find the freq upper and lower limits
                upper_lim = bin_edges[bin+1]
                lower_lim = bin_edges[bin]
                # finds the indices of the freqs where this occurs
                indices = np.where((freqs <= upper_lim) & (freqs > lower_lim))[0]
                # finds the peak in this range
                peak = np.where(abs_data[test][time][indices]== np.max(abs_data[test][time][indices]))[0][0]
                # appends the position of the peak within the overall data set to an array
                peaks[test][time][bin] = freqs[np.int32(peak+indices[0])]
            # calculates the time where this time slice occurred
            times[test][time] = time*time_slices
    return peaks, times

@nb.njit(['f8[:](i4[:], i4)', 'f8(i4, i4)'])
def note_frequencies(n, f0):
    ''' This simply returns the frequencies of the notes along the equal temperament scale'''
    ''' f0 -> fundamental frequency / Hz
        n -> number of halfsteps from note'''
    return f0 * np.power(2,n/12)

@nb.njit('Tuple((i4[:,:],f8[:]))(f8[:], c16[:,:],f8,i4)', parallel = True)
def hashed_freqs_nonuniform(freqs, fourier, time_slices, octave_factor):
    ''' Function to generate a set of times and the peaks of the fourier transform 
    for a sample of music'''
    semitone_steps = int(12/octave_factor)
    base_range = 5
    range_lower = int(base_range*octave_factor)
    range_upper = int(base_range*octave_factor) - (octave_factor-1)
    semitones = np.array([i*semitone_steps for i in range(-range_lower,range_upper,1)], dtype=np.int32)
    bin_edges = note_frequencies(semitones, 440)
    bin_edges_reduced = [bin_edges[0]]
    for i in range(len(bin_edges)-1):
        if bin_edges[i+1] - bin_edges[i] > 12:
            bin_edges_reduced.append(bin_edges[i+1])
    bin_edges = bin_edges_reduced.copy()
    bins = len(bin_edges)-1
    peaks = np.empty((fourier.shape[0],bins), dtype = np.int32)
    times = np.empty(fourier.shape[0])
    # resolution gives the length of a section of each bin
    for time in nb.prange(fourier.shape[0]):
        # iterates through each time slice of music
        for bin_num in range(bins):
            # works through each bin range to find the freq upper and lower limits
            upper_lim = bin_edges[bin_num+1]
            lower_lim = bin_edges[bin_num]
            # finds the indices of the freqs where this occurs
            indices = np.where((freqs <= upper_lim) & (freqs > lower_lim))[0]
            # finds the peak in this range
            peak = np.where(np.abs(fourier[time][indices]) == np.max(np.abs(fourier[time][indices])))[0][0]
            # appends the position of the peak within the overall data set to an array
            peaks[time][bin_num] = np.int32(peak+indices[0])
        # calculates the time where this time slice occurred
        times[time] = time*time_slices
    # print(peaks)
    return peaks, times

def create_fingerprint_song(file_name: str,time_slice: float,octave_factor = 1):
    '''This function takes a song that has been downloaded to the music file
        Then generates the fingerprint of the song and saves the fingerprint data to a file'''
    rate, data = wavfile.read('../Music/{}'.format(file_name)) # read in the wav file
    data = np.sum(data,axis = -1, dtype = np.int32) # converts stereo to mono
    fingerprints = Chunking_Sample(data,int(time_slice*rate)) # chunks the data
    # calculate fourier data
    Fourier_fingerprints = fft.rfft(fingerprints)
    Freqs_fingerprints = fft.rfftfreq(fingerprints.shape[-1], 1/rate)
    # finds the peaks
    peaks, times = hashed_freqs_nonuniform(Freqs_fingerprints,Fourier_fingerprints,time_slice,octave_factor)
    # rounds the times to the decimal places of the time slice (removes floating point error)
    times = np.round(times, int(np.log10(float(str(time_slice)[::-1])))+2)
    # creates a dictionary of data
    hash_table = {}
    hash_table['times'] = times
    hash_table['fingerprint'] = Freqs_fingerprints[peaks]
    # saves the data to a file
    np.savez('./Fingerprints/{}.npz'.format(file_name[:-4]), **hash_table)
    return hash_table

def create_fingerprint_sample(fingerprints, rate, time_slice: float,octave_factor = 1):
    '''Function to generate a fingerprint of a sample that can then be compared
        to the database '''
    Fourier_fingerprints = fft.rfft(fingerprints)
    Freqs_fingerprints = fft.rfftfreq(fingerprints.shape[-1], 1/rate)
    peaks, times = hashed_freqs_nonuniform(Freqs_fingerprints,Fourier_fingerprints,time_slice,octave_factor)
    times = np.round(times, int(np.log10(float(str(time_slice)[::-1])))+2)
    return times, Freqs_fingerprints[peaks]

def create_fingerprint_sample_ARRAY(fingerprints, rate, time_slice: float,octave_factor = 1):
    '''Function to generate a fingerprint of a sample that can then be compared
        to the database '''
    Fourier_fingerprints = fft.rfft(fingerprints)
    Freqs_fingerprints = fft.rfftfreq(fingerprints.shape[-1], 1/rate)
    peaks, times = hashed_freqs_ARRAY(Freqs_fingerprints,Fourier_fingerprints,time_slice,octave_factor)
    times = np.round(times, int(np.log10(float(str(time_slice)[::-1])))+2)
    return times, peaks

@nb.njit('Tuple((f8[:,:,:],f8[:,:,:]))(f8[:],f8[:,:],f8[:,:],f8[:])', parallel = True)
def match_fingerprints(times_database, fingerprint_database, fingerprint_sample, times_sample):
    ''' Function to find the time difference and frequency differences between the sample of a song and a song in the database'''
    result_fingerprints = np.zeros((fingerprint_database.shape[0],fingerprint_sample.shape[0] , fingerprint_sample.shape[1]), dtype = np.float64)
    result_times = np.zeros((times_database.size, times_sample.size, fingerprint_database.shape[-1]), dtype = np.float64)
    # goes through the database time slices
    for i in nb.prange(fingerprint_database.shape[0]):
        # then the sample time slices of data
        for j in nb.prange(fingerprint_sample.shape[0]):
            # calculates the time difference between the values
            temp = times_database[i] - times_sample[j]
            # then iterates through the frequency axis 
            for k in nb.prange(fingerprint_database.shape[1]):
                # result_times puts the same time for each frequency along the same timeslice
                result_times[i,j,k] = temp
                # result_fingerprint holds the difference between the Fourier peaks in the smaple and the database song
                result_fingerprints[i,j,k] = fingerprint_database[i,k] - fingerprint_sample[j,k]
    return result_fingerprints, result_times

''' The functions below are for calculating noise data to generate noise on a sample digitally'''
def P_to_DB(sig_avg_power):
    return 10*np.log10(sig_avg_power)
def Noise_Data(target_SNR, signal_avg_DB):
    noise_avg_DB = signal_avg_DB - target_SNR
    noise_avg_watts = np.power(10. , noise_avg_DB/10)
    return noise_avg_DB, noise_avg_watts
def generate_noise(mean_noise, noise_avg_watts, data_length):
    return np.random.normal(mean_noise, np.sqrt(noise_avg_watts), data_length)
####################
# White Noise addition
def gen_white_noise(target_SNR, data):
    '''given a target SNR an array of white noise is generated and then summed with the data'''
    target_SNR_DB = 10*np.log10(target_SNR)
    signal_average_power = np.average(data**2)
    sig_avg_DB = P_to_DB(signal_average_power)
    noise_avg_DB, noise_avg_power = Noise_Data(target_SNR_DB, sig_avg_DB)
    print('sig power',signal_average_power,'DB',sig_avg_DB,'\n noise DB',noise_avg_DB, 'power',noise_avg_power,'\n ratio', signal_average_power/noise_avg_power)
    white_noise = generate_noise(0, noise_avg_power,data.size)
    noisy_data = np.int32(data+white_noise)
    return noisy_data
################

################
# background noise addition
def gen_background_noise(data, record_duration, target_SNR):
    background_sounds = os.listdir('../BackgroundNoise/')
    random_sample = np.random.randint(len(background_sounds))
    background_rate, background_data = wavfile.read('../BackgroundNoise/{}'.format(background_sounds[random_sample]))
    # print(background_sounds[random_sample])
    background_data = np.sum(background_data,axis = -1, dtype = np.int32)
    background_data = background_data * np.sqrt(np.average(np.int32(data)**2)/np.average(background_data**2))*(1/np.sqrt(target_SNR))
    # print('background noise data', background_rate, background_data.shape)
    background_start = np.random.randint(background_data.size-int(background_rate*record_duration))
    # take a slice of the data from the song
    random_background_sample = background_data[background_start:background_start+int(background_rate*record_duration)]
    sample_plus_background = np.int32(data+random_background_sample)
    return sample_plus_background, background_rate, background_sounds[random_sample]

def gen_background_noise_ARRAY(data, record_duration, target_SNR, tests, random_sample):
    background_sounds = os.listdir('../BackgroundNoise/')
    # random_sample = np.random.randint(len(background_sounds))
    background_rate, background_data = wavfile.read('../BackgroundNoise/{}'.format(background_sounds[random_sample]))
    # print(background_sounds[random_sample])
    background_data = np.sum(background_data,axis = -1, dtype = np.int64)
    background_data = background_data * np.sqrt(np.average(np.int64(data)**2)/np.average(background_data**2))*(1/np.sqrt(target_SNR))
    # take a slice of the data from the song
    result = Chunking_Multisample(background_data,int(background_rate*record_duration), tests)
    sample_plus_background = np.int64(data+result)
    return sample_plus_background, background_rate, background_sounds[random_sample]


def run_match(fingerprint_sample, times_sample):
    '''runs the match finding and outputs the results'''
    database = os.listdir('../Fingerprints/')
    result = {}
    # go through each entry in the database
    for entry in database:
        # load the data
        with np.load('../Fingerprints/{}'.format(entry)) as data:
            fingerprints = data['fingerprint']
            times = data['times']
            freq_range = 1. # the chosen difference in frequency values permitted
            # creates the arrays of frequency differences and time_differences
            freq_diff, time_diff = match_fingerprints(times, fingerprints, fingerprint_sample, times_sample)
            # finds where the frequency differences are within the range of values
            indexes = np.where(np.abs(freq_diff) < freq_range)
            # reduces the frequencies into this range of close frequencies
            reduced_freqs, reduced_times = freq_diff[indexes], time_diff[indexes]
            # creates a histogram of the times where the frequencies were within the range
            hist, hist_bins = np.histogram(reduced_times,  bins = 250)
            # finds the maximum value of each histogram and the difference of the max value from the average
            result[entry] = np.max(hist)-np.average(hist)
    return result, hist, hist_bins, freq_diff, times, times_sample

@nb.njit('f8[:](f8[:], i4)')
def get_bin_edges(arr, bins):
    bin_edges = np.zeros((bins+1), dtype = np.float64)
    arr_min = arr.min()
    arr_max = arr.max()
    delta = (arr_max - arr_min)/bins
    for i in range(bin_edges.shape[0]):
        bin_edges[i] = arr_min + i * delta
    bin_edges[-1] = arr_max  # Avoid roundoff error on last point
    return bin_edges

@nb.njit('Tuple((i8[:],f8[:]))(f8[:], i4)', parallel = True)
def nb_hist(arr, bins):
    hist = np.zeros((bins), dtype = np.int64)
    bin_edges = get_bin_edges(arr, bins)
    arr_min = bin_edges[0]
    arr_max = bin_edges[-1]
    # print('minmax', arr_min, arr_max)
    arr = arr.flatten()
    rnge = arr_max - arr_min
    for val in nb.prange(arr.size):
        if arr[val] == arr_max:
            bin_num = bins-1
        else:
            bin_num = np.int64(bins * (arr[val] - arr_min) //rnge)
        if bin_num < 0 or bin_num >= bins:
            pass
        else:
            hist[bin_num] += 1
    return hist, bin_edges
@nb.njit('Tuple((f8[:,:],f8[:,:]))(f8[:],f8[:,:],f8[:,:],f8[:])', parallel = True)
def match_fingerprints_new_version(times_database, fingerprint_database, fingerprint_sample, times_sample):
    ''' Function to find the time difference and frequency differences between the sample of a song and a song in the database'''
    result_fingerprints = np.zeros((fingerprint_database.shape[0],fingerprint_sample.shape[0]), dtype = np.float64)
    result_times = np.zeros((times_database.size, times_sample.size), dtype = np.float64)
    # goes through the database time slices
    for i in nb.prange(fingerprint_database.shape[0]):
        # then the sample time slices of data
        for j in range(fingerprint_sample.shape[0]):
            # calculates the time difference between the values
            result_times[i,j] = times_database[i] - times_sample[j]
            # then iterates through the frequency axis
            cum_sum = 0
            for k in range(fingerprint_database.shape[1]):
                #  holds the difference between the Fourier peaks in the smaple and the database song
                cum_sum += abs(fingerprint_database[i,k] - fingerprint_sample[j,k])
                # /(fingerprint_database[i,k]**2+fingerprint_sample[j,k]**2)**0.5
            result_fingerprints[i,j] = cum_sum/fingerprint_database.shape[1]
    return result_fingerprints, result_times
@nb.njit('f8[:](f8[:],f8[:,:],f8[:,:,:],f8[:,:], i4)', parallel = True)
def match_arrays(song_times, song_fingerprints,mult_fingerprints, mult_time_samples, bins):
    freq_range = 1.
    similarities = np.zeros(mult_fingerprints.shape[0], dtype = np.float64)
    for m in nb.prange(mult_fingerprints.shape[0]):
        freq_diff_sum = np.zeros((song_fingerprints.shape[0],mult_fingerprints[m].shape[0]), dtype = np.float64)
        time_diff = np.zeros((song_times.size, mult_time_samples[m].size), dtype = np.float64)
        # goes through the database time slices
        for i in nb.prange(song_fingerprints.shape[0]):
            # then the sample time slices of data
            for j in nb.prange(mult_fingerprints[m].shape[0]):
                # calculates the time difference between the values
                time_diff[i,j] = song_times[i] - mult_time_samples[m][j]
                # then iterates through the frequency axis
                cum_sum = 0
                for k in range(song_fingerprints.shape[1]):
                    #  holds the difference between the Fourier peaks in the smaple and the database song
                    cum_sum += abs(song_fingerprints[i,k] - mult_fingerprints[m][j,k])
                freq_diff_sum[i,j] = cum_sum
        
        # freq_diff_sum, time_diff = match_fingerprints_new_version(song_times, song_fingerprints, mult_fingerprints[i], mult_time_samples[i])
        reduced_indexes = np.where(freq_diff_sum < 0.5*np.average(freq_diff_sum))
        # print(freq_diff_sum.shape)
        # print(reduced_indexes[-1].size)
        reduced_times = np.zeros(reduced_indexes[-1].size, dtype = np.float64)
        for n in nb.prange(reduced_indexes[-1].size):
            reduced_times[n] = time_diff[reduced_indexes[0][n],reduced_indexes[1][n]]
        hist, hist_bins = nb_hist(reduced_times, bins)
        similarities[m] = np.max(hist) - np.average(hist)
        # - np.average(hist)
    return similarities


def run_match_ARRAY(fingerprint_sample, times_sample, database = None):
    if not database == None:
        pass
    else:
        '''runs the match finding and outputs the results'''
        database = os.listdir('../Fingerprints/')
        result = {}
        # go through each entry in the database
        for entry in database:
            # load the data
            with np.load('../Fingerprints/{}'.format(entry)) as data:
                fingerprints = data['fingerprint']
                times = data['times']
                # print(fingerprint_sample.shape)
                similarity  = match_arrays(times, fingerprints, fingerprint_sample, times_sample, bins = 250)
                result[entry] = similarity
        return result

@nb.njit('f8(i4[:], i4[:])')
def test_results(true_song_indexes,  obtained_song_indexes):
    accuracy = 0
    for i in range(true_song_indexes.size):
        if obtained_song_indexes[i] == true_song_indexes[i]:
            accuracy += 1
    return accuracy / true_song_indexes.size


In [240]:
''' This is a theoretical tester to look for similarity'''
import time
import numpy as np
from scipy.io import wavfile
import os



def run_sim(target_SNR, time_slice):
    entries = os.listdir('../Music/')
    amplitude = 0.2 # reduce the amplitude of the samples to reduce clipping
    record_duration = 20 # how long the sample should be
    # test_slice = int(np.floor(record_duration/time_slice))
    # slices = int(np.floor(record_duration/time_slice))
    # choose a random song
    song = entries[np.random.randint(len(entries))]
    print(song)
    # access the song data
    rate, data = wavfile.read('../Music/{}'.format(song))
    # convert to mono and int32 so that squaring the data doesn't overflow the max number allowed
    data = np.sum(data,axis = -1, dtype = np.int32)*amplitude
    # choose a random place in the song
    start = np.random.randint(data.size-int(rate*record_duration))
    # take a slice of the data from the song
    data = data[start:start+int(rate*record_duration)]
    play_data = np.int16(data)
    data_sample, background_rate, sound = gen_background_noise(data, record_duration, target_SNR= target_SNR)
    play_data_sample = np.int16(data_sample)
    # chunk the sample
    fingerprints = Chunking_Sample(data_sample,int(time_slice*background_rate))
    # create the fingerprint of the sample
    times_sample, fingerprint_sample = create_fingerprint_sample(fingerprints,rate,time_slice,bins = bins)
    # access the fingerprints database
    result, hist, hist_bins, plt_freqs, orig_times, sample_times = run_match(fingerprint_sample, times_sample)
    # print(result)
    # the result where there is maximal difference between the average and the peak is the song that is playing
    # print('the song playing', str(max(result, key=result.get))[:-4])
    # print(song[:-4] == str(max(result, key=result.get))[:-4])
    return play_data, play_data_sample, rate, song[:-4] == str(max(result, key=result.get))[:-4], hist, hist_bins, plt_freqs, orig_times, sample_times, str(max(result, key=result.get))[:-4]

def gen_samples_array(target_SNR, tests, noise_sample, record_duration = 20, time_slice = 0.1, octave_factor = 1, entries_data = None):
    if not entries_data == None:
        database_times, database_fingerprints, rate, songs = entries_data
        amplitude = 0.2 # reduce the amplitude of the samples to reduce clipping
        # test_slice = int(np.floor(record_duration/time_slice))
        # slices = int(np.floor(record_duration/time_slice))
        song_samples = tests//database_times.shape[0]
        # print('song samples', song_samples)
        song_indexes = np.arange(database_times.shape[0])
        comparison_array = np.repeat(song_indexes, song_samples)
        background_sounds = os.listdir('../BackgroundNoise/')
        if noise_sample == None:
            noise_sample = np.random.randint(len(background_sounds))
        # print(background_sounds[noise_sample])
        total_data_sample = np.zeros((int(song_samples*database_times.shape[0]),int(rate*record_duration)), dtype=np.int64)
        
        for j in range(database_times.shape[0]):
            # choose a random song
            song = songs[j]
            # access the song data
            data = database_fingerprints[j]
            # convert to mono and int32 so that squaring the data doesn't overflow the max number allowed
            data = np.int64(data)
            data = np.sum(data,axis = -1, dtype = np.int64)*amplitude
            result = Chunking_Multisample(data, int(rate*record_duration), song_samples)
            play_data = np.int16(data)
            result_passthrough = np.int64(result)
            data_sample, background_rate, sound = gen_background_noise_ARRAY(result_passthrough, record_duration, target_SNR, song_samples, noise_sample)
            total_data_sample[j*song_samples: (j+1)*song_samples] = data_sample.copy()
        play_data_sample = np.int16(data_sample)
        fingerprints = Chunking_Sample_ARRAY(total_data_sample,int(time_slice*background_rate))
        times_sample, fingerprint_sample = create_fingerprint_sample_ARRAY(fingerprints,rate,time_slice,octave_factor = octave_factor)
        return fingerprint_sample, times_sample, song_samples, comparison_array
    else:
        entries = os.listdir('../Music/')
        amplitude = 0.2 # reduce the amplitude of the samples to reduce clipping
        # test_slice = int(np.floor(record_duration/time_slice))
        # slices = int(np.floor(record_duration/time_slice))
        song_samples = tests//len(entries)
        # print('song samples', song_samples)
        song_indexes = np.arange(len(entries))
        comparison_array = np.repeat(song_indexes, song_samples)
        background_sounds = os.listdir('../BackgroundNoise/')
        if noise_sample == None:
            noise_sample = np.random.randint(len(background_sounds))
        # print(background_sounds[noise_sample])
        song = entries[0]
        rate, _ = wavfile.read('../Music/{}'.format(song))
        del _
        total_data_sample = np.zeros((int(song_samples*len(entries)),int(rate*record_duration)), dtype=np.int64)
        for j in range(len(entries)):
            # choose a random song
            song = entries[j]
            # access the song data
            rate, data = wavfile.read('../Music/{}'.format(song))
            # convert to mono and int32 so that squaring the data doesn't overflow the max number allowed
            data = np.int64(data)
            data = np.sum(data,axis = -1, dtype = np.int64)*amplitude
            result = Chunking_Multisample(data, int(rate*record_duration), song_samples)
            del data
            result_passthrough = np.int64(result)
            data_sample, background_rate, sound = gen_background_noise_ARRAY(result_passthrough, record_duration, target_SNR, song_samples, noise_sample)
            del result_passthrough
            total_data_sample[j*song_samples: (j+1)*song_samples] = data_sample.copy()
        del data_sample, result
        fingerprints = Chunking_Sample_ARRAY(total_data_sample,int(time_slice*background_rate))
        del total_data_sample
        times_sample, fingerprint_sample = create_fingerprint_sample_ARRAY(fingerprints,rate,time_slice,octave_factor = octave_factor)
        del fingerprints
        return fingerprint_sample, times_sample, song_samples, comparison_array

def run_sim_array(tests, choice = True, optional_data = None, noise_sample = None,target_SNR = None, entries_data = None):
    if not entries_data == None:
        pass
    else:
        entries = os.listdir('../Music/')
        if choice == True:
            fingerprint_sample, times_sample, song_samples, comparison_array = gen_samples_array(target_SNR, tests, noise_sample)
        else:
            fingerprint_sample, times_sample, song_samples, comparison_array = optional_data
        
        
        result = run_match_ARRAY(fingerprint_sample, times_sample)
        template = np.zeros((len(entries), int(len(entries)*song_samples)))
        for index, val in enumerate(result.keys()):
            template[index] = result[val]
        maxes = np.max(template, axis = 0)
        indexes_max = np.zeros(maxes.size)
        for i in range(maxes.size):
            if np.where(template[:,i] == maxes[i])[0].size > 1:
                indexes_max[i] = np.where(template[:,i] == maxes[i])[0][0]
            else:
                indexes_max[i] = np.where(template[:,i] == maxes[i])[0]
        comparison_array = np.int32(comparison_array)
        accuracy = test_results(comparison_array, np.int32(indexes_max))
        return accuracy


ind  = 4
background_sounds = os.listdir('../BackgroundNoise/')
print(background_sounds[ind])

People Talking Sound Effect.wav


In [243]:
tests = 20
SNRs = np.logspace(-4, 1.5, 50, endpoint = True)
accuracies = []
ind  = 4
entries = os.listdir('../Music/') # access the music wav database
octave_factor = 2
record_duration = 20
time_slice  = 0.085
for entry in entries:
    hashes = create_fingerprint_song(entry, time_slice, octave_factor=octave_factor)

t0 = time.perf_counter()
fingerprint_sample, times_sample, song_samples, comparison_array = gen_samples_array(100, tests, ind, record_duration=record_duration, time_slice=time_slice, octave_factor=octave_factor)
print('data gen', time.perf_counter()-t0)
t0 = time.perf_counter()
final_accuracy = run_sim_array(tests, False, (fingerprint_sample, times_sample, song_samples, comparison_array))
print('data manipulation',time.perf_counter() - t0)
print(final_accuracy)


data gen 2.0798782000001665
data manipulation 0.30941079999865906
0.16666666666666666


In [2]:
import numpy as np
import math

bin_ratio = 2
rng = 4700
bins  = 20
bin_size = rng/(bin_ratio**bins)

bin_boundaries = [0]
for i in range(bins):
    bin_boundaries.append(bin_boundaries[-1]+ bin_size)
    bin_size *= bin_ratio


print(bin_boundaries)

import math

[0, 0.004482269287109375, 0.013446807861328125, 0.031375885009765625, 0.06723403930664062, 0.13895034790039062, 0.2823829650878906, 0.5692481994628906, 1.1429786682128906, 2.2904396057128906, 4.585361480712891, 9.17520523071289, 18.35489273071289, 36.71426773071289, 73.43301773071289, 146.8705177307129, 293.7455177307129, 587.4955177307129, 1174.995517730713, 2349.995517730713, 4699.995517730713]


In [104]:
import numpy as np
import sounddevice as SD
from scipy.io import wavfile
import scipy.fft as fft
import matplotlib.pyplot as plt
import numba as nb
import os
import functools
import time
import math
@nb.njit(['f8[:](i4[:], i4)', 'f8(i4, i4)'])
def note_frequencies(n, f0):
    ''' This simply returns the frequencies of the notes along the equal temperament scale'''
    ''' f0 -> fundamental frequency / Hz
        n -> number of halfsteps from note'''
    return f0 * np.power(2,n/12)

@nb.njit('Tuple((i4[:,:],f8[:]))(f8[:], c16[:,:],f8,i4)', parallel = True)
def hashed_freqs_nonuniform(freqs, fourier, time_slices, octave_factor):
    ''' Function to generate a set of times and the peaks of the fourier transform 
    for a sample of music'''
    semitone_steps = int(12/octave_factor)
    base_range = 5
    range_lower = int(base_range*octave_factor)
    range_upper = int(base_range*octave_factor) - (octave_factor-1)
    semitones = np.array([i*semitone_steps for i in range(-range_lower,range_upper,1)], dtype=np.int32)
    bin_edges = note_frequencies(semitones, 440)

    peaks = np.empty((fourier.shape[0],bins), dtype = np.int32)
    times = np.empty(fourier.shape[0])
    # resolution gives the length of a section of each bin
    for time in nb.prange(fourier.shape[0]):
        # iterates through each time slice of music
        for bin in nb.prange(len(bin_edges)-1):
            # works through each bin range to find the freq upper and lower limits
            upper_lim = bin_edges[bin]
            lower_lim = bin_edges[bin+1]
            # finds the indices of the freqs where this occurs
            indices = np.where((freqs <= upper_lim) & (freqs > lower_lim))[0]
            # finds the peak in this range
            peak = np.where(np.abs(fourier[time][indices]) == np.max(np.abs(fourier[time][indices])))[0][0]
            # appends the position of the peak within the overall data set to an array
            peaks[time][bin] = np.int32(peak)
        # calculates the time where this time slice occurred
        times[time] = time*time_slices
    return peaks, times



In [221]:
octave_factor = 6
semitone_steps = int(12/octave_factor)
range_lower = int(4*octave_factor)
range_upper = int(5*octave_factor) - (octave_factor-1)
semitones = np.array([i*semitone_steps for i in range(-range_lower,range_upper,1)], dtype=np.int32)
bin_edges = note_frequencies(semitones, 440)

@nb.njit()
def remove_bins(bin_edges):
    bin_edges_reduced = [bin_edges[0]]
    for i in range(len(bin_edges)-1):
        if bin_edges[i+1] - bin_edges[i] > 12:
            bin_edges_reduced.append(bin_edges[i+1])
    return bin_edges_reduced

print(remove_bins(bin_edges))


[27.5, 110.0, 123.47082531403103, 138.59131548843604, 155.56349186104043, 174.61411571650194, 195.99771799087463, 220.0, 246.94165062806206, 277.1826309768721, 311.12698372208087, 349.2282314330039, 391.99543598174927, 440.0, 493.8833012561241, 554.3652619537442, 622.2539674441618, 698.4564628660078, 783.9908719634985, 880.0, 987.7666025122483, 1108.7305239074883, 1244.5079348883237, 1396.9129257320155, 1567.981743926997, 1760.0, 1975.533205024496, 2217.4610478149766, 2489.0158697766474, 2793.825851464031, 3135.9634878539946, 3520.0, 3951.066410048992, 4434.922095629953, 4978.031739553295, 5587.651702928062, 6271.926975707989, 7040.0]
