#  Spectrogram CNN Model    
### Accuracy - 50%

In [None]:
import numpy as np
import torch
import torch.nn as nn

from torch.utils.data import Dataset, DataLoader

### Model Parameters 

In [None]:
learning_rate = 3e-4
training_epochs = 100
batch_size = 32
device = 'cpu'

### 데이터 로드

In [None]:
print('Loading Data...')
file = np.load('./dataset.npz')
print('Loading Finished')

### 데이터 할당

In [None]:
print('Allocate Data ...')
S_left = file['S_left']
S_right = file['S_right']
S_left_phase = file['S_left_phase']
S_right_phase = file['S_right_phase']

phi = file['phi'][0:1000]

### 데이터 전처리

In [None]:
print('Transforming ...')
S_left = np.transpose(S_left, (2, 0, 1)).reshape(1000, 1, 257, 382)
S_right = np.transpose(S_right, (2, 0, 1)).reshape(1000, 1, 257, 382)
S_left_phase = np.transpose(S_left_phase, (2, 0, 1)).reshape(1000, 1, 257, 382)
S_right_phase =np.transpose(S_right_phase, (2, 0, 1)).reshape(1000, 1, 257, 382)


for i in range(0, 1000):
    S_left[i] = (S_left[i] - np.mean(S_left[i])) / np.std(S_left[i])
    S_right[i] = (S_right[i] - np.mean(S_right[i])) / np.std(S_right[i])
    S_left_phase[i] = (S_left_phase[i] - np.mean(S_left_phase[i])) / np.std(S_left_phase[i])
    S_right_phase[i] = (S_right_phase[i] - np.mean(S_right_phase[i])) / np.std(S_right_phase[i])

    S_left[i] = (S_left[i] - np.min(S_left[i])) / (np.max(S_left[i]) - np.min(S_left[i]))
    S_right[i] = (S_right[i] - np.min(S_right[i])) / (np.max(S_right[i]) - np.min(S_right[i]))
    S_left_phase[i] = (S_left_phase[i] - np.min(S_left_phase[i])) / (np.max(S_left_phase[i]) - np.min(S_left_phase[i]))
    S_right_phase[i] = (S_right_phase[i] - np.min(S_right_phase[i])) / (np.max(S_right_phase[i]) - np.min(S_right_phase[i]))

S_left = np.log(np.add(S_left, 1))
S_right = np.log(np.add(S_right, 1))
S_left_phase = np.log(np.add(S_left_phase, 1))
S_right_phase = np.log(np.add(S_right_phase, 1))

phi = np.add(phi, 20)
phi[phi == 19] = 0
phi = np.divide(phi, 20)

phi = phi.astype(np.int16).reshape(1000,)
print('Transforming Finished')

### 데이터 병합

In [None]:
print('Merging Data ...')
input_data = np.concatenate((S_left, S_right, S_left_phase, S_right_phase), axis=1)
print('Merging Finished!')

### 훈련셋과 테스트셋으로 분할

In [None]:
# Divide datasets into training sets and evaluation sets.
def train_test_split(X, y, test_size=0.2, shuffle=True, random_state=None):
    test_num = int(X.shape[0] * test_size)
    train_num = X.shape[0] - test_num

    if shuffle:
        np.random.seed(random_state)
        shuffle = np.random.permutation(X.shape[0])
        X = X[shuffle, :]
        y = y[shuffle, ]
        X_train = X[:train_num]
        y_train = y[:train_num]
        X_test = X[train_num:]
        y_test = y[train_num:]
    else:
        X_train = X[:train_num]
        y_train = y[:train_num]
        X_test = X[train_num:]
        y_test = y[train_num:]

    return X_train, X_test, y_train, y_test
print('Dividing datasets ...')
X_train, X_test, y_train, y_test = train_test_split(input_data, phi, test_size=0.2, shuffle=True)
non1, X_val, non2, y_val = train_test_split(X_train, y_train, test_size=0.25, shuffle=True)

### PyTorch 데이터셋 구성

In [None]:
class Custom_Dataset(Dataset):

    def __init__(self, X, Y):
        self.X = torch.from_numpy(X).float()
        self.Y = torch.from_numpy(Y).long()

    def __len__(self):
        return len(self.X)

    def __getitem__(self, index):
        X = self.X[index]
        Y = self.Y[index]
        return X, Y

train_set = Custom_Dataset(X=X_train, Y=y_train)
test_set = Custom_Dataset(X=X_test, Y=y_test)
valid_set = Custom_Dataset(X=X_val, Y=y_val)

### 데이터 로더

In [None]:
train_loader = DataLoader(dataset=train_set,
                          batch_size=batch_size,
                          shuffle=True,
                          drop_last=True)
test_loader = DataLoader(dataset=test_set,
                         batch_size=batch_size,
                         shuffle=True,
                         drop_last=False)
valid_loader = DataLoader(dataset=valid_set,
                         batch_size=batch_size,
                         shuffle=True,
                         drop_last=False)

####  메모리 부족으로 인한 문제 방지를 위해 변수 삭제

In [None]:
del file, S_left, S_right, S_left_phase, S_right_phase, input_data, phi, non1, non2

### CNN 모델 구성

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN,self).__init__()
        self.Features = nn.Sequential(
            nn.Conv2d(in_channels=4, out_channels=8, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3),
            nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3),
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3)
        )
        self.FC1 = nn.Linear(in_features=768, out_features=768, bias=True)
        self.FC2 = nn.Linear(in_features=768, out_features=11, bias=True)
        torch.nn.init.kaiming_normal_(self.FC1.weight)
        torch.nn.init.kaiming_normal_(self.FC2.weight)
        self.Classifier = nn.Sequential(
            self.FC1,
            nn.Dropout(p=0.5),
            self.FC2
        )
    def forward(self, inputs):
        output = self.Features(inputs)
        output = output.view(output.size(0), -1)
        output = self.Classifier(output)

        return output

model = CNN().to(device)

### Loss function 구성

In [None]:
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

### 학습

In [None]:
total_batch = len(train_loader)

print('Learning Started')
for epoch in range(training_epochs):
    avg_cost = 0

    for X, Y in train_loader:
        X = X.to(device)
        Y = Y.to(device)

        optimizer.zero_grad()
        hypothesis = model(X)
        cost = criterion(hypothesis, Y)
        cost.backward()
        optimizer.step()

        avg_cost += cost / total_batch

        with torch.no_grad():
            X_val = valid_set.X.view(200, 4, 257, 382).float().to(device)
            Y_val = valid_set.Y.to(device)

            X_test = test_set.X.view(200, 4, 257, 382).float().to(device)
            Y_test = test_set.Y.to(device)

            prediction = model(X_val)
            correct_prediction = torch.argmax(prediction, 1) == Y_val
            accuracy = correct_prediction.float().mean()

            test_prediction = model(X_test)
            test_correct_prediction = torch.argmax(test_prediction, 1) == Y_test
            test_accuracy = test_correct_prediction.float().mean()


    print('[Epoch : {:>4}] \n Cost = {:>.9} \t Val_Accuracy = {:>.4} \t Test Accuracy = {:>.4}'.format(epoch + 1, avg_cost, accuracy.item(), test_accuracy.item()))
print('Learning Finished')