In [1]:
# pyright: reportMissingImports=false, reportUnusedVariable=warning, reportUntypedBaseClass=error
from database import Database
from fingerprinting import fingerprints
from peaks import find_neighborhood, find_min_amp

import numpy as np
import librosa
# import matplotlib.pyplot as plt
from IPython.display import Audio
from typing import Union, Callable, Tuple, List
from pathlib import Path

import matplotlib.mlab as mlab

from scipy.ndimage.filters import maximum_filter
from scipy.ndimage.morphology import generate_binary_structure, binary_erosion
from scipy.ndimage.morphology import iterate_structure

# %matplotlib notebook

database = Database()

In [2]:
from numba import njit

# `@njit` "decorates" the `_peaks` function. This tells Numba to
# compile this function using the "low level virtual machine" (LLVM)
# compiler. The resulting object is a Python function that, when called,
# executes optimized machine code instead of the Python code
# 
# The code used in _peaks adheres strictly to the subset of Python and
# NumPy that is supported by Numba's jit. This is a requirement in order
# for Numba to know how to compile this function to more efficient
# instructions for the machine to execute
@njit
def _peaks(
    data_2d: np.ndarray, rows: np.ndarray, cols: np.ndarray, amp_min: float
) -> List[Tuple[int, int]]:
    """
    A Numba-optimized 2-D peak-finding algorithm.
    
    Parameters
    ----------
    data_2d : numpy.ndarray, shape-(H, W)
        The 2D array of data in which local peaks will be detected.

    rows : numpy.ndarray, shape-(N,)
        The 0-centered row indices of the local neighborhood mask
    
    cols : numpy.ndarray, shape-(N,)
        The 0-centered column indices of the local neighborhood mask
        
    amp_min : float
        All amplitudes at and below this value are excluded from being local 
        peaks.
    
    Returns
    -------
    List[Tuple[int, int]]
        (row, col) index pair for each local peak location. 
    """
    peaks = []  # stores the (row, col) locations of all the local peaks

    # Iterate over the 2-D data in col-major order
    # we want to see if there is a local peak located at
    # row=r, col=c
    for c, r in np.ndindex(*data_2d.shape[::-1]):
        if data_2d[r, c] <= amp_min:
            # The amplitude falls beneath the minimum threshold
            # thus this can't be a peak.
            continue
        
        # Iterating over the neighborhood centered on (r, c)
        # dr: displacement from r
        # dc: discplacement from c
        for dr, dc in zip(rows, cols):
            if dr == 0 and dc == 0:
                # This would compare (r, c) with itself.. skip!
                continue

            if not (0 <= r + dr < data_2d.shape[0]):
                # neighbor falls outside of boundary
                continue

            # mirror over array boundary
            if not (0 <= c + dc < data_2d.shape[1]):
                # neighbor falls outside of boundary
                continue

            if data_2d[r, c] < data_2d[r + dr, c + dc]:
                # One of the amplitudes within the neighborhood
                # is larger, thus data_2d[r, c] cannot be a peak
                break
        else:
            # if we did not break from the for-loop then (r, c) is a peak
            peaks.append((r, c))
    return peaks

def local_peak_locations(data_2d: np.ndarray, neighborhood: np.ndarray, amp_min: float):
    """
    Defines a local neighborhood and finds the local peaks
    in the spectrogram, which must be larger than the specified `amp_min`.
    
    Parameters
    ----------
    data_2d : numpy.ndarray, shape-(H, W)
        The 2D array of data in which local peaks will be detected
    
    neighborhood : numpy.ndarray, shape-(h, w)
        A boolean mask indicating the "neighborhood" in which each
        datum will be assessed to determine whether or not it is
        a local peak. h and w must be odd-valued numbers
        
    amp_min : float
        All amplitudes at and below this value are excluded from being local 
        peaks.
    
    Returns
    -------
    List[Tuple[int, int]]
        (row, col) index pair for each local peak location.
    
    Notes
    -----
    Neighborhoods that overlap with the boundary are mirrored across the boundary.
    
    The local peaks are returned in column-major order.
    """
    rows, cols = np.where(neighborhood)
    assert neighborhood.shape[0] % 2 == 1
    assert neighborhood.shape[1] % 2 == 1

    # center neighborhood indices around center of neighborhood
    rows -= neighborhood.shape[0] // 2
    cols -= neighborhood.shape[1] // 2

    return _peaks(data_2d, rows, cols, amp_min=amp_min)


In [3]:
def generate_fingerprints(samples, sampling_rate):
    # perform rfft
    N = len(samples)
    T = N / sampling_rate

    times = np.arange(N) / sampling_rate
    c_k = np.fft.rfft(samples) 
      # convert ck (complex Fourier coeff) -> |ak| (real-valued coeff)
    amps = np.abs(c_k) / N
    amps[1 : (-1 if N % 2 == 0 else None)] *= 2

      # convert k = 0, 1, ... to freq = 0, 1/T, 2/T, ..., (int(N/2) + 1)/T
    freq = np.arange(len(amps)) / T

    spectrogram, freqs, times = mlab.specgram(
        samples,
        NFFT=4096,
        Fs=sampling_rate,
        window=mlab.window_hanning,
        noverlap=int(4096 / 2)
    )


    neighborhood = find_neighborhood(2, 1, 20)
    min_amp = find_min_amp(spectrogram)

    peaks = local_peak_locations(spectrogram, neighborhood, min_amp)
    song_fingerprints, times = fingerprints(15, peaks)
    
    return (song_fingerprints, times)

In [4]:
def import_song(path, song_name):
  # sample the audio file
  samples, sampling_rate = librosa.load(path, sr=44100, mono=True)

  song_fingerprints, times = generate_fingerprints(samples, sampling_rate)

  database.store_fingerprints(song_fingerprints, song_name, times)


In [5]:
database.load_database()

In [5]:
import os
song_directory = r"C:\Users\josep\Documents\GitHub\cogzam\music"

for filename in os.listdir(song_directory):
    print("Importing: ", filename)
    file_path = song_directory + "\\" + filename
    import_song(file_path, filename)

Importing:  anti-romantic.mp3




KeyboardInterrupt: 

In [6]:
import pickle
from IPython.display import Audio
from Analog_to_Digital import analog_to_digital
from microphone import record_audio

def match_song(clips_directory, clips_name):
    with open(clips_directory + clips_name, mode="rb") as clips_file:
    clips = pickle.load(clips_file)
    
    for sample in clips:
        sample_fingerprints, times = generate_fingerprints(sample, 44100)
        
        counts = database.search_song(sample_fingerprints, times)
        
        print(counts.most_common(3))
        
    Audio(clips[0], rate=44100)


def match_sample(sample):
    
    sample_fingerprints, times = generate_fingerprints(sample, 44100)
    counts = database.search_song(sample_fingerprints, times)
    
    return counts.most_common(1)[0]

def microphone_match():
    # record samples
    listen_time = 10

    samples, sample_rate = record_audio(listen_time)
    print("Recording...")
    samples = np.hstack([np.frombuffer(i, np.int16) for i in samples])

    # times, digitial_signal = analog_to_digital(samples, sampling_rate=44100, bit_depth=16, duration=10)

    result = match_sample(samples)

    return result
            

In [12]:
# execute off of mic
print(microphone_match())

Using input device 'Microphone (Shure MV7)'
Recording ended
Recording...
(('someoneyouloved.mp3', 133), 22)


In [10]:
database.save_database()

# Test loading from database instead of importing