In [None]:
import os
import pandas as pd
import requests
import collections
import numpy as np
from matplotlib import pyplot as plt
from typing import Optional
import seaborn as sns
import tensorflow as tf

In [None]:
!sudo apt install -y fluidsynth
!pip install --upgrade pyfluidsynth
import fluidsynth

## Download and Unzip the music sentiment dataset
(https://zenodo.org/record/5090631#.Y55keXbMJhF)

In [None]:
URL = "https://zenodo.org/record/5090631/files/EMOPIA_1.0.zip?download=1"
response = requests.get(URL)
open("dataset.zip", "wb").write(response.content)
!unzip dataset.zip

## Create dataset


In [None]:
ANNOTATION_CSV = "EMOPIA_1.0/label.csv"
AUDIO_DIRECTORY = "EMOPIA_1.0/midis/*"

In [None]:
import glob
dataset = pd.read_csv(ANNOTATION_CSV)
music_files = glob.glob(AUDIO_DIRECTORY)

# get the unique quadrants (classes)
quadrants = dataset["4Q"].unique()

## Load a smaple midi file extract relevant information

In [None]:
!pip install pretty_midi

In [None]:
import pretty_midi

pm = pretty_midi.PrettyMIDI(music_files[0])
print('Number of instruments:', len(pm.instruments))
instrument = pm.instruments[0]
instrument_name = pretty_midi.program_to_instrument_name(instrument.program)
print('Instrument name:', instrument_name)

## Required methods

In [None]:
def midi_to_notes(midi_file: str,) -> pd.DataFrame:
  pm = pretty_midi.PrettyMIDI(midi_file)
  instrument = pm.instruments[0]
  notes = collections.defaultdict(list)

  # Sort the notes by start time
  sorted_notes = sorted(instrument.notes, key=lambda note: note.start)
  prev_start = sorted_notes[0].start
  for note in sorted_notes:
    start = note.start
    end = note.end
    notes['pitch'].append(note.pitch)
    notes['start'].append(start)
    notes['end'].append(end)
    notes['step'].append(start - prev_start)
    notes['duration'].append(end - start)
    prev_start = start

  return pd.DataFrame({name: np.array(value) for name, value in notes.items()})


In [None]:
def plot_notes(notes: pd.DataFrame, count: Optional[int] = None):
  if count:
    title = f'First {count} notes'
  else:
    title = f'Whole track'
    count = len(notes['pitch'])
  plt.figure(figsize=(20, 4))
  plot_pitch = np.stack([notes['pitch'], notes['pitch']], axis=0)
  plot_start_stop = np.stack([notes['start'], notes['end']], axis=0)
  plt.plot(
      plot_start_stop[:, :count], plot_pitch[:, :count], color="b", marker=".")
  plt.xlabel('Time [s]')
  plt.ylabel('Pitch')
  _ = plt.title(title)


In [None]:
def plot_distributions(notes: pd.DataFrame, drop_percentile=2.5):
  plt.figure(figsize=[15, 5])
  plt.subplot(1, 3, 1)
  sns.histplot(notes, x="pitch", bins=20)

  plt.subplot(1, 3, 2)
  max_step = np.percentile(notes['step'], 100 - drop_percentile)
  sns.histplot(notes, x="step", bins=np.linspace(0, max_step, 21))

  plt.subplot(1, 3, 3)
  max_duration = np.percentile(notes['duration'], 100 - drop_percentile)
  sns.histplot(notes, x="duration", bins=np.linspace(0, max_duration, 21))


In [None]:
def notes_to_midi(
  notes: pd.DataFrame,
  out_file: str, 
  instrument_name: str,
  velocity: int = 100,  # note loudness
) -> pretty_midi.PrettyMIDI:

  pm = pretty_midi.PrettyMIDI()
  instrument = pretty_midi.Instrument(
      program=pretty_midi.instrument_name_to_program(
          instrument_name))

  prev_start = 0
  for i, note in notes.iterrows():
    start = float(prev_start + note['step'])
    end = float(start + note['duration'])
    note = pretty_midi.Note(
        velocity=velocity,
        pitch=int(note['pitch']),
        start=start,
        end=end,
    )
    instrument.notes.append(note)
    prev_start = start

  pm.instruments.append(instrument)
  pm.write(out_file)
  return pm


In [None]:
def create_sequences(
    dataset: tf.data.Dataset, 
    seq_length: int,
    vocab_size = 128,
) -> tf.data.Dataset:
  key_order = ['pitch', 'step', 'duration']
  """Returns TF Dataset of sequence and label examples."""
  seq_length = seq_length+1

  # Take 1 extra for the labels
  windows = dataset.window(seq_length, shift=1, stride=1,
                              drop_remainder=True)

  # `flat_map` flattens the" dataset of datasets" into a dataset of tensors
  flatten = lambda x: x.batch(seq_length, drop_remainder=True)
  sequences = windows.flat_map(flatten)

  # Normalize note pitch
  def scale_pitch(x):
    x = x/[vocab_size,1.0,1.0]
    return x

  # Split the labels
  def split_labels(sequences):
    inputs = sequences[:-1]
    labels_dense = sequences[-1]
    labels = {key:labels_dense[i] for i,key in enumerate(key_order)}

    return scale_pitch(inputs), labels

  return sequences.map(split_labels, num_parallel_calls=tf.data.AUTOTUNE)

In [None]:
def predict_next_note(
    notes: np.ndarray, 
    keras_model: tf.keras.Model, 
    temperature: float = 1.0) -> int:
  """Generates a note IDs using a trained sequence model."""

  assert temperature > 0

  # Add batch dimension
  inputs = tf.expand_dims(notes, 0)

  predictions = model.predict(inputs)
  pitch_logits = predictions['pitch']
  step = predictions['step']
  duration = predictions['duration']

  pitch_logits /= temperature
  pitch = tf.random.categorical(pitch_logits, num_samples=1)
  pitch = tf.squeeze(pitch, axis=-1)
  duration = tf.squeeze(duration, axis=-1)
  step = tf.squeeze(step, axis=-1)
  # `step` and `duration` values should be non-negative
  step = tf.maximum(0, step)
  duration = tf.maximum(0, duration)

  return int(pitch), float(step), float(duration)

In [None]:
def display_audio(pm: pretty_midi.PrettyMIDI, seconds=30):
  _SAMPLING_RATE = 16000
  waveform = pm.fluidsynth(fs=_SAMPLING_RATE)
  # Take a sample of the generated waveform to mitigate kernel resets
  waveform_short = waveform[:seconds*_SAMPLING_RATE]
  return display.Audio(waveform_short, rate=_SAMPLING_RATE)

## Visualize data from files


In [None]:
# take one song from each class and plot the data 
song_ids = []
for i in quadrants:
  result = dataset.loc[dataset['4Q'] == i]
  song_ids.append(result.iloc[0]["ID"])

path = os.path.dirname(music_files[0])

for value in song_ids :
  raw_notes = midi_to_notes(f"{path}/{value}.mid")
  plot_notes(raw_notes, count=100)
  plot_distributions(raw_notes)


## Creating Tensorflow dataset

In [None]:
from collections import OrderedDict
notes_array = {}
for files in music_files:
  head, tail = os.path.split(files)
  id = tail[:-4]
  category = dataset.loc[dataset['ID'] == id]
  category = category.iloc[0]["4Q"]
  # notes = midi_to_notes(files)
  if category in notes_array :
    notes_array[category].append(files)
  else:
    notes_array[category] = [files]
notes_array = OrderedDict(sorted(notes_array.items()))

In [None]:
# length of total notes in dataset for each class
for key in notes_array:
  print(len(notes_array[key]))

In [None]:
import random
# take random 5 songs from each cat for training.
training_notes_files = {}
#n_notes = 
for key in notes_array:
  choices = random.choices(notes_array[key],k=5)
  training_notes_files[key] = choices
training_notes = OrderedDict(sorted(training_notes_files.items()))

training_notes = []
for cat in training_notes_files :
  files = training_notes_files[cat]
  notes_total = []
  for val in files :
    notes = midi_to_notes(val)
    notes_total.append(notes)
  notes_total = pd.concat(notes_total)
  training_notes.append(notes_total)


In [None]:
train_notes = []
key_order = ['pitch', 'step', 'duration']
for notes_for_class in training_notes:
  music_data = np.stack([notes_for_class[key] for key in key_order], axis=1)
  train_notes.append(music_data)

In [None]:
note_lens = []
for note_len in train_notes:
  note_lens.append(len(note_len))
print(note_lens)

In [None]:
ds = []

for i in train_notes :
  notes_ds = tf.data.Dataset.from_tensor_slices(i)
  print(notes_ds.element_spec)
  seq_length = 25
  vocab_size = 128
  seq_ds = create_sequences(notes_ds, seq_length, vocab_size)
  print(seq_ds.element_spec)
  ds.append(seq_ds)

In [None]:
for seq_ds in ds :
  for seq, target in seq_ds.take(1):
    print('sequence shape:', seq.shape)
    print('sequence elements (first 10):', seq[0: 10])
    print()
    print('target:', target)

In [None]:
train_dataset = []
for i in range(len(ds)) :
  seq_ds = ds[i]
  batch_size = 10
  buffer_size = note_lens[i] - seq_length  # the number of items in the dataset
  train_ds = (seq_ds
            .shuffle(buffer_size)
            .batch(batch_size, drop_remainder=True)
            .cache()
            .prefetch(tf.data.experimental.AUTOTUNE))
  train_dataset.append(train_ds)

## Model

In [None]:
def mse_with_positive_pressure(y_true: tf.Tensor, y_pred: tf.Tensor):
  mse = (y_true - y_pred) ** 2
  positive_pressure = 10 * tf.maximum(-y_pred, 0.0)
  return tf.reduce_mean(mse + positive_pressure)

In [None]:
input_shape = (seq_length, 3)
learning_rate = 0.005

inputs = tf.keras.Input(input_shape)
x = tf.keras.layers.LSTM(128)(inputs)

outputs = {
  'pitch': tf.keras.layers.Dense(128, name='pitch')(x),
  'step': tf.keras.layers.Dense(1, name='step')(x),
  'duration': tf.keras.layers.Dense(1, name='duration')(x),
}

loss = {
        'pitch': tf.keras.losses.SparseCategoricalCrossentropy(
            from_logits=True),
        'step': mse_with_positive_pressure,
        'duration': mse_with_positive_pressure
  }

optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

# create 4 models for different emotions
models = []

for i in range(len(train_dataset)):
  model = tf.keras.Model(inputs, outputs)
  models.append(model)

In [None]:
for i in models:
  i.compile(loss=loss, optimizer=optimizer,run_eagerly=True)
  print(i.summary())

In [None]:
for i in range(len(models)):
  losses = model.evaluate(train_dataset[i], return_dict=True)
  print(losses)

In [None]:
for i in range(len(models)):
  models[i].compile(
      loss=loss,
      loss_weights={
          'pitch': 0.05,
          'step': 1.0,
          'duration':1.0
      },
      optimizer=optimizer,
  )
  losses = models[i].evaluate(train_dataset[i], return_dict=True)
  print(losses)

In [None]:
callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor='loss',
        patience=5,
        verbose=1,
        restore_best_weights=True),
]

In [None]:
%%time
histories = []
epochs = 50

for i in range(len(models)) :

  history = models[i].fit(
    train_dataset[i],
    epochs=epochs,
    callbacks=callbacks,
  )
  histories.append(history)

In [None]:
for history in histories :
  plt.plot(history.epoch, history.history['loss'], label='total loss')
  plt.show()

In [None]:
def generate_music(model_type):
  temperature = 2.0
  num_predictions = 200

  sample_notes = np.stack([raw_notes[key] for key in key_order], axis=1)

  # The initial sequence of notes; pitch is normalized similar to training
  # sequences
  input_notes = (
      sample_notes[:seq_length] / np.array([vocab_size, 1, 1]))

  generated_notes = []
  prev_start = 0
  for _ in range(num_predictions):
    pitch, step, duration = predict_next_note(input_notes, model_type, temperature)
    start = prev_start + step
    end = start + duration
    input_note = (pitch, step, duration)
    
    generated_notes.append((*input_note, start, end))
    input_notes = np.delete(input_notes, 0, axis=0)
    input_notes = np.append(input_notes, np.expand_dims(input_note, 0), axis=0)
    prev_start = start

  generated_notes = pd.DataFrame(
      generated_notes, columns=(*key_order, 'start', 'end'))
  return generated_notes

## Using text to emotion to map to a model

In [None]:
!pip install text2emotion
!pip install emoji~=1.6.3
import text2emotion as te

In [None]:
import nltk
nltk.download('omw-1.4')


In [None]:
result = te.get_emotion("angry")
emotion = max(result,key=result.get)
index = 0
if(emotion == "Happy"):
  index = 0
elif (emotion == "Angry"):
  index = 1
elif (emotion == "Fear"):
  index = 1
elif (emotion == "Sad"):
  index = 3
else :
  index = 2

In [None]:
print(index,emotion)

In [None]:
generated_notes = generate_music(models[index])

In [None]:
out_file = 'output.mid'
out_pm = notes_to_midi(
    generated_notes, out_file=out_file, instrument_name=instrument_name)

## Save the models


In [None]:
for i in range(len(models)) :
  model.save(f'./{i}')


In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
!mkdir /content/gdrive/My\ Drive/Colab_Models

In [None]:
for i in range(len(models)) :
  model.save(f'/content/gdrive/My Drive/New_Colab_Models/{i}')