In [19]:
import os
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import LambdaLR
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from torchvision import models, transforms
from torch.utils.tensorboard import SummaryWriter
import pandas as pd
import datetime
from tqdm import tqdm
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
import numpy as np
from transformers import ViTModel, ViTConfig

# setting device on GPU if available, else CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)
if device.type == 'cuda':
    print(torch.cuda.get_device_name(0))
    print('Memory Usage:')
    print('Allocated:', round(torch.cuda.memory_allocated(0)/1024**3,1), 'GB')
    print('Cached:   ', round(torch.cuda.memory_reserved(0)/1024**3,1), 'GB')

model_name = "VIT B16"

Using device: cuda
NVIDIA GeForce RTX 3090
Memory Usage:
Allocated: 0.0 GB
Cached:    0.0 GB


In [20]:
# Define the data transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Adjust size for ViT
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Set the paths for the datasets
base_folder = "../datasets/Vision_data"
train_folder = os.path.join(base_folder, "train")
test_folder = os.path.join(base_folder, "test")
validation_folder = os.path.join(base_folder, "validation")

# Create datasets
train_dataset = ImageFolder(root=train_folder, transform=transform)
test_dataset = ImageFolder(root=test_folder, transform=transform)
validation_dataset = ImageFolder(root=validation_folder, transform=transform)

# Create DataLoaders
batchsize = 32
numworkers = 6
train_loader = DataLoader(train_dataset, batch_size=batchsize, shuffle=True, num_workers=numworkers)
test_loader = DataLoader(test_dataset, batch_size=batchsize, shuffle=False, num_workers=numworkers)
validation_loader = DataLoader(validation_dataset, batch_size=batchsize, shuffle=False, num_workers=numworkers)


In [21]:
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score

def train_one_epoch(model, data_loader, criterion, optimizer, device, epoch, num_epochs):
    model.train()
    total_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    all_labels = []
    all_predictions = []

    # Update the tqdm description to show current epoch
    for inputs, labels in tqdm(data_loader, desc=f"Epoch {epoch}/{num_epochs} - Training"):
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * inputs.size(0)
        _, predicted = torch.max(outputs, 1)
        
        correct_predictions += (predicted == labels).sum().item()
        total_predictions += labels.size(0)
        
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())

    avg_loss = total_loss / len(data_loader.dataset)
    accuracy = accuracy_score(all_labels, all_predictions)
    precision = precision_score(all_labels, all_predictions, average='macro', zero_division=0)
    recall = recall_score(all_labels, all_predictions, average='macro', zero_division=0)
    f1 = f1_score(all_labels, all_predictions, average='macro', zero_division=0)

    return avg_loss, accuracy, precision, recall, f1

In [22]:
def evaluate(model, data_loader, criterion, device, epoch, num_epochs, phase='Validation'):
    model.eval()
    total_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    all_labels = []
    all_predictions = []

    # Update the tqdm description to show current epoch and phase (Validation or Testing)
    for inputs, labels in tqdm(data_loader, desc=f"Epoch {epoch}/{num_epochs} - {phase}"):
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
            
        total_loss += loss.item() * inputs.size(0)
        _, predicted = torch.max(outputs, 1)
            
        correct_predictions += (predicted == labels).sum().item()
        total_predictions += labels.size(0)
            
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())

    avg_loss = total_loss / len(data_loader.dataset)
    accuracy = accuracy_score(all_labels, all_predictions)
    precision = precision_score(all_labels, all_predictions, average='macro', zero_division=0)
    recall = recall_score(all_labels, all_predictions, average='macro', zero_division=0)
    f1 = f1_score(all_labels, all_predictions, average='macro', zero_division=0)

    return avg_loss, accuracy, precision, recall, f1

In [23]:
class EarlyStopping:
    def __init__(self, patience=7, verbose=False, delta=0):
        """
        Args:
            patience (int): How many epochs to wait after last time validation loss improved.
                            Default: 7
            verbose (bool): If True, prints a message for each validation loss improvement. 
                            Default: False
            delta (float): Minimum change in the monitored quantity to qualify as an improvement.
                           Default: 0
        """
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = float('inf')
        self.delta = delta

    def __call__(self, val_loss, model, model_path):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(val_loss, model, model_path)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model, model_path)
            self.counter = 0

    def save_checkpoint(self, val_loss, model, model_path):
        """Saves model when validation loss decrease."""
        if val_loss < self.val_loss_min:
            if self.verbose:
                print(f'Validation loss decreased ({self.val_loss_min:.6f} to {val_loss:.6f}). Saving model ...')
            torch.save(model.state_dict(), model_path)
            self.val_loss_min = val_loss

In [24]:
# Initialize the model
num_classes = len(train_dataset.classes)
print(num_classes)
model = models.vit_b_16(weights=models.ViT_B_16_Weights.IMAGENET1K_V1)
if hasattr(model, 'heads'):  # Check if the model has the attribute 'heads'
    in_features = model.heads.in_features  # Get the number of input features from the current head
    model.heads = nn.Linear(in_features, num_classes)  # Replace it with a new Linear layer with the correct number of output classes
else:
    print("The model does not have a 'heads' attribute. Please check the model architecture.")

model.to(device)

35


AttributeError: 'Sequential' object has no attribute 'in_features'

In [None]:
# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

# Initialize EarlyStopping
early_stopping = EarlyStopping(patience=5, verbose=True, delta=0.001)
model_path = "../output/vitb16/ViT_B_16_best.pth"  # Adjust as needed

# Initialize TensorBoard
writer = SummaryWriter('../output/vitb16_experiment')  # Adjust as needed

In [None]:
# DataFrame to store metrics
columns = pd.DataFrame(columns=[
    'Epoch', 'Training Loss', 'Validation Loss', 'Test Loss',
    'Training Accuracy', 'Validation Accuracy', 'Test Accuracy',
    'Training Precision', 'Training Recall', 'Training F1-Score',
    'Validation Precision', 'Validation Recall', 'Validation F1-Score',
    'Test Precision', 'Test Recall', 'Test F1-Score'
])

df = pd.DataFrame(columns=columns)

# Training loop
num_epochs = 50
for epoch in range(num_epochs):
    train_loss, train_accuracy, train_prec, train_rec, train_f1 = train_one_epoch(
        model, train_loader, criterion, optimizer, device, epoch, num_epochs)
    
    val_loss, val_accuracy, val_prec, val_rec, val_f1 = evaluate(
        model, validation_loader, criterion, device, epoch, num_epochs)

    # Early stopping and scheduler
    early_stopping(val_loss, model, model_path)
    if early_stopping.early_stop:
        print("Early stopping")
        break
    scheduler.step()

    # Collect data for DataFrame
    df_metrics = df_metrics.append({
        'Epoch': epoch + 1,
        'Training Loss': train_loss, 'Validation Loss': val_loss, 'Test Loss': None,
        'Training Accuracy': train_accuracy, 'Validation Accuracy': val_accuracy, 'Test Accuracy': None,
        'Training Precision': train_prec, 'Training Recall': train_rec, 'Training F1-Score': train_f1,
        'Validation Precision': val_prec, 'Validation Recall': val_rec, 'Validation F1-Score': val_f1,
        'Test Precision': None, 'Test Recall': None, 'Test F1-Score': None
    }, ignore_index=True)

# Final evaluation on the test set
test_loss, test_accuracy, test_prec, test_rec, test_f1 = evaluate(
    model, test_loader, criterion, device, epoch, num_epochs, phase='Testing')

# Update DataFrame with test results
df_metrics.loc[df_metrics['Epoch'] == num_epochs, ['Test Loss', 'Test Accuracy', 'Test Precision', 'Test Recall', 'Test F1-Score']] = \
    [test_loss, test_accuracy, test_prec, test_rec, test_f1]

# Save DataFrame to CSV
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
csv_filename = f"ViT_B_16_training_results_{timestamp}.csv"
df_metrics.to_csv(csv_filename, index=False)

In [None]:
import matplotlib.pyplot as plt
# Plotting
plt.figure(figsize=(10, 5))
plt.plot(df_metrics['Epoch'], df_metrics['Training Loss'], label='Training Loss')
plt.plot(df_metrics['Epoch'], df_metrics['Validation Loss'], label='Validation Loss')
plt.plot(df_metrics['Epoch'], df_metrics['Test Loss'], label='Test Loss', linestyle='--')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Loss over Epochs')
plt.legend()
plt.show()

plt.figure(figsize=(10, 5))
plt.plot(df_metrics['Epoch'], df_metrics['Training Accuracy'], label='Training Accuracy')
plt.plot(df_metrics['Epoch'], df_metrics['Validation Accuracy'], label='Validation Accuracy')
plt.plot(df_metrics['Epoch'], df_metrics['Test Accuracy'], label='Test Accuracy', linestyle='--')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Accuracy over Epochs')
plt.legend()
plt.show()
