In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Import libraries
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Subset, DataLoader, random_split, ConcatDataset
from torch.optim.lr_scheduler import CosineAnnealingLR
from torchvision.datasets import ImageFolder
from torchvision import transforms, models
from torchsummary import summary
import numpy as np
import pandas as pd
import os
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.utils.class_weight import compute_class_weight
from sklearn.manifold import TSNE
import seaborn as sns
import matplotlib.pyplot as plt
import copy

In [None]:
# Dataset Split

# Transformations
transform = transforms.Compose([
    transforms.Resize((300, 300)), # resize to 300x300 pixels 
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # ImageNet normalization
])

# Dataset 
data_dir = '/content/drive/My Drive/capstone' # directory of the image dataset within Google Drive classified into the folders 'good' and 'bad'
dataset = ImageFolder(root=data_dir, transform=transform)

print(f"Class mapping: {dataset.class_to_idx}")  # 'good' -> 1, 'bad' -> 0

# Seed
seed = 42 #start seed to replicate results
generator = torch.Generator().manual_seed(seed)

# Original and Augmented Datasets indices
orig_indices = []
aug_indices = []

for idx, (path, _) in enumerate(dataset.samples):
    if '.aug' in path:
        aug_indices.append(idx)
    else:
        orig_indices.append(idx)

original_dataset = Subset(dataset, orig_indices)

# Split Original Dataset into 70/15/15
total_orig = len(original_dataset)
train_size = int(0.7 * total_orig)
val_size = int(0.15 * total_orig)
test_size = total_orig - train_size - val_size

train_orig_dataset, val_dataset, test_dataset = random_split(original_dataset, [train_size, val_size, test_size], generator=generator)

# Get image base name without augmentation suffix or file extension
def get_image_id(path):    
    basename = os.path.basename(path)    
    name = basename.split('.jpg') # split by '.jpg' first    
    base_id = name[0].split('.aug') # split by '.aug' afterward
    return base_id[0]

# Validation and Test Sets indices
val_test_image_ids = set()
for subset in [val_dataset, test_dataset]:
    for idx in subset.indices:
        orig_idx = orig_indices[idx]
        path = dataset.samples[orig_idx][0]
        image_id = get_image_id(path)
        val_test_image_ids.add(image_id)

# Filter Augmented Dataset to include only augmented images corresponding to the test set
filtered_aug_indices = []
for aug_idx in aug_indices:
    path = dataset.samples[aug_idx][0]
    image_id = get_image_id(path)
    if image_id not in val_test_image_ids:
        filtered_aug_indices.append(aug_idx)

filt_aug_dataset = Subset(dataset, filtered_aug_indices)

# Final Training Set
train_dataset = ConcatDataset([train_orig_dataset, filt_aug_dataset])

# DataLoaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)

print(f"Train size: {len(train_dataset)}, Validation size: {len(val_dataset)}, Test size: {len(test_dataset)}")


In [None]:
# Save split information into dataframes

# Get base dataset and indices from subset
def get_base_dataset(dataset):
    while isinstance(dataset, Subset):
        indices = [dataset.indices[i] for i in indices]
        dataset = dataset.dataset
    return dataset, indices

# Get subset filenames and labels
def get_subset_info(subset):
    # Get the base dataset and effective indices
    base_dataset, indices = get_base_dataset(subset)

    # Get full paths and extract just the filenames
    full_paths = [base_dataset.imgs[i][0] for i in indices]
    filenames = [os.path.basename(path) for path in full_paths]

    # Get labels and class names
    labels = [base_dataset.targets[i] for i in indices]
    class_names = base_dataset.classes

    # Convert numeric labels to class names
    label_names = [class_names[label] for label in labels]

    return filenames, labels, label_names

train_files, train_labels, train_classes = get_subset_info(train_orig_dataset)
val_files, val_labels, val_classes = get_subset_info(val_dataset)
test_files, test_labels, test_classes = get_subset_info(test_dataset)
train_aug_files, train_aug_labels, train_aug_classes = get_subset_info(filt_aug_dataset)

train_df = pd.DataFrame({'filename': train_files, 'class': train_classes, 'label': train_labels})
val_df = pd.DataFrame({'filename': val_files, 'class': val_classes, 'label': val_labels})
test_df = pd.DataFrame({'filename': test_files, 'class': test_classes, 'label': test_labels})
train_aug_df = pd.DataFrame({'filename': train_aug_files, 'class': train_aug_classes, 'label': train_aug_labels})

output_dir = data_dir # root path for saving csv files

# Full paths for csv files
train_csv_path = os.path.join(output_dir, 'train_set.csv')
val_csv_path = os.path.join(output_dir, 'val_set.csv')
test_csv_path = os.path.join(output_dir, 'test_set.csv')
train_aug_csv_path = os.path.join(output_dir, 'train_aug_set.csv')

train_df.to_csv(train_csv_path, index=False)
val_df.to_csv(val_csv_path, index=False)
test_df.to_csv(test_csv_path, index=False)
train_aug_df.to_csv(train_aug_csv_path, index=False)


In [None]:
# Model definition

# Use GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# EfficientNet-B3 model
model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.IMAGENET1K_V1)
num_features = model.classifier[1].in_features
model.classifier = nn.Sequential(
    nn.Dropout(p=0.3),
    nn.Linear(num_features, 2)
)
model = model.to(device)

# Unfreeze the last few layers
for name, param in model.named_parameters():
    if "features.6" in name or "classifier" in name:  # EfficientNet last block and classifier
        param.requires_grad = True
    else:
        param.requires_grad = False

# Hyperparameters
learning_rate = 1e-3
start_epoch = 0
num_epochs = 30

optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=learning_rate, weight_decay=1e-4)
scheduler = CosineAnnealingLR(optimizer, T_max=num_epochs, eta_min=1e-5)

# Class weights for train set
train_df_tot = pd.concat([train_df,train_aug_df])
all_labels = [row['label'] for _, row in train_df_tot.iterrows()]
classes = np.unique(all_labels)
class_weights = compute_class_weight(class_weight='balanced', classes=classes, y=all_labels)
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)

# Loss function
criterion = nn.CrossEntropyLoss(weight=class_weights, label_smoothing=0.1)

# Early stopping
early_stopping_patience = 8
best_val_loss = float('inf')
no_improvement = 0

# Checkpoint
checkpoint_path = data_dir + '/checkpoint.pth'

# Model summary
summary(model, input_size=(3, 300, 300))


In [None]:
# Model training

for epoch in range(start_epoch, num_epochs):
    model.train()
    running_loss = 0.0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    avg_train_loss = running_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}")

    # Validation
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())

    avg_val_loss = val_loss / len(val_loader)
    val_accuracy = 100 * correct / total

    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {avg_train_loss:.4f}, "
          f"Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%")

    cm = confusion_matrix(all_labels, all_preds)
    print("Confusion Matrix:\n", cm)

    report = classification_report(all_labels, all_preds, target_names=['Bad', 'Good'], digits=4)
    print("Classification Report:\n", report)

    # Save checkpoint and check early stopping
    if avg_val_loss < best_val_loss:
        no_improvement = 0
        best_val_loss = avg_val_loss
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_loss': avg_val_loss,
        }, checkpoint_path)
        print(f"Checkpoint saved at epoch {epoch+1}")
    else:
        no_improvement += 1
        if no_improvement >= early_stopping_patience:
            print("Early stopping triggered.")
            break

    scheduler.step()

In [None]:
# Load model checkpoint

# Use GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# EfficientNet-B3 model
model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.IMAGENET1K_V1)
num_features = model.classifier[1].in_features
model.classifier = nn.Sequential(
    nn.Dropout(p=0.3),
    nn.Linear(num_features, 2)
)

data_dir = '/content/drive/My Drive/capstone'
checkpoint_path = data_dir + '/checkpoint.pth'
checkpoint = torch.load(checkpoint_path)

model.load_state_dict(checkpoint['model_state_dict'])

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Unfreeze the last few layers
for name, param in model.named_parameters():
    if "features.6" in name or "classifier" in name:  # EfficientNet last block and classifier
        param.requires_grad = True
    else:
        param.requires_grad = False

# Hyperparameters
learning_rate = 1e-3
num_epochs = 30
start_epoch = checkpoint['epoch'] + 1
best_val_loss = checkpoint['val_loss']

optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=learning_rate, weight_decay=1e-4)
scheduler = CosineAnnealingLR(optimizer, T_max=num_epochs, eta_min=1e-5)
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

# Class weights for train set
train_df_tot = pd.concat([train_df,train_aug_df])
all_labels = [row['label'] for _, row in train_df_tot.iterrows()]
classes = np.unique(all_labels)
class_weights = compute_class_weight(class_weight='balanced', classes=classes, y=all_labels)
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)

# Loss function
criterion = nn.CrossEntropyLoss(weight=class_weights, label_smoothing=0.1)

# Early stopping
early_stopping_patience = 8
no_improvement = 0

# checkpoint_path = data_dir + '/checkpoint2.pth' # new checkpoint


In [None]:
# Test

true_labels = []
pred_labels = []
features = []
misclassified_indices = []

feature_extractor_model = copy.deepcopy(model)
feature_extractor_model.classifier = torch.nn.Identity()  # remove classifier for feature extraction
feature_extractor_model.eval()
model.eval()

threshold = 0.7

with torch.no_grad():
    for batch_idx, (images, labels) in enumerate(test_loader):
        images, labels = images.to(device), labels.to(device)
        outputs = model(images) #classification
        extracted_features = feature_extractor_model(images)
        features.append(extracted_features.cpu().numpy())

        probabilities = F.softmax(outputs, dim=1)
        prob = probabilities[:, 1]
        predicted = (prob >= threshold).int()

        true_labels.extend(labels.cpu().numpy())
        pred_labels.extend(predicted.cpu().numpy())

        batch_misclassified = (predicted != labels).cpu().numpy()
        misclassified_indices.extend(np.where(batch_misclassified)[0] + batch_idx * test_loader.batch_size)

features = np.vstack(features)
true_labels = np.array(true_labels)
pred_labels = np.array(pred_labels)

test_df['prediction'] = pred_labels
test_csv_path = data_dir + '/test_results.csv'
test_df.to_csv(test_csv_path, index=False)

# Confusion matrix
cm = confusion_matrix(true_labels, pred_labels)

plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Bad', 'Good'], yticklabels=['Bad', 'Good'])
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix')
plt.show()

# Classification report
report = classification_report(true_labels, pred_labels, target_names=['Bad', 'Good'], digits=4)
print("Classification Report:\n", report)

# t-SNE
embedded_features = TSNE(n_components=2, perplexity=30, random_state=42).fit_transform(features)
plt.figure(figsize=(8, 6))
scatter = plt.scatter(embedded_features[:, 0], embedded_features[:, 1], c=true_labels, cmap='RdYlGn', alpha=0.7)
plt.legend(*scatter.legend_elements(), title="Classes")
plt.scatter(embedded_features[misclassified_indices, 0], embedded_features[misclassified_indices, 1], c='black', marker='x', label='Misclassified', s=60)

plt.title("t-SNE Visualization of Image Features")
plt.show()
