<a href="https://colab.research.google.com/github/Ground17/find_groove/blob/main/find_groove.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

0. Initialize packages

In [None]:
#@title
import numpy as np
import pandas as pd
import tensorflow as tf
import os, datetime, math, pickle
# from scipy.signal import butter, lfilter
# import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tqdm.auto import tqdm

from IPython.display import Javascript
from google.colab import output
from base64 import b64decode
from io import BytesIO
!pip -q install pydub
from pydub import AudioSegment

import ipywidgets as widgets
from IPython.display import display, clear_output

In [None]:
#@title
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


0. Initialize functions

In [None]:
#@title
def downsampling(sound):
    '''

    :param sound:   instance of AudioSegment
    :return:        np array of mono 11025Hz audio data

    Resampled to 11025Hz for processing.
    '''
    assert isinstance(sound, AudioSegment)

    sound = sound.set_channels(1)
    sound = sound.set_frame_rate(11025)

    audio_as_np_float32 = np.array(sound.get_array_of_samples())

    max_int16 = 2 ** 15

    return audio_as_np_float32 / max_int16

def FFT(audio_normalised):
    '''

    :param audio_normalised:    audio value in time domain
    :return:                    FFT of audio value

    Calculates the FFT of incoming audio information.
    '''
    N = len(audio_normalised)
    frequency = np.fft.fft(audio_normalised) / N
    frequency = frequency[range(math.trunc(N / 2))]
    frequency = 2 * abs(frequency)
    frequency = frequency.tolist()

    return frequency

def audioread(file_path):
    '''

    :param file_path:    audio file.wav
    :return:             downsampled wave file

    Read the wav or mp3 file and convert it to the appropriate format.
    '''
    _, file_extension = os.path.splitext(file_path)
    assert file_extension == '.wav' or file_extension == '.mp3'

    sound = None
    if file_extension == '.wav':
        sound = AudioSegment.from_wav(file_path)
    else:
        sound = AudioSegment.from_mp3(file_path)

    return downsampling(sound)

def add_noise(noise, std, length):
    return np.random.normal(0, std * noise, size=length)

def get_2D_peaks(frequencies, test=False):
    '''
    :param frequencies:   audio frequency list
    :return:              array of over mean of band frequency

    Obtain the largest frequency value in each frequency band.
    '''
    f_10 = []
    f_20 = []
    f_40 = []
    f_80 = []
    f_160 = []
    f_320 = []
    f_512 = []

    result = []
    for i in range(len(frequencies)):
        values = []

        factor = [(0, 10), (10, 20), (20, 40), (40, 80), (80, 160), (160, 320), (320, 512)]

        for start, end in factor:
            index = start
            value = frequencies[i][index]
            for j in range(index + 1, end):
                if value < frequencies[i][j]:
                    index = j
                    value = frequencies[i][j]
            values.append((value, index))

        if test:
            values.sort(reverse=True)
            for j in range(3):
                _, index = values[j]
                result.append((index, i))
        else:
            for _, index in values:
                result.append((index, i))

    return result

def spectrogram(audio_normalised, noise=0, offset=0):
    '''

    :param file:    wave file
    :return:        total frequencies (for model) and peaks of frequency and time (for fingerprint) in 2D

    get audio file -> normalise -> cut interval by using hammaing window
    -> FFT for all intervals -> get average magnitude of low frequency in a full song
    -> get peaks frequency over average magnitude

    이 코드 전체에서 가장 큰 뼈대입니다.
    오디오파일을 불러옵니다. -> 음악파일을 정규화 합니다 -> 해밍윈도우로 자릅니다.(크기 1024)
    -> 잘린 구간을 FFT합니다. -> 노래 전체 구간에서 저주파수 영역의 평균을 구합니다.
    -> 이 평균값보다 큰 주파수 영역대를 구합니다.
    '''
    assert isinstance(audio_normalised, np.ndarray)
    std = np.std(audio_normalised)
    length = len(audio_normalised)

    if noise != 0:
        audio_normalised += add_noise(noise, std, length)
    
    window = np.hamming(1024)                                   # 탭 수 1024인 해밍윈도우 생성

    frequencies = []

    for i in range(offset, length - 1024, 1024):
        audio_cut = []
        for j in range(1024):
            audio_cut.append(window[j]*audio_normalised[i+j])   # 해밍윈도우로 자르기   
        frequency = FFT(audio_cut)                              # 잘린 부분 FFT 변환
        frequencies.append(frequency)

    return frequencies

0. Prepare fingerprint and model  (If it doesn't work, skip step 1 and follow step 2.)

In [None]:
#@title
model = None
finger_dict = {}
titles = []

if not finger_dict and not titles:
    try:
        with open('/content/drive/MyDrive/find_groove/fingerprint', 'rb') as f:
            finger_dict = pickle.load(f)

        with open('/content/drive/MyDrive/find_groove/titles', 'rb') as f:
            titles = pickle.load(f)
    except:
        print("Please skip step 1 and follow step 2.")
    else:
        try:
            model = tf.keras.models.load_model('/content/drive/MyDrive/find_groove/cnn_vote_best.h5')
        except:
            pass

1. Test

In [None]:
#@title
### source - https://gist.github.com/korakot/c21c3476c024ad6d56d5f48b0bca92be

RECORD = """
const sleep  = time => new Promise(resolve => setTimeout(resolve, time))
const b2text = blob => new Promise(resolve => {
    const reader = new FileReader()
    reader.onloadend = e => resolve(e.srcElement.result)
    reader.readAsDataURL(blob)
})
var record = time => new Promise(async resolve => {
    stream = await navigator.mediaDevices.getUserMedia({ audio: true })
    recorder = new MediaRecorder(stream)
    chunks = []
    recorder.ondataavailable = e => chunks.push(e.data)
    recorder.start()
    await sleep(time)
    recorder.onstop = async ()=>{
        blob = new Blob(chunks)
        text = await b2text(blob)
        resolve(text)
    }
    recorder.stop()
})
"""

def record_search(sec=3):
    global model, finger_dict, titles
    results = []

    display(Javascript(RECORD))
    s = output.eval_js('record(%d)' % (sec*1000))
    b = b64decode(s.split(',')[1])
    audio = AudioSegment.from_file(BytesIO(b))

    ffts = spectrogram(downsampling(audio))

    rec_peaks = get_2D_peaks(ffts, True)
    fan_value = 8                                               #지문에서 탐색할 주파수 인덱스 범위
    data_fingerprint = {}
    data_cnn_vote = {}

    for i in range(len(rec_peaks)):                             #스펙트로그램 주파수 영역 길이만큼
        for j in range(1, fan_value):                           #설정한 인덱스 범위만큼
            if (i+j) < len(rec_peaks):                          #녹음된 음원의 길이를 넘지 않게
                freq1 = rec_peaks[i][0]
                freq2 = rec_peaks[i + j][0]
                t1 = rec_peaks[i][1]
                t2 = rec_peaks[i + j][1]
                t_delta = t2 - t1                               #시간 인덱스 차이

                if t_delta <= 20:                          #인덱스 차이가 1024*20(약 1.85초) 이내일 경우만 기록
                    if (freq1, freq2, t_delta) in finger_dict:
                        for item in finger_dict[(freq1, freq2, t_delta)]:
                            name = item[0]
                            t1_abs = item[1]
                            item = (name, t1_abs - t1)
                            if item not in data_fingerprint:
                                data_fingerprint[item] = 0

                            data_fingerprint[item] += 1

    title_finger = None
    value = 0
    for key in data_fingerprint:
        music_name, _ = key
        if value < data_fingerprint[key]:
            value = data_fingerprint[key]
            title_finger = music_name

    if title_finger:
        results.append(title_finger)

    if model:
        data_array = []
        array_X = []
        temp30 = []
        for m in range(len(ffts)): # 30개씩 쌓기
            for n in range(30):
                if m + n < len(ffts):
                    temp30.append(ffts[m + n])      
            if len(temp30) == 30:
                array_X.append(temp30)
                temp30 = []

        train_X = np.array(array_X)

        for i in range(len(train_X)): # 데이터 정규화
            for j in range(len(train_X[i])):
                if np.std(train_X[i, j]) == 0:
                    continue
                train_X[i, j] = (train_X[i, j] - np.mean(train_X[i, j])) / np.std(train_X[i, j])

        train_X = np.swapaxes(train_X, 1, 2)

        train_X = train_X[..., np.newaxis]
        count = [0 for _ in range(len(titles))]
        predictions = model.predict(train_X)
        temp = np.argmax(predictions, axis=1)
        for index in temp:
            if titles[index] not in data_cnn_vote:
                data_cnn_vote[titles[index]] = 0

            data_cnn_vote[titles[index]] += 1

        for music_name in data_cnn_vote:
            data_array.append((data_cnn_vote[music_name], music_name))

        for _ in range(min(len(data_array), 3)): # O(3n) = O(n)
            for j in range(len(data_array) - 1):
                if data_array[j][0] > data_array[j + 1][0]:
                    temp = data_array[j]
                    data_array[j] = data_array[j + 1]
                    data_array[j + 1] = temp

        for i in range(min(len(data_array), 3)):
            if data_array[-1 - i][1] not in results:
                results.append(data_array[-1 - i][1])

    return results

recording = False

start = widgets.Button(
    description="▶",
    disabled=False,
    button_style="success",
    tooltip='Record start!',
)

stop = widgets.Button(
    description="■",
    disabled=True,
    button_style="success",
    tooltip='Recording...!',
)

outputs = widgets.Output()

def on_button_clicked(b):
    # Display the message within the output widget.
    global recording
    recording = True
    clear_output(wait=True)
    display(stop, outputs)
    results = record_search()
    recording = False
    clear_output(wait=True)
    display(start, outputs)
    if results:
        for i, result in enumerate(results):
            if i == 0:
                print("answer:", result)
                continue
            
            print("recommended:", result)
    else:
        print("Maybe database is empty...")

start.on_click(on_button_clicked)
display(start, outputs)

Button(button_style='success', description='▶', style=ButtonStyle(), tooltip='Record start!')

Output()

1 blues.00004.wav


2. Make Fingerprint of audio  
(If possible, build an ML model)

In [None]:
#@title
# fingerprint, cnn + voting
path = "/content/drive/MyDrive/find_groove"
music_list = os.listdir(path)
N = len(music_list)
ml = False

while N <= 100:
    answer = input("Do you want to create a machine learning model?\nThe search accuracy may improve in the future, but it may take a long time.\nIn addition, if there are too many full music files, runtime may stop in the middle and it don't make model. (y/n) ")

    if answer.lower() == 'y' or answer.lower() == 'yes':
        ml = True
        break

    if answer.lower() == 'n' or answer.lower() == 'no':
        break

    print("Please enter the correct value.")
else:
    print("Machine learning is not supported for more than 100 songs.")

noise = [0, 0.56234132519] # noise = [0, 0.56234132519, 1]

array_X = []
array_y = []


titles = [] # 노래 제목들 저장하는 array

fan_value = 15 # 특정 인덱스 기준 몇번째까지 범위를 가질것인가
finger_dict = {}
title_dict = {}
for a in noise:
    k = 0
    if a != 0 and not ml:
        break

    for b in tqdm(music_list):
        # for c in range(1024): # offset
            if b not in title_dict:
                title_dict[b] = k
            try:
                ffts = spectrogram(audioread(path + '/' + b), a) # (length, 512) => (512, 10)
            except:
                continue
            else:
                peaks = get_2D_peaks(ffts)
                for i in range(len(peaks)):
                    if a != 0:
                        break

                    for j in range(1, fan_value):
                        if (i + j) < len(peaks): # 인덱스가 범위 내에 있다면
                            freq1 = peaks[i][0] # 주파수1 인덱스
                            freq2 = peaks[i + j][0] # 주파수2 인덱스
                            t1 = peaks[i][1] # 시간1 인덱스
                            t2 = peaks[i + j][1] # 시간2 인덱스
                            t_delta = t2 - t1

                            if t_delta <= 20:
                                if (freq1, freq2, t_delta) not in finger_dict:
                                    finger_dict[(freq1, freq2, t_delta)] = []

                                finger_dict[(freq1, freq2, t_delta)].append((b, t1)) # key: (freq1, freq2, t_delta), value: set of filenames

                if ml:
                    for m in range(len(ffts)): # 30개씩 쌓기
                        temp30 = []
                        for n in range(30):
                            if m + n < len(ffts):
                                temp30.append(ffts[m + n])
                        if len(temp30) == 30:
                            array_X.append(temp30)
                            array_y.append(title_dict[b])
                            temp30 = []
                if len(titles) < N:
                    titles.append(b)

                print("\r", b, "is processed.", end="")
                k += 1

with open(path + '/fingerprint', 'wb') as f:
    pickle.dump(finger_dict, f)

with open(path + '/titles', 'wb') as f:
    pickle.dump(titles, f)

if ml:
    from tensorflow.keras.utils import to_categorical
    train_X = np.array(array_X)

    # 스케일 조정 - mean: 0, std: 1
    for i in range(len(train_X)):
        for j in range(len(train_X[i])):
            if np.std(train_X[i, j]) == 0:
                continue
            train_X[i, j] = (train_X[i, j] - np.mean(train_X[i, j])) / np.std(train_X[i, j])

    train_X = np.swapaxes(train_X, 1, 2)

    train_X = train_X[..., np.newaxis]
        

    array_y_c = to_categorical(array_y, num_classes=N)
    train_y = np.array(array_y_c)

    del array_y_c, array_X, array_y

    print("\n\nPlease go to step 3.")
else:
    print("\n\nPlease go to step 1 and test.")

Machine learning is not supported for more than 100 songs.


  0%|          | 0/1002 [00:16<?, ?it/s]

 blues.00000.wav is processed.

Please go to step 2 and test.


3. Train ML model

In [None]:
#@title
if ml:
    from tensorflow.keras import layers, models, optimizers
    from tensorflow.keras.utils import to_categorical
    from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

    array_X = []
    array_y = []

    path = '/content/drive/MyDrive/find_groove'
    music_list = os.listdir(path)
    k = 0
    for b in tqdm(music_list[:10]):
        # for c in tqdm(range(1024)): # offset
        ffts = spectrogram(audioread(path + '/' + b), 0, 512) # (length, 512) => (512, 10), offset: 512
        for m in range(len(ffts)): # 30개씩 쌓기
            temp30 = []
            for n in range(30):
                if m + n < len(ffts):
                    temp30.append(ffts[m + n])
            if len(temp30) == 30:
                array_X.append(temp30)
                array_y.append(k)
                temp30 = []

        k += 1
        
    val_X = np.array(array_X)

    # 스케일 조정 - mean: 0, std: 1
    for i in range(len(val_X)):
        for j in range(len(val_X[i])):
            if np.std(val_X[i, j]) == 0:
                continue
            val_X[i, j] = (val_X[i, j] - np.mean(val_X[i, j])) / np.std(val_X[i, j])

    val_X = np.swapaxes(val_X, 1, 2)

    val_X = val_X[..., np.newaxis]
        
    array_y_c = to_categorical(array_y, num_classes=N)
    val_y = np.array(array_y_c)

    del array_y, array_X, array_y_c

    # SIZE, HEIGHT, WIDTH, CHANNELS
    SIZE = 64
    HEIGHT = 512
    WIDTH = 30
    CHANNELS = 1

    model = models.Sequential()

    model.add(layers.Conv2D(32, (3, 1), activation='relu', input_shape=(HEIGHT, WIDTH, CHANNELS)))
    model.add(layers.MaxPooling2D((4, 1)))
    model.add(layers.Conv2D(64, (3, 1), activation='relu'))
    model.add(layers.MaxPooling2D((4, 1)))
    model.add(layers.Conv2D(128, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((4, 4)))
    model.add(layers.Flatten())
    model.add(layers.Dense(1024, activation='relu'))
    model.add(layers.Dense(N, activation='softmax')) # N개의 곡

    model.summary()

    # model = tf.keras.models.load_model('/content/drive/MyDrive/find_groove/cnn_vote_best.h5') ###edit###

    mc = ModelCheckpoint('/content/drive/MyDrive/find_groove/cnn_vote_best.h5', monitor='val_loss', mode='min', save_best_only=True)

    # logdir = os.path.join("logs", datetime.datetime.now().strftime("%Y%m%d-%H%M%S"))
    # tensorboard_callback = tf.keras.callbacks.TensorBoard(logdir, histogram_freq=1)

    model.compile(optimizer=optimizers.Adam(learning_rate=1e-3), loss='categorical_crossentropy', metrics=['accuracy'])
    history = model.fit(train_X, train_y, epochs=7, batch_size=32, validation_data=(val_X, val_y), callbacks=[mc])

    print("\n\nPlease go to step 1 and test.")

  0%|          | 0/10 [00:00<?, ?it/s]

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_3 (Conv2D)           (None, 510, 30, 32)       128       
                                                                 
 max_pooling2d_3 (MaxPooling  (None, 127, 30, 32)      0         
 2D)                                                             
                                                                 
 conv2d_4 (Conv2D)           (None, 125, 30, 64)       6208      
                                                                 
 max_pooling2d_4 (MaxPooling  (None, 31, 30, 64)       0         
 2D)                                                             
                                                                 
 conv2d_5 (Conv2D)           (None, 29, 28, 128)       73856     
                                                                 
 max_pooling2d_5 (MaxPooling  (None, 7, 7, 128)       