In [10]:
import torch
import torchvision.models as models
from torchvision.models import Inception_V3_Weights
import torch.nn as nn


class InceptionTrainer:
    def __init__(self, num_classes=10, pretrained=True, freeze_features=True):
        self.num_classes = num_classes
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = self._build_model(pretrained, freeze_features).to(self.device)
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = None  # To be set later

    def _build_model(self, pretrained, freeze_features):
        # Use the new weights parameter
        weights = Inception_V3_Weights.IMAGENET1K_V1 if pretrained else None
        # Load the model with aux_logits set to True
        model = models.inception_v3(weights=weights, aux_logits=True)
        if freeze_features:
            for param in model.parameters():
                param.requires_grad = False
        # Modify the final classification layer
        model.fc = nn.Linear(model.fc.in_features, self.num_classes)

        return model

    def set_optimizer(self, lr=0.001, optimizer_type='SGD', momentum=0.9):
        """Sets the optimizer."""
        if optimizer_type == 'SGD':
            self.optimizer = torch.optim.SGD(
                filter(lambda p: p.requires_grad, self.model.parameters()),
                lr=lr,
                momentum=momentum
            )
        elif optimizer_type == 'Adam':
            self.optimizer = torch.optim.Adam(
                filter(lambda p: p.requires_grad, self.model.parameters()),
                lr=lr
            )
        else:
            raise ValueError("Unsupported optimizer type. Choose 'SGD' or 'Adam'.")

    def train(self, trainloader, epochs=5):
        """Trains the model."""
        self.model.train()
        for epoch in range(epochs):
            running_loss = 0.0
            correct = 0
            total = 0
            for inputs, labels in trainloader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                self.optimizer.zero_grad()
                outputs = self.model(inputs)  # Outputs logits and aux_logits

                # Compute the loss using both primary and auxiliary logits
                main_loss = self.criterion(outputs.logits, labels)
                aux_loss = self.criterion(outputs.aux_logits, labels)
                loss = main_loss + 0.4 * aux_loss

                loss.backward()
                self.optimizer.step()

                running_loss += loss.item()

                # Get predictions from primary logits for accuracy calculation
                _, predicted = torch.max(outputs.logits, 1)
                correct += (predicted == labels).sum().item()
                total += labels.size(0)

            print(f"Epoch {epoch + 1}/{epochs}, Loss: {running_loss / len(trainloader):.4f}, "
                  f"Accuracy: {100 * correct / total:.2f}%")

    def evaluate(self, testloader):
        """Evaluates the model."""
        self.model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in testloader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                outputs = self.model(inputs)  # Outputs logits and aux_logits
                
                # Get predictions from primary logits
                _, predicted = torch.max(outputs.logits, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        print(f"Test Accuracy: {100 * correct / total:.2f}%")


In [11]:
import torch
import torchvision.models as models
from torchvision.models import Inception_V3_Weights
import torch.nn as nn


class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2, reduction='mean'):
        """
        Custom implementation of focal loss.
        alpha: weight factor for the class (default is balanced across classes).
        gamma: focusing parameter that reduces the loss for well-classified examples.
        reduction: specifies the reduction to apply to the output ('none', 'mean', or 'sum').
        """
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.reduction = reduction

    def forward(self, inputs, targets):
        ce_loss = nn.CrossEntropyLoss(reduction='none')(inputs, targets)  # Cross-entropy per sample
        pt = torch.exp(-ce_loss)  # Probability of the true class
        focal_loss = self.alpha * ((1 - pt) ** self.gamma) * ce_loss
        
        if self.reduction == 'mean':
            return focal_loss.mean()
        elif self.reduction == 'sum':
            return focal_loss.sum()
        else:
            return focal_loss


class InceptionTrainer:
    def __init__(self, num_classes=10, pretrained=True, freeze_features=True, loss_type='cross_entropy', focal_params=None):
        """
        Initialize InceptionTrainer.
        
        num_classes: number of output classes.
        pretrained: whether to use pre-trained weights.
        freeze_features: whether to freeze feature extraction layers.
        loss_type: specify 'cross_entropy' or 'focal' for the loss function.
        focal_params: dictionary to configure FocalLoss (e.g., {'alpha': 1, 'gamma': 2}).
        """
        self.num_classes = num_classes
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = self._build_model(pretrained, freeze_features).to(self.device)
        self.optimizer = None
        self.loss_type = loss_type.lower()
        self.focal_params = focal_params or {}

        # Set loss function
        if self.loss_type == 'cross_entropy':
            self.criterion = nn.CrossEntropyLoss()
        elif self.loss_type == 'focal':
            self.criterion = FocalLoss(**self.focal_params)
        else:
            raise ValueError("Invalid loss_type. Choose 'cross_entropy' or 'focal'.")

    def _build_model(self, pretrained, freeze_features):
        weights = Inception_V3_Weights.IMAGENET1K_V1 if pretrained else None
        model = models.inception_v3(weights=weights, aux_logits=True)
        if freeze_features:
            for param in model.parameters():
                param.requires_grad = False

        model.fc = nn.Sequential(
            nn.Linear(model.fc.in_features, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, self.num_classes)
        )

        return model

    def set_optimizer(self, lr=0.001, optimizer_type='SGD', momentum=0.9):
        """Sets the optimizer."""
        if optimizer_type == 'SGD':
            self.optimizer = torch.optim.SGD(filter(lambda p: p.requires_grad, self.model.parameters()), lr=lr, momentum=momentum)
        elif optimizer_type == 'Adam':
            self.optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, self.model.parameters()), lr=lr)
        else:
            raise ValueError("Unsupported optimizer type. Choose 'SGD' or 'Adam'.")

    def train(self, trainloader, epochs=5):
        """Trains the model."""
        self.model.train()
        for epoch in range(epochs):
            running_loss = 0.0
            correct = 0
            total = 0
            for inputs, labels in trainloader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                outputs = outputs.logits
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()
                running_loss += loss.item()

                _, predicted = torch.max(outputs, 1)
                correct += (predicted == labels).sum().item()
                total += labels.size(0)

            print(f"Epoch {epoch + 1}/{epochs}, Loss: {running_loss/len(trainloader):.4f}, Accuracy: {100 * correct / total:.2f}%")

    def evaluate(self, testloader):
        """Evaluates the model."""
        self.model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in testloader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                outputs = self.model(inputs)
                outputs = outputs.logits

                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        print(f"Test Accuracy: {100 * correct / total:.2f}%")


In [12]:
class HybridTrainer:
    def __init__(self, num_classes=10, pretrained=True, freeze_features=True):
        self.num_classes = num_classes
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = self._build_model(pretrained, freeze_features).to(self.device)
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = None  # To be set later

    def _build_model(self, pretrained, freeze_features):
        weights_resnet = ResNet18_Weights.IMAGENET1K_V1 if pretrained else None
        resnet = models.resnet18(weights=weights_resnet)
        if freeze_features:
            for param in resnet.parameters():
                param.requires_grad = False

        inception = models.inception_v3(weights=None, aux_logits=False)  # Use inception without weights or aux_logits
        resnet.fc = nn.Identity()  # Remove fully connected layer in ResNet18
        hybrid_model = nn.Sequential(
            resnet,
            nn.Flatten(),  # Flatten output of ResNet for inception input
            nn.Linear(resnet.fc.in_features, 512),
            nn.ReLU(),
            inception.fc,  # Reuse inception's FC layer
        )
        return hybrid_model

    def set_optimizer(self, lr=0.001, optimizer_type='SGD', momentum=0.9):
        if optimizer_type == 'SGD':
            self.optimizer = torch.optim.SGD(filter(lambda p: p.requires_grad, self.model.parameters()), lr=lr, momentum=momentum)
        elif optimizer_type == 'Adam':
            self.optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, self.model.parameters()), lr=lr)
        else:
            raise ValueError("Unsupported optimizer type. Choose 'SGD' or 'Adam'.")

    def train(self, trainloader, epochs=5):
        self.model.train()
        for epoch in range(epochs):
            running_loss = 0.0
            correct = 0
            total = 0
            for inputs, labels in trainloader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()
                running_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                correct += (predicted == labels).sum().item()
                total += labels.size(0)
            print(f"Epoch {epoch + 1}/{epochs}, Loss: {running_loss/len(trainloader):.4f}, Accuracy: {100 * correct / total:.2f}%")

    def evaluate(self, testloader):
        self.model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in testloader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                outputs = self.model(inputs)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        print(f"Test Accuracy: {100 * correct / total:.2f}%")

In [1]:
import torch
import torchvision.models as models
from torch.utils.data import DataLoader
import torch.nn as nn

class ResNet18Trainer:
    def __init__(self, num_classes=10, pretrained=True, freeze_features=True):
        """
        Initializes the ResNet18 model trainer.
        Args:
        - num_classes: Number of output classes.
        - pretrained: Whether to use pretrained weights.
        - freeze_features: Whether to freeze feature extractor layers.
        """
        self.num_classes = num_classes
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = self._build_model(pretrained, freeze_features).to(self.device)
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = None  # To be set later

    def _build_model(self, pretrained, freeze_features):
        """
        Builds the ResNet18 model with options to load pretrained weights and freeze features.
        """
        try:
            # Attempt to use the new weights API for torchvision
            weights = models.ResNet18_Weights.IMAGENET1K_V1 if pretrained else None
            model = models.resnet18(weights=weights)
        except AttributeError:
            # Fallback for older torchvision versions
            model = models.resnet18(pretrained=pretrained)
        
        if freeze_features:
            for param in model.parameters():
                param.requires_grad = False

        model.fc = nn.Sequential(
            nn.Linear(model.fc.in_features, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, self.num_classes)
        )
        return model

    def set_optimizer(self, lr=0.001, optimizer_type='SGD', momentum=0.9):
        """
        Sets the optimizer for training.
        Args:
        - lr: Learning rate.
        - optimizer_type: Type of optimizer ('SGD' or 'Adam').
        - momentum: Momentum factor (for SGD only).
        """
        if optimizer_type == 'SGD':
            self.optimizer = torch.optim.SGD(filter(lambda p: p.requires_grad, self.model.parameters()), 
                                             lr=lr, momentum=momentum,weight_decay=0.0001)
        elif optimizer_type == 'Adam':
            self.optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, self.model.parameters()), 
                                              lr=lr, weight_decay=0.0001)
        else:
            raise ValueError("Unsupported optimizer type. Choose 'SGD' or 'Adam'.")

    def train(self, trainloader, epochs=5):
        """
        Trains the model.
        Args:
        - trainloader: Dataloader for the training dataset.
        - epochs: Number of epochs to train.
        """
        self.model.train()
        for epoch in range(epochs):
            running_loss = 0.0
            correct = 0
            total = 0

            for inputs, labels in trainloader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                self.optimizer.zero_grad()

                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()

In [3]:
#import auxiliaries as aux
import os 
import torch
import torch.nn as nn
from torchvision import transforms
import os
from torch.utils.data import DataLoader, Subset
import gc
from tqdm import tqdm
#from torchinfo import summary
import time
import auxiliaries as aux

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  

# Define parameters
classification_type = 'make'  # Make or model
num_epochs = 25
batch_size = 32
learning_rate = 1e-4
splits_folder='train_test_split_full_make_100_80_20_0'  # Folder which contains two files train.txt and test.txt with a list of images to use for train and test
#model_name='inceptionmodified'  # Model, for now between inception, resnet18 and resnet-simple                     
loss_name='focal'   # Loss, for now between focal and cross-entropy
patience = 4    # For early stopping
progressive = 5 # Used for differentiating between runs with same parameters
use_data_augmentation=False

image_type=splits_folder.split('_')[3]

# Name the file where we save the model with the parameters used for better readability
#model_save_name=f'model_{model_name}_{classification_type}_{batch_size}_{loss_name}_{progressive}.pt'

# Define root path (where the images are) and train path (where the list of images to use is)
root_dir = os.path.join(os.getcwd(), f'../CompCars/data/{'cropped_image' if image_type=='full' else 'part'}')
file_paths_train = os.path.join(os.getcwd(), f'../CompCars/data/splits/{splits_folder}/classification/train.txt')
root_dir = os.path.join(os.getcwd(),
                          f"C:\\Users\\Utilisateur\\Desktop\\DL\\datasets\\data\\data\\{'cropped_image' if image_type=='full' else 'part'}")     # Root path of where the images are
file_paths_train = os.path.join(os.getcwd(),
                           f"C:\\Users\\Utilisateur\\Desktop\\DL\\datasets\\data\\data\\splits\\{splits_folder}\\classification\\train.txt")  

# Set number of classes
if classification_type == 'make':
    if image_type=='full':
        num_classes = 163
    elif image_type=='part':
        num_classes = 123
elif classification_type == 'model':
    if image_type == 'full':    
        num_classes = 1716
    elif image_type == 'part':
        num_classes = 956
else:
    print('Wrong classification type') 


# Define transformations (resize was arbitrary, normalize was requested from pytorch)
if use_data_augmentation==False:
    transform = transforms.Compose([
        transforms.Resize((299, 299)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
else:
    transform = transforms.Compose([
        transforms.Resize((299, 299)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(15),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.05),
        transforms.RandomPerspective(distortion_scale=0.2, p=0.5),
        transforms.ToTensor(),
        transforms.RandomErasing(p=0.3, scale=(0.02, 0.1), ratio=(0.3, 3.3)),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

# Create custom dataset
dataset = aux.ClassificationImageDataset(root_dir=root_dir, file_paths=file_paths_train, 
                                 classification_type=classification_type, 
                                 transform=transform, train=True, validation_split=0.25)

# Create training and validation subsets using the indices calculated during dataset.__init__()
train_subset = Subset(dataset, dataset.train_indices)
val_subset = Subset(dataset, dataset.val_indices)

# Create dataloaders for training and validation
train_loader = DataLoader(train_subset, batch_size=batch_size)
#valid_loader = DataLoader(val_subset, batch_size=batch_size)
#testloader = DataLoader(testset, batch_size=32, shuffle=False)

In [None]:
import torch
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# Instantiate the model trainer
inception_trainer = InceptionTrainer(num_classes=num_classes, pretrained=True, freeze_features=True)

# Set up optimizer
inception_trainer.set_optimizer(lr=0.001, optimizer_type='Adam')

# Train the model
inception_trainer.train(train_loader, epochs=5)

# Evaluate the model
#inception_trainer.evaluate(testloader)

Epoch 1/5, Loss: 4.3090, Accuracy: 5.88%


In [None]:
import torch
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# Instantiate the model trainer
Res_trainer = ResNet18Trainer(num_classes=num_classes, pretrained=True, freeze_features=True)

# Set up optimizer
Res_trainer.set_optimizer(lr=0.001, optimizer_type='Adam')

# Train the model
Res_trainer.train(train_loader, epochs=5)

# Evaluate the model
#Res_trainer.evaluate(testloader)