In [None]:
# Full image classification pipeline using transfer learning and CNN

In [None]:
# %pip install torch
# %pip install torchvision

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt
import time
import copy

In [None]:
# Path to your dataset
data_dir = "/Users/eabowman/Dropbox/LichenProject/dataset"

In [None]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Check for valid image files inside subfolders
base_dir = "/Users/eabowman/Dropbox/LichenProject/dataset"
classes = os.listdir(base_dir)

for cls in classes[:5]:  # Check first 5 folders
    cls_path = os.path.join(base_dir, cls)
    if os.path.isdir(cls_path):
        print(f"\n{cls} contains:")
        print(os.listdir(cls_path))

In [None]:
# Show which class folders are empty class
for cls in os.listdir(base_dir):
    cls_path = os.path.join(base_dir, cls)
    if os.path.isdir(cls_path):
        files = os.listdir(cls_path)
        if not any(f.lower().endswith(('.jpg', '.jpeg', '.png', '.tif', '.tiff')) for f in files):
            print(f"⚠️ No valid images in: {cls_path}")

In [None]:
# Define a function that checks if a file has a valid image extension
def is_valid_image(filename):
    valid_extensions = ('.jpg', '.jpeg', '.png', '.bmp', '.tif', '.tiff', '.webp')
    return filename.lower().endswith(valid_extensions)

In [None]:
# Load the dataset
dataset = datasets.ImageFolder(
    root=data_dir,
    transform=transform,
    is_valid_file=is_valid_image,  # optional, but robust
    allow_empty=True  # prevents crashing due to empty folders
)

In [None]:
# Split dataset
# 80% training and 20% validation
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

In [None]:
# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)

In [None]:
# Class names
class_names = dataset.classes
print(f"Found {len(class_names)} species (classes).")

In [None]:
# Model using transfer learning on top of a CNN backbone
weights = ResNet18_Weights.DEFAULT  # Use the latest available weights
model = resnet18(weights=weights)

In [None]:
# Print model architecture to confirm it loaded
print(model)
# double-check that weights were applied
print(weights.meta)

In [None]:
# Replace the final classification layer to match your number of classes
num_classes = len(class_names)
model.fc = nn.Linear(model.fc.in_features, num_classes)

In [None]:
# check final layer
print(model.fc)

In [None]:
# Check if PyTorch detects a GPU
iprint("CUDA available:", torch.cuda.is_available())
print("GPU name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU found")

In [None]:
# Move the model to the right device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

In [None]:
# Set up loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

In [None]:
# Learning rate scheduler
exp_lr_scheduler = optim.StepLR(optimizer, step_size=7, gamma=0.1)

In [None]:
# Set the data directory paths
train_dir = f'{data_dir}/train'
val_dir = f'{data_dir}/val'

In [None]:
# Load datasets
image_datasets = {
    'train': datasets.ImageFolder(train_dir, data_transforms['train']),
    'val': datasets.ImageFolder(val_dir, data_transforms['val'])
}

In [None]:
# Create dataloaders
dataloaders = {
    'train': DataLoader(image_datasets['train'], batch_size=32, shuffle=True, num_workers=2),
    'val': DataLoader(image_datasets['val'], batch_size=32, shuffle=False, num_workers=2)
}

# And you can get the class names like this:
class_names = image_datasets['train'].classes

In [None]:
# Define training model
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25, device='cpu'):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        print("-" * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluation mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                # Forward
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # Backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print(f"{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")

            # Deep copy the model if it’s the best so far
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

    time_elapsed = time.time() - since
    print(f"\nTraining complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s")
    print(f"Best val Acc: {best_acc:.4f}")

    # Load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [None]:
# Start training loop or call train_model() function
num_epochs = 25  # or however many you want

model = train_model(model, dataloaders, criterion, optimizer, exp_lr_scheduler, num_epochs=num_epochs)

In [None]:
# Check model
# Load a local image and predict
# Replace with your image path
img_path = "/Users/eabowman/Dropbox/LichenProject/test_images/tcm-23467-acarospora_rosulata.jpeg"
img = Image.open(img_path).convert('RGB')

# Preprocess it
preprocess = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])
input_tensor = preprocess(img)
input_batch = input_tensor.unsqueeze(0)  # Add batch dimension

# Move to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
input_batch = input_batch.to(device)

# Run the model
with torch.no_grad():
    output = model(input_batch)
    probabilities = torch.nn.functional.softmax(output[0], dim=0)

# Get top 5 predictions
top5_prob, top5_catid = torch.topk(probabilities, 5)
for i in range(top5_prob.size(0)):
    print(f"{imagenet_classes[top5_catid[i]]}: {top5_prob[i].item():.4f}")
