# Создание LSTM для мелодии

## Импорт библиотек

In [None]:
import collections
import datetime
import fluidsynth
import glob
import numpy as np
import pathlib
import pandas as pd
import pretty_midi
import seaborn as sns
import tensorflow as tf

from IPython import display
from matplotlib import pyplot as plt
from typing import Dict, List, Optional, Sequence, Tuple
import numpy
import librosa
from midiutil import MIDIFile
from tqdm import tqdm
import json
import pandas as pd
import numpy as np

## Задаем стандартные значени random.seed

In [None]:
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)
_SAMPLING_RATE = 16000

## Функция для воспроизведения midi

In [None]:
def display_audio(pm: pretty_midi.PrettyMIDI, seconds=30):
    waveform = pm.fluidsynth(fs=_SAMPLING_RATE)
    waveform_short = waveform[:seconds*_SAMPLING_RATE]
    return display.Audio(waveform_short, rate=_SAMPLING_RATE)

## Функция для преобразования миди файла в dataframe

In [None]:
def midi_to_notes(midi_file: str) -> pd.DataFrame:
    pm = pretty_midi.PrettyMIDI(midi_file)
    instrument = pm.instruments[0]
    notes = collections.defaultdict(list)
    sorted_notes = sorted(instrument.notes, key=lambda note: note.start)
    prev_start = sorted_notes[0].start
    for note in sorted_notes:
        start = note.start
        end = note.end
        notes['pitch'].append(note.pitch)
        notes['start'].append(start)
        notes['end'].append(end)
        notes['step'].append(start - prev_start)
        notes['duration'].append(end - start)
        prev_start = start
    return pd.DataFra  me({name: np.array(value) for name, value in notes.items()})

## Преобразуем заранее подготовленные файл для теста

In [None]:
raw_notes = midi_to_notes(sample_file)
raw_notes.head()

In [None]:
raw_notes.shape

In [None]:
get_note_names = np.vectorize(pretty_midi.note_number_to_name)
sample_note_names = get_note_names(raw_notes['pitch'])
sample_note_names[:10]

## Функция отрисовки миди нот

In [None]:
def plot_piano_roll(notes: pd.DataFrame, count: Optional[int] = None):
    if count:
        title = f'First {count} notes'
    else:
        title = f'Whole track'
        count = len(notes['pitch'])
    plt.figure(figsize=(20, 4))
    plot_pitch = np.stack([notes['pitch'], notes['pitch']], axis=0)
    plot_start_stop = np.stack([notes['start'], notes['end']], axis=0)
    # print(type(plot_pitch),type(plot_start_stop))
    plt.plot(
        plot_start_stop[:, :count], plot_pitch[:, :count], color="b", marker=".")
    plt.xlabel('Time [s]')
    plt.ylabel('Pitch')
    _ = plt.title(title)

## Тестирование

In [None]:
plot_piano_roll(raw_notes, count=100)

## Функция построения графика распределение длительностей

In [None]:
def plot_distributions(notes: pd.DataFrame, drop_percentile=2.5):
    plt.figure(figsize=[15, 5])
    plt.subplot(1, 3, 1)
    sns.histplot(notes, x="pitch", bins=20)
  
    plt.subplot(1, 3, 2)
    max_step = np.percentile(notes['step'], 100 - drop_percentile)
    sns.histplot(notes, x="step", bins=np.linspace(0, max_step, 21))

    plt.subplot(1, 3, 3)
    max_duration = np.percentile(notes['duration'], 100 - drop_percentile)
    sns.histplot(notes, x="duration", bins=np.linspace(0, max_duration, 21))

## Пример графика

In [None]:
plot_distributions(raw_notes)

## Функция создания датасета из последовательности

In [None]:
def create_sequences(
    dataset: tf.data.Dataset,
    seq_length: int,
    vocab_size = 128,
) -> tf.data.Dataset:
    seq_length = seq_length+1
    windows = dataset.window(seq_length, shift=1, stride=1,
                              drop_remainder=True)
    flatten = lambda x: x.batch(seq_length, drop_remainder=True)
    sequences = windows.flat_map(flatten)

    def scale_pitch(x):
        x = x/[vocab_size,1.0,1.0]
        return x

    def split_labels(sequences):
        inputs = sequences[:-1]
        labels_dense = sequences[-1]
        labels = {key:labels_dense[i] for i,key in enumerate(key_order)}

        return scale_pitch(inputs), labels

    return sequences.map(split_labels, num_parallel_calls=tf.data.AUTOTUNE)

## Функция преобразования частот в midi ноты, удаления дубликатов и отсутсвие нот.
## Добавление 3х параметров в данные start, step, end

In [None]:
def add_time_intervals(notes):
    new_notes = []
    previous_start_time = 0.0

    for note, duration in notes:
        start_time = previous_start_time
        end_time = start_time + duration
        new_notes.append([note, duration, start_time, end_time])
        previous_start_time = end_time
    return new_notes


def remove_duplicates(a, min_freq=0.1, min_note_length=0.05):
    ans = []
    dur = 0
    for i in range(len(a)-1):
        dur += a[i][1]
        current_note = librosa.hz_to_note(a[i][0]) if a[i][0] >= min_freq else 'N'
        next_note = librosa.hz_to_note(a[i+1][0]) if a[i+1][0] >= min_freq else 'N'
        if current_note != next_note:
            ans.append((current_note, dur))
            dur = 0
    if a[-1][0] >= min_freq:
        ans.append((librosa.hz_to_note(a[-1][0]), a[-1][1]+dur))
    ans = [e if e[1] >= min_note_length else ('N', e[1]) for e in ans]
    return ans

def remove_duplicates_two(a):
    ans = []
    dur = 0
    for i in range(len(a)-1):
        dur += a[i][1]
        current_note = a[i][0]
        next_note = a[i + 1][0]
        if current_note != next_note:
            ans.append((current_note, dur))
            dur = 0
    ans.append((a[-1][0],a[-1][1]+dur))

    return ans


## функция для объединения данных

In [None]:
def MergeData(t):
    data = collections.defaultdict(list)
    prev_start_time = 0.0
    for i in t:
        if i[0] != 'N':
            data['pitch'].append(i[0])
            data['start'].append(i[2])
            data['end'].append(i[3])
            data['step'].append(i[2] - prev_start_time)
            data['duration'].append(i[1])
            prev_start_time = i[2]
    return data  

### Преобразование csv в midi файл

In [None]:
def notes_to_midi(
  notes: pd.DataFrame,
  out_file: str,
  instrument_name: str,
  velocity: int = 100,  # note loudness
) -> pretty_midi.PrettyMIDI:

    pm = pretty_midi.PrettyMIDI()
    instrument = pretty_midi.Instrument(
        program=pretty_midi.instrument_name_to_program(
            instrument_name))

    prev_start = 0
    for i, note in notes.iterrows():
        start = float(prev_start + note['step'])
        end = float(start + note['duration'])
        note = pretty_midi.Note(
          velocity=velocity,
          pitch=int(note['pitch']),
          start=start,
          end=end,
        )
        instrument.notes.append(note)
        prev_start = start

    pm.instruments.append(instrument)
    pm.write(out_file)
    return pm  

## функция потери ошибки

In [None]:
def mse_with_positive_pressure(y_true: tf.Tensor, y_pred: tf.Tensor):
    mse = (y_true - y_pred) ** 2
    positive_pressure = 10 * tf.maximum(-y_pred, 0.0)
    return tf.reduce_mean(mse + positive_pressure)

## Загрузка данных

In [None]:
# file = open("music - Chordino.json")
# f = json.load(file)
# file = open("musicDrumBeats - Chordino.json")
# f = json.load(file)
# file = open("musicGitar - Chordino.json")
# f = json.load(file)
file = open("/content/drive/MyDrive/dataframe/musicPiano - Chordino.json")
f = json.load(file)
# file = open("musicSaxophone - Chordino.json")
# f.update(json.load(file))/
# file = open("musicYixInstrumental - Chordino.json")
# f = json.load(file)

## Обработка загруженных данных

### Объединение в одну последовательность

In [None]:
Df = collections.defaultdict(list)
for i in tqdm(list(f.keys(  ))):
    curMelody = f[i]
    k = remove_duplicates_two(remove_duplicates([[float(a), float(b)]for a,b in curMelody]))
    t = MergeData(add_time_intervals(k))
    for key in t:
        Df[key] += t[key]

## Заменяем название ноты на число в midi

In [None]:
Df['pitch'] = [librosa.note_to_midi(i) for i in Df['pitch']]

## Cоздаем csv

In [None]:
Train = pd.DataFrame({name: np.array(value) for name, value in Df.items()})
n_notes = len(Train)
Train.head()

## Прослушивание полученных последовательностей

In [None]:
example_file = 'example.midi'
example_pm = notes_to_midi(
    Train, out_file=example_file, instrument_name=pretty_midi.program_to_instrument_name(instrument.program))

In [None]:
display_audio(example_pm, 120)

## Выделяем отдельные колонки для предсказания

In [None]:
key_order = ['pitch', 'step', 'duration']
train_notes = np.stack([Train[key] for key in key_order], axis=1)

## Конвертируем в Dataset из tensorflow

In [None]:
notes_ds = tf.data.Dataset.from_tensor_slices(train_notes)
notes_ds.element_spec

In [None]:
seq_length = 25
vocab_size = 128
seq_ds = create_sequences(notes_ds, seq_length, vocab_size)
seq_ds.element_spec

## Выводим информацию о Dataset

In [None]:
for seq, target in seq_ds.take(1):
    print('sequence shape:', seq.shape)
    print('sequence elements (first 10):', seq[0: 10])
    print()
    print('target:', target)

In [None]:
batch_size = 64
buffer_size = n_notes - seq_length  # the number of items in the dataset
train_ds = (seq_ds
            .shuffle(buffer_size)
            .batch(batch_size, drop_remainder=True)
            .cache()
            .prefetch(tf.data.experimental.AUTOTUNE))

In [None]:
train_ds.element_spec

# Обучение модели и предсказание

## Обучение

In [None]:
input_shape = (seq_length, 3)
learning_rate = 0.005

inputs = tf.keras.Input(input_shape)
x = tf.keras.layers.LSTM(128)(inputs)

outputs = {
  'pitch': tf.keras.layers.Dense(128, name='pitch')(x),
  'step': tf.keras.layers.Dense(1, name='step')(x),
  'duration': tf.keras.layers.Dense(1, name='duration')(x),
}

model = tf.keras.Model(inputs, outputs)

loss = {
      'pitch': tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True),
      'step': mse_with_positive_pressure,
      'duration': mse_with_positive_pressure,
}

optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

model.compile(loss=loss, optimizer=optimizer)

model.summary()

In [None]:
losses = model.evaluate(train_ds, return_dict=True)
losses

In [None]:
model.compile(
    loss=loss,
    loss_weights={
        'pitch': 0.05,
        'step': 1.0,
        'duration':1.0,
    },
    optimizer=optimizer,
)

In [None]:
model.evaluate(train_ds, return_dict=True)

In [None]:
callbacks = [
    tf.keras.callbacks.ModelCheckpoint(
        filepath='./training_checkpoints/ckpt_{epoch}',
        save_weights_only=True),
    tf.keras.callbacks.EarlyStopping(
        monitor='loss',
        patience=5,
        verbose=1,
        restore_best_weights=True),
]

In [None]:
epochs = 50

history = model.fit(
    train_ds,
    epochs=epochs,
    callbacks=callbacks,
)

In [None]:
plt.plot(history.epoch, history.history['loss'], label='total loss')
plt.show()

## Предсказание

In [None]:
def predict_next_note(
    notes: np.ndarray,
    keras_model: tf.keras.Model,
    temperature: float = 1.0) -> int:
    assert temperature > 0
    inputs = tf.expand_dims(notes, 0)
    predictions = model.predict(inputs)
    pitch_logits = predictions['pitch']
    step = predictions['step']
    duration = predictions['duration']

    pitch_logits /= temperature
    pitch = tf.random.categorical(pitch_logits, num_samples=1)
    pitch = tf.squeeze(pitch, axis=-1)
    duration = tf.squeeze(duration, axis=-1)
    step = tf.squeeze(step, axis=-1)
    step = tf.maximum(0, step)
    duration = tf.maximum(0, duration)

    return int(pitch), float(step), float(duration)

In [None]:
Test = collections.defaultdict(list)
temperature = 1.0
num_predictions = 100 #количество предсказываемых аккордов
key_order =  ['pitch', 'step', 'duration']
sample_notes = np.stack([Test[key] for key in key_order], axis=1)


curMelody = f['precious-memories.wav'] #извлеченная мелодия с помощью essentia
k = remove_duplicates_two(remove_duplicates([[float(a), float(b)]for a,b in curMelody]))
k = [[librosa.note_to_midi(a), b] fro a, b in curMelody]
t = MergeData(add_time_intervals(k))
for key in t:
    Test[key] += t[key]
    
    
Test['pitch'] = [librosa.note_to_midi(i) for i in Test['pitch']]
Test = pd.DataFrame({name: np.array(value) for name, value in Test.items()})
input_notes = (
    sample_notes[:seq_length] / np.array([vocab_size, 1, 1]))

sample_notes = np.stack([Test[key] for key in key_order], axis=1)

generated_notes = []
prev_start = 0
for _ in range(num_predictions):
    pitch, step, duration = predict_next_note(input_notes, model, temperature)
    start = max(prev_start + step, end)
    end = start + duration
    input_note = (pitch, step, duration)
    generated_notes.append((*input_note, start, end))
    input_notes = np.delete(input_notes, 0, axis=0)
    input_notes = np.append(input_notes, np.expand_dims(input_note, 0), axis=0)
    prev_start = start

generated_notes = pd.DataFrame(
    generated_notes, columns=(*key_order, 'start', 'end'))

## Сохранение в midi файл и озвучивание его

In [None]:
out_file = 'predictMelody.mid'
out_pm = notes_to_midi(
    generated_notes, out_file=out_file, instrument_name=pretty_midi.program_to_instrument_name(0))
display_audio(out_pm, 30)

## Сохраняем модель, и проверяем загрузку

In [None]:
model.save('melodyLSTM13.06.23.h5')

In [None]:
tf.keras.utils.get_custom_objects()['mse_with_positive_pressure'] = mse_with_positive_pressure
model = tf.keras.models.load_model('melodyLSTM13.06.23.h5')
