# FCN

In [1]:
# please upload data directory to Google drive
# use Google Drive to load data
from google.colab import drive, files
import zipfile

drive.mount('/content/drive')
filename = 'requirements.txt'
uploaded = files.upload()

Mounted at /content/drive


Saving requirements.txt to requirements.txt


In [2]:
!pip install -r "./requirements.txt"
!pip install torchinfo
!pip install torcheval

Collecting anyio==4.3.0 (from -r ./requirements.txt (line 1))
  Downloading anyio-4.3.0-py3-none-any.whl (85 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/85.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m85.6/85.6 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
Collecting arrow==1.3.0 (from -r ./requirements.txt (line 4))
  Downloading arrow-1.3.0-py3-none-any.whl (66 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/66.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m66.4/66.4 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting asttokens==2.4.1 (from -r ./requirements.txt (line 5))
  Downloading asttokens-2.4.1-py2.py3-none-any.whl (27 kB)
Collecting async-lru==2.0.4 (from -r ./requirements.txt (line 6))
  Downloading async_lru-2.0.4-py3-none-any.whl (6.1 kB)
Collecting colorama==0.4.6 (from -r ./requirements.tx

In [14]:
from torch.nn import (Module, Sequential, Conv2d, BatchNorm2d, ReLU, MaxPool2d,
                      Linear, AdaptiveAvgPool2d, BatchNorm1d, Sigmoid, Dropout)
import torch
from sklearn.model_selection import train_test_split
import os
import librosa
import numpy as np
from torchinfo import summary
from torch import nn
from torch.optim import Adam
from torcheval.metrics import MulticlassAUROC, MulticlassF1Score
from torch.utils.data import Dataset, DataLoader, TensorDataset
import matplotlib.pyplot as plt

# Add Methods

In [15]:
# Util
def create_dataloader(x, y, batch_size=64):
    x = torch.tensor(x, dtype=torch.float).cuda()
    y = torch.tensor(y, dtype=torch.long).cuda()
    data = TensorDataset(x, y)
    dataloader = DataLoader(data, batch_size=batch_size, shuffle=True)
    return dataloader


def k_fold_cross_validation(x, y, k):
    fold_size = x.shape[0] // k
    xs_train = []
    ys_train = []
    xs_valid = []
    ys_valid = []
    for i in range(k - 1):
        xs_valid.append(x[fold_size * i:fold_size * (i + 1)])
        ys_valid.append(y[fold_size * i:fold_size * (i + 1)])
        xs_train.append(np.concatenate([x[:fold_size * i], x[fold_size * (i + 1):]], axis=0))
        ys_train.append(np.concatenate([y[:fold_size * i], y[fold_size * (i + 1):]], axis=0))
    xs_valid.append(x[fold_size * (k - 1):])
    ys_valid.append(y[fold_size * (k - 1):])
    xs_train.append(x[:fold_size * (k - 1)])
    ys_train.append(y[:fold_size * (k - 1)])
    return xs_train, ys_train, xs_valid, ys_valid

def train(model, loss_function, opt, dataloaders_train, dataloaders_valid, k, epoch=10):
    epochs_loss = []
    epochs_accuracy = []
    for i in range(epoch):
        print("-------epoch  {} -------".format(i + 1))
        epoch_loss = 0
        epoch_accuracy = 0

        for j in range(k):
            print(f'fold {j + 1}:')

            # train
            loss_train = 0
            accuracy_train = 0
            train_size = 0
            for batch_idx, (data, target) in enumerate(dataloaders_train[j]):
                data = data.cuda()
                target = target.cuda()
                model.train()
                output = model(data)
                loss = loss_function(output, target)
                opt.zero_grad()
                loss.backward()
                opt.step()
                data_size = len(data)
                loss_train += loss.item() * data_size
                accuracy_train += (output.argmax(1) == target).sum()
                train_size += data_size
            print("train set loss: {}".format(loss_train / train_size))
            print("train set accuracy: {}".format(accuracy_train / train_size))

            # valid
            loss_valid = 0
            accuracy_valid = 0
            valid_size = 0
            for batch_idx, (data, target) in enumerate(dataloaders_valid[j]):
                data = data.cuda()
                target = target.cuda()
                model.eval()
                with torch.no_grad():
                    output = model(data)
                    loss = loss_function(output, target)
                    data_size = len(data)
                    loss_valid += loss.item() * data_size
                    accuracy_valid += (output.argmax(1) == target).sum()
                    valid_size += data_size
            print("valid set loss: {}".format(loss_valid / valid_size))
            print("valid set accuracy: {}".format(accuracy_valid / valid_size))
            epoch_loss += loss_valid / valid_size
            epoch_accuracy += accuracy_valid / valid_size
        epoch_loss = round(epoch_loss / k, 3)
        epoch_accuracy = round(float(epoch_accuracy) / k, 3)
        print(f"epoch loss: {epoch_loss}")
        print(f"epoch accuracy: {epoch_accuracy}")
        epochs_loss.append(epoch_loss)
        epochs_accuracy.append(epoch_accuracy)
    return {"loss": epochs_loss, "accuracy": epochs_accuracy}


def test(model, loss_function, dataloader_test):
    loss_test = 0
    accuracy_test = 0
    AUC_test = 0
    f1_score_test = 0
    test_size = 0
    for batch_idx, (data, target) in enumerate(dataloader_test):
        data = data.cuda()
        target = target.cuda()
        model.eval()
        with torch.no_grad():
            output = model(data)
            loss = loss_function(output, target)
            loss_test += loss.item() * len(data)
            accuracy_test += (output.argmax(1) == target).sum()
            test_size += len(data)
            auc = MulticlassAUROC(num_classes=10)
            auc.update(output, target)
            AUC_test += auc.compute() * len(data)
            auc.reset()
            f1 = MulticlassF1Score(num_classes=10)
            f1.update(output, target)
            f1_score_test += f1.compute() * len(data)
            f1.reset()
    loss = round(loss_test / test_size, 3)
    accuracy = round(float(accuracy_test) / test_size,3)
    AUC = round(float(AUC_test) / test_size, 3)
    f1 = round(float(f1_score_test) / test_size,3)
    print(f"test set loss: {loss}")
    print(f"test set accuracy: {accuracy}")
    print(f"test set AUC: {AUC}")
    print(f"test set f1-score: {f1}")
    return loss, accuracy, AUC, f1

# Read Data

In [16]:
root = '/content/drive/MyDrive/why/genres_original' # Change according path storing data
genres = os.listdir(root)
x = []
y = []
length = []
sr = 16*1000
for genre in genres:
    genre_root = os.path.join(root, genre)
    audios = os.listdir(genre_root)
    for audio in audios:
        audio_path = os.path.join(genre_root, audio)
        signal, sr = librosa.load(audio_path, sr=sr)
        x.append(signal)
        length.append(len(signal))
        y.append(genres.index(genre))
min_length = min(length)
print("Finsh reading data")

Finsh reading data


# Segment and Normalise

In [17]:
top_db = 80
for i in range(len(x)):
    signal = x[i][:min_length]
    mel_spect = librosa.feature.melspectrogram(y=signal,sr=sr,n_fft=1024) # convert signals to mel spectrogram
    mel_spect = librosa.power_to_db(mel_spect, ref=np.max, top_db=top_db) # log compression
    x[i] = mel_spect/-top_db # normalisation
print("finish conversion and compression")

finish conversion and compression


# Split Data

In [18]:
x = np.asarray(x)
x = x.transpose((0,2,1))
x = x.reshape(x.shape[0],1,x.shape[1],x.shape[2])
y = np.asarray(y)
print(x.shape)

(1000, 1, 936, 128)


In [19]:
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2,
                                                    stratify=y,shuffle=True)
# k-fold cross validation
k = 5
xs_train, ys_train, xs_valid, ys_valid = k_fold_cross_validation(x_train,y_train,k)
print("finish splitting data")

finish splitting data


# Create Dataloaders

In [20]:
batch_size = 64
dataloaders_train = []
dataloaders_valid = []
for i in range(k):
    dataloaders_train.append(create_dataloader(xs_train[i], ys_train[i], batch_size=batch_size))
    dataloaders_valid.append(create_dataloader(xs_valid[i], ys_valid[i], batch_size=batch_size))
dataloader_test = create_dataloader(x_test, y_test, batch_size=batch_size)
print("finish creating dataloaders")

finish creating dataloaders


# FCN Model

In [21]:
class ConvBlock(nn.Module):
    def __init__(self, in_filters, out_filters, stride=1):
        super(ConvBlock, self).__init__()
        self.conv = nn.Conv2d(in_filters, out_filters, kernel_size=3, stride=stride, padding=1)
        self.bn = nn.BatchNorm2d(out_filters)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.relu(x)
        return x

class FCN7(nn.Module):
    def __init__(self, class_num, pre_filter_size=7, in_channels=3):
        super(FCN7, self).__init__()

        self.conv1 = ConvBlock(1, 128)
        # self.mp1 = nn.MaxPool2d((2, 4))
        self.mp1 = nn.MaxPool2d((2, 2))


        self.conv2 = ConvBlock(128, 256)
        # self.mp2 = nn.MaxPool2d((2, 4))
        self.mp2 = nn.MaxPool2d((2, 2))

        self.conv3 = ConvBlock(256, 512)
        # self.mp3 = nn.MaxPool2d((2, 4))
        self.mp3 = nn.MaxPool2d((2, 2))

        self.conv4 = ConvBlock(512, 1024)
        # self.mp4 = nn.MaxPool2d((3, 5))
        self.mp4 = nn.MaxPool2d((2, 2))

        self.conv5 = ConvBlock(1024, 2048)
        # self.mp5 = nn.MaxPool2d((4, 4))
        self.mp5 = nn.MaxPool2d((2, 2))


        self.conv6 = nn.Conv2d(2048, 1024, kernel_size=1) # 1x1 convolutions
        self.conv7 = nn.Conv2d(1024, 1024, kernel_size=1) # additional 1x1 convolution as per FCN-7


        self.avg_pool = AdaptiveAvgPool2d((1,1))

        # Fully connected layer with batch normalization and sigmoid activation
        self.fc = nn.Sequential(
            nn.Linear(1024, class_num),
            nn.BatchNorm1d(class_num),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.mp1(self.conv1(x))
        print("After layer 1 & mp1:", x.shape)
        x = self.mp2(self.conv2(x))
        print("After layer 2 & mp2:", x.shape)
        x = self.mp3(self.conv3(x))
        print("After layer 3 & mp3:", x.shape)
        x = self.mp4(self.conv4(x))
        print("After layer 4 & mp4:", x.shape)
        x = self.mp5(self.conv5(x))
        print("After layer 5 & mp5:", x.shape)

        x = self.conv6(x)
        x = self.conv7(x)
        x = self.avg_pool(x)

        x = x.view(x.size(0), -1)  # Flatten the output
        print("Shape before FC layer:", x.shape)

        # Apply the fully connected layer
        x = self.fc(x)
        return x

# Model Construction

In [22]:
# x_tensor = torch.tensor(x).float().cuda()
# y_tensor = torch.tensor(y).long().cuda()

# input_shape = (1, x.shape[2],x.shape[3])
class_num = 10
model = FCN7(class_num=class_num).cuda()
loss_function = nn.CrossEntropyLoss()
opt = Adam(model.parameters(), lr=0.001)
summary(model, input_size=[batch_size,1,x.shape[2],x.shape[3]])

# # Initialize the model and move it to GPU
# model = FCN7(class_num=10).cuda()

# # Create a dummy input tensor that matches the input shape, including a batch dimension
# dummy_input = torch.randn(1, 1, x.shape[2],x.shape[3]).cuda()
# # Try a manual forward pass
# try:
#     model.eval()
#     with torch.no_grad():
#         dummy_output = model(dummy_input)
#         print("Forward pass successful. Output shape:", dummy_output.shape)
# except Exception as e:
#     print("Forward pass failed:", e)

After layer 1 & mp1: torch.Size([64, 128, 468, 64])
After layer 2 & mp2: torch.Size([64, 256, 234, 32])
After layer 3 & mp3: torch.Size([64, 512, 117, 16])
After layer 4 & mp4: torch.Size([64, 1024, 58, 8])
After layer 5 & mp5: torch.Size([64, 2048, 29, 4])
Shape before FC layer: torch.Size([64, 1024])


Layer (type:depth-idx)                   Output Shape              Param #
FCN7                                     [64, 10]                  --
├─ConvBlock: 1-1                         [64, 128, 936, 128]       --
│    └─Conv2d: 2-1                       [64, 128, 936, 128]       1,280
│    └─BatchNorm2d: 2-2                  [64, 128, 936, 128]       256
│    └─ReLU: 2-3                         [64, 128, 936, 128]       --
├─MaxPool2d: 1-2                         [64, 128, 468, 64]        --
├─ConvBlock: 1-3                         [64, 256, 468, 64]        --
│    └─Conv2d: 2-4                       [64, 256, 468, 64]        295,168
│    └─BatchNorm2d: 2-5                  [64, 256, 468, 64]        512
│    └─ReLU: 2-6                         [64, 256, 468, 64]        --
├─MaxPool2d: 1-4                         [64, 256, 234, 32]        --
├─ConvBlock: 1-5                         [64, 512, 234, 32]        --
│    └─Conv2d: 2-7                       [64, 512, 234, 32]        1,180,16

# Train

In [23]:
epoch = 10
history = train(model,loss_function,opt,dataloaders_train,dataloaders_valid,k,epoch=epoch)
print("finish training")

-------epoch  1 -------
fold 1:


OutOfMemoryError: CUDA out of memory. Tried to allocate 3.66 GiB. GPU 0 has a total capacity of 14.75 GiB of which 2.75 GiB is free. Process 2121 has 11.99 GiB memory in use. Of the allocated memory 5.68 GiB is allocated by PyTorch, and 6.18 GiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [None]:
plt.plot(range(epoch),history['loss'])
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()

In [None]:
plt.plot(range(epoch), history['accuracy'])
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.show()

# Test

In [None]:
loss, acc, AUC, f1 = test(model,loss_function,dataloader_test)