In [None]:
!pip install git+https://github.com/hyperreality/Poetry-Tools.git
!pip install MIDIUtil
!pip install pyrhyme

In [None]:
!pip install transformers
!pip install datasets

%tensorflow_version 1.x

!gsutil -q -m cp -r gs://magentadata/models/music_transformer/primers/* /content/
!gsutil -q -m cp gs://magentadata/soundfonts/Yamaha-C5-Salamander-JNv5.1.sf2 /content/

!apt-get update -qq && apt-get install -qq libfluidsynth1 build-essential libasound2-dev libjack-dev
!pip install -q 'tensorflow-datasets < 4.0.0'
!pip install -qU google-cloud magenta pyfluidsynth

In [None]:
from collections import defaultdict
from collections import OrderedDict

from transformers import BertModel, BertTokenizer, AdamW, get_linear_schedule_with_warmup
from transformers import BertForSequenceClassification
import torch

import numpy as np
import pandas as pd

from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset

import tensorflow.compat.v1 as tf

from google.colab import files

from tensor2tensor import models
from tensor2tensor import problems
from tensor2tensor.data_generators import text_encoder
from tensor2tensor.utils import decoding
from tensor2tensor.utils import trainer_lib

from magenta.models.score2perf import score2perf
import note_seq

import poetrytools
import pyrhyme
import re
from midiutil.MidiFile import MIDIFile
import spacy
from random import choice, seed, random

tf.disable_v2_behavior()

In [None]:
class PoemDataset(Dataset):
    def __init__(self, lines, targets, tokenizer, max_len):
        self.lines = lines
        self.targets = targets
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.lines)

    def __getitem__(self, item):
        line = str(self.lines[item])
        target = self.targets[item]

        encoding = self.tokenizer.encode_plus(
            line,
            add_special_tokens=True,
            return_token_type_ids=False,
            padding='max_length',
            max_length=self.max_len,
            return_attention_mask=True,
            return_tensors='pt'
        )

        return {
            'line_text': line,
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'targets': torch.tensor(target, dtype=torch.long)
        }

In [None]:
def create_data_loader(df, tokenizer, max_len, batch_size):
    ds = PoemDataset(
        lines=df.verse_text.to_numpy(),
        targets=df.label.to_numpy(),
        tokenizer=tokenizer,
        max_len=max_len
    )

    return DataLoader(
        ds,
        batch_size=batch_size,
        num_workers=4
    )

In [None]:
def train_epoch(
        model,
        data_loader,
        loss_fn,
        optimizer,
        device,
        scheduler,
        n_examples
):
    model = model.train()

    losses = []
    correct_predictions = 0
    for d in data_loader:
        input_ids = d['input_ids'].to(device)
        attention_mask = d['attention_mask'].to(device)
        targets = d['targets'].to(device)

        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
        )

        _, preds = torch.max(outputs, dim=1)
        loss = loss_fn(outputs, targets)
        correct_predictions += torch.sum(preds == targets)
        losses.append(loss.item())

        loss.backward()
        nn.utils.clip_grad_norm(model.parameters(), max_norm=1.0)
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()

    return float(correct_predictions) / n_examples, np.mean(losses)

In [None]:
def eval_model(model, data_loader, loss_fn, device, n_examples):
    model = model.eval()

    losses = []
    correct_predictions = 0

    with torch.no_grad():
        for d in data_loader:
            input_ids = d['input_ids'].to(device)
            attention_mask = d["attention_mask"].to(device)
            targets = d["targets"].to(device)
            outputs = model(
                input_ids=input_ids,
                attention_mask=attention_mask
            )
            _, preds = torch.max(outputs, dim=1)
            loss = loss_fn(outputs, targets)
            correct_predictions += torch.sum(preds == targets)
            losses.append(loss.item())
    return float(correct_predictions) / n_examples, np.mean(losses)

In [None]:
class SentimentClassifier(nn.Module):
    def __init__(self, n_classes):
        BERT_MODEL = 'bert-base-uncased'
        BERT_CACHE_PATH = 'bert_cache/'
        super(SentimentClassifier, self).__init__()
        self.bert = BertModel.from_pretrained(BERT_MODEL, cache_dir=BERT_CACHE_PATH)
        self.bert.config.return_dict = False
        self.drop = nn.Dropout(p=0.3)
        self.out = nn.Linear(self.bert.config.hidden_size, n_classes)

    def forward(self, input_ids, attention_mask):
        _, pooled_output = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        output = self.drop(pooled_output)
        return self.out(output)

In [None]:
def train_model():
  RANDOM_SEED = 42
  np.random.seed(RANDOM_SEED)
  torch.manual_seed(RANDOM_SEED)
  device = torch.device('cuda:0')

  poem_dataset = load_dataset('poem_sentiment')

  df_train = pd.DataFrame(poem_dataset['train'])
  df_validate = pd.DataFrame(poem_dataset['validation'])
  df_test = pd.DataFrame(poem_dataset['test'])

  MAX_TOKEN_LENGTH = 40
  BATCH_SIZE = 16

  BERT_MODEL = 'bert-base-uncased'
  BERT_CACHE_PATH = 'bert_cache/'
  TOKENIZER_PATH = 'project_cache/'

  btokenizer = BertTokenizer.from_pretrained(BERT_MODEL, cache_dir=TOKENIZER_PATH)
  
  train_data_loader = create_data_loader(df_train, btokenizer, MAX_TOKEN_LENGTH, BATCH_SIZE)
  val_data_loader = create_data_loader(df_validate, btokenizer, MAX_TOKEN_LENGTH, BATCH_SIZE)
  test_data_loader = create_data_loader(df_test, btokenizer, MAX_TOKEN_LENGTH, BATCH_SIZE)
  
  class_names = ['negative', 'positive', 'no_impact', 'mixed']


  model = SentimentClassifier(len(class_names))
  model = model.to(device)

  EPOCHS = 5
  optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False)
  total_steps = len(train_data_loader) * EPOCHS
  
  scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=0,
        num_training_steps=total_steps
    )
  
  loss_fn = nn.CrossEntropyLoss().to(device)

  for epoch in range(EPOCHS):
    print(f'Epoch {epoch + 1}/{EPOCHS}')
    print('-' * 10)
    train_acc, train_loss = train_epoch(
      model,
      train_data_loader,
      loss_fn,
      optimizer,
      device,
      scheduler,
      len(df_train)
    )
    print(f'Train loss {train_loss} accuracy {train_acc}')
    val_acc, val_loss = eval_model(
      model,
      val_data_loader,
      loss_fn,
      device,
      len(df_validate)
    )
    print(f'Val   loss {val_loss} accuracy {val_acc}')
    print()
  return model

In [None]:
model = train_model()

In [None]:
def poem_classification(poem_path):
  RANDOM_SEED = 42
  np.random.seed(RANDOM_SEED)
  torch.manual_seed(RANDOM_SEED)
  device = torch.device('cuda:0')

  MAX_TOKEN_LENGTH = 40

  BERT_MODEL = 'bert-base-uncased'
  BERT_CACHE_PATH = 'bert_cache/'
  TOKENIZER_PATH = 'project_cache/'

  btokenizer = BertTokenizer.from_pretrained(BERT_MODEL, cache_dir=TOKENIZER_PATH)
  
  class_names = ['negative', 'positive', 'no_impact', 'mixed']

  with open(poem_path) as f:
    poem_text = f.read()
  poem_lines = poem_text.split('\n')
  poem_lines = [l for l in poem_lines if l]
  score = defaultdict(int)
  for test_line in poem_lines:
    encoded_line = btokenizer.encode_plus(
      test_line,  
      add_special_tokens=True,
      return_token_type_ids=False,
      padding=True,
      max_length=MAX_TOKEN_LENGTH,
      return_attention_mask=True,
      return_tensors='pt',
    )
    input_ids = encoded_line['input_ids'].to(device)
    attention_mask = encoded_line['attention_mask'].to(device)
    output = model(input_ids, attention_mask)
    _, prediction = torch.max(output, dim=1)
    predicted_class = class_names[prediction]
    if predicted_class != 'no_impact':
      score[predicted_class] += 1
  if score['positive'] > score['negative']:
    return 'positive'
  else:
    return 'negative'

In [None]:
def choose_note(scale, curr_note, forced=0):
  if forced == 0:
    p = random()
  else:
    p = 0
  if p < 0.7:
    if len(scale[curr_note]) > 1:
      new_note = scale[curr_note][1]
    else:
      new_note = scale[curr_note][0]
  else:
    new_note = scale[curr_note][0]
  return new_note


def create_midi(durations, notes, path, pol):
    midi_numbers = {'C': 60, 'D': 62, 'E': 64,
                    'F': 65, 'G': 67, 'A': 69,
                    'B': 71, 'C_2': 72, 'D_2': 74,
                    'E_2': 76, 'F_2': 77, 'G_2': 79,
                    'A_2': 81, 'B_2': 83}
    beats = {'8': 0.5, '4': 1, '2': 2, 'p4': 1, 'p8': 0.5}
    track = 0
    channel = 0
    time = 0
    if pol == 'positive':
      tempo = 120
    else:
      tempo = 70
    volume = 100
    midi_song = MIDIFile(1)
    midi_song.addTrackName(track, time, 'Sample')
    midi_song.addTempo(track, time, tempo)
    for d, n in zip(durations, notes):
        if n != '_':
            midi_song.addNote(track, channel, midi_numbers[n], time, beats[d], volume)
        time += beats[d]
    bin_file = open(path, 'wb')
    midi_song.writeFile(bin_file)
    bin_file.close()


def gen_music(poem_text, nlp, a, r, pol):
    seed(42)
    rhymes = {}
    c_major = {'C': ['D'], 'D': ['C', 'E'], 'E': ['D', 'F'],
               'F': ['D', 'G'], 'G': ['F', 'A'], 'A': ['G', 'B'],
               'B': ['A', 'C'], 'C_2': ['B', 'D_2'], 'D_2': ['C_2', 'E_2'],
               'E_2': ['D_2', 'F_2'], 'F_2': ['E_2', 'G_2'], 'G_2': ['F_2', 'A_2'],
               'A_2': ['G_2', 'B_2'], 'B_2': ['A_2']}
    a_minor = {'A': ['B'], 'B': ['A', 'C'], 'C': ['B', 'D'],
               'D': ['C', 'E'], 'E': ['D', 'F'], 'F': ['E', 'G'],
               'G': ['F', 'A_2'], 'A_2': ['G', 'B_2'], 'B_2': ['A_2', 'C_2'],
               'C_2': ['B_2', 'D_2'], 'D_2': ['C_2', 'E_2'], 'E_2': ['D_2', 'F_2'],
               'F_2': ['E_2', 'G_2'], 'G_2': ['F_2']}
    notes = []
    i = 0
    if pol == 'positive':
        current_note = 'C'
    else:
        current_note = 'A'
    for sent in poem_text:
        sent_notes = []
        j = 0
        for k in range(i):
            if poem_text[k] == sent and notes:
                sent_notes = notes[k]
                break
        tokens = nlp(sent)
        if i % 2 == 0:
            if pol == 'positive':
                current_note = 'C'
            else:
                current_note = 'A'
        for t in tokens:
            if t.pos_ == 'PART' and t.text == '\'s':
                continue
            elif t.is_punct and (t.text == '.' or t.text == '...' or t.text == ';'):
                sent_notes.append('_')
                continue
            elif t.is_punct and t.text == ',':
                sent_notes.append('_')
                continue
            elif t.is_punct and t.text == '!':
                sent_notes.append(current_note)
                continue
            elif t.is_punct and t.text == '?':
                if pol == 'positive':
                    current_note = choose_note(c_major, current_note, forced=1)
                else:
                    current_note = choose_note(a_minor, current_note, forced=1)
                sent_notes.append(current_note)
                sent_notes.append('_')
                continue
            elif t.is_punct:
                continue
            elif len(a[i][j]) == 1:
                sent_notes.append(current_note)
            elif len(a[i][j]) > 1:
                for _ in a[i][j]:
                    sent_notes.append(current_note)
                    if pol == 'positive':
                        current_note = choose_note(c_major, current_note)
                    else:
                        current_note = choose_note(a_minor, current_note)
            j += 1
            if pol == 'positive':
                current_note = choose_note(c_major, current_note)
            else:
                current_note = choose_note(a_minor, current_note)
        if sent_notes[-1] == '_':
            idx = -2
        else:
            idx = -1
        if pol == 'positive':
            if '2' in sent_notes[-2]:
                sent_notes[idx] = choice(['E_2', 'G_2'])
            else:
                sent_notes[idx] = choice(['E', 'G'])
        else:
            if '2' in sent_notes[-2]:
                sent_notes[idx] = choice(['C_2', 'E_2'])
            else:
                sent_notes[idx] = choice(['C', 'E'])
        if sent_notes[-1] != '_':
            if r[i] in rhymes:
                sent_notes[-3:] = rhymes[r[i]]
            else:
                rhymes[r[i]] = sent_notes[-3:]
        i += 1
        notes.append(sent_notes)
    if notes[-1][-1] == '_':
        idx = -2
    else:
        idx = -1
    if pol == 'positive':
        if '2' in notes[-1][idx]:
            notes[-1][idx] = 'C_2'
        else:
            notes[-1][idx] = 'C'
    else:
        if '2' in notes[-1][idx]:
            notes[-1][idx] = 'A_2'
        else:
            notes[-1][idx] = 'A'
    return notes


def get_durations(poem_text, nlp, a):
    notes = []
    i = 0
    for sent in poem_text:
        sent_notes = []
        j = 0
        tokens = nlp(sent)
        for t in tokens:
            if t.pos_ == 'PART' and t.text == '\'s':
                continue
            elif t.is_punct and (t.text == '.' or t.text == '...' or t.text == ';'):
                sent_notes.append('p4')
                continue
            elif t.is_punct and t.text == ',':
                sent_notes.append('p8')
                continue
            elif t.is_punct and t.text == '!':
                sent_notes.append('8')
                continue
            elif t.is_punct and t.text == '?':
                sent_notes.append('8')
                sent_notes.append('p8')
                continue
            elif t.is_punct:
                continue
            elif t.is_stop and len(a[i][j]) == 1:
                sent_notes.append('8')
            elif len(a[i][j]) == 1:
                if a[i][j][0] == '1':
                    sent_notes.append('4')
                else:
                    sent_notes.append('8')
            elif len(a[i][j]) > 1:
                for stress in a[i][j]:
                    if stress == '1':
                        sent_notes.append('4')
                    else:
                        sent_notes.append('8')
            j += 1
        if 'p' not in sent_notes[-1]:
          sent_notes[-1] = '2'
        i += 1
        notes.append(sent_notes)
    return notes


def find_rhyme_scheme(poem_verses):
    to_rhyme_list = []
    for verse in poem_verses:
        v = re.sub('[\W_]', ' ', verse).split(' ')
        v = [a for a in v if a.isupper() or len(a) > 1]
        to_rhyme_list.append(v[-1])
    rhyme_dict = {}
    rhyme_finder = pyrhyme.RhymeBrain()
    for word in to_rhyme_list:
        all_rhymes = rhyme_finder.rhyming_list(word, lang='en')
        all_rhymes = [r.word for r in all_rhymes]
        rhyme_dict[word] = all_rhymes
    verse_dict = {}
    idx = 0
    verse_number = 0
    for word in to_rhyme_list:
        found = 0
        for k in verse_dict:
            to_find = verse_dict[k][0]
            rhyme_num = verse_dict[k][1]
            if to_find in rhyme_dict[word] or word in rhyme_dict[to_find]:
                verse_dict[verse_number] = (word, rhyme_num)
                found = 1
                break
        if found == 0:
            verse_dict[verse_number] = (word, idx)
            idx += 1
        verse_number += 1
    rhyme_scheme = [verse_dict[k][1] for k in verse_dict]
    return rhyme_scheme

In [None]:
def create_song(poem_path, polarity, song_path):
  with open(poem_path) as f:
    poem_text = f.read()
  poem = poetrytools.tokenize(poem_text)
  s = poetrytools.scanscion(poem)
  s = [v for v in s if v]
  verses = poem_text.split('\n')
  verses = [v for v in verses if v]
  r = find_rhyme_scheme(verses)
  nlp = spacy.load('en_core_web_sm')
  durations = get_durations(verses, nlp, s)
  notes = gen_music(verses, nlp, s, r, polarity)
  durations = [d for sub_d in durations for d in sub_d]
  notes = [n for sub_n in notes for n in sub_n]
  create_midi(durations, notes, song_path, polarity)

In [None]:
SF2_PATH = '/content/Yamaha-C5-Salamander-JNv5.1.sf2'
SAMPLE_RATE = 16000

# Upload a MIDI file and convert to NoteSequence.
def upload_midi(song_path):
  with open(song_path, 'rb') as f:
    data = f.read()
  return note_seq.midi_to_note_sequence(data)

# Decode a list of IDs.
def decode(ids, encoder):
  ids = list(ids)
  if text_encoder.EOS_ID in ids:
    ids = ids[:ids.index(text_encoder.EOS_ID)]
  return encoder.decode(ids)

In [None]:
model_name = 'transformer'
hparams_set = 'transformer_tpu'
ckpt_path = 'gs://magentadata/models/music_transformer/checkpoints/melody_conditioned_model_16.ckpt'
class MelodyToPianoPerformanceProblem(score2perf.AbsoluteMelody2PerfProblem):
  @property
  def add_eos_symbol(self):
    return True

problem = MelodyToPianoPerformanceProblem()
melody_conditioned_encoders = problem.get_feature_encoders()

# Set up HParams.
hparams = trainer_lib.create_hparams(hparams_set=hparams_set)
trainer_lib.add_problem_hparams(hparams, problem)
hparams.num_hidden_layers = 16
hparams.sampling_method = 'random'

# Set up decoding HParams.
decode_hparams = decoding.decode_hparams()
decode_hparams.alpha = 0.0
decode_hparams.beam_size = 1

# Create Estimator.
run_config = trainer_lib.create_run_config(hparams)
estimator = trainer_lib.create_estimator(
    model_name, hparams, run_config,
    decode_hparams=decode_hparams)

# These values will be changed by the following cell.
inputs = []
decode_length = 0

# Create input generator.
def input_generator():
  global inputs
  while True:
    yield {
        'inputs': np.array([[inputs]], dtype=np.int32),
        'targets': np.zeros([1, 0], dtype=np.int32),
        'decode_length': np.array(decode_length, dtype=np.int32)
    }

# Start the Estimator, loading from the specified checkpoint.
input_fn = decoding.make_input_fn_from_generator(input_generator())
melody_conditioned_samples = estimator.predict(
    input_fn, checkpoint_path=ckpt_path)

# "Burn" one.
_ = next(melody_conditioned_samples)

In [None]:
POEM_PATH = '/content/drive/MyDrive/Poems/SonnetLXXIIAmoretti.txt'
poem_sentiment = poem_classification(POEM_PATH)
print(poem_sentiment)

positive


In [None]:
event_padding = 2 * [note_seq.MELODY_NO_EVENT]
split_path = POEM_PATH.split('/')
MIDI_FOLDER = '/content/drive/MyDrive/MidiFiles/'
SONG_PATH = MIDI_FOLDER + split_path[-1][:-3] + '.mid'

print(f'Poem sentiment is: {poem_sentiment}')

create_song(POEM_PATH, poem_sentiment, SONG_PATH)

melody_ns = upload_midi(SONG_PATH)
melody_instrument = note_seq.infer_melody_for_sequence(melody_ns)
notes = [note for note in melody_ns.notes
        if note.instrument == melody_instrument]
del melody_ns.notes[:]
melody_ns.notes.extend(
  sorted(notes, key=lambda note: note.start_time))
for i in range(len(melody_ns.notes) - 1):
  melody_ns.notes[i].end_time = melody_ns.notes[i + 1].start_time
inputs = melody_conditioned_encoders['inputs'].encode_note_sequence(
        melody_ns)

# Play and plot the melody.
note_seq.play_sequence(
    melody_ns,
    synth=note_seq.fluidsynth, sample_rate=SAMPLE_RATE, sf2_path=SF2_PATH)
note_seq.plot_sequence(melody_ns)

In [None]:
decode_length = 4096
sample_ids = next(melody_conditioned_samples)['outputs']

# Decode to NoteSequence.
midi_filename = decode(
    sample_ids,
    encoder=melody_conditioned_encoders['targets'])
accompaniment_ns = note_seq.midi_file_to_note_sequence(midi_filename)

# Play and plot.
note_seq.play_sequence(
    accompaniment_ns,
    synth=note_seq.fluidsynth, sample_rate=SAMPLE_RATE, sf2_path=SF2_PATH)
note_seq.plot_sequence(accompaniment_ns)

In [None]:
note_seq.sequence_proto_to_midi_file(
    accompaniment_ns, '/content/drive/MyDrive/SonnetLXXIIAmoretti7.mid')
files.download('/content/drive/MyDrive/SonnetLXXIIAmoretti7.mid')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>