# Transfer learning practice

In [1]:
import os
import numpy as np
import pandas as pd
import torch
from torch import nn
from torch.optim import Adam
from torch.utils.data import DataLoader, Dataset
from torchvision import models, transforms
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from PIL import Image
import shutil


In [2]:
# Define directory paths
data_dir = '' # root
train_dir = os.path.join(data_dir, 'train')
validation_dir = os.path.join(data_dir, 'test')


In [3]:
train_info = pd.read_csv('train.csv')

X = train_info[['filename', 'xmin', 'ymin', 'xmax', 'ymax']]
y = train_info['class_id']

data = pd.concat([X, y], axis=1)	

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)

train_data = pd.concat([X_train, y_train], axis=1)
validation_data = pd.concat([X_val, y_val], axis=1)

In [4]:
def create_train_test_img_folder(train_df, test_df):
    splits = {'train': train_df, 'test': test_df}
    for split, df in splits.items():
        for i, row in df.iterrows():
            source_file = f'images/{row["filename"]}'
            class_name = 'pinguin' if row['class_id'] == 1 else 'turtle'
            destination_folder = f'data/{split}'
            os.makedirs(destination_folder, exist_ok=True)
            destination_file = os.path.join(destination_folder)
            shutil.copy(source_file, destination_file)


create_train_test_img_folder(train_data, validation_data)

In [5]:
root_dir = 'data'
train_dir = os.path.join(root_dir, 'train')
validation_dir = os.path.join(root_dir, 'validation')

In [16]:
class BoundingBoxDataset(Dataset):
    def __init__(self, data, root_dir, transform=None):
        self.data = data
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        img_name = os.path.join(self.root_dir, row['filename'])
        image = plt.imread(img_name)
        
        # Apply transformations if any
        if self.transform:
            image = self.transform(image)

        # Extract bounding box coordinates
        bbox = torch.tensor([row['xmin'], row['ymin'], row['xmax'], row['ymax']])
        
        # Extract class label
        label = torch.tensor(row['class_id'] - 1)  # Subtract 1 to make labels 0-based

        return image, label, bbox


In [17]:
# Data transformations
train_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(),
    transforms.Resize((255, 255)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) # Pretrained VGG-16 mean and std
])

validation_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((255, 255)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

In [18]:
# Load train and test data
train_dataset = BoundingBoxDataset(train_data, root_dir=train_dir, transform=train_transforms)
validation_dataset = BoundingBoxDataset(validation_data, root_dir=validation_dir, transform=validation_transforms)

# Data loaders
batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)


In [19]:
# Load pretrained VGG-16 model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
base_model = models.vgg16(pretrained=True)

# Freeze the base model layers
for param in base_model.parameters():
    param.requires_grad = False

# Modify the classifier for dual outputs
class MultiTaskVGG16(nn.Module):
    def __init__(self):
        super(MultiTaskVGG16, self).__init__()
        
        # Base feature extractor
        self.features = base_model.features
        self.avgpool = base_model.avgpool
        self.flatten = nn.Flatten()
        
        # Shared fully connected layers
        self.shared_fc = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(),
            nn.Dropout(0.5)
        )
        
        # Classification head
        self.classifier = nn.Linear(4096, 2)  # For 2 classes: penguin and turtle
        
        # Bounding box regression head
        self.regressor = nn.Linear(4096, 4)  # For 4 coordinates: xmin, ymin, xmax, ymax

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = self.flatten(x)
        x = self.shared_fc(x)
        
        class_output = self.classifier(x)
        bbox_output = self.regressor(x)
        
        return class_output, bbox_output

# Initialize model
model = MultiTaskVGG16().to(device)




In [20]:
# Loss functions
classification_criterion = nn.CrossEntropyLoss()
regression_criterion = nn.MSELoss()

# Optimizer (only fine-tuning the final layers)
optimizer = Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.001)


In [21]:
# Training function
def train(model, train_loader, optimizer, device):
    model.train()
    running_loss = 0.0
    for images, labels, bboxes in tqdm(train_loader):
        images, labels, bboxes = images.to(device), labels.to(device), bboxes.to(device)
        
        optimizer.zero_grad()
        class_outputs, bbox_outputs = model(images)
        
        # Compute losses
        class_loss = classification_criterion(class_outputs, labels)
        bbox_loss = regression_criterion(bbox_outputs, bboxes.float())
        loss = class_loss + bbox_loss  # Combine losses
        
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * images.size(0)
    
    epoch_loss = running_loss / len(train_loader.dataset)
    return epoch_loss

# Evaluation function
def evaluate(model, test_loader, device):
    model.eval()
    running_loss = 0.0
    all_preds, all_labels = [], []
    
    with torch.no_grad():
        for images, labels, bboxes in tqdm(test_loader):
            images, labels, bboxes = images.to(device), labels.to(device), bboxes.to(device)
            
            class_outputs, bbox_outputs = model(images)
            
            # Compute losses
            class_loss = classification_criterion(class_outputs, labels)
            bbox_loss = regression_criterion(bbox_outputs, bboxes.float())
            loss = class_loss + bbox_loss
            
            running_loss += loss.item() * images.size(0)
            
            # Store predictions and labels for accuracy calculation
            preds = class_outputs.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    accuracy = accuracy_score(all_labels, all_preds)
    epoch_loss = running_loss / len(test_loader.dataset)
    return epoch_loss, accuracy


In [23]:
# Training loop
num_epochs = 12
best_accuracy = 0.0
for epoch in range(num_epochs):
    print(f"Epoch {epoch + 1}/{num_epochs}")
    
    train_loss = train(model, train_loader, optimizer, device)
    val_loss, val_accuracy = evaluate(model, validation_loader, device)
    
    print(f"Train Loss: {train_loss:.4f}")
    print(f"Validation Loss: {val_loss:.4f}")
    print(f"Validation Accuracy: {val_accuracy:.4f}")
    # Save the model if it has the best accuracy so far
    if val_accuracy > best_accuracy:
        best_accuracy = val_accuracy
        torch.save(model.state_dict(), "best_model_full.pth")
        print("Saved Best Model!")



Epoch 1/12


  0%|          | 0/24 [00:02<?, ?it/s]


IndexError: Target -1 is out of bounds.

In [24]:
model.load_state_dict(torch.load("best_model_full.pth"))

# Evaluate on the test set
validation_loss, validation_accuracy = evaluate(model, validation_loader, device)

print(f"Test Loss: {validation_loss:.4f}")
print(f"Test Accuracy: {validation_accuracy:.4f}")


  0%|          | 0/6 [00:00<?, ?it/s]


FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\monte\\Documents\\Fundamentos de Analítica II\\FDA2_final\\data\\validation\\image_id_233_.jpg'

In [None]:
# # Inference and saving to CSV
# def predict_and_save(model, validation_loader, device, output_file="submission.csv"):
#     model.eval()
#     predictions = []
#     seen_filenames = set()  # Track unique filenames to avoid duplicates
    
#     with torch.no_grad():
#         for images, labels, bboxes in validation_loader:  # Accepts images, labels, and bboxes
#             images = images.to(device)
#             class_outputs, bbox_outputs = model(images)
            
#             # Get predicted class (1-based indexing)
#             class_preds = class_outputs.argmax(dim=1).cpu().numpy() + 1  
#             # Get bounding box predictions
#             bbox_preds = bbox_outputs.cpu().numpy()
            
#             # Assuming filenames are stored as part of the dataset's DataFrame
#             for filename, class_id, bbox_pred, label, bbox_true in zip(
#                 validation_loader.dataset.data['filename'], class_preds, bbox_preds, labels.cpu().numpy(), bboxes.cpu().numpy()
#             ):
#                 # Only add unique filenames
#                 if filename not in seen_filenames:
#                     predictions.append([filename, class_id, *bbox_pred, label, *bbox_true])
#                     seen_filenames.add(filename)  # Mark this filename as seen
    
#     # Save predictions to CSV
#     submission_df = pd.DataFrame(predictions, columns=[
#         "filename", "pred_class_id", "pred_xmin", "pred_ymin", "pred_xmax", "pred_ymax",
#         "true_class_id", "true_xmin", "true_ymin", "true_xmax", "true_ymax"
#     ])
#     submission_df.to_csv(output_file, index=False)
    
#     print(f"Saved predictions to {output_file}")

# # Run the prediction and save function
# predict_and_save(model, train_loader, device, output_file="submission_full_test_2.csv")



Saved predictions to submission_full_test_2.csv


# Test

In [25]:


# Custom dataset for validation/test set without labels
class TestDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        filename = self.data.iloc[idx]['filename']
        img_path = os.path.join(self.img_dir, filename)
        image = Image.open(img_path).convert("RGB")
        
        if self.transform:
            image = self.transform(image)
        
        return image, filename  # Only return image and filename


In [26]:
# Define transformations
data_transforms = transforms.Compose([
    transforms.Resize((255, 255)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Paths
csv_file = 'test.csv'
img_dir = 'images'

# Create the dataset and loader
test_dataset = TestDataset(csv_file, img_dir, transform=data_transforms)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

In [None]:
# # Inference and saving to CSV for unlabeled validation/test set
# def predict_and_save_unlabeled(model, loader, device, output_file="test_predictions.csv"):
#     model.eval()
#     predictions = []
#     seen_filenames = set()  # Track unique filenames to avoid duplicates
    
#     with torch.no_grad():
#         for images, filenames in loader:  # Accepts images and filenames only
#             images = images.to(device)
#             class_outputs, bbox_outputs = model(images)
            
#             # Get predicted class (1-based indexing)
#             class_preds = class_outputs.argmax(dim=1).cpu().numpy() + 1  
#             # Get bounding box predictions
#             bbox_preds = bbox_outputs.cpu().numpy()
            
#             # Save predictions for each file
#             for filename, class_id, bbox_pred in zip(filenames, class_preds, bbox_preds):
#                 # Only add unique filenames
#                 if filename not in seen_filenames:
#                     predictions.append([filename, class_id, *bbox_pred])
#                     seen_filenames.add(filename)  # Mark this filename as seen
    
#     # Save predictions to CSV
#     submission_df = pd.DataFrame(predictions, columns=[
#         "filename", "pred_class_id", "pred_xmin", "pred_ymin", "pred_xmax", "pred_ymax"
#     ])
#     submission_df.to_csv(output_file, index=False)
    
#     print(f"Saved predictions to {output_file}")

# # Run the prediction and save function for validation/test data
# predict_and_save_unlabeled(model, test_loader, device, output_file="test_predictions.csv")


Saved predictions to test_predictions.csv


## **CNN Personalizado**

A continuación, crea una clase personalizada para el dataset que cargará las imágenes y sus anotaciones

In [6]:
# Definición del Dataset

class CustomDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_name = os.path.join('data/train', self.dataframe.iloc[idx, 0])
        image = plt.imread(img_name)
        label = self.dataframe.iloc[idx, -1]  # class_id

        if self.transform:
            image = self.transform(image)

        return image, label


Define las transformaciones necesarias para la augmentación de datos y la normalización

In [7]:
#Definición de transformaciones 

transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),  # Cambiar el tamaño de las imágenes
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalización
])


Crea las instancias de tus conjuntos de datos y DataLoaders para el entrenamiento y la validación.

In [8]:
# Inicialización de datasets y dataloaders

train_dataset = CustomDataset(train_data, transform=transform)
validation_dataset = CustomDataset(validation_data, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=32, shuffle=False)


Define un modelo de red neuronal convolucional personalizado. Aquí tienes un ejemplo simple

In [9]:
# Definición del Modelo CNN Personalizado

class CustomCNN(nn.Module):
    def __init__(self):
        super(CustomCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 28 * 28, 128)  # Cambia las dimensiones según tu entrada
        self.fc2 = nn.Linear(128, 2)  # Salida para 2 clases (pingüinos y tortugas)

    def forward(self, x):
        x = self.pool(nn.functional.relu(self.conv1(x)))
        x = self.pool(nn.functional.relu(self.conv2(x)))
        x = self.pool(nn.functional.relu(self.conv3(x)))
        x = x.view(-1, 64 * 28 * 28)  # Aplanar
        x = nn.functional.relu(self.fc1(x))
        x = self.fc2(x)
        return x


Inicializa el modelo, la función de pérdida y el optimizador.

In [10]:
# Inicialización del Modelo, Pérdida y Optimizador 

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CustomCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001)


Definicnión de la función de entrenamiento

Teniendo en cuenta que el Cross Entropy Loss es una función que espera que las clases estén codificados en 0 y 1, procedemos a realizar la codificaicón previa. 

In [11]:
import pandas as pd

# Supongamos que ya tienes tus dataframes 'test_data' y 'train_data'

# Cambiar los valores de 'class_id' en 'train_data'
train_data['class_id'] = train_data['class_id'].replace({1: 0, 2: 1})

# Cambiar los valores de 'class_id' en 'test_data'
validation_data['class_id'] = validation_data['class_id'].replace({1: 0, 2: 1})

# Verificar los cambios
print("Valores únicos en train_data['class_id']:", train_data['class_id'].unique())
print("Valores únicos en validation_data['class_id']:", validation_data['class_id'].unique())


Valores únicos en train_data['class_id']: [0 1]
Valores únicos en validation_data['class_id']: [1 0]


In [12]:
def train_model(model, train_loader, criterion, optimizer, num_epochs=10):
    model.train()  # Modo de entrenamiento
    for epoch in range(num_epochs):
        running_loss = 0.0
        running_accuracy = 0.0
        
        for images, labels in tqdm(train_loader):
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()  # Zero gradients
            outputs = model(images)  # Forward pass
            
            loss = criterion(outputs, labels)  # Calculate loss
            loss.backward()  # Backward pass
            optimizer.step()  # Update weights
            
            # Calculando la precisión
            _, predicted = torch.max(outputs, 1)  # Obtener la clase predicha
            correct = (predicted == labels).sum().item()  # Contar las predicciones correctas
            accuracy = correct / labels.size(0)  # Calcular la precisión del lote
            
            running_loss += loss.item()
            running_accuracy += accuracy

        epoch_loss = running_loss / len(train_loader)
        epoch_accuracy = running_accuracy / len(train_loader)  # Calcular precisión promedio de la época
        
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}')

# Entrenar el modelo
train_model(model, train_loader, criterion, optimizer, num_epochs=10)


100%|██████████| 12/12 [00:11<00:00,  1.08it/s]


Epoch [1/10], Loss: 0.6726, Accuracy: 0.6304


100%|██████████| 12/12 [00:08<00:00,  1.37it/s]


Epoch [2/10], Loss: 0.4767, Accuracy: 0.7596


100%|██████████| 12/12 [00:08<00:00,  1.38it/s]


Epoch [3/10], Loss: 0.3810, Accuracy: 0.8127


100%|██████████| 12/12 [00:08<00:00,  1.39it/s]


Epoch [4/10], Loss: 0.2690, Accuracy: 0.8954


100%|██████████| 12/12 [00:08<00:00,  1.38it/s]


Epoch [5/10], Loss: 0.1955, Accuracy: 0.9209


100%|██████████| 12/12 [00:09<00:00,  1.29it/s]


Epoch [6/10], Loss: 0.1449, Accuracy: 0.9479


100%|██████████| 12/12 [00:08<00:00,  1.35it/s]


Epoch [7/10], Loss: 0.0750, Accuracy: 0.9735


100%|██████████| 12/12 [00:09<00:00,  1.30it/s]


Epoch [8/10], Loss: 0.0489, Accuracy: 0.9917


100%|██████████| 12/12 [00:08<00:00,  1.33it/s]


Epoch [9/10], Loss: 0.0205, Accuracy: 1.0000


100%|██████████| 12/12 [00:09<00:00,  1.32it/s]

Epoch [10/10], Loss: 0.0163, Accuracy: 0.9974





In [None]:
def evaluate_model(model, test_loader):
    model.eval()  
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for batch in test_loader:
            # Imprimir el tipo y la longitud del batch
            print(f'Batch: {batch}, Type: {type(batch)}, Length: {len(batch)}')
            
            # Desempaquetar el batch según su longitud
            if len(batch) == 2:
                images, labels = batch
            elif len(batch) > 2:
                images, labels = batch[0], batch[1]  
            else:
                print("Unexpected batch format:", batch)
                continue

            # Revisión de característica de images y labels 
            if isinstance(images, tuple) or isinstance(labels, tuple):
                print("Images or labels are tuples. Check your dataset.")
                continue
            
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    print(f'Accuracy: {accuracy:.4f}')

# Evaluación del modelo 

evaluate_model(model, test_loader)


Batch: [tensor([[[[2.2489, 2.2489, 2.2489,  ..., 2.2489, 2.2489, 2.2489],
          [2.2489, 2.2489, 2.2489,  ..., 2.2489, 2.2489, 2.2489],
          [2.2489, 2.2489, 2.2489,  ..., 2.2489, 2.2489, 2.2489],
          ...,
          [2.2489, 2.2489, 2.2489,  ..., 2.2489, 2.2489, 2.2489],
          [2.2489, 2.2489, 2.2489,  ..., 2.2489, 2.2489, 2.2489],
          [2.2489, 2.2489, 2.2489,  ..., 2.2489, 2.2489, 2.2489]],

         [[2.4286, 2.4286, 2.4286,  ..., 2.4286, 2.4286, 2.4286],
          [2.4286, 2.4286, 2.4286,  ..., 2.4286, 2.4286, 2.4286],
          [2.4286, 2.4286, 2.4286,  ..., 2.4286, 2.4286, 2.4286],
          ...,
          [2.4286, 2.4286, 2.4286,  ..., 2.4286, 2.4286, 2.4286],
          [2.4286, 2.4286, 2.4286,  ..., 2.4286, 2.4286, 2.4286],
          [2.4286, 2.4286, 2.4286,  ..., 2.4286, 2.4286, 2.4286]],

         [[2.6400, 2.6400, 2.6400,  ..., 2.6400, 2.6400, 2.6400],
          [2.6400, 2.6400, 2.6400,  ..., 2.6400, 2.6400, 2.6400],
          [2.6400, 2.6400, 2.6400,

  avg = a.mean(axis, **keepdims_kw)
  ret = ret.dtype.type(ret / rcount)


Hay que pegarle la cabeza de la clasificación a esta parte. 