## Train test spilt base on folder

In [13]:
import os
from sklearn.utils import shuffle
import pandas as pd
from sklearn.model_selection import KFold
import shutil

SMOOTH_IMAGE_PATH = "../audio-image/smooth/"
ROCK_IMAGE_PATH = "../audio-image/rock/"

filename_list = []
label_list = []

SMOOTH_LABEL = "smooth"
ROCK_LABEL = "rock"

for filename in os.listdir(SMOOTH_IMAGE_PATH):
    if filename == ".DS_Store":
        continue
    filename_list.append(filename)
    label_list.append(SMOOTH_LABEL)
for filename in os.listdir(ROCK_IMAGE_PATH):
    if filename == ".DS_Store":
        continue
    filename_list.append(filename)
    label_list.append(ROCK_LABEL)

# Put into dataframe for shuffle and split
data = {"filename": filename_list, "label": label_list}
df = pd.DataFrame(data)

#shuffle the data and do K-fold split
df = shuffle(df)

kf = KFold(n_splits=4)
split_index = 0

os.mkdir("../dataset/")
TRAIN_PATH = "train/"
VAL_PATH = "val/"
TRAIN_SMOOTH_PATH = "train/smooth/"
TRAIN_ROCK_PATH = "train/rock/"
VAL_SMOOTH_PATH = "val/smooth/"
VAL_ROCK_PATH = "val/rock/"

for train_index, val_index in kf.split(df):
    split_index += 1
    SPLIT_FOLDER_PATH = "../dataset/K" + str(split_index) + "/"

    os.mkdir(SPLIT_FOLDER_PATH)
    os.mkdir(SPLIT_FOLDER_PATH + TRAIN_PATH)
    os.mkdir(SPLIT_FOLDER_PATH + TRAIN_SMOOTH_PATH)
    os.mkdir(SPLIT_FOLDER_PATH + TRAIN_ROCK_PATH)
    os.mkdir(SPLIT_FOLDER_PATH + VAL_PATH)
    os.mkdir(SPLIT_FOLDER_PATH + VAL_SMOOTH_PATH)
    os.mkdir(SPLIT_FOLDER_PATH + VAL_ROCK_PATH)

    train_set = df.iloc[train_index]
    val_set = df.iloc[val_index]

    for i in train_set.index:
        fn = train_set["filename"][i]
        if train_set["label"][i] == SMOOTH_LABEL:
            shutil.copyfile(SMOOTH_IMAGE_PATH + fn, SPLIT_FOLDER_PATH + TRAIN_SMOOTH_PATH + fn)
        else:
            shutil.copyfile(ROCK_IMAGE_PATH + fn, SPLIT_FOLDER_PATH + TRAIN_ROCK_PATH + fn)

    for i in val_set.index:
        fn = val_set["filename"][i]
        if val_set["label"][i] == SMOOTH_LABEL:
            shutil.copyfile(SMOOTH_IMAGE_PATH + fn, SPLIT_FOLDER_PATH + VAL_SMOOTH_PATH + fn)
        else:
            shutil.copyfile(ROCK_IMAGE_PATH + fn, SPLIT_FOLDER_PATH + VAL_ROCK_PATH + fn)

0
20
40
30
10
30
10
30
10
30
10


## Put into dataloader

In [99]:
import torch
from torchvision import transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader

# define some constants
INPUT_WIDTH = 128
INPUT_HEIGHT = 128
NUM_OF_SPLIT = 4
BATCH_SIZE = 1

# Define container
train_dataloader_list = []
val_dataloader_list = []

# define custom transform
resize_transform = transforms.Resize(size=(INPUT_HEIGHT, INPUT_WIDTH))

# mix transform for
custom_transform = transforms.Compose([resize_transform, transforms.ToTensor()])

for i in range(NUM_OF_SPLIT):
    index = i + 1
    TRAIN_ROOT_PATH = "../dataset/K" +  str(index) + "/train/"
    VAL_ROOT_PATH = "../dataset/K" + str(index) + "/val/"

    train_dataset = ImageFolder(root=TRAIN_ROOT_PATH, transform=custom_transform)
    val_dataset = ImageFolder(root=VAL_ROOT_PATH, transform=custom_transform)

    train_dataloader_list.append(DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False))
    val_dataloader_list.append(DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False))

## Define CNN model

In [100]:
import torch.nn as nn
import torch.nn.functional as F


class CNNModel(nn.Module):
    def __init__(self):
        super().__init__()

        # In the init function, we define each layer we will use in our model

        # Our images are RGB, so we have input channels = 3.
        # We will apply 12 filters in the first convolutional layer
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=12, kernel_size=3, stride=1, padding=1)

        # A second convolutional layer takes 12 input channels, and generates 24 outputs
        self.conv2 = nn.Conv2d(in_channels=12, out_channels=24, kernel_size=3, stride=1, padding=1)

        # We in the end apply max pooling with a kernel size of 2
        self.pool = nn.MaxPool2d(kernel_size=2)

        # Our 128x128 image tensors will be pooled twice with a kernel size of 2. 128/2/2 is 32.
        # This means that our feature tensors are now 32 x 32, and we've generated 24 of them

        # We need to flatten these in order to feed them to a fully-connected layer
        self.fc = nn.Linear(in_features=32 * 32 * 24, out_features=2)

    def forward(self, x):
        # In the forward function, pass the data through the layers we defined in the init function

        # Use a ReLU activation function after layer 1 (convolution 1 and pool)
        x = F.relu(self.pool(self.conv1(x)))

        # Use a ReLU activation function after layer 2
        x = F.relu(self.pool(self.conv2(x)))

        # Flatten
        x = x.view(-1, 32 * 32 * 24)
        # Feed to fully-connected layer to predict class
        x = self.fc(x)
        # Return class probabilities via a log_softmax function
        return torch.log_softmax(x, dim=1)

k1_cnn = CNNModel()
k2_cnn = CNNModel()
k3_cnn = CNNModel()
k4_cnn = CNNModel()


## Define loss function and optimizer

In [101]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
k1_optimizer = optim.SGD(k1_cnn.parameters(), lr=0.0001, momentum=0.9)
k2_optimizer = optim.SGD(k2_cnn.parameters(), lr=0.0001, momentum=0.9)
k3_optimizer = optim.SGD(k3_cnn.parameters(), lr=0.0001, momentum=0.9)
k4_optimizer = optim.SGD(k4_cnn.parameters(), lr=0.0001, momentum=0.9)


## Training model and print result

In [102]:
def train_model(cnn_model, optimizer, criterion_func,  train_dataloader, val_dataloader, model_name):
    for index, data in enumerate(train_dataloader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = cnn_model(inputs)
        loss = criterion_func(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        #print(f'epoch: {epoch + 1}, iteration: {i + 1}, loss: {loss.item()}')

    correct = 0
    total = 0

    with torch.no_grad():
        for d in val_dataloader:
            images, labels = d
            # calculate outputs by running images through the network
            outputs = cnn_model(images)
            # the class with the highest energy is what we choose as prediction
            _, predicted = torch.max(outputs.data, 1)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        print(f'Accuracy: {100 * correct / total} %')

    print("Finish training " + model_name + " in one epoch")

num_of_epoch = 20
for epoch in range(num_of_epoch):  # loop over the dataset multiple times
    train_model(k1_cnn, k1_optimizer, criterion, train_dataloader_list[0], val_dataloader_list[0], "K1")
    train_model(k2_cnn, k2_optimizer, criterion, train_dataloader_list[1], val_dataloader_list[1], "K2")
    train_model(k3_cnn, k3_optimizer, criterion, train_dataloader_list[2], val_dataloader_list[2], "K3")
    train_model(k4_cnn, k4_optimizer, criterion, train_dataloader_list[3], val_dataloader_list[3], "K4")

Accuracy: 70.0 %
Finish training K1 in one epoch
Accuracy: 40.0 %
Finish training K2 in one epoch
Accuracy: 40.0 %
Finish training K3 in one epoch
Accuracy: 30.0 %
Finish training K4 in one epoch
Accuracy: 90.0 %
Finish training K1 in one epoch
Accuracy: 40.0 %
Finish training K2 in one epoch
Accuracy: 70.0 %
Finish training K3 in one epoch
Accuracy: 30.0 %
Finish training K4 in one epoch
Accuracy: 90.0 %
Finish training K1 in one epoch
Accuracy: 40.0 %
Finish training K2 in one epoch
Accuracy: 100.0 %
Finish training K3 in one epoch
Accuracy: 30.0 %
Finish training K4 in one epoch
Accuracy: 90.0 %
Finish training K1 in one epoch
Accuracy: 40.0 %
Finish training K2 in one epoch
Accuracy: 100.0 %
Finish training K3 in one epoch
Accuracy: 30.0 %
Finish training K4 in one epoch
Accuracy: 100.0 %
Finish training K1 in one epoch
Accuracy: 40.0 %
Finish training K2 in one epoch
Accuracy: 100.0 %
Finish training K3 in one epoch
Accuracy: 30.0 %
Finish training K4 in one epoch
Accuracy: 100.0 

## Analysis and result

In [103]:
from sklearn.metrics import confusion_matrix, accuracy_score, recall_score, precision_score
import numpy as np

def get_result(cnn_model, val_dataloader):
    with torch.no_grad():
        pred = []
        ans = []
        for d in val_dataloader:
            images, labels = d
            # calculate outputs by running images through the network
            outputs = cnn_model(images)
            # the class with the highest energy is what we choose as prediction
            _, predicted = torch.max(outputs.data, 1)
            pred.append(predicted.item())
            ans.append(labels.item())

    return pred, ans


predict = [] # An 2D array storing 4 models predictions
answer = [] # An 2D array storing 4 models' validation correct answers

p, a = get_result(k1_cnn, val_dataloader_list[0])
predict.append(p)
answer.append(a)
p, a = get_result(k2_cnn, val_dataloader_list[1])
predict.append(p)
answer.append(a)
p, a = get_result(k3_cnn, val_dataloader_list[2])
predict.append(p)
answer.append(a)
p, a = get_result(k4_cnn, val_dataloader_list[3])
predict.append(p)
answer.append(a)

# get average of confusion matrix
confusion_matrix_list = []
for i in range(len(predict)):
    confusion_matrix_list.append(confusion_matrix(answer[i], predict[i]))


result_confusion_matrix = [[0, 0], [0, 0]]
for c in confusion_matrix_list:
    for row in range(len(c)):
        for col in range(len(c[row])):
            result_confusion_matrix[row][col] += c[row][col]

for row in range(len(result_confusion_matrix)):
    for col in range(len(result_confusion_matrix[row])):
        result_confusion_matrix[row][col] = result_confusion_matrix[row][col] / 4.0

print(result_confusion_matrix)

[[5.0, 0.0], [0.0, 5.0]]


In [105]:
# get average accuracy, recall and precision
accuracy_list = []
recall_list = []
precision_list = []

for i in range(len(predict)):
    accuracy_list.append(accuracy_score(answer[i], predict[i]))
    recall_list.append(recall_score(answer[i], predict[i]))
    precision_list.append(precision_score(answer[i], predict[i]))

print(accuracy_list)
print(recall_list)
print(precision_list)

print("Average accuracy score: " + str(sum(accuracy_list) / 4))
print("Average recall score: " + str(sum(recall_list) / 4))
print("Average precision score: " + str(sum(precision_list) / 4))

[1.0, 1.0, 1.0, 1.0]
[1.0, 1.0, 1.0, 1.0]
[1.0, 1.0, 1.0, 1.0]
Average accuracy score: 1.0
Average recall score: 1.0
Average precision score: 1.0
