In [1]:
%load_ext autoreload
%autoreload 2

In [1]:
"""
This is the data processing script for POP909: A Pop song Dataset for Music Arrangement Generation.

It will allow you to quickly process the POP909 Files (Midi) into the Google Magenta's music representation 
as in [Music Transformer](https://magenta.tensorflow.org/music-transformer) and
[Performance RNN](https://magenta.tensorflow.org/performance-rnn).

Modified from: https://github.com/music-x-lab/POP909-Dataset/blob/master/data_process/data_process.ipynb

Note: I'm not using this data representation anymore. This notebook is just included for potential future use.
"""

import csv
import itertools
import pickle
import random
import os
import sys
from dataclasses import dataclass

import numpy as np
import pretty_midi
import pypianoroll
import torch
from matplotlib import pyplot as plt
from pprint import pprint
from tqdm import tqdm

from util import MidiEventProcessor

In [2]:
@dataclass
class Chord:
    name: str
    start_time: float
    end_time: float


@dataclass
class Event:
    token: int
    chord: Chord


@dataclass
class Segment:
    chord_token: int
    event_tokens: list


@dataclass
class Chunk:
    chords: list
    events: list


class ChordVocab:
    def __init__(self, chords):
        self.chords = chords.copy()
        self.chord_to_token = {chord: i for i, chord in enumerate(chords)}

In [3]:
def get_unique_chords(data_root):
    chords = set()
    for song_idx in sorted(os.listdir(data_root)):
        song_dir = os.path.join(data_root, song_idx)
        if not os.path.isdir(song_dir):
            continue
        chord_path = os.path.join(song_dir, "chord_midi.txt")
        with open(chord_path, newline="") as f:
            reader = csv.reader(f, delimiter="\t")
            chord_data = list(reader)
        for _, _, chord in chord_data:
            chords.add(chord)
    return chords


def make_chord_vocab(data_root, out_path):
    unique_chords = list(get_unique_chords(data_root))
    unique_chords.sort()
    unique_chords.remove("N")
    unique_chords.insert(0, "N")
    with open(out_path, "w") as f:
        for chord in unique_chords:
            f.write(f"{chord}\n")


def load_chord_vocab(vocab_path):
    with open(vocab_path, "r") as f:
        chords = f.read().splitlines()
    return ChordVocab(chords)

In [5]:
make_chord_vocab("../pop909/original", "../pop909/chord_vocab.txt")

In [6]:
chord_vocab = load_chord_vocab("../pop909/chord_vocab.txt")

In [10]:
def preprocess_song(midi_path, chord_path, chord_vocab):
    midi_data = pretty_midi.PrettyMIDI(midi_path)
    
    assert midi_data.instruments[0].name == "MELODY"
    assert midi_data.instruments[1].name == "BRIDGE"
    assert midi_data.instruments[2].name == "PIANO"
    
    melody = midi_data.instruments[0].notes
    bridge = midi_data.instruments[1].notes
    piano = midi_data.instruments[2].notes

    notes = melody + bridge + piano
    notes.sort(key=lambda note: note.start)
    
    with open(chord_path, newline="") as f:
        reader = csv.reader(f, delimiter="\t")
        chord_data = list(reader)

    chords = [
        Chord(chord, float(start_time), float(end_time))
        for start_time, end_time, chord in chord_data
    ]
    mep = MidiEventProcessor()
    events = mep.encode(notes, chords)

    segments = []
    for chord in chords:
        event_tokens = [e.token for e in events if e.chord is chord]
        chord_token = chord_vocab.chord_to_token[chord.name]
        segment = Segment(chord_token, event_tokens)
        segments.append(segment)
    return segments


def preprocess_pop909(data_root, chord_vocab_path, out_dir):
    chord_vocab = load_chord_vocab(chord_vocab_path)
    os.makedirs(out_dir, exist_ok=True)
    for song_idx in tqdm(sorted(os.listdir(data_root))):
        song_dir = os.path.join(data_root, song_idx)
        if not os.path.isdir(song_dir):
            continue
        midi_path = os.path.join(song_dir, f"{song_idx}.mid")
        chord_path = os.path.join(song_dir, "chord_midi.txt")
        segments = preprocess_song(midi_path, chord_path, chord_vocab)
        out_path = os.path.join(out_dir, f"{song_idx}.pkl")
        with open(out_path, "wb") as f:
            pickle.dump(segments, f)

In [11]:
preprocess_pop909(
    data_root="../pop909/original",
    chord_vocab_path="../pop909/chord_vocab.txt",
    out_dir="../pop909/processed",
)

100%|██████████| 910/910 [03:42<00:00,  4.10it/s]


In [12]:
def group(iterable, n, fillvalue=None):
    """
    Example:
    >>> group("ABCDEFG", 3, "x")
    ABC DEF Gxx
    
    Source: https://docs.python.org/3/library/itertools.html.
    """
    args = [iter(iterable)] * n
    return itertools.zip_longest(*args, fillvalue=fillvalue)


def get_chunks(path, n=4, eos_token=356):
    """
    Create a list of chunks, i.e. {chord_tokens, event_tokens} pairs,
    from a file containing the segments of a song.
    
    Also append an end-of-sequence token to every sequence of event
    tokens.
    
    Args:
        path: The path to the source file.
        n: The number of segments per data point.
    """
    segments = pickle.load(open(path, "rb"))
    segments = filter(lambda s: len(s.event_tokens) > 0, segments)
    chunks = []
    for segment_group in group(segments, n):
        if None in segment_group:
            continue
        chord_tokens = [s.chord_token for s in segment_group]
        event_tokens = []
        for s in segment_group:
            event_tokens.extend(s.event_tokens)
        event_tokens.append(eos_token)
        chunk = Chunk(chords=chord_tokens, events=event_tokens)
        chunks.append(chunk)
    return chunks


def get_all_chunks(data_root, n=4):
    """
    Get the chunks from all songs and combine them.
    """
    all_chunks = []
    for file_name in sorted(os.listdir(data_root)):
        if not file_name.endswith(".pkl"):
            continue
        path = os.path.join(data_root, file_name)
        chunks = get_chunks(path, n)
        all_chunks.extend(chunks)
    return all_chunks

In [13]:
chunks = get_all_chunks("../pop909/processed")
with open("../pop909/pop909.pkl", "wb") as f:
    pickle.dump(chunks, f)

In [None]:
class MusicDataset(torch.utils.data.Dataset):
    """
    A dataset of chunks from songs. Each chunk consists of a sequence of
    4 chords along with a sequence of notes played with those chords.
    
    The chords and the notes are both tokenized already, and converted to
    tensors when __getitem__ is called.
    """

    def __init__(self, data_path):
        super().__init__()
        self.data_path = data_path

        data = pickle.load(open(data_path, "rb"))
        self._chords = [torch.LongTensor(d.chords) for d in data]
        self._events = [torch.LongTensor(d.events) for d in data]
        
    def __len__(self):
        return len(self._chords)
    
    def __getitem__(self, idx):
        return (
            self._chords[idx],
            self._events[idx],
        )

In [14]:
dataset = MusicDataset("../pop909/pop909.pkl")

NameError: name 'MusicDataset' is not defined

In [None]:
len(dataset)

In [None]:
seq_lens = [len(d[1]) for d in dataset]
print(max(seq_lens))

In [None]:
plt.hist(seq_lens, bins=25)
plt.show()