In [None]:
! pip install -q kaggle
from google.colab import files
files.upload()
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle competitions download -c state-farm-distracted-driver-detection

In [None]:
# Importamos las librerias necesarias
import torch
import torchvision
import torch.nn as nn
import os
import glob as gb
import cv2
import matplotlib.pyplot as plt
import numpy as np
import random 
from PIL import Image
from skimage import io
from skimage.transform import resize
from tqdm import tqdm
import pandas as pd 
import albumentations as A
from scipy.io import loadmat
# from pathlib import Path

In [None]:
# Verificaremos si tenemos GPU
device = "cuda" if torch.cuda.is_available() else "cpu"
device

In [None]:
# Descomprimiendo ek dataset
import zipfile

with zipfile.ZipFile('/content/state-farm-distracted-driver-detection.zip', 'r') as zip_ref:
    zip_ref.extractall('/content/conductores')

In [None]:
import os 

PATH = '/content/conductores/imgs/train'

classes = os.listdir(PATH)
classes, len(classes) # 400 clases de aves

In [None]:
imgs, labels = [], []     # Creamos el dataset de reatures y sus labels

for i, lab in enumerate(classes):
  paths = os.listdir(f'{PATH}/{lab}')
  print(f'Categoría: {lab}. Imágenes: {len(paths)}')
  paths = [p for p in paths if p[-3:] == "jpg"]
  imgs += [f'{PATH}/{lab}/{img}' for img in paths]
  labels += [i]*len(paths)

In [None]:
# Para verificar si todos tienen 3 canales
imgsN = []
labelsN = []
for i,x in enumerate(imgs):
  imgsN += [x]
  labelsN += [labels[i]]

In [None]:
# plt.imshow(np.array(imgsN[1]))
img = plt.imread(imgsN[100])
plt.imshow(img), img.shape

In [None]:
# Longitud de las imgs de train
len(imgsN), len(labelsN)

In [None]:
# La clase dataset para preparar los datos
class Dataset(torch.utils.data.Dataset):
  def __init__(self, X, y, trans, device):
    self.X = X
    self.y = y
    self.trans = trans
    self.device = device

  def __len__(self):
    return len(self.X)

  def __getitem__(self, ix):
      img = io.imread(self.X[ix]) 
      label = self.y[ix]
      img = self.trans(image=img)['image']         
      return torch.from_numpy(img / 255.).permute(2,0,1).float(), torch.tensor(label).long()

In [None]:
PATH = '/content/conductores/imgs/train'

classesE = os.listdir(PATH)
classesE, len(classesE) # 400 clases de aves

imgsE, labelsE = [], []     # Creamos el dataset de reatures y sus labels

for i, lab in enumerate(classesE):
  paths = os.listdir(f'{PATH}/{lab}')
  print(f'Categoría: {lab}. Imágenes: {len(paths)}')
  paths = [p for p in paths if p[-3:] == "jpg"]
  imgsE += [f'{PATH}/{lab}/{img}' for img in paths]
  labelsE += [i]*len(paths)


In [None]:
len(classesE)

In [None]:
# Creramos el dataset para entrenar y evaluar
trans = A.Compose([
    A.Resize(224, 224)
])

dataset = {
    'train': Dataset(imgsN, labelsN, trans, device), 
    'test': Dataset(imgsE, labelsE, trans, device)
}

len(dataset['train']), len(dataset['test'])

In [None]:
# Mostrando algunas imagenes del dataset
r, c = 1, 8
fig = plt.figure(figsize=(2*c, 2*r))
for _r in range(r):
    for _c in range(c):
        ax = plt.subplot(r, c, _r*c + _c + 1)
        ix = random.randint(0, len(dataset['train'])-1)
        img, label = dataset['train'][ix]
        plt.imshow(img.permute(1,2,0))
        plt.title(f'{classes[label][:6]}', color="black")
        plt.axis("off")
plt.tight_layout()
plt.show()

In [None]:
class Model(torch.nn.Module):
  def __init__(self, n_classes=len(classes), pretrained=False, freeze=False ):
    super().__init__()
    # descargamos resnet
    resnet = torchvision.models.resnet34(pretrained=pretrained)
    # nos quedamos con todas las capas menos la última
    self.resnet = torch.nn.Sequential(*list(resnet.children())[:-1])
    if freeze:
      for param in self.resnet.parameters():
        param.requires_grad=False
    # añadimos una nueva capa lineal para llevar a cabo la clasificación
    self.fc = torch.nn.Linear(512 , n_classes)

  def forward(self, x):
    x = self.resnet(x)
    x = x.view(x.shape[0], -1)
    x = self.fc(x)
    return x

  def unfreeze(self):
    for param in self.resnet.parameters():
        param.requires_grad=True

In [None]:
dataset['train'][100][0].shape  # Dimension de una imagen ya redimensinada

In [None]:
# Probamos las dimensiones y las dimensiones de salida de un batch al numero de clases a cladificar
model = Model()
outputs = model(torch.randn(64, 3, 224, 224))
outputs.shape

In [None]:
dataloader = {
    'train': torch.utils.data.DataLoader(dataset['train'], batch_size=64, shuffle=True, pin_memory=True), 
    'test': torch.utils.data.DataLoader(dataset['test'], batch_size=256, shuffle=False)
}
# Dimensiones de un batch de entrenamiento
imgs, labels = next(iter(dataloader['train']))
imgs.shape

In [None]:
from tqdm import tqdm
import numpy as np

def fit(model, dataloader, epochs=5, lr=1e-2):
    model.to(device)
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)
    criterion = torch.nn.CrossEntropyLoss()
    for epoch in range(1, epochs+1):
        model.train()
        train_loss, train_acc = [], []
        bar = tqdm(dataloader['train'])
        for batch in bar:
            X, y = batch
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            y_hat = model(X)
            loss = criterion(y_hat, y)
            loss.backward()
            optimizer.step()
            train_loss.append(loss.item())
            acc = (y == torch.argmax(y_hat, axis=1)).sum().item() / len(y)
            train_acc.append(acc)
            bar.set_description(f"loss {np.mean(train_loss):.5f} acc {np.mean(train_acc):.5f}")
        bar = tqdm(dataloader['test'])
        val_loss, val_acc = [], []
        model.eval()
        with torch.no_grad():
            for batch in bar:
                X, y = batch
                X, y = X.to(device), y.to(device)
                y_hat = model(X)
                loss = criterion(y_hat, y)
                val_loss.append(loss.item())
                acc = (y == torch.argmax(y_hat, axis=1)).sum().item() / len(y)
                val_acc.append(acc)
                bar.set_description(f"val_loss {np.mean(val_loss):.5f} val_acc {np.mean(val_acc):.5f}")
        print(f"Epoch {epoch}/{epochs} loss {np.mean(train_loss):.5f} val_loss {np.mean(val_loss):.5f} acc {np.mean(train_acc):.5f} val_acc {np.mean(val_acc):.5f}")

In [None]:
# Fine Tunning
model = Model(pretrained=True, freeze=False)
fit(model, dataloader, epochs=5)

In [None]:
# # Fine Tunning segunda prueba 2
# model2 = Model(pretrained=True, freeze=False)
# fit(model2, dataloader, epochs=10)

In [None]:
# Evaluamos el modelo con los datos de evaluacion o test
model.to(device)
model.eval()
with torch.no_grad():
  bar = tqdm(dataloader['test'])
  test_acc = []
  for imgs, labs in bar:
    imgs, labs = imgs.to(device), labs.to(device)
    y_hat = model(imgs)
    acc = (torch.argmax(y_hat, axis=1) == labs).sum().item() / len(labs)
    test_acc.append(acc)
  print(f' acc {np.mean(test_acc):.5f}')

In [None]:
# Prediciendo
r, c = 5, 10
fig = plt.figure(figsize=(2*c, 2*r))
checks = 0 
for _r in range(r):
    for _c in range(c):
        ax = plt.subplot(r, c, _r*c + _c + 1)
        ix = random.randint(0, len(dataset['test'])-1)
        img, label = dataset['test'][ix]
        y_hat = model(img.unsqueeze(0).to(device))
        lab = torch.argmax(y_hat, axis=1)[0].item()
        plt.imshow(img.permute(1,2,0))
        plt.title(f'{label}/{lab}', color="red" if label != lab else "green"  )
        if label == lab:
          checks += 1
        plt.axis("off")
plt.tight_layout()
plt.show()
print(checks)

IOU-

In [None]:
# 'c0': 'Safe driving', 
#             'c1': 'Texting - right', 
#             'c2': 'Talking on the phone - right', 
#             'c3': 'Texting - left', 
#             'c4': 'Talking on the phone - left', 
#             'c5': 'Operating the radio', 
#             'c6': 'Drinking', 
#             'c7': 'Reaching behind', 
#             'c8': 'Hair and makeup', 
#             'c9': 'Talking to passenger'

In [None]:
class Model_1(torch.nn.Module):
  def __init__(self, n_classes=len(classes), pretrained=False, freeze=False ):
    super().__init__()
    # descargamos resnet
    resnet = torchvision.models.resnet50(pretrained=pretrained)
    # nos quedamos con todas las capas menos la última
    self.resnet = torch.nn.Sequential(*list(resnet.children())[:-1])
    if freeze:
      for param in self.resnet.parameters():
        param.requires_grad=False
    # añadimos una nueva capa lineal para llevar a cabo la clasificación
    self.fc = torch.nn.Linear(2048, n_classes)

  def forward(self, x):
    x = self.resnet(x)
    x = x.view(x.shape[0], -1)
    x = self.fc(x)
    return x

  def unfreeze(self):
    for param in self.resnet.parameters():
        param.requires_grad=True

In [None]:
# Probamos las dimensiones y las dimensiones de salida de un batch al numero de clases a cladificar
model = Model_1()
outputs = model(torch.randn(64, 3, 224, 224))
outputs.shape

In [None]:
from tqdm import tqdm
import numpy as np

def fit(model, dataloader, epochs=5, lr=1e-2):
    model.to(device)
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)
    criterion = torch.nn.CrossEntropyLoss()
    for epoch in range(1, epochs+1):
        model.train()
        train_loss, train_acc = [], []
        bar = tqdm(dataloader['train'])
        for batch in bar:
            X, y = batch
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            y_hat = model(X)
            loss = criterion(y_hat, y)
            loss.backward()
            optimizer.step()
            train_loss.append(loss.item())
            acc = (y == torch.argmax(y_hat, axis=1)).sum().item() / len(y)
            train_acc.append(acc)
            bar.set_description(f"loss {np.mean(train_loss):.5f} acc {np.mean(train_acc):.5f}")
        bar = tqdm(dataloader['test'])
        val_loss, val_acc = [], []
        model.eval()
        with torch.no_grad():
            for batch in bar:
                X, y = batch
                X, y = X.to(device), y.to(device)
                y_hat = model(X)
                loss = criterion(y_hat, y)
                val_loss.append(loss.item())
                acc = (y == torch.argmax(y_hat, axis=1)).sum().item() / len(y)
                val_acc.append(acc)
                bar.set_description(f"val_loss {np.mean(val_loss):.5f} val_acc {np.mean(val_acc):.5f}")
        print(f"Epoch {epoch}/{epochs} loss {np.mean(train_loss):.5f} val_loss {np.mean(val_loss):.5f} acc {np.mean(train_acc):.5f} val_acc {np.mean(val_acc):.5f}")

In [None]:
# Fine Tunning
model = Model_1(pretrained=True, freeze=False)
fit(model, dataloader, epochs=5)

In [None]:
# Evaluamos el modelo con los datos de evaluacion o test
model.to(device)
model.eval()
with torch.no_grad():
  bar = tqdm(dataloader['test'])
  test_acc = []
  for imgs, labs in bar:
    imgs, labs = imgs.to(device), labs.to(device)
    y_hat = model(imgs)
    acc = (torch.argmax(y_hat, axis=1) == labs).sum().item() / len(labs)
    test_acc.append(acc)
  print(f' acc {np.mean(test_acc):.5f}')