In [85]:
import partitura as pt
import torch
import torch.nn as nn
from torch.nn import functional as F
import numpy as np
import os
import pandas as pd
import pytorch_lightning as pl
from torch.utils.data import DataLoader, Dataset

# Pitch Spelling with Partitura

Have you always been bad at spelling bee, do you find that spelling notes makes this even worse. Your time of struggling is over.... Today we going to teach a Model to learn how to *pitch* spell.

### Definition

Spelling a pitch relates to the system of naming notes by letters (A-G) and sharp(#) and flat (♭) signs - and sometimes double sharp and flat signs, resulting in names or 'spellings' like 'A♭', 'D#', 'F♭♭'.

Translating between frequencies in Hz and such names is non-trivial. You need to consider :

-  The 'concert pitch' you are taking as a reference
- The temperament in which the piece is played
- The overall key that the music would be notated in
- Use of the correct enharmonic equivalents for accidentals (Using the correct enharmonic equivalent, Purpose of double-sharps and double-flats?)

If translating between, say, MIDI note numbers and 'spelled' names, the first two steps can be skipped.

Spelled pitch names often have an octave number appended for disambiguation - e.g. 'A♭3', 'D#5'.


### Some Spelling algorithms

Partitura contains an implementation for a standard algorithm for Pitch Spelling. The algorithm in question is called ps13 created by Meredith and al.:

	The ps13 pitch spelling algorithm, D Meredith - Journal of New Music Research, 2006

Some notable algorithms and currect SOTA is PKSpell.

	PKSpell: Data-driven pitch spelling and key signature estimation
	F Foscarin, N Audebert, R Fournier-S'Niehotta, 2021


Let's first download a pitch spelling dataset.

In [86]:
!wget https://github.com/CPJKU/asap-dataset/archive/refs/heads/note_alignments.zip

--2022-11-04 11:49:35--  https://github.com/CPJKU/vienna4x22/archive/refs/heads/master.zip
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://codeload.github.com/CPJKU/vienna4x22/zip/refs/heads/master [following]
--2022-11-04 11:49:39--  https://codeload.github.com/CPJKU/vienna4x22/zip/refs/heads/master
Resolving codeload.github.com (codeload.github.com)... 140.82.121.10
Connecting to codeload.github.com (codeload.github.com)|140.82.121.10|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [application/zip]
Saving to: ‘master.zip.1’

master.zip.1            [    <=>             ] 857.87K  1.02MB/s               ^C


In [101]:
!mkdir ./content
!unzip ./master.zip -d ./content/

unzip:  cannot find or open ./master.zip, ./master.zip.zip or ./master.zip.ZIP.


In [90]:
files = [(os.path.join(root, file), os.path.join(os.path.dirname(root), "musicxml", file[:-10]+".musicxml")) for root, dirs, files in os.walk("vienna4x22-master") for file in files if file.endswith(".match")]

In [None]:
asap_files = [[(os.path.join(root, file), os.path.join(os.path.dirname(root), os.path.basename(root).split("_")[0]+".mid"), os.path.join(os.path.dirname(root), "xml_score.musicxml"), os.path.join(root, os.path.splitext(file)[0]+".match")) for root, dirs, files in os.walk("vienna4x22-master") for file in files if file.endswith("note_alignments.tsv")]]

In [91]:
files[0]

('vienna4x22-master/match/Chopin_op10_no3_p14.match',
 'vienna4x22-master/musicxml/Chopin_op10_no3.musicxml')

In [92]:
def produce_match(alignment_fn, mfn, sfn, match_name):
	"""
	Produce and Save Match.

	Parameters
	----------
	mfn : str
		Performance Midi File Path
	sfn : str
		Score musicxml File Path
	alignment_fn : str
		Alignment ".txt" file path
	match_name : str
		Path and Save Name.
	"""
	data = pd.read_csv(alignment_fn, sep="\t")

	alignment = list()
	for x in data[["xml_id", "midi_id"]].to_numpy():
		if x[1] == "deletion":
			dd = dict(label="deletion", score_id=x[0])
		# TODO for asap alignments to contain "n"
		elif x[0] == "insertion":
			dd = dict(label="insertion", performance_id=str(x[1]))
		else:
			dd = dict(label="match", score_id=x[0], performance_id=str(x[1]))
		alignment.append(dd)
	ppart = pt.load_performance_midi(mfn)
	# This may cause re-indexing.
	spart = pt.score.merge_parts(pt.load_musicxml(sfn))
	spart = pt.score.unfold_part_maximal(spart, ignore_leaps=False)
	pt.save_match(alignment, ppart, spart, match_name)

In [93]:
def tokenize_pitch_spelling(ps_note):
	# step = {"A": 0, "B": 1, "C": 2, "D": 3, "E": 4, "F": 5, "G": 6}[]
	alter = {0:"", 1:"#", 2:"##", -1:"-", -2:"--"}[ps_note["alter"].item()]
	return pitch_to_ix[ps_note["step"].item()+alter]

def create_data(files):
	X_train = list()
	y_train = list()
	X_test = list()
	y_test = list()
	for match_file, score_file in files:
		performance, alignment = pt.load_match(match_file)
		score = pt.load_score(score_file)
		matched_notes = [alignment[idx] for idx, d in enumerate(alignment) if d["label"] == "match"]
		pna = performance.note_array()
		sna = score.note_array(include_pitch_spelling=True)
		X, y = np.zeros((len(matched_notes), 3), dtype=float), np.zeros((len(matched_notes), ), dtype=int)
		for idx, match_note in enumerate(matched_notes):
			X[idx] = np.lib.recfunctions.structured_to_unstructured(pna[np.where(pna["id"] == str(match_note["performance_id"]))][["onset_sec", "duration_sec", "pitch"]])
			y[idx] = tokenize_pitch_spelling(sna[np.where(sna["id"] == match_note["score_id"])][["step", "alter", "octave"]])
		if os.path.basename(match_file).startswith("Mozart"):
			X_test.append(X)
			y_test.append(y)
		else:
			X_train.append(X)
			y_train.append(y)
	return X_train, y_train, X_test, y_test

In [94]:
X_train, y_train, X_test, y_test = create_data(files)



### Model

In [95]:
PAD = "<PAD>"

PITCHES = {
	0: ["C", "B#", "D--"],
	1: ["C#", "B##", "D-"],
	2: ["D", "C##", "E--"],
	3: ["D#", "E-", "F--"],
	4: ["E", "D##", "F-"],
	5: ["F", "E#", "G--"],
	6: ["F#", "E##", "G-"],
	7: ["G", "F##", "A--"],
	8: ["G#", "A-"],
	9: ["A", "G##", "B--"],
	10: ["A#", "B-", "C--"],
	11: ["B", "A##", "C-"],
}

INTERVALS = {
	0: ["P1", "d2", "A7"],
	1: ["m2", "A1"],
	2: ["M2", "d3", "AA1"],
	3: ["m3", "A2"],
	4: ["M3", "d4", "AA2"],
	5: ["P4", "A3"],
	6: ["d5", "A4"],
	7: ["P5", "d6", "AA4"],
	8: ["m6", "A5"],
	9: ["M6", "d7", "AA5"],
	10: ["m7", "A6"],
	11: ["M7", "d1", "AA6"],
}

DIATONIC_PITCHES = ["C", "D", "E", "F", "G", "A", "B"]

KEY_SIGNATURES = list(range(-7, 8))
accepted_pitches = [ii for i in PITCHES.values() for ii in i]
accepted_ks = KEY_SIGNATURES
pitch_to_ix = {p: accepted_pitches.index(p) for p in accepted_pitches}
ks_to_ix = {k: KEY_SIGNATURES.index(k) for k in KEY_SIGNATURES}
# add PADDING TAD
pitch_to_ix[PAD] = len(accepted_pitches)
ks_to_ix[PAD] = len(KEY_SIGNATURES)

In [96]:
class PKSpell(nn.Module):
	"""Models that decouples key signature estimation from pitch spelling by adding a second RNN.
	This model reached state of the art performances for pitch spelling.
	"""

	def __init__(
		self,
		input_dim=3,
		hidden_dim=100,
		pitch_to_ix=pitch_to_ix,
		hidden_dim2=24,
		rnn_depth=1,
		dropout=0.1,
		bidirectional=True
	):
		super(PKSpell, self).__init__()
		self.dropout = nn.Dropout(dropout)
		self.n_out_pitch = len(pitch_to_ix)
		self.hidden_dim = hidden_dim
		self.hidden_dim2 = hidden_dim2

		# RNN layer.
		self.rnn = nn.LSTM(
			input_size=input_dim,
			hidden_size=hidden_dim // 2 if bidirectional else hidden_dim,
			bidirectional=bidirectional,
			num_layers=rnn_depth,
		)
		# Output layers.
		self.top_layer_pitch = nn.Linear(hidden_dim, self.n_out_pitch)
		# Loss function that we will use during training.
		self.loss_pitch = nn.CrossEntropyLoss()

	def compute_outputs(self, sentences, sentences_len):
		rnn_out, _ = self.rnn(sentences)
		rnn_out = self.dropout(rnn_out)
		out_pitch = self.top_layer_pitch(rnn_out)
		return out_pitch

	def forward(self, sentences, pitches, sentences_len):
		# First computes the predictions, and then the loss function.

		# Compute the outputs. The shape is (max_len, n_sentences, n_labels).
		scores_pitch = self.compute_outputs(sentences, sentences_len)

		# Flatten the outputs and the gold-standard labels, to compute the loss.
		# The input to this loss needs to be one 2-dimensional and one 1-dimensional tensor.
		scores_pitch = scores_pitch.view(-1, self.n_out_pitch)
		pitches = pitches
		loss = self.loss_pitch(scores_pitch, pitches)
		return loss

	def predict(self, sentences, sentences_len):
		# Compute the outputs from the linear units.
		scores_pitch, scores_ks = self.compute_outputs(sentences, sentences_len)

		# Select the top-scoring labels. The shape is now (max_len, n_sentences).
		predicted_pitch = scores_pitch.argmax(dim=2)
		return [predicted_pitch[: int(l), i].cpu().numpy() for i, l in enumerate(sentences_len)]


class PSDataset(Dataset):
	def __init__(self, x, y):
		super(PSDataset, self).__init__()
		self.x = x
		self.y = y
	def __getitem__(self, idx):
		return torch.tensor(self.x[idx]), torch.tensor(self.y[idx])
	def __len__(self):
		return len(self.x)

def collate_ps(data):
	def merge(sequences):
		lengths = [len(seq) for seq in sequences]
		padded_seqs = torch.zeros(len(sequences), max(lengths)).long()
		for i, seq in enumerate(sequences):
			end = lengths[i]
			padded_seqs[i, :end] = seq[:end]
		return sequences, lengths

	# sort a list by sequence length (descending order) to use pack_padded_sequence
	data.sort(key=lambda x: len(x[0]), reverse=True)

	# seperate source and target sequences
	src_seqs, trg_seqs = zip(*data)

	# merge sequences (from tuple of 1D tensor to 2D tensor)
	# src_seqs, src_lengths = merge(src_seqs)
	# trg_seqs, trg_lengths = merge(trg_seqs)
	src_lengths = [len(seq) for seq in src_seqs]

	return src_seqs[0].float(), src_lengths, trg_seqs[0]

class PKSpellPL(pl.LightningModule):
	def __init__(self):
		super(PKSpellPL, self).__init__()
		self.module = PKSpell()
	def training_step(self, batch, batch_idx):
		src_seqs, src_lengths, trg_seqs = batch
		loss = self.module(src_seqs, trg_seqs, src_lengths)
		self.log("train_loss", loss.item(), on_epoch=True, on_step=True, prog_bar=True)
		return loss
	def val_step(self, batch, batch_idx):
		src_seqs, src_lengths, trg_seqs = batch
		loss = self.module(src_seqs, trg_seqs, src_lengths)
		self.log("val_loss", loss.item(), on_epoch=True, prog_bar=True)
		return loss
	def configure_optimizers(self):
		optimizer = torch.optim.Adam(self.parameters(), lr=0.001, weight_decay=5e-4)
		return {
			"optimizer": optimizer,
		}

### Train the PKSpell model

In [99]:
model = PKSpellPL()
train_dataloader = DataLoader(PSDataset(X_train, y_train), collate_fn=collate_ps, batch_size=1, num_workers=2)
val_dataloader = DataLoader(PSDataset(X_test, y_test), collate_fn=collate_ps, batch_size=1, num_workers=2)
trainer = pl.Trainer(max_epochs=1)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [100]:
trainer.fit(model, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader)

  rank_zero_warn("You passed in a `val_dataloader` but have no `validation_step`. Skipping val loop.")

  | Name   | Type    | Params
-----------------------------------
0 | module | PKSpell | 25.6 K
-----------------------------------
25.6 K    Trainable params
0         Non-trainable params
25.6 K    Total params
0.103     Total estimated model params size (MB)
  rank_zero_warn(


Training: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=1` reached.


# Voice Separation

Here we will investigate the task of voice separation from Midi. This task consists of assigning a voice to each monophonic line.

In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.utils.data import DataLoader, Dataset
import pytorch_lightning as pl

### Model

In [None]:

class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)


class Down(nn.Module):
    """Downscaling with maxpool then double conv"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)


class Up(nn.Module):
    """Upscaling then double conv"""

    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()

        # if bilinear, use the normal convolutions to reduce the number of channels
        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
            self.conv = DoubleConv(in_channels, out_channels, in_channels // 2)
        else:
            self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
            self.conv = DoubleConv(in_channels, out_channels)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        # input is CHW
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])
        # if you have padding issues, see
        # https://github.com/HaiyongJiang/U-Net-Pytorch-Unstructured-Buggy/commit/0e854509c2cea854e247a9c615f175f76fbb2e3a
        # https://github.com/xiaopeng-liao/Pytorch-UNet/commit/8ebac70e633bac59fc22bb5195e513d5832fb3bd
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)


class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)

class UNet(nn.Module):
    def __init__(self, n_channels, n_classes, bilinear=False):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear

        self.inc = DoubleConv(n_channels, 64)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        factor = 2 if bilinear else 1
        self.down4 = Down(512, 1024 // factor)
        self.up1 = Up(1024, 512 // factor, bilinear)
        self.up2 = Up(512, 256 // factor, bilinear)
        self.up3 = Up(256, 128 // factor, bilinear)
        self.up4 = Up(128, 64, bilinear)
        self.outc = OutConv(64, n_classes)

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits


class UnetVoiceSeparationModel(pl.LightningModule):
    def __init__(self,
                 n_classes,
                 input_channels = 1,
                 lr=0.0005,
                 weight_decay=5e-4,
        ):
        super(UnetVoiceSeparationModel, self).__init__()
        self.save_hyperparameters()
        self.module = UNet(input_channels, n_classes).double()
        self.lr = lr
        self.weight_decay = weight_decay
        self.train_loss = nn.CrossEntropyLoss(ignore_index=-1)

    def training_step(self, batch, batch_idx):
        pr_dict = batch[0]
        voice_pr = pr_dict["voice_pianoroll"].squeeze().T
        input_pr = torch.clip(voice_pr + 1, 0, 1).unsqueeze(0).unsqueeze(0).to(self.device, dtype = torch.float64)
        labels = voice_pr.to(self.device).unsqueeze(0)

        pred = self.module(input_pr)
        loss = self.train_loss(pred, labels)
        # batch_f1 = self.train_f1(batch_pred, batch_labels)
        self.log("train_loss", loss.item(), prog_bar=True, on_epoch=True, on_step = True, batch_size = 1, sync_dist=True)
        # self.log("train_f1", batch_f1.item(), prog_bar=True, on_epoch=True)
        return loss

    def validation_step(self, batch, batch_idx):
        pr_dict = batch[0]
        voice_pr = pr_dict["voice_pianoroll"].squeeze().T
        input_pr = torch.clip(voice_pr + 1, 0, 1).unsqueeze(0).unsqueeze(0).to(self.device, dtype = torch.float64)
        labels = voice_pr.to(self.device).unsqueeze(0)

        pred = self.module(input_pr)
        loss = self.train_loss(pred, labels)
        self.log("val_loss", loss.item(), prog_bar=True, on_epoch=True, on_step = True, batch_size = 1)
        voice_pred = pr_to_voice_pred(F.log_softmax(pred.squeeze(), dim = 0), pr_dict["notearray_onset_beat"].squeeze(), pr_dict["notearray_duration_beat"].squeeze(),  pr_dict["notearray_pitch"].squeeze(), piano_range=True, time_div = 12)
        voice_pred = voice_pred.to(self.device)
        fscore = self.val_monophonic_f1(voice_pred, pr_dict["notearray_voice"].squeeze(), pr_dict["notearray_onset_beat"].squeeze(), pr_dict["notearray_duration_beat"].squeeze())
        self.log("val_f1", fscore.item(), prog_bar=True, on_epoch=True, on_step = True, batch_size = 1, sync_dist=True)
        avc = self.val_avc(voice_pred, pr_dict["notearray_voice"].squeeze())
        self.log("val_avc", avc.item(), prog_bar=True, on_epoch=True, on_step = True, batch_size = 1, sync_dist=True)
        # add F1 computation
        return avc

    def configure_optimizers(self):
        optimizer = Adam(self.parameters(), lr=self.lr, weight_decay=self.weight_decay)
        return {
            "optimizer": optimizer,
        }