In [1]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Install necessary dependencies
!pip install torch torchvision tqdm

# Import libraries
import os
import random
import shutil
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
from torchvision.models import resnet50, resnet101, resnet152
from PIL import Image
from tqdm.notebook import tqdm
from torchvision.models.resnet import ResNet50_Weights, ResNet101_Weights, ResNet152_Weights

# Set random seed for reproducibility
random.seed(42)
torch.manual_seed(42)

Mounted at /content/drive
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


<torch._C.Generator at 0x7fc718296590>

In [2]:
# Define variables
model_name = "egoist_V1"
train_percentage = 0.8
batch_size = 64
num_epochs = 30
load_pretrained = False  # Change this to True if you want to load a pretrained model
pretrained_model_name = "egoist_V1_c14_m101"  # Specify the name of the pretrained model
model_type = "101"

# Define paths
project_folder = '/content/drive/MyDrive/colab_projects/ai_image_grouper'
original_folder = f'{project_folder}/data'
training_folder = f'{project_folder}/training'
validation_folder = f'{project_folder}/validation'
model_folder = f'{project_folder}/models'

In [3]:
# Define functions

def count_folders(directory):
    return len([folder for folder in os.listdir(directory) if os.path.isdir(os.path.join(directory, folder))])

def remove_folder(directory):
    if os.path.exists(directory):
        shutil.rmtree(directory)

num_categories = count_folders(original_folder)

In [9]:
remove_folder(training_folder)
remove_folder(validation_folder)

# Load and preprocess data

print("\nConverting PNG images to RGBA")
# Recursively search for PNG images and convert them to RGBA
for root, dirs, files in os.walk(original_folder):
    for file in files:
        if file.endswith('.png'):
            image_path = os.path.join(root, file)
            im = Image.open(image_path)
            if im.format == 'PNG' and im.mode != 'RGBA':
                im = im.convert('RGBA')
                im.save(image_path)

print("Conversion completed!\n")

print(f"Categories found: {num_categories}\n")

# Create the "training" and "validation" folders if they don't exist
os.makedirs(training_folder, exist_ok=True)
os.makedirs(validation_folder, exist_ok=True)
os.makedirs(model_folder, exist_ok=True)

# Randomly split images into training and validation folders
for root, dirs, files in os.walk(original_folder):
    # Get the relative path from the original folder
    relative_path = os.path.relpath(root, original_folder)

    # Create the corresponding folders in the "training" and "validation" directories
    training_dir = os.path.join(training_folder, relative_path)
    validation_dir = os.path.join(validation_folder, relative_path)
    os.makedirs(training_dir, exist_ok=True)
    os.makedirs(validation_dir, exist_ok=True)

    # Randomly shuffle the list of files
    random.shuffle(files)

    # Split the files based on the train_percentage
    train_size = int(len(files) * train_percentage)
    train_files = files[:train_size]
    validation_files = files[train_size:]

    # Move the files to the "training" and "validation" folders
    for file in train_files:
        src = os.path.join(root, file)
        dst = os.path.join(training_dir, file)
        shutil.copy(src, dst)

    for file in validation_files:
        src = os.path.join(root, file)
        dst = os.path.join(validation_dir, file)
        shutil.copy(src, dst)


Converting PNG images to RGBA
Conversion completed!

Categories found: 14



In [4]:
# Define the model

if model_type == '50':
    model = resnet50(weights=ResNet50_Weights.DEFAULT)
elif model_type == '101':
    model = resnet101(weights=ResNet101_Weights.DEFAULT)
elif model_type == '152':
    model = resnet152(weights=ResNet152_Weights.DEFAULT)
else:
    raise ValueError("Invalid model_type. Please choose from '50', '101', or '152'.")

num_features = model.fc.in_features

# Modify the fully connected layer for the number of categories
model.fc = nn.Linear(num_features, num_categories)

Downloading: "https://download.pytorch.org/models/resnet101-cd907fc2.pth" to /root/.cache/torch/hub/checkpoints/resnet101-cd907fc2.pth
100%|██████████| 171M/171M [00:02<00:00, 70.0MB/s]


In [5]:
# Define data transformations

train_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(degrees=45),
    transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=0.5),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [6]:
# Create data loaders

# Create the ImageFolder dataset for training
training_dataset = ImageFolder(training_folder, transform=train_transform)

# Create a data loader for the training dataset
training_loader = DataLoader(training_dataset, batch_size=batch_size, shuffle=True)

# Create the ImageFolder dataset for validation
validation_dataset = ImageFolder(validation_folder, transform=val_transform)

# Create a data loader for the validation dataset
validation_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)

In [7]:
# Define loss function, optimizer, and scheduler

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=3)

In [8]:
# Training loop

# Check if a pretrained model should be loaded
epoch_index = 0

if load_pretrained:
    checkpoint = torch.load(f"{model_folder}/{pretrained_model_name}.ckp")
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    num_epochs += checkpoint['epoch']
    epoch_index = checkpoint['epoch']

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device}")
model.to(device)

best_val_accuracy = 0.0

print("Training...\n")

for epoch in range(epoch_index, num_epochs):
    model.train()  # Set the model to training mode
    running_loss = 0.0

    progress_bar = tqdm(enumerate(training_loader), total=len(training_loader), desc=f"Epoch {epoch+1}/{num_epochs}", leave=False)

    for batch_idx, (images, labels) in progress_bar:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

        progress_bar.set_description(f"Epoch {epoch+1}/{num_epochs}, Batch {batch_idx+1}/{len(training_loader)}")

    # Validation loop
    model.eval()  # Set the model to evaluation mode

    val_loss = 0.0
    val_correct = 0
    val_total = 0

    val_progress_bar = tqdm(validation_loader, desc=f"Epoch {epoch+1}/{num_epochs}, Validation", leave=False)

    with torch.no_grad():
        for val_images, val_labels in val_progress_bar:
            val_images, val_labels = val_images.to(device), val_labels.to(device)
            val_outputs = model(val_images)
            val_batch_loss = criterion(val_outputs, val_labels)
            val_loss += val_batch_loss.item()

            _, val_predicted = torch.max(val_outputs.data, 1)
            val_total += val_labels.size(0)
            val_correct += (val_predicted == val_labels).sum().item()

            val_progress_bar.set_postfix(loss=val_loss / len(validation_loader), accuracy=(val_correct / val_total) * 100)

    val_accuracy = (val_correct / val_total) * 100
    val_loss /= len(validation_loader)
    tqdm.write(f"Epoch {epoch+1}/{num_epochs}, Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")

    torch.save({
        'epoch': epoch+1,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': val_loss,
    }, f'{model_folder}/{model_name}_c{num_categories}_m{model_type}.ckp')

    # Check if the current model has the best validation accuracy
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        # Save the current best model
        torch.save(model.state_dict(), f'{model_folder}/{model_name}_c{num_categories}_m{model_type}.pth')

    # Update the learning rate
    scheduler.step(val_loss)

print(f"\nTraining completed. Best validation accuracy: {best_val_accuracy:.4f}%")

Using cpu
Training...



Epoch 1/30:   0%|          | 0/15 [00:00<?, ?it/s]

KeyboardInterrupt: ignored