In [1]:
import torch
import torch.nn as nn
from torch.autograd import Variable
from torch import optim

In [2]:
class DelayedRNN(nn.Module):
    
    def __init__(self, num_hidden):
        super(DelayedRNN, self).__init__()

        self.t_delay_RNN_x = nn.GRU(input_size=num_hidden, hidden_size=num_hidden, batch_first=True)
        self.t_delay_RNN_y = nn.GRU(input_size=num_hidden, hidden_size=num_hidden, batch_first=True)
        self.t_delay_RNN_z = nn.GRU(input_size=num_hidden, hidden_size=num_hidden, batch_first=True)

        self.W_t = nn.Linear(3 * num_hidden, num_hidden)

        self.c_RNN = nn.GRU(input_size=num_hidden, hidden_size=num_hidden, batch_first=True)

        self.W_c = nn.Linear(num_hidden, num_hidden)

        self.f_delay_RNN = nn.GRU(input_size=num_hidden, hidden_size=num_hidden, batch_first=True)

        self.W_f = nn.Linear(num_hidden, num_hidden)

    def forward(self, input_h_t, input_h_f, input_h_c):

        h_t_x = Variable(torch.zeros(input_h_t.shape))
        h_t_y = Variable(torch.zeros(input_h_t.shape))
        h_t_z = Variable(torch.zeros(input_h_t.shape))

        for i in range(input_h_t.shape[2]):
            h_t_x_slice, _ = self.t_delay_RNN_x(input_h_t[:, :, i, :])
            h_t_x[:, :, i, :] = h_t_x_slice

        reverse_index = np.arange(input_h_t.shape[2] - 1, -1, -1)
        for i in range(input_h_t.shape[1]):
            h_t_y_slice, _ = self.t_delay_RNN_y(input_h_t[:, i, :, :])
            h_t_z_slice, _ = self.t_delay_RNN_z(input_h_t[:, i, reverse_index, :])
            h_t_y[:, i, :, :] = h_t_y_slice
            h_t_z[:, i, :, :] = h_t_z_slice[:, reverse_index, :]

        h_t_concat = torch.cat([h_t_x, h_t_y, h_t_z], 3)

        h_t_w = self.W_t(h_t_concat)

        output_h_t = torch.add(input_h_t, h_t_w)

        h_c_rnn, _ = self.c_RNN(input_h_c)
        h_c_w = self.W_c(h_c_rnn)
        output_h_c = torch.add(input_h_c, h_c_w)

        h_c_expand = output_h_c.view(output_h_c.shape[0], output_h_c.shape[1], 1, output_h_c.shape[2]).repeat(1, 1, 32, 1)
        h_f_sum = torch.add(torch.add(input_h_f, output_h_t), h_c_expand)

        h_f_ = Variable(torch.zeros(input_h_f.shape))

        for i in range(h_f_sum.shape[1]):
            h_f_slice, _ = self.f_delay_RNN(h_f_sum[:, i, :, :])
            h_f_[:, i, :, :] = h_f_slice

        h_f_w = self.W_f(h_f_)

        output_h_f = torch.add(input_h_f, h_f_w)

        return output_h_t, output_h_f, output_h_c

In [3]:
class MelNet(nn.Module):

    def __init__(self, num_hidden, num_layer, K):
        super(MelNet, self).__init__()
        
        self.W_t_0 = nn.Linear(1, num_hidden)
        self.W_f_0 = nn.Linear(1, num_hidden)
        self.W_c_0 = nn.Linear(32, num_hidden)
        
        self.module_list = nn.ModuleList([DelayedRNN(512) for i in range(num_layer)])

        self.W_theta = nn.Linear(num_hidden, 3 * K)
        self.pi_softmax = nn.Softmax(dim=3)
        self.K = K
        
    def forward(self, input_tensor):
        
        h_t = self.W_t_0(input_tensor)
        h_f = self.W_f_0(input_tensor)
#         h_c = self.W_c_0(input_tensor[:, :, :, 0])
        h_c = self.W_c_0(input_tensor[:, :, :, 0])
        
#         print('h_t: {}\nh_f: {}\nh_c: {}'.format(h_t.shape, h_f.shape, h_c.shape))
        
        for layer in self.module_list:
            h_t, h_f, h_c = layer(h_t, h_f, h_c)
            
        theta_hat = self.W_theta(h_f)
        
        mu = theta_hat[:, :, :, :K]
        std = torch.exp(theta_hat[:, :, :, K:2*K])
        pi = self.pi_softmax(theta_hat[:, :, :, 2*K:])
        
#         loss = torch.tensor([0])
        
#         for batch in range(mu.shape[0]):
#             for i in range(mu.shape[1]):
#                 for j in range(mu.shape[2]):
#                     prob = 0
#                     for k in range(self.K):
#                         prob += pi[batch, i, j, k] * torch.exp(torch.distributions.normal.Normal(mu[batch, i, j, k], std[batch, i, j, k]).log_prob(input_tensor[batch, i, j, 0]))

#                     loss = torch.add(loss, -torch.log(prob))

        dist = (1.0 / np.sqrt(2*np.pi) * torch.exp(-0.5 * ((input_tensor - mu) / std)**2) / std)
        prob = pi * dist
        loss = - torch.log(torch.sum(prob, dim=3))
        mean_loss = torch.mean(loss)
        
        return mean_loss


In [4]:
net = MelNet(512, 3, 10)

In [5]:
# net.parameters

<bound method Module.parameters of MelNet(
  (W_t_0): Linear(in_features=1, out_features=512, bias=True)
  (W_f_0): Linear(in_features=1, out_features=512, bias=True)
  (W_c_0): Linear(in_features=32, out_features=512, bias=True)
  (module_list): ModuleList(
    (0): DelayedRNN(
      (t_delay_RNN_x): GRU(512, 512, batch_first=True)
      (t_delay_RNN_y): GRU(512, 512, batch_first=True)
      (t_delay_RNN_z): GRU(512, 512, batch_first=True)
      (W_t): Linear(in_features=1536, out_features=512, bias=True)
      (c_RNN): GRU(512, 512, batch_first=True)
      (W_c): Linear(in_features=512, out_features=512, bias=True)
      (f_delay_RNN): GRU(512, 512, batch_first=True)
      (W_f): Linear(in_features=512, out_features=512, bias=True)
    )
    (1): DelayedRNN(
      (t_delay_RNN_x): GRU(512, 512, batch_first=True)
      (t_delay_RNN_y): GRU(512, 512, batch_first=True)
      (t_delay_RNN_z): GRU(512, 512, batch_first=True)
      (W_t): Linear(in_features=1536, out_features=512, bias=Tru

In [16]:
import librosa
import scipy as sp
import numpy as np
import os
import glob
import matplotlib.pyplot as plt
import IPython.display as ipd
from tqdm import tqdm_notebook as tqdm
import random
import copy

In [7]:
nsc = 6 * 256
hop = 256
nov = nsc - hop
n_mels = 256
fs = 44100/2
num_hidden = 512
K = 10
eps = 1e-8
db_ref = 160

mel_filters = librosa.filters.mel(sr=fs, n_fft=nsc, n_mels=n_mels)

In [8]:
meta_path = "D:/korean-single-speaker-speech-dataset/transcript.v.1.2.txt"
with open(meta_path, encoding='utf-8') as f:
    metadata = np.array([line.strip().split('|') for line in f])
    
wave_name_list = []

for data in metadata:
    wave_name_list.append(data[0])

In [9]:
data_folder = "D:/korean-single-speaker-speech-dataset/kss"

mel_path_list = list()
mel_shape_list = list()

for i, wav_name in enumerate(tqdm(wave_name_list)):
    
    npy_name = wav_name.replace('.wav', '.npy')
    wav_path = os.path.join(data_folder, wav_name)  
    save_path = os.path.join(data_folder + '/MelNet', npy_name)
    mel_path_list.append(save_path)
    
    if not os.path.isfile(save_path):
        y, sr = librosa.core.load(wav_path)
        f, t, Zxx = sp.signal.stft(y, fs=sr, nperseg=nsc, noverlap=nov)
        Sxx = np.abs(Zxx)
        Sxx = np.maximum(Sxx, eps)

        mel_filters = librosa.filters.mel(sr=fs, n_fft=nsc, n_mels=n_mels)
        mel_specgram = np.matmul(mel_filters, Sxx)

        log_mel_specgram = 20 * np.log10(np.maximum(mel_specgram, eps))
        norm_log_mel_specgram = (log_mel_specgram + db_ref) / db_ref
        
        mel_shape_list.append(norm_log_mel_specgram.shape)

        np.save(save_path, norm_log_mel_specgram)

HBox(children=(IntProgress(value=0, max=12853), HTML(value='')))




In [10]:
# data_dir = "D:/korean-single-speaker-speech-dataset/kss"

# file_list = glob.glob(data_dir + '/*')
# y, fs = librosa.core.load(file_list[0])

# f, t, Sxx = scipy.signal.stft(y, fs=fs, window='hann', nperseg=nsc, noverlap=nov)
# # Sxx = Sxx[1:, :]
# Zxx = np.abs(Sxx)
# log_spectrogram = 20 * np.log10(np.maximum(Zxx, 1e-8))
# log_spectrogram_norm = (log_spectrogram + 160) / 160

# mel_spectrogram = np.matmul(mel_filters, Zxx)
# log_mel_spectrogram = 20 * np.log10(np.maximum(mel_spectrogram, 1e-8))
# mel_input = (log_mel_spectrogram + 160) / 160

# Tier6 = mel_input[::2, :]
# Tier6_not = mel_input[1::2, :]

# Tier5 = Tier6_not[:, ::2]
# Tier5_not = Tier6_not[:, 1::2]

# Tier4 = Tier5_not[::2, :]
# Tier4_not = Tier5_not[1::2, :]

# Tier3 = Tier4_not[:, ::2]
# Tier3_not = Tier4_not[:, 1::2]

# Tier2 = Tier3_not[::2, :]
# Tier1 = Tier3_not[1::2, :]

# Tiers = [Tier1, Tier2, Tier3, Tier4, Tier5, Tier6]

In [11]:
# tensor = torch.tensor(Tier1.T)
# input_tensor = tensor.view([1, tensor.shape[0], tensor.shape[1], 1])

NameError: name 'Tier1' is not defined

In [12]:
def split_tier(batched_tensor):
    Tier6 = batched_tensor[:, :, ::2]
    Tier6_not = batched_tensor[:, :, 1::2]

    Tier5 = Tier6_not[:, ::2, :]
    Tier5_not = Tier6_not[:, 1::2, :]

    Tier4 = Tier5_not[:, :, ::2]
    Tier4_not = Tier5_not[:, :, 1::2]

    Tier3 = Tier4_not[:, ::2, :]
    Tier3_not = Tier4_not[:, 1::2, :]

    Tier2 = Tier3_not[:, :, ::2]
    Tier1 = Tier3_not[:, :, 1::2]

    Tiers = [Tier1, Tier2, Tier3, Tier4, Tier5, Tier6]
    
    return Tiers

def find_next_multiple(num, target):
    ans = num * np.int(np.ceil(target / num))
    return ans

In [13]:
class Batch_Loader():
    def __init__(self, mel_path_list, batch_size):
        super(Batch_Loader).__init__()
        self.mel_path_list = mel_path_list
        self.total_num_input = len(mel_path_list)
        self.tensor_input_list = [None] * self.total_num_input
        self.shuffle_step = 20
        self.loading_sequence = None
        self.end_flag = True
        self.batch_size = batch_size
        self.tensor_length_list = list()
    
    def load(self, i):
        norm_log_mel_specgram = np.load(self.mel_path_list[i])
        input_spectrogram = norm_log_mel_specgram.T
        tensor_input = torch.tensor(input_spectrogram).view(1, input_spectrogram.shape[0], input_spectrogram.shape[1])
        self.tensor_input_list[i] = tensor_input
        
    def get(self, i):
        if type(self.tensor_input_list[i]) == type(None):
            self.load(i)
        return self.tensor_input_list[i]  
    
    def load_all(self):
        for i in tqdm(range(len(self.mel_path_list))):
            self.tensor_length_list.append(self.get(i).shape[1])
        
        self.tensor_length_list = np.asarray(self.tensor_length_list)
    
    def initialize_batch(self):
        if len(self.tensor_length_list) == 0:
            self.load_all()
        loading_sequence = np.argsort(self.tensor_length_list)
#       print(loading_sequence)
#       print(type(loading_sequence))
        bundle = np.stack([self.tensor_length_list[loading_sequence], loading_sequence])
        
        for seq_len in range(self.shuffle_step, np.max(self.tensor_length_list), self.shuffle_step):
            idxs = np.where((bundle[0, :] > seq_len) & (bundle[0, :] <= seq_len + self.shuffle_step))[0]
            idxs_origin = copy.deepcopy(idxs)
            random.shuffle(idxs)
            bundle[:, idxs_origin] = bundle[:, idxs]
            
        loading_sequence = bundle[1, :]
        
        self.loading_sequence = loading_sequence
        self.current_loading_index = 0
        self.end_flag = False
        
        return
    
    def get_batch(self):
        
        tensor_list = list()
        tensor_size_list = list()
        
        count = 0
        max_seq_len = 0
        
        for i in range(self.batch_size):
            
            if self.current_loading_index >= self.total_num_input:
                self.end_flag = True
                break
            
            tensor = self.get(self.loading_sequence[self.current_loading_index])
            tensor_list.append(tensor)
            tensor_size_list.append(tensor.shape[1])
             
            if (tensor.shape[1] > max_seq_len):
                max_seq_len = tensor.shape[1] 
            
            self.current_loading_index += 1
            count += 1
            
        batch_len = find_next_multiple(8, max_seq_len)
            
        batched_tensor = torch.zeros(count, batch_len, n_mels)
        batched_loss_mask = torch.zeros(count, batch_len, n_mels)
        
        for order in range(count):
            batched_tensor[order, :tensor_size_list[order], :] = tensor_list[order]
            batched_loss_mask[order, :tensor_size_list[order], :] = torch.ones(tensor_list[order].shape)
        
        
        
        return batched_tensor, batched_loss_mask

In [14]:
batch_loader = Batch_Loader(mel_path_list, 4)

In [17]:
optimizer = optim.SGD(net.parameters(), lr=0.01)

EPOCH = 1

batch_loader.initialize_batch()

for epoch in range(EPOCH):
    batch_loader.initialize_batch()
    while batch_loader.end_flag == False:
        optimizer.zero_grad()
        input_tensor, loss_mask = batch_loader.get_batch()
        
        input_tensor_tier = split_tier(input_tensor)
        loss_mask_tier = split_tier(loss_mask)
        
        tier_1 = input_tensor_tier[0]
        
        input_tensor = tier_1.view([tier_1.shape[0], tier_1.shape[1], tier_1.shape[2], 1])
        
        loss = net(input_tensor)
        loss.backward()
        optimizer.step()
        
        print(loss)

tensor(0.7507, grad_fn=<MeanBackward0>)
tensor(0.5077, grad_fn=<MeanBackward0>)
tensor(1.8073, grad_fn=<MeanBackward0>)
tensor(0.6928, grad_fn=<MeanBackward0>)
tensor(1.5934, grad_fn=<MeanBackward0>)


KeyboardInterrupt: 