In [None]:
# %pip install torch torchvision torchaudio

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models import resnet50
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

In [None]:
INPUT_DIR_A = '../data/model_input_a'
INPUT_DIR_B = '../data/model_input_b'

In [None]:
if torch.cuda.is_available():
    device = "cuda"
elif torch.backends.mps.is_available():
    device = "mps"
else:
    device = "cpu"

print(f"Device: {device}")

In [None]:
def get_dataset(data_dir, batch_size=1, random_perspective=0):
    # Define the transformation
    transform = transforms.Compose([
        transforms.RandomPerspective(p=random_perspective),  # Apply a random perspective transformation
        transforms.ToTensor()  # Convert the image to a tensor
    ])

    # Define the dataset
    dataset = datasets.ImageFolder(data_dir, transform=transform)

    # Verify the classes
    classes = dataset.classes
    dataset = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    return dataset, classes



In [None]:
def prepare_model(name='resnet50', num_classes=2, linear_layer_size=2048):
    model = None

    if name == 'resnet50':
        model = resnet50(weights='ResNet50_Weights.DEFAULT')
        
    for param in model.parameters():
        param.requires_grad = False 

    model.fc = nn.Linear(linear_layer_size, num_classes)

    model.to(device)    

    # Define the loss function
    criterion = nn.CrossEntropyLoss()

    # Define the optimizer
    optimizer = optim.SGD(model.fc.parameters(), lr=0.001, momentum=0.9)    

    return model, criterion, optimizer

In [None]:
def training_loop(model, images, labels, optimizer, criterion, running_loss, total, correct):
    images = images.to(device)
    labels = labels.to(device)
    
    optimizer.zero_grad()
    outputs = model(images)
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()
    
    running_loss += loss.item()
    _, predicted = torch.max(outputs.data, 1)
    total += labels.size(0)
    correct += (predicted == labels).sum().item()

    return running_loss, total, correct

In [None]:
def evaluate_model(model, testloader, criterion):
    correct = 0
    total = 0
    running_loss = 0.0

    with torch.no_grad():
        for data in testloader:
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    return running_loss, total, correct

In [None]:
train_dataset, classes = get_dataset(INPUT_DIR_A, random_perspective=.5)
test_dataset, _ = get_dataset(INPUT_DIR_B)
model, criterion, optimizer = prepare_model()

# Define the number of training epochs
num_epochs = 100

# Training loop
for epoch in range(num_epochs):
    running_loss = 0.0
    correct = 0
    total = 0
    
    # Set the model to training mode
    model.train()
    
    for images, labels in train_dataset:
        running_loss, total, correct = training_loop(model, images, labels, optimizer, criterion, running_loss, total, correct)
    
    # Compute the accuracy
    accuracy = 100 * correct / total
    
    # Print the loss and accuracy for each epoch
    print(f"Epoch {epoch+1}/{num_epochs}: Loss = {running_loss:.4f}, Accuracy = {accuracy:.2f}%", end=", ")

    # Set the model to evaluation mode
    model.eval()

    running_loss, total, correct = evaluate_model(model, test_dataset, criterion)
    
    # Compute the accuracy
    test_accuracy = 100 * correct / total

    print(f"Test Accuracy = {test_accuracy:.2f}%")

    if accuracy > 90:
        break
