In [1]:
import librosa
from IPython.display import Audio
from scipy import signal
import numpy as np
import glob
from typing import List, Dict, Tuple
from tqdm import tqdm
import pickle
from PIL import Image
import sounddevice as sd
import soundfile as sf
from scipy.io.wavfile import write
from playsound import playsound

playsound is relying on another python subprocess. Please use `pip install pygobject` if you want playsound to run more efficiently.


In [2]:
window_length = 2048
sr = 22050
distance_between_samples = 1 / sr  # in secunde
frequencies = np.fft.fftfreq(window_length, distance_between_samples)[:window_length//2+1]  # valorile pozitive ale frequency bin urilor
num_peaks = 15

def create_constellation(spectrogram):

    constellation = []
    for time, window in enumerate(spectrogram.T):
        # window is the spectrum from the time-th bin
        # we only want real values
        spectrum = abs(window)  # amplitudini, loudness
        peaks, props = signal.find_peaks(spectrum, prominence=0, distance=200)
        # distance - required minimal horizontal distance (>= 1) in samples between neighbouring peaks. Smaller peaks are removed first until the condition is fulfilled for all remaining peaks.
        # prominence - required prominence of peaks. Either a number, None, an array matching x or a 2-element sequence of the former. The first element is always interpreted as the minimal and the second, if supplied, as the maximal required prominence.

        # Returns
        # peaks - ndarray - indices of peaks in x that satisfy all given conditions.
        # properties - dict - a dictionary containing properties of the returned peaks which were calculated as intermediate results during evaluation of the specified conditions

        n_peaks = min(num_peaks, len(peaks))
        largest_peaks = np.argpartition(props["prominences"], -n_peaks)[-n_peaks:]  # ultimele n_peaks elemente vor fi indexes pt prominences cele mai mari

        for peak in peaks[largest_peaks]:
            frequency = frequencies[peak]  # iau frecventa coresp binului de frecventa in care se incadreaza valoarea peak
            constellation.append([time, frequency])

    return constellation


In [3]:
def create_hashes (constellation, song_id=None):
    hashes = {}

    upper_frequency = 22050
    frequency_bits = 10

    for index, (first_time, first_freq) in enumerate(constellation):
        for second_time, second_freq in constellation[index : index + 100]:
            difference = second_time - first_time
            if difference <= 1 or difference >= 99:
                continue

            first_binned = first_freq / upper_frequency * (2 ** frequency_bits)
            second_binned = second_freq / upper_frequency * (2 ** frequency_bits)

            hash = int(first_binned) | (int(second_binned) << 10) | (int(difference) << 20)
            hashes[hash] = (first_time, song_id)

    return hashes


In [4]:
sr = 22050
songs_indexes = {}
database: Dict[int, List[Tuple[int, int]]] = {}
def load_songs():

    songs = glob.glob('Data/pop+rock+pop_rock+blues_rock/*.[jpPn][npP]*[gG]')
    # Am incarcat spectrogramele din memorie folosind modulul glob

    for index, filename in enumerate(tqdm(sorted(songs))):
        songs_indexes[index] = filename
        image = Image.open(filename)
        np_image = np.array(image)
        constellation = create_constellation(np_image)
        hashes = create_hashes(constellation, index)

        for hash, time_index_pair in hashes.items():
            if hash not in database:
                database[hash] = []
            database[hash].append(time_index_pair)

load_songs()

100%|██████████| 402/402 [01:26<00:00,  4.63it/s]


In [5]:
with open("database.pickle", 'wb') as db:
    pickle.dump(database, db, pickle.HIGHEST_PROTOCOL)
with open("song_index.pickle", 'wb') as songs:
    pickle.dump(songs_indexes, songs, pickle.HIGHEST_PROTOCOL)

In [6]:
database = pickle.load(open('database.pickle', 'rb'))
song_name_index = pickle.load(open("song_index.pickle", "rb"))

In [7]:
def find_scores(hashes):
    matches_per_song = {}
    for hash, (sample_time, _) in hashes.items():
        if hash in database:
            matching_occurences = database[hash]
            for source_time, song_index in matching_occurences:
                if song_index not in matches_per_song:
                    matches_per_song[song_index] = []
                matches_per_song[song_index].append((hash, sample_time, source_time))


    # %%
    scores = {}
    for song_index, matches in matches_per_song.items():
        song_scores_by_offset = {}
        for hash, sample_time, source_time in matches:
            delta = source_time - sample_time
            if delta not in song_scores_by_offset:
                song_scores_by_offset[delta] = 0
            song_scores_by_offset[delta] += 1

        max = (0, 0)
        for offset, score in song_scores_by_offset.items():
            if score > max[1]:
                max = (offset, score)

        scores[song_index] = max

    # Sort the scores for the user
    scores = list(sorted(scores.items(), key=lambda x: x[1][1], reverse=True))

    return scores


In [27]:
fs = 22050  # Sample rate
seconds = 10  # Duration of recording
path = "Data/test.wav"
myrecording = sd.rec(int(seconds * fs), samplerate=fs, channels=2)
sd.wait()  # Wait until recording is finished
write(path, fs, myrecording)  # Save as WAV file

In [25]:
# Extract data and sampling rate from file
data, fs = sf.read(path, dtype='float32')
sd.play(data, fs)
status = sd.wait()  # Wait until file is done playing

In [26]:
#path = "Data/pop+drums+guitar_cut0.1.wav"
audio, srr = librosa.load(path)
spect = librosa.feature.melspectrogram(y=audio, sr=sr, fmin=0.)
#spect = librosa.feature.melspectrogram(y=data, sr=sr, fmin=0.)

def print_top_five(new_audio):
    song_constellation = create_constellation(new_audio)
    song_hashes = create_hashes(song_constellation, None)
    scores = find_scores(song_hashes)

    print(f"Total number of songs in the dataset: {len(scores)}")
    print(f"Recording {path.rsplit('/', 1)[1]}:\n")
    for index, (song_id, score) in enumerate(scores):
        song_name = song_name_index[song_id]
        _, filename = song_name.rsplit('/', 1)
        filename = filename.rsplit('.', 1)[0]
        print(f"{index}. {filename}: Score of {score[1]} at {score[0]}")


print_top_five(spect)



Total number of songs in the dataset: 402
Recording test.wav:

0. pop_spec35: Score of 91 at 825
1. pop_spec1: Score of 89 at 36
2. pop_rock_spec24: Score of 71 at 717
3. pop_rock_spec35: Score of 68 at 684
4. blues_rock_spec68: Score of 63 at 897
5. rock_spec85: Score of 62 at 839
6. pop_rock_spec4: Score of 61 at 799
7. rock_spec31: Score of 60 at 853
8. pop_spec26: Score of 60 at 690
9. rock_spec23: Score of 60 at 124
10. pop_rock_spec1: Score of 59 at 740
11. pop_spec11: Score of 59 at 425
12. rock_spec53: Score of 58 at 694
13. blues_rock_spec27: Score of 55 at 457
14. pop_rock_spec44: Score of 55 at 906
15. pop_rock_spec18: Score of 55 at 804
16. pop_rock_spec23: Score of 54 at 864
17. pop_rock_spec37: Score of 54 at 709
18. rock_spec62: Score of 54 at 493
19. pop_rock_spec62: Score of 53 at 823
20. pop_rock_spec34: Score of 53 at 531
21. rock_spec26: Score of 53 at 271
22. pop_spec5: Score of 52 at 915
23. pop_spec41: Score of 52 at 869
24. pop_rock_spec68: Score of 51 at 845
25