In [None]:
import torch
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Dataset

In [None]:
# Metti qua il tuo path al dataset
path = "../data/IMAGE_TRAINING_SET"

In [None]:
import os 

# define function that walks through each directory and subdirectory
def walk_through_dir(dir_path):
  for dirpath, dirnames, filenames in os.walk(dir_path):
    print(f"There are {len(dirnames)} directories and {len(filenames)} images in '{dirpath}'.")

walk_through_dir(path)

In [None]:
from pathlib import Path
#setup path for our data files
fire_dir = Path(path+"/1")
non_fire_dir = Path(path+"/0")

fire_dir, non_fire_dir


In [None]:
import torch
from torchvision import transforms
import albumentations

preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

augmentation = transforms.Compose([
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=30),
    transforms.GaussianBlur(kernel_size = 3, sigma = 0.1),
])


# preprocess = albumentations.Sequential([
#     albumentations.Resize(height=256, width=256, always_apply=True),
#     albumentations.CenterCrop(height=224, width=224, always_apply=True),
#     albumentations.Normalize(mean=[0.485, 0.456, 0.406],
#                                 std=[0.229, 0.224, 0.225],
#                                 max_pixel_value=255.,
#                                 always_apply=True),
# ])

# augmentation = albumentations.Compose([
#     albumentations.HorizontalFlip(p=0.5), # Flip orizzontale
#     albumentations.Rotate(p=0.8, limit=15), # Rotazione
#     albumentations.Blur(p=0.3), # Sfoca l'immagine
#     albumentations.Sharpen(p=0.4),  # Accentua i bordi
#     albumentations.RandomBrightnessContrast(p=0.2), # Cambia luminosità e contrasto
#     # albumentations.ChannelShuffle(p=0.2), # Scambia i canali RGB
#     # albumentations.ColorJitter(p=0.2), # Cambia colore
#     # albumentations.Downscale(p=0.2), # Riduce la risoluzione
#     # albumentations.Emboss(p=0.2), # Rilievo
#     albumentations.HueSaturationValue(p=0.2), # Cambia tonalità, saturazione e valore
#     # albumentations.RandomFog(p=0.7), # Nebbia
#     # albumentations.RandomRain(p=0.3), # Pioggia
#     # albumentations.RandomShadow(p=0.5), # Ombra
#     # albumentations.Solarize(p=0.2), # Solarizzazione
#     albumentations.UnsharpMask(p=0.2), # Maschera di contrasto
# ], p=.5)


In [None]:
from torchvision.datasets import ImageFolder

#define data root directory
root_dir = path

#you can also apply data transformation using the `transforms` parameter here
fire_dataset = ImageFolder(root_dir, transform=transforms.Compose([preprocess, augmentation]))

In [None]:
#the classes are the names of the files in the root folder
class_names = fire_dataset.classes
class_names

In [None]:
#class names as a dict
class_dict = fire_dataset.class_to_idx
class_dict

In [None]:
from torch.utils.data import random_split
# split the dataset into training, validation and testing
rand_bytes = os.urandom(8) # generate 8 random bytes (64 bits) beacuse manual_seed needs a 64-bit integer (8 bytes)
rand_int = int.from_bytes(rand_bytes, byteorder='big')
train_dataset, valid_dataset, test_dataset = random_split(fire_dataset, [0.75, 0.15, 0.10], generator=torch.Generator().manual_seed(rand_int))

In [None]:
len(train_dataset), len(valid_dataset), len(test_dataset)

# Model

In [None]:
import torchvision.models as models
from torch.nn import Linear

def build_resnet18(num_outputs=1):
    
    model = models.resnet18(pretrained=True)
    
    for param in model.parameters():
        param.requires_grad = False
    
    for param in model.layer3.parameters():
        param.requires_grad = True
        
    for param in model.layer4.parameters():
        param.requires_grad = True
        
    model.fc = Linear(model.fc.in_features, num_outputs)
    
    return model

# function to get trainable parameters of the give model
def get_trainable_params(model):
    params_to_update = []
    for name,param in model.named_parameters():
        if param.requires_grad == True:
            params_to_update.append(param)
            print("\t",name)
    return params_to_update

def build_mobilenet():
    mobilenet = models.mobilenet_v2(pretrained=True)

    for param in mobilenet.parameters():
        param.requires_grad = False
    
    mobilenet.classifier[1] = Linear(mobilenet.classifier[1].in_features, 1)

    for param in mobilenet.features[8:].parameters():
        param.requires_grad = True
    for param in mobilenet.classifier.parameters():
        param.requires_grad = True

    return mobilenet

    

model = build_mobilenet()
model

# Training

In [None]:
from torch.utils.tensorboard import SummaryWriter
from tensorboard import notebook
import os

def start_tensorboard(log_dir):
  writer = SummaryWriter(os.path.join("runs", log_dir))

  # run tensorboard in background
  ! killall tensorboard
  %load_ext tensorboard
  %tensorboard --logdir ./runs

  notebook.list() # View open TensorBoard instances

  return writer

In [None]:
from torchvision.utils import make_grid
from tqdm import tqdm

def one_epoch(model, lossFunction, output_activation, optimizer, train_loader, val_loader, writer, epoch_num):
  model.to(device)

  model.train()

  i_start = epoch_num * len(train_loader)
  for i, (X, y) in tqdm(enumerate(train_loader), desc="epoch {} - train".format(epoch_num)):
    if i == 0:
      writer.add_image('first_batch', make_grid(X))

    X = X.to(device)
    y = y.to(device).float()

    optimizer.zero_grad()

    o = model(X)
    o = output_activation(o).squeeze()
    l = lossFunction(o, y)


    l.backward()
    optimizer.step()

    acc = ((o.detach() > .5) == y.detach()).float().mean()
    
    # print("- batch loss and accuracy : {:.7f}\t{:.4f}".format(l.detach().item(), acc))
    writer.add_scalar('train/loss', l.detach().item(), i_start+i)
    writer.add_scalar('train/acc', acc, i_start+i)

  model.eval()
  with torch.no_grad():
    val_loss = []
    val_corr_pred = []
    for X, y in tqdm(val_loader, desc="epoch {} - validation".format(epoch_num)):
      X = X.to(device)
      y = y.to(device).float()

      o = model(X)
      o = output_activation(o).squeeze()
      val_loss.append(lossFunction(o, y))
      val_corr_pred.append((o > .5) == y)

    val_loss = torch.stack(val_loss).mean().item()
    val_accuracy = torch.concatenate(val_corr_pred).float().mean().item()

    # print("Validation loss and accuracy : {:.7f}\t{:.4f}".format(val_loss, val_accuracy))
    writer.add_scalar('val/loss', val_loss, i_start+i)
    writer.add_scalar('val/acc', val_accuracy, i_start+i)
  return val_loss, val_accuracy

In [None]:
from torch.nn import BCELoss, CrossEntropyLoss, Sigmoid, Softmax

# learning parameters
lossFunction, output_activation = BCELoss(), Sigmoid()
batch_size = 256
lr = .001
momentum = .9
lambda_reg = 0

epochs = 300
early_stopping_patience = 5

optimizer = torch.optim.Adam(get_trainable_params(model),
                          lr=lr,
                          weight_decay=lambda_reg)

# create output directory and logger
experiment_name = "image_train_mobilenet"
import os
#!rm -rf resnet18_images
os.makedirs(experiment_name)
writer = start_tensorboard(experiment_name)

In [None]:
from torch.utils.data import DataLoader
dataloader_params = {"batch_size": batch_size, "num_workers": 2, "pin_memory": True}
train_loader = DataLoader(train_dataset, shuffle=True, **dataloader_params)
val_loader = DataLoader(valid_dataset, shuffle=False, **dataloader_params)

# early stopping and best model saving
early_stopping_counter = early_stopping_patience
min_val_loss = 1e10

# training and validation
val_losses = torch.zeros(epochs)
val_accuracies = torch.zeros(epochs)
for e in range(epochs):
  print("EPOCH {}".format(e))
  val_loss, val_accuracy = one_epoch(model, lossFunction, output_activation, optimizer, train_loader, val_loader, writer, e)

  # store the validation metrics
  val_losses[e] = val_loss
  val_accuracies[e] = val_accuracy
  torch.save(val_losses, os.path.join(experiment_name,'val_losses.pth'))
  torch.save(val_accuracies, os.path.join(experiment_name,'val_accuracies.pth'))

  # save the best model and check the early stopping criteria
  if val_loss < min_val_loss: # save the best model
    min_val_loss = val_loss
    early_stopping_counter = early_stopping_patience # reset early stopping counter
    torch.save(model.state_dict(), os.path.join(experiment_name,'best_model.pth'))
    print("- saved best model: val_loss =", val_loss, "val_accuracy =", val_accuracy)

  if e>0: # early stopping counter update
    if val_losses[e] > val_losses[e-1]:
        early_stopping_counter -= 1 # update early stopping counter
    else:
        early_stopping_counter = early_stopping_patience # reset early stopping counter
  if early_stopping_counter == 0: # early stopping
      break

# Testing

In [None]:
from torch.utils.data import DataLoader
dataloader_params = {"batch_size": batch_size, "num_workers": 2, "pin_memory": True}
val_loader = DataLoader(test_dataset, shuffle=False, **dataloader_params)

model = build_mobilenet().to(device)
model.load_state_dict(torch.load(os.path.join(experiment_name,'best_model.pth')))

Y, Y_hat = [], []
with torch.no_grad():
  model.eval()
  for X, y in val_loader:
    Y.append(y)
    Y_hat.append((output_activation(model(X.to(device))) > .5).squeeze().to(device))

Y = torch.concatenate(Y).to("cpu")
Y_hat = torch.concatenate(Y_hat).to("cpu")
print("Test accuracy:", (Y==Y_hat).float().mean().item())


In [None]:
from sklearn.metrics import confusion_matrix
# Compute precision and recall and F-score
from sklearn.metrics import precision_score, recall_score

confusion_matrix(Y, Y_hat)

precision = precision_score(Y, Y_hat)
recall = recall_score(Y, Y_hat)

print("Precision: {:.4f}".format(precision))
print("Recall: {:.4f}".format(recall))
print("F-score: {:.4f}".format(2 * precision * recall / (precision + recall)))

In [None]:
import matplotlib.pyplot as plt

def visualize_video_sample(X, y):
    plt.figure()
    plt.imshow(make_grid(X).numpy().transpose((1, 2, 0)))
    plt.title(y)
    plt.show()

# analyze the false positives

for i in range(len(Y)):
  if Y[i] == 0 and Y_hat[i] == 1:
    visualize_video_sample(test_dataset[i][0], test_dataset[i][1])