In [1]:
import numpy as np
import librosa
import matplotlib.pyplot as plt
from IPython.display import Audio
from typing import Union, Callable, Tuple
from pathlib import Path

%matplotlib notebook

import numpy as np

import matplotlib.pyplot as plt
import matplotlib.mlab as mlab

from scipy.ndimage.filters import maximum_filter
from scipy.ndimage.morphology import generate_binary_structure, binary_erosion
from scipy.ndimage.morphology import iterate_structure

from typing import Tuple, Callable, List

%matplotlib notebook

In [2]:
local_song_path = r"C:\Users\Hello\Music\cogworks-audio\Olivia Rodrigo - jealousy, jealousy.mp3"

length = 11  # seconds

# load the digital signal for the first 11 seconds of the song
samples, sampling_rate = librosa.load(local_song_path, sr=44100, mono=True, duration=length)



FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\Hello\\Music\\cogworks-audio\\Olivia Rodrigo - jealousy, jealousy.mp3'

In [19]:
N = len(samples)
T = N / sampling_rate

times = np.arange(N) / sampling_rate
c_k = np.fft.rfft(samples) 
# convert ck (complex Fourier coeff) -> |ak| (real-valued coeff)
amps = np.abs(c_k) / N
amps[1 : (-1 if N % 2 == 0 else None)] *= 2

# convert k = 0, 1, ... to freq = 0, 1/T, 2/T, ..., (int(N/2) + 1)/T
freq = np.arange(len(amps)) / T

In [36]:
# using matplotlib's built-in spectrogram function
import matplotlib.mlab as mlab

fig, ax = plt.subplots()

spectrogram, freqs, times, im = ax.specgram(
    samples,
    NFFT=4096,
    Fs=sampling_rate,
    window=mlab.window_hanning,
    noverlap=4096 // 2,
    mode='magnitude',
    scale="dB"
)
fig.colorbar(im)

ax.set_xlabel("Time [seconds]")
ax.set_ylabel("Frequency (Hz)")
ax.set_title("Spectrogram of Recording")
ax.set_ylim(0, 4000)

# peaks = find_peaks(...)  # list of (row, col)
t_loc = [times[c] for r,c in peaks]
f_loc = [freqs[r] for r,c in peaks]
# add shifts to times and freqs so that the peaks
# align with the spectrogram image bins
ax.scatter(t_loc, f_loc, s=4, color="white")
ax.set_xlabel("Time (sec)")
ax.set_ylabel("Frequency (Hz)")
ax.set_xlim(0, 11)

<IPython.core.display.Javascript object>

(0.0, 11.0)

IndentationError: unexpected indent (<ipython-input-24-b405f717fbad>, line 4)

In [10]:
neighborhood = iterate_structure(generate_binary_structure(rank=2, connectivity=1), 20)

In [11]:
from numba import njit

# `@njit` "decorates" the `_peaks` function. This tells Numba to
# compile this function using the "low level virtual machine" (LLVM)
# compiler. The resulting object is a Python function that, when called,
# executes optimized machine code instead of the Python code
# 
# The code used in _peaks adheres strictly to the subset of Python and
# NumPy that is supported by Numba's jit. This is a requirement in order
# for Numba to know how to compile this function to more efficient
# instructions for the machine to execute
@njit
def _peaks(
    data_2d: np.ndarray, rows: np.ndarray, cols: np.ndarray, amp_min: float
) -> List[Tuple[int, int]]:
    """
    A Numba-optimized 2-D peak-finding algorithm.
    
    Parameters
    ----------
    data_2d : numpy.ndarray, shape-(H, W)
        The 2D array of data in which local peaks will be detected.

    rows : numpy.ndarray, shape-(N,)
        The 0-centered row indices of the local neighborhood mask
    
    cols : numpy.ndarray, shape-(N,)
        The 0-centered column indices of the local neighborhood mask
        
    amp_min : float
        All amplitudes at and below this value are excluded from being local 
        peaks.
    
    Returns
    -------
    List[Tuple[int, int]]
        (row, col) index pair for each local peak location. 
    """
    peaks = []  # stores the (row, col) locations of all the local peaks

    # Iterate over the 2-D data in col-major order
    # we want to see if there is a local peak located at
    # row=r, col=c
    for c, r in np.ndindex(*data_2d.shape[::-1]):
        if data_2d[r, c] <= amp_min:
            # The amplitude falls beneath the minimum threshold
            # thus this can't be a peak.
            continue
        
        # Iterating over the neighborhood centered on (r, c)
        # dr: displacement from r
        # dc: discplacement from c
        for dr, dc in zip(rows, cols):
            if dr == 0 and dc == 0:
                # This would compare (r, c) with itself.. skip!
                continue

            if not (0 <= r + dr < data_2d.shape[0]):
                # neighbor falls outside of boundary
                continue

            # mirror over array boundary
            if not (0 <= c + dc < data_2d.shape[1]):
                # neighbor falls outside of boundary
                continue

            if data_2d[r, c] < data_2d[r + dr, c + dc]:
                # One of the amplitudes within the neighborhood
                # is larger, thus data_2d[r, c] cannot be a peak
                break
        else:
            # if we did not break from the for-loop then (r, c) is a peak
            peaks.append((r, c))
    return peaks

# `local_peak_locations` is responsible for taking in the boolean mask `neighborhood`
# and converting it to a form that can be used by `_peaks`. This "outer" code is 
# not compatible with Numba which is why we end up using two functions:
# `local_peak_locations` does some initial pre-processing that is not compatible with
# Numba, and then it calls `_peaks` which contains all of the jit-compatible code
def local_peak_locations(data_2d: np.ndarray, neighborhood: np.ndarray, amp_min: float):
    """
    Defines a local neighborhood and finds the local peaks
    in the spectrogram, which must be larger than the specified `amp_min`.
    
    Parameters
    ----------
    data_2d : numpy.ndarray, shape-(H, W)
        The 2D array of data in which local peaks will be detected
    
    neighborhood : numpy.ndarray, shape-(h, w)
        A boolean mask indicating the "neighborhood" in which each
        datum will be assessed to determine whether or not it is
        a local peak. h and w must be odd-valued numbers
        
    amp_min : float
        All amplitudes at and below this value are excluded from being local 
        peaks.
    
    Returns
    -------
    List[Tuple[int, int]]
        (row, col) index pair for each local peak location.
    
    Notes
    -----
    Neighborhoods that overlap with the boundary are mirrored across the boundary.
    
    The local peaks are returned in column-major order.
    """
    rows, cols = np.where(neighborhood)
    assert neighborhood.shape[0] % 2 == 1
    assert neighborhood.shape[1] % 2 == 1

    # center neighborhood indices around center of neighborhood
    rows -= neighborhood.shape[0] // 2
    cols -= neighborhood.shape[1] // 2

    return _peaks(data_2d, rows, cols, amp_min=amp_min)

In [12]:
log_S = np.log(spectrogram).ravel()  # flattened array
ind = round(len(log_S) * 0.75)
cutoff_log_amplitude = np.partition(log_S, ind)[ind]
cutoff_log_amplitude

  log_S = np.log(spectrogram).ravel()  # flattened array


-22.718542444156352

In [30]:
peaks = local_peak_locations(spectrogram, neighborhood, cutoff_log_amplitude)

In [14]:
peaks

[(378, 11),
 (437, 11),
 (465, 11),
 (528, 11),
 (550, 11),
 (595, 11),
 (687, 11),
 (742, 11),
 (805, 11),
 (843, 11),
 (897, 11),
 (944, 11),
 (986, 11),
 (1045, 11),
 (1113, 11),
 (1137, 11),
 (1175, 11),
 (1266, 11),
 (1293, 11),
 (1345, 11),
 (1371, 11),
 (1415, 11),
 (1457, 11),
 (1844, 11),
 (1877, 16),
 (1823, 18),
 (176, 19),
 (349, 19),
 (1607, 19),
 (1665, 19),
 (1758, 25),
 (1914, 25),
 (1955, 25),
 (1997, 25),
 (1732, 26),
 (49, 35),
 (983, 35),
 (1496, 35),
 (1651, 35),
 (1842, 36),
 (1541, 37),
 (1631, 38),
 (1863, 41),
 (225, 43),
 (593, 44),
 (685, 44),
 (770, 44),
 (795, 44),
 (845, 44),
 (870, 44),
 (6, 45),
 (2042, 46),
 (1907, 52),
 (1933, 53),
 (947, 54),
 (1124, 54),
 (1149, 54),
 (1183, 54),
 (1223, 54),
 (1250, 54),
 (1308, 54),
 (1377, 54),
 (1436, 54),
 (1464, 54),
 (1658, 54),
 (1786, 54),
 (1624, 55),
 (1820, 55),
 (1997, 55),
 (1691, 57),
 (111, 59),
 (164, 59),
 (291, 59),
 (503, 59),
 (530, 59),
 (68, 60),
 (448, 60),
 (611, 60),
 (695, 60),
 (775, 60),


In [39]:
type(peaks)

list

In [53]:
fanout = 15
fingerprints = []
times = []

for count, peak in enumerate(peaks):
    if count >= len(peaks) - 15:
        break
    fingerprint = []
    for i in range(fanout):
        print_i = (peak[0], peaks[count + i + 1][0], peaks[count + i + 1][1] - peak[1])
        fingerprint.append(print_i)
    times.append(peak[1])    
    fingerprints.append(fingerprint)
print(times)
# print(fingerprints)

[11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 11, 16, 18, 19, 19, 19, 19, 25, 25, 25, 25, 26, 35, 35, 35, 35, 36, 37, 38, 41, 43, 44, 44, 44, 44, 44, 44, 45, 46, 52, 53, 54, 54, 54, 54, 54, 54, 54, 54, 54, 54, 54, 54, 55, 55, 55, 57, 59, 59, 59, 59, 59, 60, 60, 60, 60, 60, 60, 60, 60, 60, 60, 60, 60, 66, 67, 76, 78, 78, 78, 78, 78, 78, 78, 79, 79, 79, 81, 83, 84, 86, 87, 88, 88, 88, 89, 89, 90, 91, 91, 91, 91, 91, 91, 91, 91, 91, 91, 92, 96, 98, 98, 98, 98, 101, 101, 102, 103, 103, 104, 105, 106, 106, 106, 106, 106, 106, 106, 108, 109, 113, 113, 113, 113, 113, 114, 114, 114, 114, 114, 114, 114, 115, 115, 116, 117, 121, 121, 124, 125, 125, 129, 133, 134, 136, 136, 137, 137, 138, 138, 138, 138, 138, 139, 139, 139, 140, 140, 140, 141, 143, 143, 143, 147, 148, 148, 153, 153, 153, 153, 153, 153, 153, 153, 153, 153, 153, 153, 153, 154, 154, 154, 154, 154, 158, 158, 160, 161, 161, 162, 162, 162, 162, 165, 165, 166, 168, 168, 169, 170, 170, 171, 1

In [55]:
def fingerprints(fanout, peaks):
    fingerprints = []
    times = []

    for count, peak in enumerate(peaks):
        if count >= len(peaks) - 15:
            break
        fingerprint = []
        for i in range(fanout):
            print_i = (peak[0], peaks[count + i + 1][0], peaks[count + i + 1][1] - peak[1])
            fingerprint.append(print_i)
        times.append(peak[1])    
        fingerprints.append(fingerprint)
    return fingerprints, times