In [41]:
import numpy as np
import matplotlib.pyplot as plt

import scipy
from scipy import fft, signal
from scipy.io.wavfile import read

from scipy.fft import fftfreq

# Database
import glob
from typing import List, Dict, Tuple
from tqdm import tqdm
import pickle

In [42]:
def create_constellation(audio, Fs):
    # Parameters
    window_length_seconds = 0.5
    window_length_samples = int(window_length_seconds * Fs)
    window_length_samples += window_length_samples % 2
    num_peaks = 15
    # Pad the song to divide evenly into windows
    amount_to_pad = window_length_samples - audio.size % window_length_samples
    song_input = np.pad(audio, (0, amount_to_pad))
    # Perform a short time fourier transform
    frequencies, times, stft = signal.stft(
        song_input, Fs, nperseg=window_length_samples, nfft=window_length_samples, return_onesided=True
    )
    constellation_map = []
    for time_idx, window in enumerate(stft.T):
        spectrum = abs(window)
        peaks, props = signal.find_peaks(spectrum, prominence=0, distance=200)
        n_peaks = min(num_peaks, len(peaks))
        largest_peaks = np.argpartition(props["prominences"], -n_peaks)[-n_peaks:]
        for peak in peaks[largest_peaks]:
            frequency = frequencies[peak]
            constellation_map.append([time_idx, frequency])
    return constellation_map

In [43]:
# constellation_map = create_constellation(song, Fs)

def create_hashes(constellation_map, song_id=None):
    hashes = {}
    # Use this for binning - 23_000 is slighlty higher than the maximum
    # frequency that can be stored in the .wav files, 22.05 kHz
    upper_frequency = 23_000 
    frequency_bits = 10
    # Iterate the constellation
    for idx, (time, freq) in enumerate(constellation_map):
        # Iterate the next 100 pairs to produce the combinatorial hashes
        # When we produced the constellation before, it was sorted by time already
        # So this finds the next n points in time (though they might occur at the same time)
        for other_time, other_freq in constellation_map[idx : idx + 100]: 
            diff = other_time - time
            # If the time difference between the pairs is too small or large
            # ignore this set of pairs
            if diff <= 1 or diff > 10:
                continue
            # Place the frequencies (in Hz) into a 1024 bins
            freq_binned = freq / upper_frequency * (2 ** frequency_bits)
            other_freq_binned = other_freq / upper_frequency * (2 ** frequency_bits)
            # Produce a 32 bit hash
            # Use bit shifting to move the bits to the correct location
            hash = int(freq_binned) | (int(other_freq_binned) << 10) | (int(diff) << 20)
            hashes[hash] = (time, song_id)
    return hashes

In [44]:
import glob
from typing import List, Dict, Tuple
from tqdm import tqdm
import pickle

songs = glob.glob('converted/*.wav') # --> upload here the folder with our .wav files
song_name_index = {}
database: Dict[int, List[Tuple[int, int]]] = {}

# Go through each song, using where they are alphabetically as an id
for index, filename in enumerate(tqdm(sorted(songs))):
    song_name_index[index] = filename
    # Read the song, create a constellation and hashes
    Fs, audio_input = read(filename)
    constellation = create_constellation(audio_input, Fs)
    hashes = create_hashes(constellation, index)
    # For each hash, append it to the list for this hash
    for hash, time_index_pair in hashes.items():
        if hash not in database:
            database[hash] = []
        database[hash].append(time_index_pair)
# Dump the database and list of songs as pickles
with open("database_test.pickle", 'wb') as db:
    pickle.dump(database, db, pickle.HIGHEST_PROTOCOL)
with open("song_test_index.pickle", 'wb') as songs:
    pickle.dump(song_name_index, songs, pickle.HIGHEST_PROTOCOL)

100%|██████████| 22/22 [01:14<00:00,  3.39s/it]


In [45]:
# Load the database
database = pickle.load(open('database_test.pickle', 'rb'))
song_name_index = pickle.load(open("song_test_index.pickle", "rb"))

# Load a song
Fs, audio_input = read("converted_memo/Goosebumps Remix MEMO.wav")
# Create the constellation and hashes
constellation = create_constellation(audio_input, Fs)
hashes = create_hashes(constellation, None)
# For each hash in the song, check if there's a match in the database
# There could be multiple matching tracks, so for each match, incrememnt a counter for that song ID by one
matches_per_song = {}
for hash, (sample_time, _) in hashes.items():
    if hash in database:
        matching_occurences = database[hash]
        for source_time, song_id in matching_occurences:
            if song_id not in matches_per_song:
                matches_per_song[song_id] = 0
            matches_per_song[song_id] += 1
for song_id, num_matches in list(sorted(matches_per_song.items(), key=lambda x: x[1], reverse=True))[:10]:
    print(f"Song: {song_name_index[song_id]} - Matches: {num_matches}")

Song: converted/Travis Scott, HVME - Goosebumps (Remix - Official Audio).wav - Matches: 38662
Song: converted/drivers license.wav - Matches: 32908
Song: converted/Harry Styles - Treat People With Kindness (Official Audio).wav - Matches: 29290
Song: converted/The Weeknd - Blinding Lights (Official Audio).wav - Matches: 29082
Song: converted/Sweet Melody - Little Mix (Official Audio).wav - Matches: 26971
Song: converted/Miley Cyrus & Dua Lipa - Prisoner (Audio).wav - Matches: 26299
Song: converted/Martin Garrix - Pressure (ft. Tove Lo).wav - Matches: 25993
Song: converted/Kygo, Donna Summer - Hot Stuff (Audio).wav - Matches: 25846
Song: converted/Shawn Mendes, Justin Bieber - Monster.wav - Matches: 25792
Song: converted/Bad Bunny, Jhay Cortez - DÁKITI (Audio).wav - Matches: 25256


In [46]:
# Load a short recording with some background noise
Fs, audio_input = read("converted_memo/Goosebumps Remix MEMO.wav")
# Create the constellation and hashes
constellation = create_constellation(audio_input, Fs)
hashes = create_hashes(constellation, None)
# For each hash in the song, check if there's a match in the database
# There could be multiple matches, so for each match:
#   Append all of them to a hashmap based on the song id along with the time
#   the hash occurs in the sample and at the source
# In the end, matches_per_song is key'd by song ID with values being
# lists of hashes, the 
matches_per_song = {}
for hash, (sample_time, _) in hashes.items():
    if hash in database:
        matching_occurences = database[hash]
        for source_time, song_id in matching_occurences:
            if song_id not in matches_per_song:
                matches_per_song[song_id] = []
            matches_per_song[song_id].append((hash, sample_time, source_time))

In [47]:
def score_hashes_against_database(hashes):
    matches_per_song = {}
    for hash, (sample_time, _) in hashes.items():
        if hash in database:
            matching_occurences = database[hash]
            for source_time, song_index in matching_occurences:
                if song_index not in matches_per_song:
                    matches_per_song[song_index] = []
                matches_per_song[song_index].append((hash, sample_time, source_time))
            
    # %%
    scores = {}
    for song_index, matches in matches_per_song.items():
        song_scores_by_offset = {}
        for hash, sample_time, source_time in matches:
            delta = source_time - sample_time
            if delta not in song_scores_by_offset:
                song_scores_by_offset[delta] = 0
            song_scores_by_offset[delta] += 1
        max = (0, 0)
        for offset, score in song_scores_by_offset.items():
            if score > max[1]:
                max = (offset, score)
        
        scores[song_index] = max
    # Sort the scores for the user
    scores = list(sorted(scores.items(), key=lambda x: x[1][1], reverse=True)) 
    
    return scores

In [48]:
def print_top_five(file_name):
    # Load a short recording with some background noise
    Fs, audio_input = read(file_name)
    # Create the constellation and hashes
    constellation = create_constellation(audio_input, Fs)
    hashes = create_hashes(constellation, None)
    scores = score_hashes_against_database(hashes)[:5]
    for song_id, score in scores:
        print(f"{song_name_index[song_id]}: Score of {score[1]} at {score[0]}")

In [49]:
print("Song: Travis Scott, HVME - Goosebumps (Remix)")
print('Match:')
print_top_five("converted_memo/Goosebumps Remix MEMO.wav")

Song: Travis Scott, HVME - Goosebumps (Remix)
Match:
converted/Travis Scott, HVME - Goosebumps (Remix - Official Audio).wav: Score of 1056 at 69
converted/Martin Garrix - Pressure (ft. Tove Lo).wav: Score of 172 at 481
converted/Miley Cyrus & Dua Lipa - Prisoner (Audio).wav: Score of 169 at 609
converted/Sweet Melody - Little Mix (Official Audio).wav: Score of 150 at 740
converted/The Weeknd - Blinding Lights (Official Audio).wav: Score of 148 at 719


In [50]:
print("Song: DJ Khaled ft. Drake - POPSTAR")
print('Match:')
print_top_five("converted_memo/POPSTAR (feat. Drake) MEMO.wav")

Song: DJ Khaled ft. Drake - POPSTAR
Match:
converted/DJ Khaled ft. Drake - POPSTAR (Official Audio).wav: Score of 1400 at 7
converted/Martin Garrix - Pressure (ft. Tove Lo).wav: Score of 167 at 413
converted/Harry Styles - Treat People With Kindness (Official Audio).wav: Score of 145 at 638
converted/The Kid LAROI - WITHOUT YOU (Official Audio).wav: Score of 140 at 558
converted/Miley Cyrus & Dua Lipa - Prisoner (Audio).wav: Score of 138 at 598


In [51]:
print("Song: The Weeknd - Save Your Tears")
print('Match:')
print_top_five("converted_memo/Save Your Tears MEMO.wav")

Song: The Weeknd - Save Your Tears
Match:
converted/The Weeknd - Save Your Tears (Official Audio).wav: Score of 427 at 14
converted/Harry Styles - Watermelon Sugar (Official Audio).wav: Score of 212 at 612
converted/The Kid LAROI - WITHOUT YOU (Official Audio).wav: Score of 199 at 540
converted/DJ Khaled ft. Drake - POPSTAR (Official Audio).wav: Score of 196 at 741
converted/Shawn Mendes, Justin Bieber - Monster.wav: Score of 161 at 604
