In [3]:
# https://arxiv.org/pdf/1802.04208.pdf

https://www.youtube.com/watch?v=BA-Z0KJIyJs

In [4]:
! pip install pescador

!wget http://deepyeti.ucsd.edu/cdonahue/wavegan/data/drums.tar.gz 
!tar -xvf drums.tar.gz
!wget http://deepyeti.ucsd.edu/cdonahue/wavegan/data/mancini_piano.tar.gz
!tar -xvf mancini_piano.tar.gz
!wget http://deepyeti.ucsd.edu/cdonahue/wavegan/data/sc09.tar.gz
!tar -xvf sc09.tar.gz

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
sc09/train/Eight_b4aa9fef_nohash_2.wav
sc09/train/Eight_b4aa9fef_nohash_3.wav
sc09/train/Eight_b4aa9fef_nohash_4.wav
sc09/train/Eight_b4bef564_nohash_0.wav
sc09/train/Eight_b52bd596_nohash_0.wav
sc09/train/Eight_b544d4fd_nohash_0.wav
sc09/train/Eight_b5552931_nohash_0.wav
sc09/train/Eight_b5552931_nohash_1.wav
sc09/train/Eight_b3bb4dd6_nohash_0.wav
sc09/train/Eight_b5552931_nohash_2.wav
sc09/train/Eight_b71ebf79_nohash_0.wav
sc09/train/Eight_b8872c20_nohash_0.wav
sc09/train/Eight_b9f46737_nohash_1.wav
sc09/train/Eight_baeac2ba_nohash_1.wav
sc09/train/Eight_bd061bef_nohash_0.wav
sc09/train/Eight_be91a00a_nohash_0.wav
sc09/train/Eight_c0445658_nohash_2.wav
sc09/train/Eight_c120e80e_nohash_5.wav
sc09/train/Eight_c1e0e8e3_nohash_1.wav
sc09/train/Eight_c4500713_nohash_1.wav
sc09/train/Eight_c71e3acc_nohash_1.wav
sc09/train/Eight_b5552931_nohash_3.wav
sc09/train/Eight_b5552931_nohash_4.wav
sc09/train/Eight_b575b5fb_nohash_0.wav

In [5]:
import torch
import torch.nn as nn
from torch.nn import Parameter
import torch.nn.functional as F
import torch.utils.data
import torch.optim as optim
import soundfile as sf
device = torch.device("cuda" if (torch.cuda.is_available()) else "cpu")

In [6]:
import os
import time
import math

import random
import librosa
import librosa.display
import numpy as np

import matplotlib
import matplotlib.pyplot as plt
import glob
import pescador

from torch.autograd import Variable

# **GENERATOR MODEL**


In [7]:
class Transpose1dLayer(nn.Module):
    def __init__(self,in_channels,out_channels, kernel_size, stride, padding=11, output_padding=1,upsample=4):
        super(Transpose1dLayer, self).__init__()
        self.upsample = upsample
        reflection_pad = nn.ConstantPad1d(kernel_size // 2, value=0)
        conv1d = nn.Conv1d(in_channels, out_channels, kernel_size, stride)
        conv1d.weight.data.normal_(0.0, 0.02)
        batch_norm = nn.BatchNorm1d(out_channels)
        operation_list = [reflection_pad, conv1d,batch_norm]
        self.transpose_ops = nn.Sequential(*operation_list)
        
    def forward(self, x):
        x = nn.functional.interpolate(x, scale_factor=self.upsample, mode="nearest")
        return self.transpose_ops(x)

class Generator(nn.Module):
    def __init__(self,noise_latent_dim, model_size=64,num_channel=1,slice_len=65536):
        super(Generator, self).__init__()
        self.model_size = model_size  # d
        self.num_channel = num_channel  # c
        latent_dim = noise_latent_dim
        self.dim_mul = 32
        deconv_layers = [
            Transpose1dLayer(self.dim_mul * model_size,(self.dim_mul * model_size) // 2,kernel_size= 25,stride=1, upsample=4),
            Transpose1dLayer((self.dim_mul * model_size) // 2,(self.dim_mul * model_size) // 4,kernel_size=25,stride=1,upsample=4),
            Transpose1dLayer((self.dim_mul * model_size) // 4,(self.dim_mul * model_size) // 8,kernel_size=25,stride=1,upsample=4),
            Transpose1dLayer((self.dim_mul * model_size) // 8,(self.dim_mul * model_size) // 16,kernel_size=25,stride=1,upsample=4),
            Transpose1dLayer((self.dim_mul * model_size) // 16, model_size,25,stride=1,upsample=4),
            Transpose1dLayer(model_size,num_channel,25,stride=1,upsample=4)]
        self.deconv_list = nn.ModuleList(deconv_layers)
        self.fc1 = nn.Linear(latent_dim, 4 * 4 * model_size * self.dim_mul)
        self.bn1 = nn.BatchNorm1d(num_features=model_size * self.dim_mul)

    def forward(self, x):
        x = self.fc1(x).view(-1, self.dim_mul * self.model_size, 16)
        x = F.relu(self.bn1(x))
        for deconv in self.deconv_list[:-1]:
            x = F.relu(deconv(x))
        output = torch.tanh(self.deconv_list[-1](x))
        return output



def initialize_model_weights(nn_model):
    for layer in nn_model.modules():
        if isinstance(layer,(nn.Conv1d,nn.ConvTranspose1d,nn.Linear)):
            nn.init.kaiming_normal_(layer.weight.data)
        elif isinstance(layer,(nn.BatchNorm1d)):
            nn.init.normal_(layer.weight.data,0.0,0.02) #paper has zero centerd normal distribution


# **DISCRIMINATOR MODEL**

In [8]:
class Conv1D(nn.Module):
    def __init__(self,input_channels,output_channels, kernel_size, stride=4,padding=11,shift_factor=2,drop_out=0):
        super(Conv1D, self).__init__()
        self.conv1d = nn.Conv1d(input_channels, output_channels, kernel_size, stride=stride, padding=padding)
        self.batch_norm = nn.BatchNorm1d(output_channels)
        self.phase_shuffle = PhaseShuffle(shift_factor)
        self.use_phase_shuffle = shift_factor == 0
        # self.dropout = nn.Dropout2d(drop_out)

    def forward(self, x):
        x = self.conv1d(x)
        x = self.batch_norm(x)
        x = F.leaky_relu(x, 0.2)
        if self.use_phase_shuffle:
            x = self.phase_shuffle(x)
        # x = self.dropout(x)
        return x


class PhaseShuffle(nn.Module):
    # Copied from https://github.com/jtcramer/wavegan/blob/master/wavegan.py#L8
    def __init__(self, shift_factor):
        super(PhaseShuffle, self).__init__()
        self.shift_factor = shift_factor

    def forward(self, x):
        if self.shift_factor == 0:
            return x
        # uniform in (L, R)
        k_list = (torch.Tensor(x.shape[0]).random_(0, 2 * self.shift_factor + 1)
                    - self.shift_factor)
        k_list = k_list.numpy().astype(int)

        # Combine sample indices into lists so that less shuffle operations
        # need to be performed
        k_map = {}
        for idx, k in enumerate(k_list):
            k = int(k)
            if k not in k_map:
                k_map[k] = []
            k_map[k].append(idx)

        # Make a copy of x for our output
        x_shuffle = x.clone()

        # Apply shuffle to each sample
        for k, idxs in k_map.items():
            if k > 0:
                x_shuffle[idxs] = F.pad(x[idxs][..., :-k], (k, 0), mode="reflect")
            else:
                x_shuffle[idxs] = F.pad(x[idxs][..., -k:], (0, -k), mode="reflect")
        return x_shuffle

class Discriminator(nn.Module):
    def __init__(self, model_size=64, num_channel=1, shift_factor=2, slice_len=65536):
        super(Discriminator,self).__init__()

        self.model_size = model_size  # d
        self.num_channel = num_channel  # c
        self.shift_factor = shift_factor  # n
        
    
        dense_layer = [
            Conv1D(num_channel, model_size,kernel_size=25, stride=4, padding=11,shift_factor=shift_factor),
            Conv1D(model_size, model_size*2,kernel_size=25, stride=4, padding=11,shift_factor=shift_factor),
            Conv1D(model_size*2, model_size*4,kernel_size=25, stride=4, padding=11,shift_factor=shift_factor),
            Conv1D(model_size*4, model_size*8,kernel_size=25, stride=4, padding=11,shift_factor=shift_factor),
            Conv1D(model_size*8, model_size*16,kernel_size=25, stride=4, padding=11,shift_factor=shift_factor),
            Conv1D(model_size*16, model_size*32,kernel_size=25, stride=4, padding=11,shift_factor=0),
        ]
        # self.print = nn.ModuleList(dense_layer)
        self.fully_connected_in_size = 512 * model_size
        self.dense_layer = nn.Sequential(*dense_layer)
        self.fully_connected = nn.Linear(self.fully_connected_in_size, 1)

    def forward(self, x):
        out = self.dense_layer(x)
        out = out.view(-1, self.fully_connected_in_size)
        return self.fully_connected(out) 

# **HYPER PARAMETERS**

In [9]:
learning_rate_g = 0.0001
learning_rate_d = 0.0003
beta1 = 0.5
beta2 = 0.9
n_epochs = 20
num_channel = 1
slice_len = 65536
model_size = 64
noise_laten_dim = 100
shift_factor =2
critic_iter = 5
weight_clip = 0.01
sampling_rate = 14000


lambda_ = 10
batch_size = 10
noise_latent_dim = 100  # size of the sampling noise

# **INSTANTITE GENERATOR**

In [10]:
gen_model = Generator(noise_latent_dim, model_size=model_size,num_channel=num_channel,slice_len=slice_len).to(device)
initialize_model_weights(gen_model)

gen_optimizer = optim.Adam(gen_model.parameters(),lr = learning_rate_g,betas=(beta1,beta2))
# gen_optimizer = optim.RMSprop(gen_model.parameters(),lr = learning_rate_g)

dis_model = Discriminator(model_size=model_size, num_channel=num_channel, shift_factor=shift_factor, slice_len=slice_len).to(device)
initialize_model_weights(dis_model)
dis_optimizer = optim.Adam(dis_model.parameters(),lr = learning_rate_d,betas=(beta1,beta2))
# dis_optimizer = optim.RMSprop(dis_model.parameters(),lr = learning_rate_d)

In [22]:
dis_model

Discriminator(
  (dense_layer): Sequential(
    (0): Conv1D(
      (conv1d): Conv1d(1, 64, kernel_size=(25,), stride=(4,), padding=(11,))
      (batch_norm): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (phase_shuffle): PhaseShuffle()
    )
    (1): Conv1D(
      (conv1d): Conv1d(64, 128, kernel_size=(25,), stride=(4,), padding=(11,))
      (batch_norm): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (phase_shuffle): PhaseShuffle()
    )
    (2): Conv1D(
      (conv1d): Conv1d(128, 256, kernel_size=(25,), stride=(4,), padding=(11,))
      (batch_norm): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (phase_shuffle): PhaseShuffle()
    )
    (3): Conv1D(
      (conv1d): Conv1d(256, 512, kernel_size=(25,), stride=(4,), padding=(11,))
      (batch_norm): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (phase_shuffle): PhaseShuffle()

In [21]:
gen_model

Generator(
  (deconv_list): ModuleList(
    (0): Transpose1dLayer(
      (transpose_ops): Sequential(
        (0): ConstantPad1d(padding=(12, 12), value=0)
        (1): Conv1d(2048, 1024, kernel_size=(25,), stride=(1,))
        (2): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (1): Transpose1dLayer(
      (transpose_ops): Sequential(
        (0): ConstantPad1d(padding=(12, 12), value=0)
        (1): Conv1d(1024, 512, kernel_size=(25,), stride=(1,))
        (2): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (2): Transpose1dLayer(
      (transpose_ops): Sequential(
        (0): ConstantPad1d(padding=(12, 12), value=0)
        (1): Conv1d(512, 256, kernel_size=(25,), stride=(1,))
        (2): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (3): Transpose1dLayer(
      (transpose_ops): Sequential(
        (0): ConstantPad1d(padding=(

Helper functions to save the sounds generated

In [11]:
def make_dir(path):
    if not os.path.isdir(path):
        os.makedirs(path)
    return path

def save_sounds(fake_batch,epoch,samplerate=16000):
    epoch_dir = make_dir(os.path.join(output_wave_dir, str(epoch+1)))
    for idx, fake_wave in enumerate(fake_batch):
        path = os.path.join(epoch_dir, "{}.wav".format(idx + 1))
        sf.write(path, fake_wave[0],samplerate)

def gradient_penalty(dis_model,real,fake):
    batch_size,channel,size = real.shape
    epsilon= torch.rand((batch_size,1,1)) #one epsilon for each example
    epsilon = epsilon.repeat(1,channel,size).to(device)
    interpolated_waves  = real*epsilon + fake * (1-epsilon)

    scores = dis_model(interpolated_waves)

    gradient = torch.autograd.grad(

        inputs = interpolated_waves,
        outputs = scores,
        grad_outputs = torch.ones_like(scores),
        create_graph = True,
        retain_graph = True,
    )[0]
    gradient  = gradient.view(gradient.shape[0],-1)
    l2_norm = gradient.norm(2,dim=1)
    penalty = torch.mean((l2_norm-1)**2)
    return penalty

In [12]:
# ! pip install torchaudio --quiet
# import torchaudio
#  train_data = torchaudio.datasets.SPEECHCOMMANDS(root="./piano",download=True)
# train_loader = torch.utils.data.DataLoader(train_data,batch_size=batch_size)
# ! git clone https://github.com/Srigowri/audioDataloader.git --quiet

# from audioDataloader.dataloader import AudioDataset
# from audioDataloader.transforms import mulawnEncode,array2tensor,dic2tensor,mulaw
# import os

In [13]:
output_wave_dir = "/content/results"
output_model_dir = "/content/models"

make_dir(output_wave_dir)
make_dir(output_model_dir)

'/content/models'

In [14]:
wave_gan_name = "wavegan_{}.tar"
fixed_noise = torch.randn(batch_size,noise_latent_dim).to(device)

gen_model.eval()
with torch.no_grad():
    fake_waves = gen_model(fixed_noise).detach().cpu().numpy()
save_sounds(fake_waves, -1)

In [15]:
# from IPython.display import Audio
# Audio(url=output_wave_dir+"/0/2.wav")

# from google.colab import output
# output.eval_js('new Audio("output_wave_dir"+"/0/2.wav").play()')

In [16]:
!git clone https://github.com/Srigowri/audio_loader_pytorch.git

fatal: destination path 'audio_loader_pytorch' already exists and is not an empty directory.


In [17]:
%cd audio_loader_pytorch/
! ls
from utils import WavDataLoader

/content/audio_loader_pytorch
LICENSE  output  params.py  __pycache__  Readme.md  requirements.txt  utils.py


In [18]:
# ! rm models/*

0 1
1 2
101 10
171 10(1)
171 9(1)

In [None]:
gen_model.train()
dis_model.train()
gen_losses,dis_losses,wassertein_distance = [],[],[]
n_epochs=5

train_loader = WavDataLoader(os.path.join("/content/piano", "train"), "wav")
for epoch in range(10000):
    
    for _ in range(critic_iter):
        real = next(train_loader)
        noise = torch.randn(batch_size,noise_latent_dim).to(device)

        fake = gen_model(noise)
        critic_for_real = dis_model(real).reshape(-1)
        critic_for_fake = dis_model(fake).reshape(-1)
        wass_dist = -(torch.mean(critic_for_real) - torch.mean(critic_for_fake))
        penalty = gradient_penalty(dis_model,real,fake)
        critic_loss = wass_dist + lambda_* penalty #maximize is same as minimizing the negative of the loss
        dis_model.zero_grad()
        critic_loss.backward(retain_graph=True)  #to reutilize the same fake for generator retain the graph
        dis_optimizer.step()

    gen_output = dis_model(fake).reshape(-1)
    gen_loss = -torch.mean(gen_output)
    gen_model.zero_grad()
    gen_loss.backward()
    gen_optimizer.step()
    
    if epoch % 10 == 0:
        gen_losses.append(gen_loss.detach().item() * -1)
        dis_losses.append(critic_loss.detach().item())
        wassertein_distance.append(wass_dist.detach().item() * -1)
        print(f"Epoch {epoch}/{n_epochs}  \t \
        Generator loss: {gen_loss.detach().item():.4f} \t \
        Discriminator loss: {-critic_loss.detach().item():.4f}\t\
        Wassertein distance: {-1*wass_dist.detach().item():.4f}")

        with torch.no_grad():
            fake = fake_waves = gen_model(fixed_noise).detach().cpu().numpy()
        save_sounds(fake_waves, epoch)
        # saving_dict = {
        #         "generator": gen_model.state_dict(),
        #         "discriminator": dis_model.state_dict(),
        #         "epochs": epoch,
        #         "optimizer_d": dis_optimizer.state_dict(),
        #         "optimizer_g": gen_optimizer.state_dict(),
        #         "discriminator_cost": dis_losses,
        #         "wassertine_distance": wassertein_distance,
        #         "generator_cost": gen_losses,
        #     }
        # torch.save(saving_dict,output_model_dir+"/"+wave_gan_name.format(epoch) )

Kindly check the results folder for the generated audio signals