In [1]:
!pip install mido

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting mido
  Downloading mido-1.2.10-py2.py3-none-any.whl (51 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.1/51.1 KB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mido
Successfully installed mido-1.2.10


## 모델구조
Seq2Seq 구조를 기본으로 하며, 몇가지 특이점이 있음.

#### Encoder : Bi-directional LSTM

context vector는 $h^{\leftarrow}_T$와 $h^{\rightarrow}_T$를 concatenate하여 사용한다.


#### Decoder : Hierarchical RNN

'Conductor'layer를 통해 root에 해당하는 sequence를 생성하고, root의 token값을 사용하여 subsequence를 생성한다. subsequence의 input으로 root token(conductor state)과 이전 cell의 output을 concat한 값 사용. subsequnence의 initial hidden state는 conductor state의 tanh값을 쓴다.

해당 구조를 통해 gradient vanishing 문제를 완화할 수 있음.

(디코더이므로) 단방향 LSTM 사용.

## 학습 및 추론
Variational Auto-Encoder 구조는 mini-batch 내 datapoint들의 평균과 표준편차(또는 분산)를 추론에 사용한다. (표준편차를 이용한 noise가 낀 latent vector를 복원하는 형태로 학습) 이를 통해 학습 시 latent space를 dense하게 구성할 수 있으며, 추론 시 출력물의 형태가 다양할 수 있다.

In [3]:
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F

import tensorflow as tf
import tensorflow_datasets as tfds

import mido

In [10]:
# input에 외부 feature(conductor vector)을 concat하여 input하는 구조를 구현하기 위해 custom LSTM cell 정의
class LSTMCell_ExternalFeat(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(LSTMCell_ExternalFeat, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # Input gate weights
        self.W_ix = nn.Linear(input_size, hidden_size)
        self.W_ih = nn.Linear(hidden_size, hidden_size)
        self.b_i = nn.Parameter(torch.zeros(hidden_size))

        # Forget gate weights
        self.W_fx = nn.Linear(input_size, hidden_size)
        self.W_fh = nn.Linear(hidden_size, hidden_size)
        self.b_f = nn.Parameter(torch.zeros(hidden_size))

        # Cell weights
        self.W_cx = nn.Linear(input_size, hidden_size)
        self.W_ch = nn.Linear(hidden_size, hidden_size)
        self.b_c = nn.Parameter(torch.zeros(hidden_size))

        # Output gate weights
        self.W_ox = nn.Linear(input_size, hidden_size)
        self.W_oh = nn.Linear(hidden_size, hidden_size)
        self.b_o = nn.Parameter(torch.zeros(hidden_size))

    def forward(self, x, hidden, ext):    # ext로 차원 맞춰야하니까 W_~x는 input size 두배여야 함.
        x = torch.cat([x, ext])           # conducter feature를 concat.
        h, c = hidden
        i = torch.sigmoid(self.W_ix(x) + self.W_ih(h) + self.b_i)
        f = torch.sigmoid(self.W_fx(x) + self.W_fh(h) + self.b_f)
        c = f * c + i * torch.tanh(self.W_cx(x) + self.W_ch(h) + self.b_c)
        o = torch.sigmoid(self.W_ox(x) + self.W_oh(h) + self.b_o)
        h = o * torch.tanh(c)

        return h, c

In [17]:
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size)      # 드럼의 타입
        self.lstm1 = nn.LSTM(hidden_size, hidden_size, bidirectional=True)  # 2-Layer Bi-LSTM
        self.lstm2 = nn.LSTM(hidden_size, hidden_size, bidirectional=True)
        self.out1  = nn.Linear(hidden_size*2, hidden_size)
        self.out2  = nn.Linear(hidden_size*2, hidden_size)

    # 다음 논문에서 제시된 방식으로 차원 유지 : Schuster, M. and Paliwal, K. K. Bidirectional recurrent neural networks. 1997.
    def forward(self, input):
        embedded = self.embedding(input)
        output, hidden = self.lstm1(embedded)
        output = self.out1(output)
        output, hidden = self.lstm1(output)
        hidden = self.out2(hidden)          # context vector : Bi-directional output
        return hidden, #output


class Reparameterize(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(Reparameterize, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.Wmu = nn.Linear(input_size, hidden_size)
        self.Wstd = nn.Linear(input_size, hidden_size)

    def forward(self, context) :
        mu  = self.Wmu(context)                             # µ = W_hµ * h_T + b_µ
        std = torch.log(torch.exp(self.Wstd(context)) + 1)  # σ = log (exp(W_hσ * h_T + b_σ) + 1)
        eps = torch.randn_like(std)
        z = mu + eps*std                                    # ε ∼ N (0, I), z = µ + σ ⊙ ε
        return z


class Decoder(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Decoder, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size

        self.lstm1 = nn.LSTM(hidden_size, hidden_size)
        self.lstm2_cell = LSTMCell_ExternalFeat(input_size, hidden_size)
        self.fc_init = nn.Linear(hidden_size, hidden_size)
        self.fc_out = nn.Linear(hidden_size, output_size)

        self.tanh = nn.Tanh()
        
    def forward(self, encoder_output, target_seq):      # target seq len == 4*conducter seq len
        batch_size, max_seq_len, _ = target_seq.shape
        output_seq = torch.zeros(batch_size, max_seq_len, self.output_size)     # 최종 output
        cu,_ = self.lstm1(self.input_size, self.hidden_size)         # 1st LSTM Layer : Conducter

        # 2nd LSTM Layer : Decoder (LSTM w/ External input)
        inits = self.tanh(self.fc_init(cu))
        cell_state = torch.zeros(batch_size, 1, self.hidden_size)
        output = torch.zeros(batch_size, 1, self.output_size)
        for c, i, u in zip(cu, inits, range(max_seq_len//4)) :      # 논문에선 16마디 제시. 과제 안내에서 4 마디 제시.
            hidden_state = i
            for t in range(4):            # 4 x u + t 에 넣게 바꾸자
                lstm_input = torch.cat([c, output], dim=1)  # Concatenate External Feature
                hidden_state, cell_state = self.lstm2_cell(lstm_input, (hidden_state, cell_state))  # LSTM Cell
                output = self.fc_out(hidden_state)          # Output Layer
                output_seq[:, 4*u + t, :] = output          # Save the Output
        
        return output_seq

def loss_function(self, x_recon, x, mu, logvar):
    BCE = F.binary_cross_entropy(x_recon, x.view(-1, "##input_size"), reduction='sum')
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + KLD        # E[log pθ(x|z)] − β KL(qλ(z|x)||p(z))

In [20]:
encoder = Encoder(input_size=2048 , hidden_size=512).to('cuda')
reparameterize = Reparameterize(input_size=512 , hidden_size=512).to('cuda')
decoder = Decoder(input_size=1024 , hidden_size=1024 , output_size=512).to('cuda')

In [4]:
# dataset 불러오기. binary MIDI 표현으로 되어있음.
dataset = tfds.load(
    name="groove/full-midionly",
    split=tfds.Split.TRAIN,
    try_gcs=True)



In [5]:
numpy_ds = dataset.as_numpy_iterator()
binary_codes = []
for beats in numpy_ds :
    binary_codes.append(beats['midi'])  # 딕셔너리 내에서 다른 정보 제외하고 binary MIDI 데이터만 추출

# mido 라이브러리로 다룰 수 있도록 전처리
midi_dats=[]
for b in binary_codes :
    with open('tmp.mid', 'wb') as f:
        f.write(b)
    midi_dats.append(mido.MidiFile('tmp.mid'))

In [26]:
# 데이터 확인
# 사용할 수 있는 feature : channel(악기), velocity(강도), note(음 높낮이)
# time값을 sequence 상의 위치로 사용.
for track in midi_dats[42].tracks:
    print(track.name)
    for msg in track:
        print(msg)

Midi Drums
MetaMessage('track_name', name='Midi Drums', time=0)
MetaMessage('instrument_name', name='Brooklyn', time=0)
MetaMessage('time_signature', numerator=4, denominator=4, clocks_per_click=24, notated_32nd_notes_per_beat=8, time=0)
MetaMessage('key_signature', key='C', time=0)
MetaMessage('smpte_offset', frame_rate=24, hours=33, minutes=0, seconds=0, frames=0, sub_frames=0, time=0)
MetaMessage('set_tempo', tempo=857143, time=0)
control_change channel=9 control=4 value=90 time=0
note_on channel=9 note=36 velocity=51 time=0
note_on channel=9 note=22 velocity=127 time=8
control_change channel=9 control=4 value=90 time=45
note_on channel=9 note=42 velocity=27 time=0
note_off channel=9 note=36 velocity=64 time=4
note_off channel=9 note=22 velocity=64 time=8
control_change channel=9 control=4 value=90 time=35
note_on channel=9 note=42 velocity=58 time=0
note_off channel=9 note=42 velocity=64 time=0
note_off channel=9 note=42 velocity=64 time=10
control_change channel=9 control=4 value=

In [77]:
# 드럼 비트를 이진수 형태로 변환
drum_beats=[]
max_time=[]
for msg in midi_dats[70].tracks :
    for i in msg : max_time.append(i.time)
max_time = max(max_time)

for msg in midi_dats[70].tracks:
    for beat in msg :
        if type(beat) != mido.messages.messages.Message :
            continue    # MetaMessage 건너뛰기
        if beat.type == 'note_on':
            # note_on 메시지에서 드럼 비트 정보 추출
            note = beat.note    # 음 높낮이
            time = beat.time    # 음이 찍힌 시간
            # 드럼 비트가 존재하는 경우에만 이진수로 변환
            drum_beat = np.zeros(max_time+1)
            drum_beat[time] = 1
            drum_beats.append(drum_beat)
# numpy 배열로 변환
drum_beats = sum(np.array(drum_beats))

In [78]:
drum_beats

array([1., 2., 0., 2., 0., 3., 0., 0., 0., 1., 0., 0., 0., 2., 0., 1., 0.,
       0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 2., 0., 0., 0., 0.,
       1., 0., 1., 2., 0., 0., 0., 0., 2., 1., 1., 0., 2., 1., 0., 1., 0.,
       0., 1., 0., 1., 1., 2., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 1.,
       1., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0.