Imports nella cella nascosta seguente

In [None]:
import torch.nn as nn
import torch.optim as optim
import torch
from torchvision.transforms import transforms
from torch.utils.data import DataLoader
from torch.utils.data import WeightedRandomSampler
from tqdm import tqdm
import torch.nn.functional as F
from torchvision.models import resnet34
import numpy as np
from sklearn.metrics import confusion_matrix
import seaborn as sns
from torch.utils.data import WeightedRandomSampler
import pandas as pd
from collections import Counter
import matplotlib.pyplot as plt
import os
from torch.utils.data import Dataset
from skimage import io
from sklearn import preprocessing
import wandb

In [None]:
#LOGIN WANDB

#os.environ["WANDB_API_KEY"] = ""
#os.environ["WANDB_MODE"] = "dryrun"
#!pip install wandb -qqq
wandb.login()

In [None]:
#Reproducibility stuff
np.random.seed(42)
torch.manual_seed(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

Definizione Dataset e Trasformazioni nella cella nascosta seguente

In [None]:
class MyDataset(Dataset):
    def __init__(self, path, csv_file, transform=None, **kwargs, ):
        self.path = path
        self.annotations = pd.read_csv(csv_file, header=None)
        self.transform = transform
        self.le = preprocessing.LabelEncoder()
        self.labels = self.le.fit_transform(self.annotations.iloc[:, 1])

    def __len__(self) -> int:
        return len(self.annotations)

    def __getitem__(self, index):
        img_path = os.path.join(self.path, self.annotations.iloc[index, 0])  # path dell'immagine CHECK
        image = io.imread(img_path)
        y_label = self.labels[index]
        
        if self.transform:
            image = self.transform(image)
            
        return (image, y_label)


class MapDataset(torch.utils.data.Dataset):
    """
    Given a dataset, creates a dataset which applies a mapping function
    to its items (lazily, only when an item is called).

    Note that data is not cloned/copied from the initial dataset.
    """

    def __init__(self, dataset, map_fn):
        self.dataset = dataset
        self.map = map_fn

    def __getitem__(self, index):
        image = self.map(self.dataset[index][0])
        label = self.dataset[index][1]
        return (image, label)
        
    def __len__(self):
        return len(self.dataset)
    
    
class AddGaussianNoise(object):
    def __init__(self, mean=0., std=1.):
        self.std = std
        self.mean = mean
        
    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean
    
    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)
    
    
class RollTransformation(object):
    def __init__(self, shift, dim):
        self.shift = shift
        self.dim = dim
        
    def __call__(self, tensor):
        return torch.roll(input=tensor,shifts=self.shift, dims=self.dim)
    
    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)
    
class AddGaussianNoise(object):
    def __init__(self, mean=0., std=1.):
        self.std = std
        self.mean = mean
        
    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean
    
    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)
    

Istanziamento Dataset e Dataloader

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
batch_size = 32


image_transforms = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Normalize(mean=(0.5,), std=(0.5,)),
    ]
)

train_map = transforms.Compose(
    [
        #transforms.RandomRotation(degrees=10),
        AddGaussianNoise(mean=0, std=0.05),
        #RollTransformation(shift=int((torch.rand(1)-0.5)*97), dim=1),
        #RollTransformation(shift=int((torch.rand(1)-0.5)*0.3*128), dim=0),
    ]
)

dataset = MyDataset(csv_file = "classes_denoised_small.csv", path = "Dataset_small", transform = image_transforms)
testlen = int(len(dataset)*0.2)
temp, test_set = torch.utils.data.random_split(dataset,[len(dataset)-testlen, testlen])
vallen = int(len(temp)*0.2)
train_set_temp, val_set = torch.utils.data.random_split(temp,[len(temp)-vallen, vallen])
train_set = MapDataset(train_set_temp, train_map)
#train_set, val_set = torch.utils.data.random_split(temp,[len(temp)-vallen, vallen])


#LOADER NON PESATI

#train_loader = DataLoader(dataset = train_set, batch_size = batch_size, shuffle = True)
#val_loader = DataLoader(dataset = val_set, batch_size = batch_size, shuffle = True)
#test_loader = DataLoader(dataset = test_set, batch_size = batch_size, shuffle = True)


#LOADER PESATI

le = preprocessing.LabelEncoder()
df = pd.read_csv("classes_denoised_small.csv", header = None)
labels = le.fit_transform(df.iloc[:, 1])
val_targets = labels[val_set.indices]
#train_targets = labels[train_set.indices]
train_targets = labels[train_set_temp.indices]
weight = np.zeros(397)
class_sample_count = np.unique(val_targets, return_counts=True)
weight[class_sample_count[0]] = 1. / class_sample_count[1]
val_weight = weight[val_targets]
val_weight = torch.from_numpy(val_weight)
weight = np.zeros(397)
class_sample_count = np.unique(val_targets, return_counts=True)
weight[class_sample_count[0]] = 1. / class_sample_count[1]
train_weight = weight[train_targets]
train_weight = torch.from_numpy(train_weight)

train_sampler = WeightedRandomSampler(weights=train_weight,num_samples=len(train_set),replacement=True)
train_loader = DataLoader(dataset = train_set,batch_size = batch_size, sampler=train_sampler)#, shuffle = True) 
val_sampler = WeightedRandomSampler(weights=val_weight,num_samples=vallen,replacement=True)
val_loader = DataLoader(dataset = val_set, batch_size = batch_size, sampler=val_sampler) #, shuffle = True)
test_loader = DataLoader(dataset = test_set, batch_size = batch_size, shuffle = True)

Definizione modelli nella cella seguente

In [None]:
class CNN(nn.Module):
    def __init__(self, input_channels: int, out_dim: int, drop = False, batchnorm = False) -> None:

        super().__init__()
        #self.n_feature = n_feature
        self.conv1 = nn.Conv2d(
            in_channels=input_channels, out_channels=32, kernel_size=5, padding=2
        )
        self.conv2 = nn.Conv2d(32, 64, kernel_size=5, padding=2)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=5, padding=2)
        self.batchnorm1 = nn.BatchNorm2d(32)
        self.batchnorm2 = nn.BatchNorm2d(64)
        self.batchnorm3 = nn.BatchNorm2d(128)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(128*12*4, out_dim)
        self.fc2 = nn.Linear(out_dim, out_dim)
        self.drop = drop
        self.batchnorm = batchnorm

    def forward(
        self,
        x: torch.Tensor,
    ) -> torch.Tensor:
        
        x = self.conv1(x)
        x = F.relu(x)
        if self.batchnorm:
            x = self.batchnorm1(x)
        x = F.max_pool2d(x, kernel_size=2)
        
        x = self.conv2(x)
        x = F.relu(x)
        if self.batchnorm:
            x = self.batchnorm2(x)
        x = F.max_pool2d(x, kernel_size=2)
            
        x = self.conv3(x)
        x = F.relu(x)
        if self.batchnorm:
            x = self.batchnorm3(x)
        x = F.max_pool2d(x, kernel_size=2)

        
        x = x.view(x.shape[0], -1)
        x = self.fc1(x)
        x = F.relu(x)
        if self.drop:
            x = self.dropout2(x)
        
        x = self.fc2(x)
        return x
 
    
class Resnet_model(nn.Module):
    def __init__(self, in_dim, out_dim):
        super(BirdNet, self).__init__()
        self.dropout = nn.Dropout(p=0.5)
        self.dense_output = nn.Linear(in_dim, out_dim)
        self.resnet = resnet34(pretrained=True)
        self.resnet_head = list(self.resnet.children())
        self.resnet_head = nn.Sequential(*self.resnet_head[:-1])

    def forward(self, x):
        x = self.resnet_head(x)
        return self.dense_output(self.dropout(x.view(-1, res_out)))
    
    
class CNN2(nn.Module):
    def __init__(self, input_channels: int, out_dim: int, drop = False, batchnorm = False) -> None:

        super().__init__()
        self.conv1 = nn.Conv2d(
            in_channels=input_channels, out_channels=64, kernel_size=7, stride=2, padding=3
        )
        self.conv2 = nn.Conv2d(64, 128, kernel_size=5, padding=2)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(256, 512, kernel_size=3, padding=1)
        self.conv5 = nn.Conv2d(512, 1024, kernel_size=3, padding=1)
        self.batchnorm1 = nn.BatchNorm2d(64)
        self.batchnorm2 = nn.BatchNorm2d(128)
        self.batchnorm3 = nn.BatchNorm2d(256)
        self.batchnorm4 = nn.BatchNorm2d(512)
        self.batchnorm5 = nn.BatchNorm2d(1024)
        self.dropout1 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(1024 * 6 * 2, 512)
        self.fc2 = nn.Linear(512, 512) # dropout 0.5
        self.fc3 = nn.Linear(512, out_dim) #512 units dropout 0.5
        self.drop = drop
        self.batchnorm = batchnorm

    def forward(
        self,
        x: torch.Tensor,
    ) -> torch.Tensor:

        x = self.conv1(x)
        x = self.batchnorm1(x)
        x = F.elu(x)
        x = F.max_pool2d(x, kernel_size=2)
    
        x = self.conv2(x)
        x = self.batchnorm2(x)
        x = F.elu(x)
        x = F.max_pool2d(x, kernel_size=2)

        x = self.conv3(x)
        x = self.batchnorm3(x)
        x = F.elu(x)
        x = F.max_pool2d(x, kernel_size=2)

        x = self.conv4(x)
        x = self.batchnorm4(x)
        x = F.elu(x)
        x = F.max_pool2d(x, kernel_size=2)

        x = self.conv5(x)
        x = self.batchnorm5(x)
        x = F.elu(x)
        x = F.max_pool2d(x, kernel_size=2)

        x = x.view(x.shape[0], -1)
        x = self.fc1(x)
        x = F.elu(x)
        x = self.dropout1(x)
        
        x = self.fc2(x)
        x = F.elu(x)
        x = self.dropout1(x)
        
        x = self.fc3(x)
        return x    
    
class M5(nn.Module):
    def __init__(self,  n_output:int, n_input=1, n_channel=32):
        super().__init__()
        self.conv1 = nn.Conv1d(n_input, n_channel, kernel_size=80, stride=16)
        self.bn1 = nn.BatchNorm1d(n_channel)
        self.pool1 = nn.MaxPool1d(4)
        self.conv2 = nn.Conv1d(n_channel, n_channel, kernel_size=3)
        self.bn2 = nn.BatchNorm1d(n_channel)
        self.pool2 = nn.MaxPool1d(4)
        self.conv3 = nn.Conv1d(n_channel, 2 * n_channel, kernel_size=3)
        self.bn3 = nn.BatchNorm1d(2 * n_channel)
        self.pool3 = nn.MaxPool1d(4)
        self.conv4 = nn.Conv1d(2 * n_channel, 2 * n_channel, kernel_size=3)
        self.bn4 = nn.BatchNorm1d(2 * n_channel)
        self.pool4 = nn.MaxPool1d(4)
        self.fc1 = nn.Linear(2 * n_channel, n_output)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(self.bn1(x))
        x = self.pool1(x)
        x = self.conv2(x)
        x = F.relu(self.bn2(x))
        x = self.pool2(x)
        x = self.conv3(x)
        x = F.relu(self.bn3(x))
        x = self.pool3(x)
        x = self.conv4(x)
        x = F.relu(self.bn4(x))
        x = self.pool4(x)
        x = F.avg_pool1d(x, x.shape[-1])
        x = x.permute(0, 2, 1)
        x = self.fc1(x)
        return x #F.log_softmax(x, dim=2)

Definizione funzione fit

In [None]:
def fit(model, train_loader, criterion, num_epochs=1, channels = 1, checkpoint_loc=None, checkpoint_name=None, epoch_start_scheduler=1):
    
    if checkpoint_loc is not None:
        os.makedirs(checkpoint_loc, exist_ok=True)
    
    
    for epoch in tqdm(range(num_epochs), leave=False):
        model.train()
        train_losses = []
        train_accs = []
        val_losses = []
        val_accs = []
        temp_loss_sum = 0
        temp_acc_sum = 0
        for batch_idx, (data, targets) in enumerate(train_loader):
            if(channels == 3): data = torch.cat((data, data, data), 1)
            data = data.to(device=device)
            targets = targets.to(device=device)
            pred = model(data)
            train_loss = criterion(pred,targets.long())
            train_acc = sum(torch.argmax(pred, dim=1)==targets)/len(targets)

            train_losses.append(train_loss.item())
            train_accs.append(train_acc)

            optimizer.zero_grad()
            train_loss.backward()

            
            optimizer.step()
            
            if checkpoint_name is not None and checkpoint_loc is not None:
                checkpoint_dict = {
                    "parameters": model.state_dict(),
                    "optimizer": optimizer.state_dict(),
                    "epoch": epoch
                }
                torch.save(checkpoint_dict, os.path.join(checkpoint_loc, checkpoint_name))
        
        with torch.no_grad():
            model.eval()
            for batch_idx, (data, targets) in enumerate(val_loader):
                if(channels == 3): data = torch.cat((data, data, data), 1)
                data = data.to(device=device)
                targets = targets.to(device=device)
                pred = model(data)
                val_loss = criterion(pred,targets.long())
                val_acc = sum(torch.argmax(pred, dim=1)==targets)/len(targets)
                val_losses.append(val_loss.item())
                val_accs.append(val_acc)
        
        if(epoch >= epoch_start_scheduler):
            scheduler.step()
        
        avgtrainloss = sum(train_losses)/len(train_losses)
        avgvalloss = sum(val_losses)/len(val_losses)
        avgtrainacc = sum(train_accs)/len(train_accs)
        avgvalacc = sum(val_accs)/len(val_accs)
        wandb.log({
            "train_acc" : avgtrainacc,
            "val_acc" : avgvalacc,
            "train_loss": avgtrainloss,
            "val_loss": avgvalloss
        })
                  
        print(f'Train accuracy at epoch {epoch} is {avgtrainacc}')
        print(f'Validation accuracy at epoch {epoch} is {avgvalacc}')

### Small CNN

In [None]:
learning_rate = 1e-1
wandb.init(project="jupyter-project",
           config={
               "model": "SimpleCNN",
               "batch_size": batch_size,
               "learning_rate": learning_rate,
               "dropout": 0.5
           })

In [None]:
input_channels = 1
out_dim = 397


model = CNN(input_channels=input_channels, out_dim=out_dim, drop = True, batchnorm = True)
model.to(device)
optimizer = optim.SGD(model.parameters(), lr = learning_rate)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.75)

In [None]:
%%wandb

fit(model=model, train_loader=train_loader, criterion=nn.CrossEntropyLoss(), num_epochs=20, checkpoint_loc = "model_checkpoints", checkpoint_name="SimpleCNN_drop_noise", epoch_start_scheduler = 7)

### Resnet

In [None]:
learning_rate = 1e-1
wandb.init(project="jupyter-project", resume=False,
           config={
               "model": "ResNet",
               "batch_size": batch_size,
               "learning_rate": learning_rate,
               "dropout": 0.5,
           })

In [None]:
res_out = 512
out_dim = 397

model = Resnet_model(in_dim = res_out, out_dim = out_dim)
model.to(device)
optimizer = optim.SGD(model.parameters(), lr = learning_rate)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

In [None]:
%%wandb
fit(model=model, train_loader=train_loader, criterion=nn.CrossEntropyLoss(), num_epochs=15, channels=3)

### Big CNN

In [None]:
learning_rate = 1e-1
wandb.init(project="jupyter-project", resume="1iovfjai", 
           config={
               "model": "BiggerCNN",
               "batch_size": batch_size,
               "learning_rate": learning_rate,
               "dropout": 0.5
           })

In [None]:
input_channels = 1
out_dim = 397

#checkpoint_dic = torch.load("model_checkpoints/BigCNN_drop_noise")
model =  CNN2(input_channels=input_channels, out_dim=out_dim, drop = True, batchnorm = True)
#model.load_state_dict(checkpoint_dic["parameters"])
model.to(device)
optimizer = optim.SGD(model.parameters(), lr = learning_rate)
#optimizer.load_state_dict(checkpoint_dic['optimizer'])
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.75)

In [None]:
fit(model=model, train_loader=train_loader, criterion=nn.CrossEntropyLoss(), num_epochs=20, checkpoint_loc = "model_checkpoints", checkpoint_name="BigCNN_drop_noise", epoch_start_scheduler = 2)

### 1D CNN

In [None]:
learning_rate = 1e-1
wandb.init(project="jupyter-project",
           config={
               "model": "CNN1D",
               "batch_size": batch_size,
               "learning_rate": learning_rate,
               "dropout": 0.5
           })

In [None]:
input_channels = 1
out_dim = 397

model = M5(n_input = input_channels, n_output=out_dim)
#dic = torch.load("model_checkpoints/1DCNN")
#model.load_state_dict(dic["parameters"])
model.to(device)
optimizer = optim.SGD(model.parameters(), lr = learning_rate)
#optimizer.load_state_dict(dic['optimizer'])
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.75)

In [None]:
fit(model=model, train_loader=train_loader, criterion=nn.CrossEntropyLoss(), num_epochs=30, checkpoint_loc = "model_checkpoints", checkpoint_name="1DCNN_normalized2", epoch_start_scheduler = 7)