# Создаем словарь аккордов и инструментов

In [None]:
CFG = {
    "pitches": [
        "c",
        "d",
        "dm",
        "e",
        "em",
        "f",
        "g",
        "a",
        "am",
        "bm"
    ],
    "instruments": [
        "Guitar",
        "Piano",
        "Violin"
    ]
}

# Дробим большое аудио на маленькие сэмплы

In [None]:
from scipy.io import wavfile
from scipy.fftpack import fft
from math import log2

class LongFileProfiler():
    def __init__(self, file_name, window = 0.5):
        self.current_pointer = 0
        self.frequency, self.samples = wavfile.read(file_name) # https://docs.scipy.org/doc/scipy/reference/generated/scipy.io.wavfile.read.html -> frequency, samples
        self.window = round(window*self.frequency)             # Окно которым мы проходимся и смотрим

    def get_profile(self):
        profiles_list = []
        samples_count = len(self.samples)                      # Количество сэмплов

        while self.current_pointer < samples_count:
            rigth_bound =  self.current_pointer + self.window
            if rigth_bound >= samples_count:
                rigth_bound = samples_count - 1
            window_samples = self.samples[self.current_pointer: rigth_bound]
            X = fft(window_samples)
            profiles_list.append(self.pcp(X))
            self.current_pointer += self.window
        return profiles_list
    
    def pcp(self, X):
        # Взял этот алгоритм из статьи, которую прикрепил в архиве
        # Немного модифицировал его
        fref = 130.81 # Параметр подобранный автором статьи
        N = len(X)
        def M(l, p):
            if l == 0:
                return -1
            return round(12 * log2((self.frequency * l)/(N * fref))) % 12
        
        pcp = [0 for p in range(12)]
        # Считаем pcp
        for p in range(12):
            for l in range(N//2):
                if p == M(l, p):
                    pcp[p] += abs(X[l])**2 
        # Нормализуем pcp
        pcp_norm = [0 for p in range(12)]
        for p in range(12):
            if type(sum(pcp)) == np.ndarray:       # Если у нас стерео
                normp = []
                pcpp = np.array(pcp[p])
                spcp = np.array(sum(pcp))
                toapp = []
                for i in range(len(spcp)): 
                    if spcp[i] == 0.:
                        toapp.append(0.)
                    else:
                        toapp.append(pcpp[i]/spcp[i])
                pcp_norm[p] = toapp
            else:                           # Если моно
                if sum(pcp) == 0:
                    pcp_norm[p] = 0
                else:
                    pcp_norm[p] = (pcp[p] / sum(pcp))
                

        return pcp_norm
    
    def get_profile_for_pitch(self):
        X = fft(self.samples())
        return self.pcp(X)



# Модель

In [None]:
import pandas
import numpy as np
import keras
import itertools
import matplotlib.pyplot as plt
from keras.models import Sequential, load_model
from keras.layers import Dense, LSTM, TimeDistributed, Bidirectional 
from keras import metrics
import sklearn.metrics

class Trainer():
    def __init__(self, model_type='Dense', file_name="my_model.h5", loss_function="binary_crossentropy", CFG=CFG):
        self.pitches = CFG['pitches']
        self.trained = False
        self.file_name = file_name
        self.loss_function = loss_function
        self.model_type = model_type

    # Читаем наши данные
    def read_pitch_csv(self, folder_name):
        data = pandas.DataFrame()
        list_ = []
        for pitch in self.pitches:
            file_data = pandas.read_csv(folder_name + pitch + ".csv", header=None)
            list_.append(file_data)
        data = pandas.concat(list_)
        return data

    def out_data_generator(self, how_many):
        list_ = []
        for i in range(len(self.pitches)):
            for _ in range(how_many):
                out = [0.0 for _ in range(len(self.pitches))]
                out[i] = 1.0
                list_.append(out)
        data = pandas.DataFrame(list_)
        return data

    def validation_input_data(self, instrument):
        return self.read_pitch_csv("./dataset/validation/" + instrument + "/")

    def validation_output_data(self):
        return self.out_data_generator(10)

    def input_data(self):
        return self.read_pitch_csv("./dataset/train/")

    def output_data(self):
        return self.out_data_generator(200)

    # Валидация
    def validate(self):
        validation = {}
        for instrument in CFG["instruments"]:
            X = self.validation_input_data(instrument).values
            Y = self.validation_output_data().values

            scores = self.model().evaluate(X, Y, verbose=False)
            for i in range(1, len(self._model.metrics_names)):
                print("Валидация на train для %s: %s: %.2f%%" % (instrument, self._model.metrics_names[i], scores[i]*100))
            print()

            validation[instrument] = (self._model.metrics_names, scores)
        return validation

    def model(self):
        if not self.trained:
            self.train()
        return self._model

    # Сохраняем веса
    def save(self):
        self.model().save(self.file_name)

    # Сохраняем архитектуру модельки
    def save_architecture(self):
        json_string = self.model_architecture()
        json_file_name = self.file_name.split(".")[0] + ".json"
        with open(json_file_name, "w") as f:
            f.write(json_string)

    # Архитектура модельки
    def model_architecture(self):
        return self.model().to_json()

    # Загружаем веса
    def load(self):
        self._model = load_model(self.file_name)
        self.trained = True

    def train(self):
        self._model = Sequential()
        if self.model_type == 'Dense':
            self._model.add(Dense(30, input_dim=12, activation='relu'))
            self._model.add(Dense(20, activation='relu'))
            self._model.add(Dense(10, activation='sigmoid'))
        elif self.model_type == 'DLSTM':
            self._model.add(TimeDistributed(Dense(10, input_dim=12, activation='relu'))) # output shape: (nb_samples, timesteps, 10)
            self._model.add(LSTM(10, return_sequences=True)) # output shape: (nb_samples, timesteps, 10)
            self._model.add(TimeDistributed(Dense(5, activation='sigmoid'))) # output shape: (nb_samples, timesteps, 5)
            self._model.add(LSTM(10, return_sequences=False)) # output shape: (nb_samples, 10))
        elif self.model_type == 'LSTM': 
            self._model.add(LSTM(100, return_sequences=True, batch_input_shape=(10,12), stateful=True))
            self._model.add(LSTM(100, return_sequences=False, stateful=True))
            self._model.add(Dense(100, activation='relu'))
            self._model.add(Dense(100, activation='relu'))
            self._model.add(Dense(10, activation='sigmoid'))

        X = self.input_data().values
        Y = self.output_data().values
        if self.model_type != 'Dense': # Reshaping for LSTM
            X = X.reshape(len(X),12,1)

        self._model.compile(loss=self.loss_function, optimizer='adam', metrics=[metrics.categorical_accuracy, metrics.top_k_categorical_accuracy])
        self._model.fit(X, Y, epochs=100, batch_size=10, verbose=1)

        scores = self._model.evaluate(X, Y)
        
        self.trained = True
        return self._model.metrics_names, scores

    # Предсказываем для новой мелодии
    def predict(self, audio_file):
        profiler = LongFileProfiler(audio_file)
        X = np.array([profiler.get_profile_for_pitch()])
        print(X)
        return self.model().predict(X)



In [None]:
trainer = Trainer('Dense')
trainer.train()             # Обучение
trainer.save()              # Сохранение весов
trainer.load()              # Загрузка весов
display(trainer.validate()) # Валидация

# Проверим на большой песне

In [None]:
import numpy as np

class Spliter():
    def __init__(self, song_file, out_file="spliter_result.txt"):
        self.song_file = song_file
        self.out_file=out_file

    def split_song(self):
        trainer = Trainer()
        trainer.load()
        longFileProfiler = LongFileProfiler(self.song_file, window = 1) # Для окна время в секундах
        profiles = longFileProfiler.get_profile()
        chords = []
        for profile in profiles:
            X = np.array([profile])
            if type(X[0][0]) == np.ndarray:
                X = [[i[0] for i in X[0]]] # Если стерео, то берём только 1 канал
            prediction = trainer.model().predict(X)
            chord_index = np.argmax(prediction)
            chords.append(CFG["pitches"][chord_index])
        return chords

    def save_split(self):
        chords = self.split_song()
        chords_string = " ".join(chords)
        with open(self.out_file, "w") as f:
            f.write(chords_string)
        print("Сохранено в " + self.out_file)
        
Spliter('./song/about_a_girl.wav', 'test.txt').save_split()