In [2]:
from microphone import record_audio
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import Audio
import librosa
import matplotlib.mlab as mlab
from scipy.ndimage.morphology import generate_binary_structure
from scipy.ndimage.morphology import iterate_structure
from typing import Tuple, List
from numba import njit

In [17]:
import pickle

In [3]:
def micRecord(time=10):
    frames, rate = record_audio(time)
    return np.hstack([np.frombuffer(i, np.int16) for i in frames]), rate

In [4]:
def getFile(path):
    recorded_audio, sampling_rate = librosa.load(path, 
                                                 sr=44100, 
                                                 mono=True, 
                                                 offset=1.5, 
                                                 duration=20)
    return recorded_audio, sampling_rate

In [5]:
def userinput():
    while True:
        audioType = input("u for Upload, r for Record: ")
        if audioType == 'u':
            path = input("Enter path to file: ")
            samples, rate = getFile(path)
            break
        elif audioType == 'r':
            samples, rate = micRecord()
            break
        print("Invalid input. Try again.")
    # print(rate)
    
    return samples, rate

In [6]:
def ecdf(data):
    """Returns (x) the sorted data and (y) the empirical cumulative-proportion
    of each datum.
    
    Parameters
    ----------
    data : numpy.ndarray, size-N
    
    Returns
    -------
    Tuple[numpy.ndarray shape-(N,), numpy.ndarray shape-(N,)]
        Sorted data, empirical CDF values"""
    data = np.asarray(data).ravel()  # flattens the data
    y = np.linspace(1 / len(data), 1, len(data))  # stores the cumulative proportion associated with each sorted datum
    x = np.sort(data)
    return x, y

In [7]:
@njit
def _peaks(
    data_2d: np.ndarray, nbrhd_row_offsets: np.ndarray, nbrhd_col_offsets: np.ndarray, amp_min: float
) -> List[Tuple[int, int]]:
    """
    A Numba-optimized 2-D peak-finding algorithm.
    
    Parameters
    ----------
    data_2d : numpy.ndarray, shape-(H, W)
        The 2D array of data in which local peaks will be detected.
    nbrhd_row_offsets : numpy.ndarray, shape-(N,)
        The row-index offsets used to traverse the local neighborhood.
        
        E.g., given the row/col-offsets (dr, dc), the element at 
        index (r+dr, c+dc) will reside in the neighborhood centered at (r, c).
    
    nbrhd_col_offsets : numpy.ndarray, shape-(N,)
        The col-index offsets used to traverse the local neighborhood. See
        `nbrhd_row_offsets` for more details.
        
    amp_min : float
        All amplitudes equal to or below this value are excluded from being
        local peaks.
    
    Returns
    -------
    List[Tuple[int, int]]
        (row, col) index pair for each local peak location, returned in 
        column-major order
    """
    peaks = []  # stores the (row, col) locations of all the local peaks

    # Iterating over each element in the the 2-D data 
    # in column-major ordering
    #
    # We want to see if there is a local peak located at
    # row=r, col=c
    for c, r in np.ndindex(*data_2d.shape[::-1]):
        if data_2d[r, c] <= amp_min:
            # The amplitude falls beneath the minimum threshold
            # thus this can't be a peak.
            continue
        
        # Iterating over the neighborhood centered on (r, c) to see
        # if (r, c) is associated with the largest value in that
        # neighborhood.
        #
        # dr: offset from r to visit neighbor
        # dc: offset from c to visit neighbor
        for dr, dc in zip(nbrhd_row_offsets, nbrhd_col_offsets):
            if dr == 0 and dc == 0:
                # This would compare (r, c) with itself.. skip!
                continue

            if not (0 <= r + dr < data_2d.shape[0]):
                # neighbor falls outside of boundary.. skip!
                continue

            if not (0 <= c + dc < data_2d.shape[1]):
                # neighbor falls outside of boundary.. skip!
                continue

            if data_2d[r, c] < data_2d[r + dr, c + dc]:
                # One of the amplitudes within the neighborhood
                # is larger, thus data_2d[r, c] cannot be a peak
                break
        else:
            # if we did not break from the for-loop then (r, c) is a local peak
            peaks.append((r, c))
    return peaks

In [8]:
def local_peak_locations(data_2d: np.ndarray, neighborhood: np.ndarray, amp_min: float):
    """
    Defines a local neighborhood and finds the local peaks
    in the spectrogram, which must be larger than the specified `amp_min`.
    
    Parameters
    ----------
    data_2d : numpy.ndarray, shape-(H, W)
        The 2D array of data in which local peaks will be detected
    
    neighborhood : numpy.ndarray, shape-(h, w)
        A boolean mask indicating the "neighborhood" in which each
        datum will be assessed to determine whether or not it is
        a local peak. h and w must be odd-valued numbers
        
    amp_min : float
        All amplitudes at and below this value are excluded from being local 
        peaks.
    
    Returns
    -------
    List[Tuple[int, int]]
        (row, col) index pair for each local peak location, returned
        in column-major ordering.
    
    Notes
    -----
    The local peaks are returned in column-major order, meaning that we 
    iterate over all nbrhd_row_offsets in a given column of `data_2d` in search for
    local peaks, and then move to the next column.
    """

    # We always want our neighborhood to have an odd number
    # of nbrhd_row_offsets and nbrhd_col_offsets so that it has a distinct center element
    assert neighborhood.shape[0] % 2 == 1
    assert neighborhood.shape[1] % 2 == 1
    
    # Find the indices of the 2D neighborhood where the 
    # values were `True`
    #
    # E.g. (row[i], col[i]) stores the row-col index for
    # the ith True value in the neighborhood (going in row-major order)
    nbrhd_row_indices, nbrhd_col_indices = np.where(neighborhood)
    

    # Shift the neighbor indices so that the center element resides 
    # at coordinate (0, 0) and that the center's neighbors are represented
    # by "offsets" from this center element.
    #
    # E.g., the neighbor above the center will has the offset (-1, 0), and 
    # the neighbor to the right of the center will have the offset (0, 1).
    nbrhd_row_offsets = nbrhd_row_indices - neighborhood.shape[0] // 2
    nbrhd_col_offsets = nbrhd_col_indices - neighborhood.shape[1] // 2

    return _peaks(data_2d, nbrhd_row_offsets, nbrhd_col_offsets, amp_min=amp_min)

In [9]:
def find_min_amp(spectrogram, amp_threshold):
    log_S = np.log(spectrogram).ravel()  # flattened array
    ind = round(len(log_S) * amp_threshold)
    cutoff_log_amplitude = np.partition(log_S, ind)[ind]
    return cutoff_log_amplitude

In [10]:
def peak_extract(samples, sampling_rate, *, amp_threshold=0.75, neighborhood_rank=2, neighborhood_connectivity=1, neighborhood_iterations=20):
	
    """
    Extracts peaks from a spectrogram created from the sample data.
    
    Parameters
    ----------
    samples : numpy.ndarray
        Array of audio samples
	
	sampling_rate : int
		The sampling rate of the audio samples
        
    amp_threshold : float
        All amplitudes at and below this value are excluded from being local 
        peaks.
	neighborhood_rank : int
	neighborhood_connectivity : int
	neighborhood_iterations : int
    
    Returns
    -------
    List[Tuple[int, int]]
        (row, col) index pair for each local peak location, returned
        in column-major ordering.
    """
    
    time = np.arange(len(samples)) / sampling_rate

    base_structure = generate_binary_structure(neighborhood_rank,neighborhood_connectivity)
    neighborhood = iterate_structure(base_structure, neighborhood_iterations)

    spectrogram, freqs, times = mlab.specgram(
		samples,
		NFFT=4096,
		Fs=sampling_rate,
		window=mlab.window_hanning,
		noverlap=int(4096 / 2)
	)

    spectrogram = np.clip(spectrogram, 10**-20, None)
    
    amp_min = find_min_amp(spectrogram, amp_threshold)

    return local_peak_locations(spectrogram, neighborhood, amp_min)

In [11]:
samples, sample_rate = userinput()
peaks = peak_extract(samples, sample_rate)

u for Upload, r for Record: u
Enter path to file: C:\Users\ejian\CogWorks-2022-Gausslien-Audio-Capstone\Songs2\WN Don_t Cry.mp3


  return f(*args, **kwargs)


In [12]:
peaks

[(140, 0),
 (177, 0),
 (211, 0),
 (394, 0),
 (441, 0),
 (500, 0),
 (888, 0),
 (930, 0),
 (1015, 0),
 (1133, 0),
 (1612, 0),
 (264, 1),
 (357, 1),
 (1356, 2),
 (1712, 3),
 (1750, 3),
 (1941, 3),
 (1896, 5),
 (1595, 6),
 (1977, 7),
 (1244, 8),
 (1479, 8),
 (1319, 9),
 (592, 10),
 (1545, 10),
 (1826, 10),
 (1184, 11),
 (1641, 11),
 (1920, 11),
 (1786, 12),
 (198, 13),
 (767, 13),
 (1879, 14),
 (1851, 15),
 (84, 16),
 (234, 16),
 (1806, 17),
 (1905, 18),
 (2037, 19),
 (5, 20),
 (365, 20),
 (542, 20),
 (1003, 20),
 (1427, 20),
 (1960, 20),
 (2004, 20),
 (1663, 21),
 (455, 22),
 (1512, 22),
 (37, 23),
 (282, 23),
 (322, 23),
 (671, 23),
 (922, 23),
 (1272, 24),
 (1572, 24),
 (1623, 24),
 (1240, 25),
 (1356, 25),
 (220, 26),
 (566, 26),
 (1711, 26),
 (781, 28),
 (951, 28),
 (1222, 28),
 (490, 29),
 (1744, 29),
 (1772, 29),
 (971, 32),
 (129, 35),
 (258, 35),
 (1685, 36),
 (1050, 37),
 (2011, 37),
 (357, 38),
 (594, 38),
 (1106, 38),
 (1147, 38),
 (1189, 38),
 (1386, 38),
 (1541, 39),
 (1851, 

In [19]:
"""
Loads and returns the dictionary of fingerprints
"""
def loadPeaks():
    with open("peaks.pkl", mode="rb") as opened_file:
        return pickle.load(opened_file)

"""
Saves the passed dictionary of song samples
The database can be switched if a different dictionary is passed
"""
def savePeaks(peak_dict: dict):
    with open("peaks.pkl", mode="wb") as opened_file:
        pickle.dump(peak_dict, opened_file)


In [15]:
peak_dict = {}
for i,v in enumerate(peaks):
    peak_dict.setdefault(peaks[i][1], []).append(peaks[i][0])
peak_dict

{0: [140, 177, 211, 394, 441, 500, 888, 930, 1015, 1133, 1612],
 1: [264, 357],
 2: [1356],
 3: [1712, 1750, 1941],
 5: [1896],
 6: [1595],
 7: [1977],
 8: [1244, 1479],
 9: [1319],
 10: [592, 1545, 1826],
 11: [1184, 1641, 1920],
 12: [1786],
 13: [198, 767],
 14: [1879],
 15: [1851],
 16: [84, 234],
 17: [1806],
 18: [1905],
 19: [2037],
 20: [5, 365, 542, 1003, 1427, 1960, 2004],
 21: [1663],
 22: [455, 1512],
 23: [37, 282, 322, 671, 922],
 24: [1272, 1572, 1623],
 25: [1240, 1356],
 26: [220, 566, 1711],
 28: [781, 951, 1222],
 29: [490, 1744, 1772],
 32: [971],
 35: [129, 258],
 36: [1685],
 37: [1050, 2011],
 38: [357, 594, 1106, 1147, 1189, 1386],
 39: [1541, 1851, 1963],
 43: [198, 328, 542],
 44: [1807],
 45: [878],
 47: [993, 1632],
 48: [741, 1603],
 49: [140, 566, 1364],
 50: [177, 521, 946, 1517, 1990],
 52: [1725],
 53: [812, 919],
 54: [33, 2044],
 56: [853, 897, 1319, 1533],
 57: [114, 704, 1021, 1663, 1828],
 58: [158, 643, 673, 780],
 59: [1777],
 62: [1625],
 63: [1

In [20]:
savePeaks(peak_dict)