In [None]:
import IPython
import numpy as np
import matplotlib.pyplot as plt
from scipy.fft import rfft, rfftfreq
from scipy import signal
from scipy.io.wavfile import read, write

import os
import pandas as pd

plt.rcParams['figure.figsize'] = [8, 6]
plt.rcParams['figure.dpi'] = 140
from scipy import fft, signal
import scipy
from scipy.io.wavfile import read
import matplotlib.pyplot as plt
from scipy.fft import fft, fftfreq

# Understanding The Shazam Algorithm

In [None]:
IPython.display.Image(filename="shazam_logo.png", width=300, height=300)

# What does a sound file look like?

In [None]:
IPython.display.Audio("data/middle_c.wav")

In [None]:
Fs, song = read("data/middle_c.wav")
    
time_to_plot = np.arange(0, 1500, dtype=int)

fig, ax = plt.subplots(figsize=(10, 5))
plt.plot(time_to_plot, song[time_to_plot])
ax.set_xlabel("Time Step")
ax.set_ylabel("Amplitude")
plt.show()

# What does the Fourier Transform of a sound file look like?

In [None]:
def transform_song(sampling_rate, song):
    num_samples = len(song)
    
    # apply fourier transform to the song
    song_ft = rfft(song)
    
    # get everything to be real-valued
    song_ft = np.abs(song_ft)
    
    x_axis = rfftfreq(num_samples, 1 / sampling_rate)

    return x_axis, song_ft

In [None]:
Fs, song = read("data/middle_c.wav")
    
time_to_plot = np.arange(0, 1500, dtype=int)
idx, song_ft = transform_song(Fs, song[time_to_plot])

fig, ax = plt.subplots(figsize=(10, 5))
ax.set_xlim([0, 2000])
ax.set_xlabel("Frequency (Hz)")
plt.plot(idx, song_ft)
plt.show()

### Frequency of Middle C: 256 Hertz

# What about an actual song?

In [None]:
IPython.display.Audio(r"data/levitating.wav")

In [None]:
Fs, song = read("data/levitating.wav")
song = song[:, 0]

time_to_plot = np.arange(0, 150000, dtype=int)
idx, song_ft = transform_song(Fs, song[time_to_plot])


fig, ax = plt.subplots(2, 1, figsize=(10, 5))
ax[0].plot(time_to_plot, song[time_to_plot])
ax[1].plot(idx, song_ft)
ax[1].set_xlim([0, 5000])
plt.show()

# How can we use this information?

In [None]:
# find peaks in the frequencies of fft-transformed song
def find_peaks(amplitudes):

    peak_indices, props = signal.find_peaks(amplitudes, prominence=0, distance=10000)

    # take only the 15 most prominent peaks
    n_peaks = 15

    # peak choosing method taken from links below:
    # https://michaelstrauss.dev/shazam-in-python
    # https://stackoverflow.com/questions/6910641/how-do-i-get-indices-of-n-maximum-values-in-a-numpy-array
    candidate_peaks = props["prominences"]
    if len(candidate_peaks) < n_peaks:
        n_peaks = len(candidate_peaks)
    largest_peak_indices = np.argpartition(props["prominences"], -n_peaks)[-n_peaks:]
    return peak_indices[largest_peak_indices]

In [None]:
def graph_ft(fx, fy, peaks=()):
    fig, ax = plt.subplots()
    plt.xlabel("Frequency (Hz)")
    plt.plot(fx, fy, zorder=1)
    plt.scatter(fx[peaks], fy[peaks], color="red", zorder=2)
    ax.set_xlim([0, 5000])
    plt.show()

In [None]:
idx, song_ft = transform_song(Fs, song)
peaks = find_peaks(song_ft)
graph_ft(idx, song_ft, peaks)

# Creating a unique "fingerprint" for a song

In [None]:
def create_constellation(audio, Fs):
    # Parameters
    window_length_seconds = 0.5
    window_length_samples = int(window_length_seconds * Fs)
    window_length_samples += window_length_samples % 2
    num_peaks = 15

    # Pad the song to divide evenly into windows
    amount_to_pad = window_length_samples - audio.size % window_length_samples

    song_input = np.pad(audio, (0, amount_to_pad))

    # Perform a short time fourier transform
    frequencies, times, stft = signal.stft(
        song_input, Fs, nperseg=window_length_samples, nfft=window_length_samples, return_onesided=True
    )

    constellation_map = []

    for time_idx, window in enumerate(stft.T):
        # Spectrum is by default complex. 
        # We want real values only
        spectrum = abs(window)
        # Find peaks - these correspond to interesting features
        # Note the distance - want an even spread across the spectrum
        peaks, props = signal.find_peaks(spectrum, prominence=0, distance=200)

        # Only want the most prominent peaks
        # With a maximum of 15 per time slice
        n_peaks = min(num_peaks, len(peaks))
        # Get the n_peaks largest peaks from the prominences
        # This is an argpartition
        # Useful explanation: https://kanoki.org/2020/01/14/find-k-smallest-and-largest-values-and-its-indices-in-a-numpy-array/
        largest_peaks = np.argpartition(props["prominences"], -n_peaks)[-n_peaks:]
        for peak in peaks[largest_peaks]:
            frequency = frequencies[peak]
            constellation_map.append([time_idx, frequency])

    return constellation_map

In [None]:
Fs, song = read("data/levitating.wav")
# audio can have two channels, use only one
if song.shape[1] > 1:
    song = song[:, 0]

constellation_map = create_constellation(song, Fs)
fig, ax = plt.subplots()
ax.set_ylabel("Frequency (Hz)")
ax.set_xlabel("Time Step")
plt.scatter(*zip(*constellation_map))
plt.show()

From Aydin, we have the means to construct the constellation for hashing.

In [None]:
def create_constellation(audio, Fs):
    window_length_seconds = 0.5
    window_length_samples = int(window_length_seconds * Fs)
    window_length_samples += window_length_samples % 2
    num_peaks = 15

    amount_to_pad = window_length_samples - audio.size % window_length_samples

    song_input = np.pad(audio, (0, amount_to_pad))

    frequencies, times, stft = signal.stft(
        song_input, Fs, nperseg=window_length_samples, nfft=window_length_samples, return_onesided=True
    )

    constellation_map = []

    for time_idx, window in enumerate(stft.T):
        spectrum = abs(window)
        peaks, props = signal.find_peaks(spectrum, prominence=0, distance=200)

        n_peaks = min(num_peaks, len(peaks))
        largest_peaks = np.argpartition(props["prominences"], -n_peaks)[-n_peaks:]
        for peak in peaks[largest_peaks]:
            frequency = frequencies[peak]
            constellation_map.append([time_idx, frequency])

    return constellation_map

With a constellation, we can decide an inital way on how we want to construct our hash function.

In [None]:
constellation_map = create_constellation(song, Fs)

def create_hashes(constellation_map, song_id=None):
    hashes = {}
    upper_frequency = 23_000 
    frequency_bits = 10

    
    for idx, (time, freq) in enumerate(constellation_map):
        for other_time, other_freq in constellation_map[idx : idx + 100]: 
            diff = other_time - time
            if diff <= 1 or diff > 10:
                continue

            freq_binned = freq / upper_frequency * (2 ** frequency_bits)
            other_freq_binned = other_freq / upper_frequency * (2 ** frequency_bits)

            hash = int(freq_binned) | (int(other_freq_binned) << 10) | (int(diff) << 20)
            hashes[hash] = (time, song_id)
    return hashes

hashes = create_hashes(constellation_map, 0)
for i, (hash, (time, _)) in enumerate(hashes.items()):
    if i > 10: 
        break
    print(f"Hash {hash} occurred at {time}")

Next, we take a folder of .wav files and hash them, storing the data into a hashtable database.

In [None]:
import glob
from typing import List, Dict, Tuple
from tqdm import tqdm
import pickle

songs = glob.glob('wavdata/*.wav')

song_name_index = {}
database: Dict[int, List[Tuple[int, int]]] = {}

for index, filename in enumerate(tqdm(sorted(songs))):
    song_name_index[index] = filename
    Fs, audio_input = read(filename)
    songdf=pd.DataFrame(audio_input)
    audio_input=songdf.iloc[:,1]+songdf.iloc[:,0]
    constellation = create_constellation(audio_input, Fs)
    hashes = create_hashes(constellation, index)

    for hash, time_index_pair in hashes.items():
        if hash not in database:
            database[hash] = []
        database[hash].append(time_index_pair)
        
with open("database.pickle", 'wb') as db:
    pickle.dump(database, db, pickle.HIGHEST_PROTOCOL)
with open("song_index.pickle", 'wb') as songs:
    pickle.dump(song_name_index, songs, pickle.HIGHEST_PROTOCOL)

In [None]:
database = pickle.load(open('database.pickle', 'rb'))
song_name_index = pickle.load(open("song_index.pickle", "rb"))

Lastly, when we wish to use a recording, we hash the recording with the same parameters and then attempt to find matching hashes from our song database in order to find the one that is closest to our recording.

In [None]:
Fs, audio_input = read("data/recording2.wav")
constellation = create_constellation(audio_input, Fs)
hashes = create_hashes(constellation, None)

matches_per_song = {}
for hash, (sample_time, _) in hashes.items():
    if hash in database:
        matching_occurences = database[hash]
        for source_time, song_id in matching_occurences:
            if song_id not in matches_per_song:
                matches_per_song[song_id] = 0
            matches_per_song[song_id] += 1

for song_id, num_matches in list(sorted(matches_per_song.items(), key=lambda x: x[1], reverse=True))[:10]:
    print(f"Song: {song_name_index[song_id]} - Matches: {num_matches}")


In [None]:
IPython.display.Audio('data/recording2.wav')

In [None]:
IPython.display.Audio('wavdata/baby.wav')

# Issue of Finding Matches For Recordings with Matches

### When adding more nosies..

In [None]:
IPython.display.Audio('data/noise_talking.wav')

In [None]:
IPython.display.Audio('data/baby_trim1.wav')

In [None]:
from scipy.io.wavfile import read
import pandas as pd
Fs_noise, audio_input_noise = read("data/noise_talking.wav")
Fs, audio_input = read("data/baby_trim1.wav")
songdf=pd.DataFrame(audio_input)
audio_input=songdf.iloc[:,1]+songdf.iloc[:,1]
noisemax=max(audio_input_noise)
audiomax=max(audio_input)
# random_noise=np.random.randint(-1000, high=1000, size=len(audio_input_noise), dtype=int)

In [None]:
audio=audio_input/audiomax+audio_input_noise[-len(audio_input):]/noisemax

In [None]:
audio_input=audio
constellation = create_constellation(audio_input, Fs)
hashes = create_hashes(constellation, None)

# For each hash in the song, check if there's a match in the database
# There could be multiple matching tracks, so for each match:
#   Incrememnt a counter for that song ID by one
matches_per_song = {}
for hash, (sample_time, _) in hashes.items():
    if hash in database:
        matching_occurences = database[hash]
        for source_time, song_id in matching_occurences:
            if song_id not in matches_per_song:
                matches_per_song[song_id] = 0
            matches_per_song[song_id] += 1

for song_id, num_matches in list(sorted(matches_per_song.items(), key=lambda x: x[1], reverse=True)):
    print(f"Song: {song_name_index[song_id]} - Matches: {num_matches}")

The matches number reduced a lot.

# Another way: Time Differenece

In [None]:
# Load a short recording with some background noise
# this method make use of the characterisitic of songs, they have a 
# specific order to occur
audio_input=audio
constellation = create_constellation(audio_input, Fs)
hashes = create_hashes(constellation, None)

# For each hash in the song, check if there's a match in the database
# There could be multiple matches, so for each match:
#   Append all of them to a hashmap based on the song id along with the time
#   the hash occurs in the sample and at the source
# In the end, matches_per_song is key'd by song ID with values being
# lists of hashes, the 
matches_per_song = {}
for hash, (sample_time, _) in hashes.items():
    if hash in database:
        matching_occurences = database[hash]
        for source_time, song_id in matching_occurences:
            if song_id not in matches_per_song:
                matches_per_song[song_id] = []
            matches_per_song[song_id].append((hash, sample_time, source_time))

We try to find time index in database having the hash code as the sample

In [None]:
scores = {}
song_ids = [2,3,4, 10] # Song ID=10 is the true match
for song_id in song_ids:
    song_name = song_name_index[song_id].split('/')[1]

    matches = matches_per_song[song_id]
    print(f"Total matches for {song_name}: {len(matches)}")
    song_scores_by_offset = {}
    for hash, sample_time, source_time in matches:
        delta = source_time - sample_time
        if delta not in song_scores_by_offset:
            song_scores_by_offset[delta] = 0
        song_scores_by_offset[delta] += 1

    # Produce a histogram
    # For clarity's sake, only plot the 100 largest offsets
    high_scores = list(sorted(song_scores_by_offset.items(), key=lambda x: x[1], reverse=True))[:100]
    plt.figure()
    plt.bar(*zip(*high_scores))
    plt.title(song_name)
    plt.ylim((0, 900))