In [1]:
import numpy as np      
import matplotlib.pyplot as plt 
import scipy.io.wavfile 
import subprocess
import librosa
import librosa.display
import IPython.display as ipd

from pathlib import Path, PurePath   
from tqdm.notebook import tqdm

## Utility functions

In [2]:
def convert_mp3_to_wav(audio:str) -> str:  
    """Convert an input MP3 audio track into a WAV file.

    Args:
        audio (str): An input audio track.

    Returns:
        [str]: WAV filename.
    """
    if audio[-3:] == "mp3":
        wav_audio = audio[:-3] + "wav"
        if not Path(wav_audio).exists():
                subprocess.check_output(f"ffmpeg -i {audio} {wav_audio}", shell=True)
        return wav_audio
    
    return audio

def plot_spectrogram_and_picks(track:np.ndarray, sr:int, peaks:np.ndarray, onset_env:np.ndarray) -> None:
    """[summary]

    Args:
        track (np.ndarray): A track.
        sr (int): Aampling rate.
        peaks (np.ndarray): Indices of peaks in the track.
        onset_env (np.ndarray): Vector containing the onset strength envelope.
    """
    times = librosa.frames_to_time(np.arange(len(onset_env)),
                            sr=sr, hop_length=HOP_SIZE)

    plt.figure()
    ax = plt.subplot(2, 1, 2)
    D = librosa.stft(track)
    librosa.display.specshow(librosa.amplitude_to_db(np.abs(D), ref=np.max),
                            y_axis='log', x_axis='time')
    plt.subplot(2, 1, 1, sharex=ax)
    plt.plot(times, onset_env, alpha=0.8, label='Onset strength')
    plt.vlines(times[peaks], 0,
            onset_env.max(), color='r', alpha=0.8,
            label='Selected peaks')
    plt.legend(frameon=True, framealpha=0.8)
    plt.axis('tight')
    plt.tight_layout()
    plt.show()

def load_audio_picks(audio, duration, hop_size):
    """[summary]

    Args:
        audio (string, int, pathlib.Path or file-like object): [description]
        duration (int): [description]
        hop_size (int): 

    Returns:
        tuple: Returns the audio time series (track) and sampling rate (sr), a vector containing the onset strength envelope
        (onset_env), and the indices of peaks in track (peaks).
    """
    try:
        track, sr = librosa.load(audio, duration=duration)
        onset_env = librosa.onset.onset_strength(track, sr=sr, hop_length=hop_size)
        peaks = librosa.util.peak_pick(onset_env, 10, 10, 10, 10, 0.5, 0.5)
    except Error as e:
        print('An error occurred processing ', str(audio))
        print(e)

    return track, sr, onset_env, peaks
    
    

## Settings

In [3]:
N_TRACKS = 1413
HOP_SIZE = 512
DURATION = 30 # TODO: to be tuned!
THRESHOLD = 5 # TODO: to be tuned!

In [75]:
data_folder = Path("./data/mp3s-32k/")
mp3_tracks = data_folder.glob("*/*/*.mp3")
tracks = data_folder.glob("*/*/*.wav")

## Preprocessing

In [5]:
def preprocessing(mp3_tracks):
    for track in tqdm(mp3_tracks, total=N_TRACKS):
        convert_mp3_to_wav(str(track))

In [6]:
#preprocessing(mp3_tracks)

## Audio signals

In [7]:
"""for idx, audio in enumerate(tracks):
    if idx >= 1:
        break
    track, sr, onset_env, peaks = load_audio_picks(audio, DURATION, HOP_SIZE)
    plot_spectrogram_and_picks(track, sr, peaks, onset_env)
        
        """

'for idx, audio in enumerate(tracks):\n    if idx >= 1:\n        break\n    track, sr, onset_env, peaks = load_audio_picks(audio, DURATION, HOP_SIZE)\n    plot_spectrogram_and_picks(track, sr, peaks, onset_env)\n        \n        '

## Minhash

In [8]:
# TODO

In [9]:
from bitstring import BitArray
import pandas as pd
from collections import *
import pickle
import multiprocessing
import random

In [10]:
def save_object(obj, filename):
    with open(filename, 'wb') as outp:  # Overwrites any existing file.
        pickle.dump(obj, outp, pickle.HIGHEST_PROTOCOL)

In [11]:
def read_object(filename):
    with open(filename, 'rb') as file:
        data = pickle.load(file)
    return data

In [12]:
def timeOfPeaks(peaks, times):
    timesPeaks = []
    
    for i in peaks:
        timesPeaks.append(times[i])
    
    return timesPeaks

In [13]:
def fibonacci_hash_float(value:float, rand = False, hash_size = 15):

    value = BitArray(float=value, length=64)
    phi = (1 + 5 ** 0.5) / 2
    g = int(2 ** 64 /phi)


    value ^= value >> 61

    if(rand):
        value = int(g * value.float * np.random.random_sample(1))
    else:
        value = int(g * value.float)

    return int(str(value)[0:hash_size])

In [61]:
class HashTable:
    def __init__(self):
        self.hash_table = defaultdict(list)
        
    def generate_hash(self, inp_vector):
        hashVal = "".join(inp_vector)
        return hashVal
            
    def setitem(self, vec, label):
        val = self.generate_hash(vec)
        self.hash_table[val].append(label)
        
    def getitem(self, inp_vec):
        hash_value = self.generate_hash(inp_vec)
        return self.hash_table.get(hash_value, [])
    
    def getTable(self):
        return(self.hash_table)

In [95]:
class LSH:
    def __init__(self, num_tables, threshold, b):
        assert num_tables == threshold//b, "The number of table must be equals to threshold // band"
        self.num_tables = num_tables
        self.band = b
        self.threshold = threshold
        self.hash_tables = list()
        for i in range(self.num_tables):
            self.hash_tables.append(HashTable())
        
            
    def minhash(self, vec, label):
        
        out = defaultdict(list)
        
        for i in range(0,self.threshold):
            random.shuffle(vec)
    
            for idx, num in enumerate(vec):
    
                if(num != 0):
                    
                    out[i//self.band].append(str(idx))
                    break
        

        for el, table in zip(out, self.hash_tables):
            table.setitem(out[el], label)


    def info(self):
        print("Numero di tabelle: " + str(self.num_tables))
        print("Elementi per tabella: " + str(len(self.hash_tables[0].hash_table)))

In [16]:
def make_fingerprints(audio, duration, hop):
    track, sr, onset_env, peaks = load_audio_picks(audio, DURATION, HOP_SIZE)
    times = librosa.frames_to_time(np.arange(len(onset_env)), sr=sr, hop_length=HOP_SIZE)
    timesPeaks = timeOfPeaks(peaks, times)
    freqsP = [onset_env[i] for i in peaks]

    fingerprints = []
    sec = hop
    time=0
    count=0
    while(sec <= duration):

        idx = 0
        hashVal = 0
        count += 1

        while(time <=sec):
            
            if(timesPeaks[idx] < sec and timesPeaks[idx] > time):
                hashVal ^= fibonacci_hash_float(freqsP[idx]) ^ hashVal
                

            time = timesPeaks[idx]
           
            if(idx+1 < len(freqsP)):
                idx += 1
            else:
                break
        
        fingerprints.append(hashVal)
        
        sec += hop
        
 
        
    return fingerprints

In [None]:
fin = make_fingerprints("./data/mp3s-32k/aerosmith/Aerosmith/03-Dream_On.wav", 30, 0.3)
fin

creao fingerprints da 1 secondo della canzone.

come creo i backet?

Lets try with some test...

In [127]:
def populate_LSH(song = "NA"):
    
    lsh = LSH(15, 150, 10)
    
    if(song == "NA"):
        
        for idx, audio in tqdm(enumerate(tracks), total = N_TRACKS):

            fingerprint = make_fingerprints(audio, DURATION, 0.3)

            lsh.minhash(fingerprint, audio.name)
    else:
        fingerprint = make_fingerprints(song, DURATION, 0.3)
        
        lsh.minhash(fingerprint, song)
        
    return lsh

In [124]:
def searchForCollision(lsh, lsh_q):
    match = list()
    
    for table, tableq in zip(lsh.hash_tables, lsh_q.hash_tables):
        
        match.extend(table.hash_table.get([*tableq.hash_table.keys()][0], ["NA"]))
        
    match = list(filter(lambda a: a != "NA", match))
    
    prob = Counter(match).most_common(1)[0][1] / len(match)
    
    return [Counter(match).most_common(1)[0], prob]

In [41]:
#save_object(lsh, "./data/lsh_50_150_3(0.3).pkl")

In [125]:
lsh = populate_LSH()
save_object(lsh, "./data/lsh_15_150_10(0.3).pkl")
#lsh = read_object("./data/lsh_50_150_3(0.3).pkl")
lsh_q = populate_LSH("./data/queries/track1.wav")
out = searchForCollision(lsh, lsh_q)
out

[('./data/mp3s-32k/aerosmith/Aerosmith/03-Dream_On.wav', 10), 1.0]

In [83]:
lsh.info()

Numero di tabelle: 50
Elementi per tabella: 149


In [81]:
lsh_q.hash_tables[0].hash_table

defaultdict(list, {'110': ['./data/queries/track1.wav']})

In [98]:
lsh.hash_tables[0].hash_table

defaultdict(list,
            {'010': ['./data/mp3s-32k/aerosmith/Aerosmith/03-Dream_On.wav']})

In [None]:
audioq = "./data/queries/track1.wav"
audio = "./data/mp3s-32k/aerosmith/Aerosmith/03-Dream_On.wav"

# Query test

take the fisrt query

In [None]:
"""audio = 'data/queries/track3.wav'"""

make the hashmin of the song

In [None]:
"""track, sr, onset_env, peaks = load_audio_picks(audio, DURATION, HOP_SIZE)
timess = librosa.frames_to_time(np.arange(len(onset_env)), sr=sr, hop_length=HOP_SIZE)
timesPeaks = timeOfPeaks(peaks, timess)
freqsP = [onset_env[i] for i in peaks]
    
h = minhash(freqsP, timesPeaks, THRESHOLD, DURATION)
h"""

lets see if it match something...

In [None]:
"""guess_song(audio)"""

In [None]:
"""data_folder2 = Path("./data/queries/")
query_tracks = data_folder2.glob("./*.wav")
get = 0
miss = 0
for query in query_tracks:
    print("\nCurrent query: " + str(query) + "\n")
    try:
        print(guess_song(query))
        get += 1
    except KeyError:
        print("Not matched!")
        miss += 1
    print("\n===========================================\n")
    
print("Song matched: " + str(get) + "  Song missed: " + str(miss))"""