### Model CNN

In [34]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np

import matplotlib.pyplot as plt
import time
import copy
import json

# Detect if we have a GPU available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

##### Importem el dataloader

In [37]:
# Path al fitxer amb les dades
train_path = "train.json"
test_path = "test.json"
validation_path = "validation.json"

# Carguem les dades d'entrenament
with open(train_path, "r") as fp:
    data_train = json.load(fp)

train_x = np.array(data_train["mfcc"], dtype=np.float32)
train_y = np.array(data_train["labels"], dtype=np.float32)

# Carguem les dades de validation
with open(validation_path, "r") as fp:
    data_validation = json.load(fp)

val_x = np.array(data_validation["mfcc"], dtype=np.float32)
val_y = np.array(data_validation["labels"], dtype=np.float32)

# Carguem les dades de test
with open(test_path , "r") as fp:
    data_test = json.load(fp)

test_x = np.array(data_test["mfcc"], dtype=np.float32)
test_y = np.array(data_test["labels"], dtype=np.float32)




#### Definim Model CNN

In [28]:
class CNNModel(nn.Module):
    def __init__(self, input_shape):
        super(CNNModel, self).__init__()

        # 1st conv layer
        self.conv1 = nn.Conv2d(in_channels=input_shape[2], out_channels=32, kernel_size=(3, 3), padding=1)
        self.relu1 = nn.ReLU()
        self.maxpool1 = nn.MaxPool2d(kernel_size=(3, 3), stride=(2, 2), padding=1)
        self.batchnorm1 = nn.BatchNorm2d(32)

        # 2nd conv layer
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(3, 3), padding=1)
        self.relu2 = nn.ReLU()
        self.maxpool2 = nn.MaxPool2d(kernel_size=(3, 3), stride=(2, 2), padding=1)
        self.batchnorm2 = nn.BatchNorm2d(32)

        # 3rd conv layer
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(2, 2), padding=0)
        self.relu3 = nn.ReLU()
        self.maxpool3 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0)
        self.batchnorm3 = nn.BatchNorm2d(32)

        # flatten output and feed it into dense layer
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(32 * (input_shape[0] // 8) * (input_shape[1] // 8), 64)
        self.relu4 = nn.ReLU()
        self.dropout = nn.Dropout(0.3)

        # output layer
        self.fc2 = nn.Linear(64, 10)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)
        x = self.batchnorm1(x)

        x = self.conv2(x)
        x = self.relu2(x)
        x = self.maxpool2(x)
        x = self.batchnorm2(x)

        x = self.conv3(x)
        x = self.relu3(x)
        x = self.maxpool3(x)
        x = self.batchnorm3(x)

        x = self.flatten(x)
        x = self.fc1(x)
        x = self.relu4(x)
        x = self.dropout(x)

        x = self.fc2(x)
        output = self.softmax(x)

        return output

Funció d'entrenament

In [29]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25):
    since = time.time()

    acc_history = {"train": [], "val": []}
    losses = {"train": [], "val": []}

    # we will keep a copy of the best weights so far according to validation accuracy
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    # Get model outputs and calculate loss
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    losses[phase].append(loss)

                    _, preds = torch.max(outputs, 1)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
            
            acc_history[phase].append(epoch_acc)

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, acc_history, losses

Executem el model:

In [38]:
# Generem dataloader

class CustomDataset(Dataset):
    def __init__(self, train_x, train_y):
        self.train_x = train_x
        self.train_y = train_y
    
    def __len__(self):
        return len(self.train_x)
    
    def __getitem__(self, idx):
        x = self.train_x[idx]
        y = self.train_y[idx]
        return x, y

training_data = CustomDataset(train_x, train_y)
val_data = CustomDataset(val_x, val_y)

batch_size = 8
train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_data, batch_size=batch_size, shuffle=True)

dataloaders_dic = { 'train': train_dataloader, 'val':val_dataloader}

In [39]:
# Batch size for training (change depending on how much memory you have)


entrada_model = ( train_x.shape[1], train_x.shape[2], 1)
model = CNNModel( entrada_model )
num_epochs = 50

model = model.to(device)

optimizer = optim.Adam(model.parameters(), lr=0.001)
loss = nn.CrossEntropyLoss()

# Train and evaluate
model, hist, losses = train_model(model, dataloaders_dic, loss, optimizer, num_epochs=num_epochs)

Epoch 0/49
----------


RuntimeError: Given groups=1, weight of size [32, 1, 3, 3], expected input[1, 8, 259, 13] to have 1 channels, but got 8 channels instead