In [1]:

import torch
import os
import random
from torch.utils.data import Dataset
from torch.utils.data import DataLoader, TensorDataset

import librosa
import librosa.display
import numba.decorators
import numpy as np
import matplotlib.pyplot as plt
from numba.decorators import jit as optional_jit


from google.colab import auth
auth.authenticate_user()

from google.colab import drive
drive.mount('/content/gdrive')


#PATH = 'C://Projects//keras_talk//keras//intern//dataset//'
PATH = '/content/gdrive/My Drive/dataset/'

train_size = 800
test_size = 200

EPOCHS = 10
BATCH_SIZE = 40


def Y_DATA(y_data):
    for idx in range(y_data.shape[0]):
        y = y_data[idx]
        if y < 0:  y_data[idx] = 10
        else:      y_data[idx] = (y//20)
    return y_data

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


##### data normalization
###### 

In [2]:

import numpy as np


dataset_dict = { 0 : 'S_left',        1 : 'S_left_phase',
                 2 : 'S_right',       3 : 'S_right_phase',
                 4 : 'clean_left',    5 : 'clean_left_phase',
                 6 : 'clean_right',   7 : 'clean_right_phase',
                 8 : 'idx_drone_end', 9 : 'idx_voice_end',
                10 : 'idx_voice_start'}


x_data_list = [0,2,1,3]


numpy_dict = dict()
for n in x_data_list:
    numpy_name    = dataset_dict[n]
    numpy_dict[n] = np.load( PATH + numpy_name + '.npy' )
    


'''    x_data,       y_data '''
'''(1000,6,257,382), (1000,)'''

x_data = []
for idx in range(1000):
    x_element = []

    for n in x_data_list:
        x_element.append( numpy_dict[n][:,:,idx] )

   
    
    # log scale 변환 [dB]
    x_L = numpy_dict[0][:,:,idx]
    x_R = numpy_dict[2][:,:,idx]

    x_L = 20*np.log10( np.abs(x_L) + np.finfo(np.float32).eps )
    x_R = 20*np.log10( np.abs(x_R) + np.finfo(np.float32).eps )

    # even mode, odd mode 
    #x_even = (x_L + x_R)/2
    x_odd  = (x_L - x_R)/2


    #x_element.append(x_even)
    x_element.append(x_odd)
    x_element = np.asarray( x_element )



    # normalization
    for k in range(len(x_element)):
        x_mean = x_element[k].mean()
        x_stdv = x_element[k].std()
        x_element[k] = ( (x_element[k] - x_mean ) / x_stdv)
    

    x_data.append( x_element )


x_data = np.asarray(x_data)
y_data = Y_DATA( np.load(PATH + 'angle.npy') )
print('done..')

done..


In [3]:

#x_data = x_data.reshape()
#y_data = y_data.reshape()



x_data = torch.from_numpy( x_data ).float().to('cuda')
y_data = torch.from_numpy( y_data ).long().to('cuda')

full_dataset = TensorDataset( x_data, y_data )


train_dataset, valid_dataset = torch.utils.data.random_split( full_dataset, [train_size, test_size])
train_dataset = DataLoader( dataset=train_dataset, batch_size = BATCH_SIZE, shuffle=True, drop_last=True)
valid_dataset = DataLoader( dataset=valid_dataset, batch_size = BATCH_SIZE, shuffle=True, drop_last=True)


#### DenseNet

In [4]:
import re
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import OrderedDict
from torch import Tensor



#### ResNet

In [9]:

def conv3x3(in_planes, out_planes, stride=1, groups=1, dilation=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=dilation, groups=groups, bias=False, dilation=dilation)


def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)





class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, inplanes, planes, stride=1, downsample=None, groups=1,
                 base_width=64, dilation=1, norm_layer=None):
        super(Bottleneck, self).__init__()
        
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        
        width = int(planes * (base_width / 64.)) * groups
        # Both self.conv2 and self.downsample layers downsample the input when stride != 1
        
        self.conv1 = conv1x1(inplanes, width)
        self.bn1 = norm_layer(width)
        self.conv2 = conv3x3(width, width, stride, groups, dilation)
        self.bn2 = norm_layer(width)
        self.conv3 = conv1x1(width, planes * self.expansion)
        self.bn3 = norm_layer(planes * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x) # 1x1 stride = 1
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out) # 3x3 stride = stride
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out) # 1x1 planes, plaines * self.expansion
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out



class ResNet(nn.Module):

    def __init__(self, block, layers, num_classes=11, zero_init_residual=False,
                 groups=1, width_per_group=64, replace_stride_with_dilation=None,
                 norm_layer=None):
        
        super(ResNet, self).__init__()
        
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer

        self.inplanes = 64
        self.dilation = 1
        
        if replace_stride_with_dilation is None:
            # each element in the tuple indicates if we should replace
            # the 2x2 stride with a dilated convolution instead
            replace_stride_with_dilation = [False, False, False]
        
        if len(replace_stride_with_dilation) != 3:
            raise ValueError("replace_stride_with_dilation should be None "
                             "or a 3-element tuple, got {}".format(replace_stride_with_dilation))
        
        

        self.groups = groups
        self.base_width = width_per_group

        # input : 4 x 257 x 382
        self.conv1 = nn.Conv2d(5, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False)
        
        # output = self.conv1( input )
        # output : 64 x 129 x 191
        self.bn1 = norm_layer(self.inplanes)
        self.relu = nn.ReLU(inplace=True)

        # input : 64 x 65 x 86
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        # output : 64*65*86

        #[layers] = [3,4,6,3]
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0])
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1])
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2])
        
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, Bottleneck):
                    nn.init.constant_(m.bn3.weight, 0)
                elif isinstance(m, BasicBlock):
                    nn.init.constant_(m.bn2.weight, 0)


    def _make_layer(self, block, planes, blocks, stride=1, dilate=False):
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        if dilate:
            self.dilation *= stride
            stride = 1
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride),
                norm_layer(planes * block.expansion),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample, self.groups,
                            self.base_width, previous_dilation, norm_layer))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes, groups=self.groups,
                                base_width=self.base_width, dilation=self.dilation,
                                norm_layer=norm_layer))

        return nn.Sequential(*layers)




    def _forward_impl(self, x):
        # See note [TorchScript super()]
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

    def forward(self, x):
        return self._forward_impl(x)



def _resnet(arch, block, layers, pretrained, progress, **kwargs):
    model = ResNet(block, layers, **kwargs)
    if pretrained:
        state_dict = load_state_dict_from_url(model_urls[arch], progress=progress)
        model.load_state_dict(state_dict)
    return model



def resnet_custom(pretrained=False, progress=True, **kwargs):

    return _resnet('resnet101', Bottleneck, [3, 5, 5, 3], pretrained, progress, **kwargs)

##### model train

In [10]:
torch.manual_seed(1)


#model = densenet_custom().to('cuda')
model = resnet_custom().to('cuda')

criterion = nn.CrossEntropyLoss().to('cuda')
#optimizer = torch.optim.SGD(model.parameters(), lr=0.00001, weight_decay=0.9)
optimizer = torch.optim.Adagrad(model.parameters(), lr=0.00001)

total=0
corr=0
model.train()
for epoch in range(50):
    print('epoch' + str(epoch+1))
    
    for i, (data, label) in enumerate(train_dataset):
        (data, label) = (data.to('cuda'), label.to('cuda'))

        optimizer.zero_grad()
        output = model(data)
  
        loss = F.nll_loss(output, label.reshape(BATCH_SIZE))
        loss.backward()
        optimizer.step()
        
        preds = output.data.max(1)[1]
        total += BATCH_SIZE
        corr  += (preds==label.reshape(BATCH_SIZE)).sum().item()
        

        if i%4 == 0: print('\tLoss: {:.3f}'.format(loss.item()))
    print('   Acc: {:.3f}'.format( 100*corr/total ))
        

epoch1
	Loss: 0.068
	Loss: -0.069
	Loss: -0.058
	Loss: -0.093
	Loss: -0.071
   Acc: 9.375
epoch2
	Loss: -0.114
	Loss: -0.078
	Loss: -0.144
	Loss: -0.070
	Loss: -0.182
   Acc: 9.938
epoch3
	Loss: -0.175
	Loss: -0.217
	Loss: -0.152
	Loss: -0.188
	Loss: -0.207
   Acc: 10.292
epoch4
	Loss: -0.347
	Loss: -0.159
	Loss: -0.436
	Loss: -0.323
	Loss: -0.301
   Acc: 10.719
epoch5
	Loss: -0.266
	Loss: -0.388
	Loss: -0.322
	Loss: -0.332
	Loss: -0.241
   Acc: 11.025
epoch6
	Loss: -0.340
	Loss: -0.434
	Loss: -0.427
	Loss: -0.427
	Loss: -0.385
   Acc: 11.167
epoch7
	Loss: -0.369
	Loss: -0.403
	Loss: -0.484
	Loss: -0.321
	Loss: -0.344
   Acc: 11.500
epoch8
	Loss: -0.431
	Loss: -0.453
	Loss: -0.472
	Loss: -0.462
	Loss: -0.437
   Acc: 11.688
epoch9
	Loss: -0.395
	Loss: -0.499
	Loss: -0.411
	Loss: -0.537
	Loss: -0.542
   Acc: 11.889
epoch10
	Loss: -0.577
	Loss: -0.477
	Loss: -0.572
	Loss: -0.473
	Loss: -0.618
   Acc: 12.025
epoch11
	Loss: -0.574
	Loss: -0.695
	Loss: -0.554
	Loss: -0.550
	Loss: -0.602
   A

In [13]:
model.eval()

with torch.no_grad():
    correct = 0
    total = 0
    for data, label in valid_dataset:
        output = model(data)
        preds  = torch.max(output.data, 1)[1]
        total   += len(label)
        
        label = label.reshape(BATCH_SIZE)
        correct += (preds==label).sum().item()
      
    print('Test Accuracy: ', 100.*correct/total)

Test Accuracy:  7.0
