<a href="https://colab.research.google.com/github/HyunLee103/Pytorch_practice/blob/master/timbreTron.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
from torch.utils.data import DataLoader
from torch.autograd import Variable
import numpy as np
import librosa
import os
import json

json_dir = '/content/drive/Shared drives/Music_Style_Transform/json_sample'
save_dir_b = '/content/drive/My Drive/ADV_Project_Music_style_transform/cqt_sample/bass'
save_dir_d = '/content/drive/My Drive/ADV_Project_Music_style_transform/cqt_sample/drum'
save_dir_o = '/content/drive/My Drive/ADV_Project_Music_style_transform/cqt_sample/other'

In [0]:
# json -> cqt


for i,(dirpath,dirnames,filenames) in enumerate(os.walk(json_dir)):
    if dirpath is not json_dir:

        genre = dirpath.split('/')[-1]
        print("\n Processing : {}".format(genre))
        
        for index, f in enumerate(filenames):
            file_path = os.path.join(dirpath,f)
            id = f[1:5]
            seg = f[6]
            
            print(index, f)

            with open(file_path) as fp:
                data = json.load(fp)
                try:
                    data = np.array(data)
                    data = data.reshape(480000)
                    all = np.abs(librosa.core.cqt(data, 32000))
                    all = all[:,:,np.newaxis]
                except:
                    pass


                if index % 2 != 0:
                    with open(file_path) as fp:
                        data = json.load(fp)
                        data = np.array(data['wave'])
                        for i in range(3):
                            if i == 0:
                                drum = data[i]
                                drum = drum.reshape(480000)
                                drum = np.abs(librosa.core.cqt(drum, 32000))
                                drum = drum[:,:,np.newaxis]
                                da = np.concatenate((drum,all),axis=2)
                                np.save(os.path.join(save_dir_d,'{}_{}_da_{}_cqt'.format(genre,id,seg)),da)
                            elif i == 1:
                                bass = data[i]
                                bass = bass.reshape(480000)
                                bass = np.abs(librosa.core.cqt(bass, 32000))
                                bass = bass[:,:,np.newaxis]
                                ba = np.concatenate((bass,all),axis=2)
                                np.save(os.path.join(save_dir_b,'{}_{}_ba_{}_cqt'.format(genre,id,seg)),ba)
                            elif i == 2:
                                other = data[i]
                                other = other.reshape(480000)
                                other = np.abs(librosa.core.cqt(other, 32000))
                                other = other[:,:,np.newaxis]
                                oa = np.concatenate((other,all),axis=2)
                                np.save(os.path.join(save_dir_o,'{}_{}_oa_{}_cqt'.format(genre,id,seg)),oa)



 Processing : jazz
0 00237_0_all.json
1 00237_0.json
2 00237_1_all.json
3 00237_1.json
4 00590_0_all.json
5 00590_0.json
6 00590_1_all.json
7 00590_1.json
8 00591_0_all.json
9 00591_0.json
10 00591_1_all.json
11 00591_1.json

 Processing : pop
0 04064_0_all.json
1 04064_0.json
2 04064_1_all.json
3 04064_1.json
4 04065_0_all.json
5 04065_0.json
6 04065_1_all.json
7 04065_1.json
8 04066_0_all.json
9 04066_0.json
10 04066_1_all.json
11 04066_1.json
12 04067_0_all.json
13 04067_0.json
14 04067_1_all.json
15 04067_1.json


# shape : (84, 938)

In [0]:
import torch.nn as nn
import torch.nn.functional as F

class ResidualBlock(nn.Module):
    def __init__(self, in_features):
        super(ResidualBlock, self).__init__()

        conv_block = [  nn.ReflectionPad2d(1),
                        nn.Conv2d(in_features, in_features, 3),
                        nn.InstanceNorm2d(in_features),
                        nn.ReLU(inplace=True),
                        nn.ReflectionPad2d(1),
                        nn.Conv2d(in_features, in_features, 3),
                        nn.InstanceNorm2d(in_features)  ]

        self.conv_block = nn.Sequential(*conv_block)

    def forward(self, x):
        return x + self.conv_block(x)

class Generator(nn.Module):
    def __init__(self, input_nc, output_nc, n_residual_blocks=9):
        super(Generator, self).__init__()

        # Initial convolution block       
        model = [   nn.ReflectionPad2d(3),
                    nn.Conv2d(input_nc, 64, 7),
                    nn.InstanceNorm2d(64),
                    nn.ReLU(inplace=True) ]

        # Downsampling
        in_features = 64
        out_features = in_features*2
        for _ in range(2):
            model += [  nn.Conv2d(in_features, out_features, 3, stride=2, padding=1),
                        nn.InstanceNorm2d(out_features),
                        nn.ReLU(inplace=True) ]
            in_features = out_features
            out_features = in_features*2

        # Residual blocks
        for _ in range(n_residual_blocks):
            model += [ResidualBlock(in_features)]

        # Upsampling
        out_features = in_features//2
        for _ in range(2):
            model += [  nn.ConvTranspose2d(in_features, out_features, 3, stride=2, padding=1, output_padding=1),
                        nn.InstanceNorm2d(out_features),
                        nn.ReLU(inplace=True) ]
            in_features = out_features
            out_features = in_features//2

        # Output layer
        model += [  nn.ReflectionPad2d(3),
                    nn.Conv2d(64, output_nc, 7),
                    nn.Tanh() ]

        self.model = nn.Sequential(*model)

    def forward(self, x):
        return self.model(x)

class Discriminator(nn.Module):
    def __init__(self, input_nc):
        super(Discriminator, self).__init__()

        # A bunch of convolutions one after another
        model = [   nn.Conv2d(input_nc, 64, 4, stride=2, padding=1),
                    nn.LeakyReLU(0.2, inplace=True) ]

        model += [  nn.Conv2d(64, 128, 4, stride=2, padding=1),
                    nn.InstanceNorm2d(128), 
                    nn.LeakyReLU(0.2, inplace=True) ]

        model += [  nn.Conv2d(128, 256, 4, stride=2, padding=1),
                    nn.InstanceNorm2d(256), 
                    nn.LeakyReLU(0.2, inplace=True) ]

        model += [  nn.Conv2d(256, 512, 4, padding=1),
                    nn.InstanceNorm2d(512), 
                    nn.LeakyReLU(0.2, inplace=True) ]

        # FCN classification layer
        model += [nn.Conv2d(512, 1, 4, padding=1)]

        self.model = nn.Sequential(*model)

    def forward(self, x):
        x =  self.model(x)
        # Average pooling and flatten
        return F.avg_pool2d(x, x.size()[2:]).view(x.size()[0], -1)

In [5]:
!pip install visdom

Collecting visdom
[?25l  Downloading https://files.pythonhosted.org/packages/c9/75/e078f5a2e1df7e0d3044749089fc2823e62d029cc027ed8ae5d71fafcbdc/visdom-0.1.8.9.tar.gz (676kB)
[K     |▌                               | 10kB 16.8MB/s eta 0:00:01[K     |█                               | 20kB 3.3MB/s eta 0:00:01[K     |█▌                              | 30kB 4.3MB/s eta 0:00:01[K     |██                              | 40kB 4.6MB/s eta 0:00:01[K     |██▍                             | 51kB 3.7MB/s eta 0:00:01[K     |███                             | 61kB 4.2MB/s eta 0:00:01[K     |███▍                            | 71kB 4.4MB/s eta 0:00:01[K     |███▉                            | 81kB 4.8MB/s eta 0:00:01[K     |████▍                           | 92kB 5.0MB/s eta 0:00:01[K     |████▉                           | 102kB 5.0MB/s eta 0:00:01[K     |█████▎                          | 112kB 5.0MB/s eta 0:00:01[K     |█████▉                          | 122kB 5.0MB/s eta 0:00:01[K 

In [0]:
import glob
import random
import os

from torch.utils.data import Dataset
from PIL import Image
import torchvision.transforms as transforms

class Dataset(torch.utils.data.Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transforms.Compose(transform_)
        #self.unaligned = unaligned # ??

        lst_data = os.listdir(self.data_dir)
        self.lst_data_jazz = [f for f in lst_data if f.startswith('j')]
        self.lst_data_pop = [f for f in lst_data if f.startswith('p')]

    def __len__(self):
        return max(len(self.lst_data_jazz), len(self.lst_data_jazz))

    def __getitem__(self, index):
        jazz = np.load(os.path.join(self.data_dir,self.lst_data_jazz[index]))
        pop = np.load(os.path.join(self.data_dir,self.lst_data_pop[index]))
        
        if self.transform:
            jazz = self.transform(jazz)
            pop = self.transform(pop)
        
        return {'jazz': jazz, 'pop': pop}

## 트렌스폼 구현하기
class ToTensor(object):
    def __call__(self, data):
        for key, value in data.items():
            value = value.transpose((2, 0, 1)).astype(np.float32)
            data[key] = torch.from_numpy(value)

        return data

class Normalization(object):
    def __init__(self, mean=0.5, std=0.5):
        self.mean = mean
        self.std = std

    def __call__(self, data):
        for key, value in data.items():
            data[key] = (value - self.mean) / self.std

        return data



# DCGAN에 사용할 selanA image data가 DCGAN 모델의 generator output인 64x64와 맞지 않으므로
# resize 해주는 transform class 선언
class Resize(object):
    def __init__(self,shape):
        self.shape = shape

    def __call__(self, data):
        for key, value in data.items():
            data[key] = resize(value, output_shape=(self.shape[0],self.shape[1],
                                                    self.shape[2]))
        return data

class RandomCrop(object):
  def __init__(self, shape):
      self.shape = shape

  def __call__(self, data):
    # input, label = data['input'], data['label']
    # h, w = input.shape[:2]

    h, w = data['label'].shape[:2]
    new_h, new_w = self.shape

    top = np.random.randint(0, h - new_h)
    left = np.random.randint(0, w - new_w)

    id_y = np.arange(top, top + new_h, 1)[:, np.newaxis]
    id_x = np.arange(left, left + new_w, 1)

    # input = input[id_y, id_x]
    # label = label[id_y, id_x]
    # data = {'label': label, 'input': input}

    # Updated at Apr 5 2020
    for key, value in data.items():
        data[key] = value[id_y, id_x]

    return data

In [0]:
data_dir = '/content/drive/My Drive/ADV_Project_Music_style_transform/timbreTron_cycleGAN/cqt_sample/bass'

In [0]:
transform_ = [transforms.ToTensor(),transforms.Normalize((0.5,0.5),(0.5,0.5))]

In [0]:
dataloader = DataLoader(Dataset(data_dir, transform=transform_),batch_size=3,shuffle=True)

In [0]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [16]:
for i, data in enumerate(dataloader):
    real_j = data['jazz'].to(device)
    real_p = data['pop'].to(device)
    print(real_p.shape,real_j.shape)

torch.Size([3, 2, 84, 938]) torch.Size([3, 2, 84, 938])
torch.Size([3, 2, 84, 938]) torch.Size([3, 2, 84, 938])


In [0]:
class DECBR2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=True, norm="bnorm", relu=0.0,output_padding=0):
        super().__init__()

        layers = []
        layers += [nn.ConvTranspose2d(in_channels=in_channels, out_channels=out_channels,
                             kernel_size=kernel_size, stride=stride, padding=padding, output_padding = output_padding,
                             bias=bias)]

        if not norm is None:
            if norm == "bnorm":
                layers += [nn.BatchNorm2d(num_features=out_channels)]
            elif norm == "inorm":
                layers += [nn.InstanceNorm2d(num_features=out_channels)]

        if not relu is None and relu >= 0.0:
            layers += [nn.ReLU() if relu == 0 else nn.LeakyReLU(relu)]

        self.cbr = nn.Sequential(*layers)

    def forward(self, x):
        return self.cbr(x)

class CBR2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=True, norm="bnorm", relu=0.0):
        super().__init__()

        layers = []
        layers += [nn.Conv2d(in_channels=in_channels, out_channels=out_channels,
                             kernel_size=kernel_size, stride=stride, padding=padding,
                             bias=bias)]

        if not norm is None:
            if norm == "bnorm":
                layers += [nn.BatchNorm2d(num_features=out_channels)]
            elif norm == "inorm":
                layers += [nn.InstanceNorm2d(num_features=out_channels)]

        if not relu is None and relu >= 0.0:
            layers += [nn.ReLU() if relu == 0 else nn.LeakyReLU(relu)]

        self.cbr = nn.Sequential(*layers)

    def forward(self, x):
        return self.cbr(x)

In [0]:
class Pix2Pix_generator(nn.Module):
    def __init__(self, in_channels,out_channels,nker=64,norm='bnorm'):
        super(Pix2Pix_generator, self).__init__()

        # encoder
        # Leaky relu 사용, 첫번째 encoder는 batchnorm X
        self.enc1 = CBR2d(in_channels,1*nker,kernel_size=4, padding=1,stride=2,
        norm = None,relu=0.2)
        self.enc2 = CBR2d(1*nker,2*nker,kernel_size=4, padding=1,stride=2,
        norm = norm ,relu=0.2)
        self.enc3 = CBR2d(2*nker,4*nker,kernel_size=4, padding=1,stride=2,
        norm = norm,relu=0.2)
        self.enc4 = CBR2d(4*nker,8*nker,kernel_size=4, padding=1,stride=2,
        norm = norm,relu=0.2)
        self.enc5 = CBR2d(8*nker,8*nker,kernel_size=4, padding=1,stride=2,
        norm = norm,relu=0.2)
        self.enc6 = CBR2d(8*nker,8*nker,kernel_size=4, padding=1,stride=2,
        norm = norm,relu=0.2)


        # decoder, skip-connection 고려해서 input channel modeling
        self.dec1 = DECBR2d(8*nker, 8*nker, kernel_size=4, padding=1,
        norm = norm, relu=0.0, stride=2)
        self.drop1 = nn.Dropout2d(0.5)
        self.pad1 = nn.ReflectionPad2d((0,1,0,0)) # (left, right, top, bottom) 
        
        self.dec2 = DECBR2d(2 * 8 * nker, 8*nker, kernel_size=4, padding=1,
        norm = norm, relu=0.0, stride=2)
        self.drop2 = nn.Dropout2d(0.5)
        self.pad2 = nn.ReflectionPad2d((0,0,0,1))

        self.dec3 = DECBR2d(2*8*nker, 4*nker, kernel_size=4, padding=1,
        norm = norm, relu=0.0, stride=2)
        self.drop3 = nn.Dropout2d(0.5)
        self.pad3 = nn.ReflectionPad2d((1,0,0,0))

        self.dec4 = DECBR2d(2 * 4 *nker, 2*nker, kernel_size=4, padding=1,
        norm = norm, relu=0.0, stride=2)
        self.pad4 = nn.ReflectionPad2d((0,0,1,0))

        self.dec5 = DECBR2d(2*2*nker, 1*nker, kernel_size=4, padding=1,
        norm = norm, relu=0.0, stride=2)
        self.pad5 = nn.ReflectionPad2d((1,0,0,0))
        
        self.dec6 = DECBR2d(2*1*nker, out_channels, kernel_size=4, padding=1,
        norm = None, relu=None, stride=2)

    def forward(self, x):
        print(x.shape)
        enc1 = self.enc1(x)
        print(enc1.shape)
        enc2 = self.enc2(enc1)
        print(enc2.shape)
        enc3 = self.enc3(enc2)
        print(enc3.shape)
        enc4 = self.enc4(enc3)
        print(enc4.shape)
        enc5 = self.enc5(enc4)
        print(enc5.shape)
        enc6 = self.enc6(enc5)
        print(enc6.shape)
     

        dec1 = self.dec1(enc6)
        drop1 = self.drop1(dec1)
        pad1 = self.pad1(drop1)
        print(pad1.shape)

        cat2 = torch.cat((pad1,enc5),dim=1)
        dec2 = self.dec2(cat2)
        drop2 = self.drop2(dec2)
        pad2 = self.pad2(drop2)
        print(pad2.shape)

        cat3 = torch.cat((pad2,enc4),dim=1)
        dec3 = self.dec3(cat3)
        drop3 = self.drop3(dec3)
        pad3 = self.pad3(drop3)
        print(pad3.shape)

        cat4 = torch.cat((pad3,enc3),dim=1)
        print(cat4.shape)
        dec4 = self.dec4(cat4)
        pad4 = self.pad4(dec4)
        print(pad4.shape)

        cat5 = torch.cat((pad4,enc2),dim=1)
        dec5 = self.dec5(cat5)
        pad5 = self.pad5(dec5)
        print(pad5.shape)

        cat6 = torch.cat((pad5,enc1),dim=1)
        dec6 = self.dec6(cat6)
       
      
        x = torch.tanh(dec6)
        print(x.shape)

        return x



In [0]:
net = Pix2Pix_generator(2,2,norm='inorm').to(device)

In [212]:
out = net(real_j.float())

torch.Size([3, 2, 84, 938])
torch.Size([3, 64, 42, 469])
torch.Size([3, 128, 21, 234])
torch.Size([3, 256, 10, 117])
torch.Size([3, 512, 5, 58])
torch.Size([3, 512, 2, 29])
torch.Size([3, 512, 1, 14])
torch.Size([3, 512, 2, 29])
torch.Size([3, 512, 5, 58])
torch.Size([3, 256, 10, 117])
torch.Size([3, 512, 10, 117])
torch.Size([3, 128, 21, 234])
torch.Size([3, 64, 42, 469])
torch.Size([3, 2, 84, 938])


In [163]:
out.shape

torch.Size([3, 2, 84, 938])

In [0]:
# Networks
netG_A2B = Generator(2, 2).to(device)
netG_B2A = Generator(2, 2).to(device)
netD_A = Discriminator(2).to(device)
netD_B = Discriminator(2).to(device)

netG_A2B.apply(weights_init_normal)
netG_B2A.apply(weights_init_normal)
netD_A.apply(weights_init_normal)
netD_B.apply(weights_init_normal)

# Lossess
criterion_GAN = torch.nn.MSELoss().to(device)
criterion_cycle = torch.nn.L1Loss().to(device)
criterion_identity = torch.nn.L1Loss().to(device)

# Optimizers & LR schedulers
optimizer_G = torch.optim.Adam(itertools.chain(netG_A2B.parameters(), netG_B2A.parameters()),
                                lr=opt.lr, betas=(0.5, 0.999))
optimizer_D_A = torch.optim.Adam(netD_A.parameters(), lr=opt.lr, betas=(0.5, 0.999))
optimizer_D_B = torch.optim.Adam(netD_B.parameters(), lr=opt.lr, betas=(0.5, 0.999))

lr_scheduler_G = torch.optim.lr_scheduler.LambdaLR(optimizer_G, lr_lambda=LambdaLR(opt.n_epochs, opt.epoch, opt.decay_epoch).step)
lr_scheduler_D_A = torch.optim.lr_scheduler.LambdaLR(optimizer_D_A, lr_lambda=LambdaLR(opt.n_epochs, opt.epoch, opt.decay_epoch).step)
lr_scheduler_D_B = torch.optim.lr_scheduler.LambdaLR(optimizer_D_B, lr_lambda=LambdaLR(opt.n_epochs, opt.epoch, opt.decay_epoch).step)




NameError: ignored