# Image Classification

In [None]:
# Essential packages
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
from PIL import Image

# Torch packages
#!pip install torch
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

torch.set_float32_matmul_precision('medium')

# Torch Metrics
#!pip install torchmetrics
import torchmetrics

# Torch Vision
#!pip install torchvision
from torchvision import transforms

# Lightning
#!pip install pytorch-lightning
import pytorch_lightning as pl
from pytorch_lightning.loggers import TensorBoardLogger

# Optuna
#!pip install optuna
#!pip install optuna-integration[pytorch_lightning]
import optuna
from optuna.integration.pytorch_lightning import PyTorchLightningPruningCallback

# Scikit-Learn packages
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight
from sklearn.metrics import (
    roc_curve, auc, precision_recall_curve, roc_auc_score,
    precision_score, recall_score, f1_score, classification_report
)

# Load data
X_train = np.load("./data/Xtrain1.npy")
y_train = np.load("./data/Ytrain1.npy")
X_train_extra = np.load("./data/Xtrain1_extra.npy")
X_test = np.load("./data/Xtest1.npy")

print(f"X_train shape: {X_train.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"X_train_extra shape: {X_train_extra.shape}")
print(f"X_test shape: {X_test.shape}")

## Data Exploration

### Print images with matplotlib

In [None]:
print(f"Number of images: {X_train.shape[0]}, Image size: {X_train.shape[1]}")
print(f"Number of classes: {len(np.unique(y_train))}\n")

# Print a specific image
#img1 = X_train[1,:].reshape((48,48))
#imgplot = plt.imshow(img1, cmap='gray')

# Print all images
num_images = X_train.shape[0]
img_shape = (48, 48)

images_per_figure = 200
cols = 20
rows = int(np.ceil(images_per_figure / cols))

# Loop through all images in batches
for start_idx in range(0, num_images, images_per_figure):
    fig, axes = plt.subplots(rows, cols, figsize=(cols * 2, rows * 2))
    axes = axes.flatten()

    for i, ax in enumerate(axes):
        img_idx = start_idx + i
        if img_idx < num_images:
            img = X_train[img_idx, :].reshape(img_shape)
            ax.imshow(img, cmap='gray')
            ax.axis('off')
        else:
            ax.axis('off')

    plt.tight_layout()
    plt.show()

### Save images to PDF file

In [None]:
# Install necessary packages
!pip install PyPDF2 pillow

from PyPDF2 import PdfMerger
from PIL import Image
import os
import numpy as np
import matplotlib.pyplot as plt

X = np.array(X_train)  # Training images
y = np.array(y_train)  # Corresponding labels

num_images = X.shape[0]
img_shape = (48, 48)
cols = 10
images_per_batch = 100
rows = int(np.ceil(images_per_batch / cols))
batch_image_paths = []

for batch_start in range(0, num_images, images_per_batch):
    fig, axes = plt.subplots(rows, cols, figsize=(cols * 2, rows * 2))
    axes = axes.flatten()

    for i, ax in enumerate(axes):
        img_idx = batch_start + i
        if img_idx < num_images:
            img = X[img_idx, :].reshape(img_shape)
            ax.imshow(img, cmap='gray')
            ax.set_title(f'ID: {img_idx}, [{y[img_idx]}]')
            ax.axis('off')
        else:
            ax.axis('off')

    batch_filename = f'batch_{batch_start // images_per_batch}.png'
    plt.savefig(batch_filename, bbox_inches='tight', pad_inches=0.1, dpi=120)
    batch_image_paths.append(batch_filename)
    plt.close(fig)

batch_pdfs = []
for batch_image in batch_image_paths:
    img = Image.open(batch_image)
    if img.mode != 'RGB':
        img = img.convert('RGB')
    img_resized = img.resize((img.width // 2, img.height // 2), Image.Resampling.LANCZOS)
    pdf_filename = batch_image.replace('.png', '.pdf')
    img_resized.save(pdf_filename, 'PDF', resolution=100.0, quality=100, optimize=True)
    batch_pdfs.append(pdf_filename)

pdf_merger = PdfMerger()
for pdf_file in batch_pdfs:
    pdf_merger.append(pdf_file)

final_pdf_filename = 'all_images_merged.pdf'
pdf_merger.write(final_pdf_filename)
pdf_merger.close()

print(f"Final merged PDF saved as '{final_pdf_filename}'")

for batch_image in batch_image_paths:
    os.remove(batch_image)
for pdf_file in batch_pdfs:
    os.remove(pdf_file)

print("Intermediate batch images and PDF files deleted.")

## CNN Model Training

### Preprocessing Data

In [None]:
#!pip install imblearn
from imblearn.over_sampling import SMOTE

# Reshape the images to (num_samples, 48*48) for SMOTE (flatten images)
X_train_reshaped = X_train.reshape(-1, 48, 48)
X_train_reshaped = np.expand_dims(X_train_reshaped, axis=1)

num_samples, channels, height, width = X_train_reshaped.shape
X_train_flat = X_train.reshape(num_samples, -1)  # Shape: (num_samples, 48*48)

print(f"Class distribution before SMOTE: {np.bincount(y_train)}")

# Split into train and validation sets (no SMOTE applied to validation data)
X_train_split, X_val_split, y_train_split, y_val_split = train_test_split(
    X_train_flat, y_train, test_size=0.20, random_state=1337
)

# Apply SMOTE to oversample only class 0 in the training split
smote = SMOTE(sampling_strategy={0: int(y_train_split[y_train_split == 1].shape[0])})  # Balance to the number of class 1
X_resampled, y_resampled = smote.fit_resample(X_train_split, y_train_split)

print(f"Class distribution after SMOTE: {np.bincount(y_resampled)}\n")

# Separate synthetic samples from original samples
num_original_samples = len(X_train_split)
num_resampled_samples = len(X_resampled)

# Find the synthetic samples
synthetic_samples = X_resampled[num_original_samples:]
synthetic_labels = y_resampled[num_original_samples:]

# Reshape the resampled data back to (num_samples, 1, 48, 48)
synthetic_samples_images = synthetic_samples.reshape(-1, channels, height, width)

# Visualize some oversampled images
fig, axes = plt.subplots(2, 5, figsize=(10, 5))
axes = axes.ravel()

for i in range(10):
    axes[i].imshow(synthetic_samples_images[i, 0], cmap='gray')
    axes[i].set_title(f'Class: {synthetic_labels[i]}')
    axes[i].axis('off')

plt.tight_layout()
plt.show()

In [None]:
from torchvision import transforms

# Custom dataset class
class CraterDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        # Convert numpy image to PIL image for augmentation
        image_pil = Image.fromarray(image.squeeze(), mode='L')  # 'L' for grayscale

        # Apply transformations (if any)
        image = self.transform(image_pil)

        return image, torch.tensor(label, dtype=torch.float32)

# Data transformations (augmentation)
train_transform = transforms.Compose([
    #transforms.RandomHorizontalFlip(),
    #transforms.RandomVerticalFlip(),
    #transforms.RandomRotation(90),
    transforms.ToTensor(),  # Convert the PIL image to a tensor
])

# Validation transform (just conversion to tensor)
val_transform = transforms.Compose([
    transforms.ToTensor(),  # Convert the PIL image to a tensor
])

# Reshape the resampled training data and validation data back to (num_samples, 1, 48, 48)
X_train_reshaped = X_resampled.reshape(-1, 1, 48, 48)
X_val_reshaped = X_val_split.reshape(-1, 1, 48, 48)
X_train_extra_reshaped = X_train_extra.reshape(-1, 1, 48, 48)

# Check shapes
print("X_train_reshaped shape:", X_train_reshaped.shape)
print("X_val_reshaped shape:", X_val_reshaped.shape)
print("X_train_extra_reshaped shape:", X_train_extra_reshaped.shape)

# Create datasets with transforms
train_dataset = CraterDataset(X_train_reshaped, y_resampled, transform=train_transform)
val_dataset = CraterDataset(X_val_reshaped, y_val_split, transform=val_transform)

# Create data loaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Calculate class weights to handle imbalance in the resampled training data
class_weights = class_weight.compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_resampled),
    y=y_resampled
)
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float32)
print(class_weights_tensor)

### CNN Model Training

In [6]:
# Define the model
class NewCNN(nn.Module):
    def __init__(self):
        super(NewCNN, self).__init__()

        # First block: 1 -> 32 filters
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 64, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(64)

        # Second block: 64 -> 128 filters
        self.conv4 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv5 = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        self.conv6 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(256)

        # Third block: 256 -> 512 filters
        self.conv7 = nn.Conv2d(256, 512, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(512)

        # Global Average Pooling
        self.gap = nn.AdaptiveAvgPool2d(1)

        # Fully connected layers
        self.fc1 = nn.Linear(512, 256)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(256, 1)  # Binary classification output

        # Activation function
        self.activation = nn.LeakyReLU(0.05)

    def forward(self, x):
        # First block: Conv + BN + Activation + Pool
        x = self.activation(self.conv1(x))
        x = self.activation(self.conv2(x))
        x = self.activation(self.conv3(x))
        x = self.bn1(x)
        x = nn.MaxPool2d(2, 2)(x)

        # Second block: Conv + BN + Activation + Pool
        x = self.activation(self.conv4(x))
        x = self.activation(self.conv5(x))
        x = self.activation(self.conv6(x))
        x = self.bn2(x)
        x = nn.MaxPool2d(2, 2)(x)

        # Third block: Conv + BN + Activation + Pool
        x = self.activation(self.conv7(x))
        x = self.bn3(x)
        x = nn.MaxPool2d(2, 2)(x)

        # Global Average Pooling
        x = self.gap(x)
        x = x.view(x.size(0), -1)  # Flatten for fully connected layer

        # Fully Connected Layers
        x = self.activation(self.fc1(x))
        x = self.dropout(x)

        # Output layer
        return self.fc2(x)

In [7]:
import torchmetrics

class ClassificationTask(pl.LightningModule):
    def __init__(self, model=None, criterion=None, threshold=0.5, optimizer_type='Adam', lr=1e-5, scheduler_type=None):
        super(ClassificationTask, self).__init__()

        self.model = model
        self.criterion = criterion
        self.lr = lr
        self.optimizer_type = optimizer_type
        self.scheduler_type = scheduler_type
        self.threshold = threshold

        # Define metrics for train, validation, and test
        self.metrics = {
            'train': {
                'acc': torchmetrics.Accuracy(task="binary", threshold=self.threshold),
                'f1': torchmetrics.F1Score(task="multiclass", num_classes=2, threshold=self.threshold, average="macro"),
                'losses': []
            },
            'val': {
                'acc': torchmetrics.Accuracy(task="binary", threshold=self.threshold),
                'f1': torchmetrics.F1Score(task="multiclass", num_classes=2, threshold=self.threshold, average="macro"),
                'losses': []
            },
            'test': {
                'acc': torchmetrics.Accuracy(task="binary", threshold=self.threshold),
                'f1': torchmetrics.F1Score(task="multiclass", num_classes=2, threshold=self.threshold, average="macro"),
                'losses': []
            }
        }

    def forward(self, x):
        return self.model(x)

    def _step(self, batch, phase):
        x, y = batch
        logits = self(x).view(-1).to(self.device)  # Forward pass

        # Ensure metrics are also on the same device
        self.metrics[phase]['acc'].to(self.device)
        self.metrics[phase]['f1'].to(self.device)

        preds = torch.sigmoid(logits) >= self.threshold
        loss = self.criterion(logits, y)

        # Update the metrics
        self.metrics[phase]['losses'].append(loss.item())
        self.metrics[phase]['acc'](preds, y)
        self.metrics[phase]['f1'](preds, y)

        return loss

    def training_step(self, batch, batch_idx):
        return self._step(batch, 'train')

    def validation_step(self, batch, batch_idx):
        return self._step(batch, 'val')

    def test_step(self, batch, batch_idx):
        return self._step(batch, 'test')

    def _epoch_end(self, phase):
        avg_loss = torch.tensor(self.metrics[phase]['losses']).mean()
        avg_acc = self.metrics[phase]['acc'].compute()
        avg_f1 = self.metrics[phase]['f1'].compute()

        # Log the metrics
        self.log(f'{phase}_loss_epoch', avg_loss, prog_bar=True)
        self.log(f'{phase}_acc_epoch', avg_acc, prog_bar=True)
        self.log(f'{phase}_f1_epoch', avg_f1, prog_bar=True)

        # Reset metrics for the next epoch
        self.metrics[phase]['losses'].clear()
        self.metrics[phase]['acc'].reset()
        self.metrics[phase]['f1'].reset()

    def on_train_epoch_end(self):
        self._epoch_end('train')

    def on_validation_epoch_end(self):
        self._epoch_end('val')

    def on_test_epoch_end(self):
        self._epoch_end('test')

    def configure_optimizers(self):
        if self.optimizer_type == 'Adam':
            optimizer = torch.optim.AdamW(self.model.parameters(), lr=self.lr, weight_decay=1e-5)
        else:
            raise ValueError(f"Unsupported optimizer type: {self.optimizer_type}")

        if self.scheduler_type == 'ReduceLROnPlateau':
            scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=5)
            return {'optimizer': optimizer, 'lr_scheduler': {'scheduler': scheduler, 'monitor': 'val_f1_epoch'}}
        elif self.scheduler_type == 'StepLR':
            scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
            return {'optimizer': optimizer, 'lr_scheduler': {'scheduler': scheduler}}
        else:
            return optimizer

In [11]:
optuna.logging.set_verbosity(optuna.logging.WARNING)

# This redefinition is just a bug fix...
class OptunaPruning(PyTorchLightningPruningCallback, pl.Callback):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

# Define Optuna's objective function, optimized for F1-score
def Objective(train_loader, val_loader, class_weights_tensor, trial):
    # Suggest hyperparameters to tune
    lr = trial.suggest_float('lr', 1e-7, 1e-3, log=True)
    dropout_rate = trial.suggest_float('dropout_rate', 0.1, 0.7)
    optimizer_type = trial.suggest_categorical('optimizer_type', ['Adam'])
    threshold_opt = trial.suggest_float('threshold', 0.1, 0.9)

    # Modify the CNN model with the trial's suggested dropout rate
    model = NewCNN()
    model.dropout = nn.Dropout(dropout_rate)

    # Define the loss function with class weights
    criterion = nn.BCEWithLogitsLoss(pos_weight=class_weights_tensor[1])

    # Create a classification task instance using suggested hyperparameters
    classification_task = ClassificationTask(
        model=model,
        criterion=criterion,
        threshold=threshold_opt,
        optimizer_type=optimizer_type,
        lr=lr,
        scheduler_type='ReduceLROnPlateau'
    )

    # Early stopping callback based on F1 score
    early_stopping = pl.callbacks.EarlyStopping(
        monitor='val_f1_epoch',
        patience=10,
        verbose=True,
        mode='max'
    )

    # Model checkpoint to save the best model's weights
    checkpoint_callback = pl.callbacks.ModelCheckpoint(
        dirpath=f"lightning_logs/optuna_trials/trial_{trial.number}",
        filename=f"best_model_trial_{trial.number}",
        monitor='val_f1_epoch',
        save_top_k=1,
        mode='max'
    )

    # Set up the logger to store logs in the optuna_trials directory
    logger = pl.loggers.TensorBoardLogger("lightning_logs/optuna_trials", name=f"trial_{trial.number}")

    # Set up the PyTorch Lightning Trainer with pruning callback
    trainer = pl.Trainer(
        max_epochs=100,
        logger=logger,
        accelerator="auto",
        strategy="auto",
        devices="auto",
        callbacks=[
            early_stopping,
            checkpoint_callback,
            OptunaPruning(trial, monitor="val_f1_epoch")
        ],
        #log_every_n_steps=0,        # No logging during steps
        #enable_progress_bar=False,  # Disable the progress bar
        #enable_model_summary=False  # Disable the model summary output
    )

    # Train the model
    trainer.fit(classification_task, train_loader, val_loader)

    # Optuna will still compute and optimize based on the F1 score on the validation set
    f1_metric = torchmetrics.F1Score(task="multiclass", num_classes=2, threshold=threshold_opt, average="macro")

    classification_task.eval()  # Switch to evaluation mode

    with torch.no_grad():
        for batch in val_loader:
            x, y = batch

            logits = classification_task.model(x).flatten()
            preds = torch.sigmoid(logits) >= threshold_opt

            f1_metric.update(preds, y)

    # Compute the weighted F1 score
    macro_f1_score = f1_metric.compute().item()

    # Print the F1 score for the current trial
    print(f"Trial {trial.number}: Macro F1 Score = {macro_f1_score:.6f}")

    # Return the macro F1 score
    return macro_f1_score

In [None]:
#import logging
#log = logging.getLogger("pytorch_lightning")
#log.propagate = False
#log.setLevel(logging.ERROR)

# Set up the Optuna study and optimize the objective
study = optuna.create_study(direction="maximize")
study.optimize(
    lambda trial: Objective(train_loader, val_loader, class_weights_tensor, trial),
    n_trials=50 # Perform N trials of hyperparameter search
)

# Print the best hyperparameters found by Optuna
best_trial = study.best_trial
print("Best trial:")
print(f"  F1 score: {best_trial.value}")
print(f"  Params: ")
for key, value in best_trial.params.items():
    print(f"    {key}: {value}")

### Load best model and predict

In [None]:
# Load the best model checkpoint based on the best trial number
#best_trial_number = best_trial.number
#best_model_checkpoint_path = f"lightning_logs/optuna_trials/trial_{best_trial_number}/best_model_trial_{best_trial_number}.ckpt"

best_model_checkpoint_path = f"./best_model_trial_2.ckpt"
print(f"Loading the best model from: {best_model_checkpoint_path}")

# Load the model from the checkpoint
task = ClassificationTask.load_from_checkpoint(
    checkpoint_path=best_model_checkpoint_path,
    criterion=nn.BCEWithLogitsLoss(pos_weight=class_weights_tensor[1]),
    model=NewCNN()
)
task.eval()

# Make predictions and get logits (for ROC and AUC)
logits_list = []
predictions = []
with torch.no_grad():
    for batch in val_loader:
        images, _ = batch
        images = images.to(task.device)
        logits = task.model(images)
        logits_list.append(logits.cpu())

# Flatten the logits tensor and apply sigmoid to get probabilities
logits = torch.cat(logits_list).flatten().numpy()
probabilities = torch.sigmoid(torch.tensor(logits)).numpy()

# Convert y_test_split to a tensor and numpy array
y_test_tensor = torch.tensor(y_val_split, dtype=torch.int).numpy()

# ---- ROC Curve and Youden's J Statistic ----
fpr, tpr, thresholds = roc_curve(y_test_tensor, probabilities)
roc_auc = auc(fpr, tpr)

# Calculate Youden's J statistic for each threshold
youden_j = tpr - fpr
best_threshold_index = np.argmax(youden_j)
best_threshold1 = thresholds[best_threshold_index]
print(f'Best threshold according to Youden\'s J: {best_threshold1:.4f}')

#final_predictions = (probabilities >= best_threshold1).astype(int)

# ---- ROC Curve ----
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='blue', label=f'ROC curve (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], linestyle='--', color='gray') # Diagonal
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve')
plt.legend(loc="lower right")
plt.grid(True)
plt.show()

# ---- Precision-Recall Curve ----

# Calculate the precision-recall curve
precision, recall, thresholds = precision_recall_curve(y_test_tensor, probabilities)

f1_scores = 2 * (precision * recall) / (precision + recall)

best_threshold_index = np.argmax(f1_scores)
best_threshold2 = thresholds[best_threshold_index]
print(f'Best threshold for maximizing F1 score: {best_threshold2}')

#best_trial.params['threshold']
final_predictions = (probabilities >= 0.8500936271227173).astype(int)

plt.figure(figsize=(8, 6))
plt.plot(recall, precision, color='green')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.grid(True)
plt.show()

# ---- Confusion Matrix ----
confmat = torchmetrics.ConfusionMatrix(task="binary")
confmat_result = confmat(torch.tensor(final_predictions), torch.tensor(y_test_tensor)).numpy()
print("Confusion Matrix:\n", confmat_result)

# Plot the confusion matrix using seaborn heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(confmat_result, annot=True, fmt='d', cmap='Blues', xticklabels=['No Crater', 'Crater'], yticklabels=['No Crater', 'Crater'])
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()


# ---- Compute F1-Score with torchmetrics ----
f1_metric = torchmetrics.F1Score(task="multiclass", num_classes=2, threshold=float(0.8500936271227173), average="macro")
f1_score_value = f1_metric(torch.tensor(final_predictions), torch.tensor(y_test_tensor)).item()

# Print the torchmetrics F1 score
print(f"\nMacro F1-Score using torchmetrics: {f1_score_value:.6f}")

# ---- Additional Metrics ----
# Precision, Recall, F1-Score, Accuracy with new threshold
report_dict = classification_report(y_test_tensor, final_predictions, output_dict=True, digits=6)
#print(report_dict)
print(f"Macro Avg F1-Score: {report_dict['macro avg']['f1-score']:.6f}\n")
print(classification_report(y_test_tensor, final_predictions, output_dict=False, digits=6))

# AUC
roc_auc_score_value = roc_auc_score(y_test_tensor, probabilities)
print(f"\nAUC Score: {roc_auc_score_value:.6f}")

### Self-labeling

In [None]:
# Load the best model checkpoint based on the best trial number
#best_trial_number = best_trial.number
best_model_checkpoint_path = f"./best_model_trial_2.ckpt"
print(f"Loading the best model from: {best_model_checkpoint_path}")

# Load the model from the checkpoint
task = ClassificationTask.load_from_checkpoint(
    checkpoint_path=best_model_checkpoint_path,
    criterion=nn.BCEWithLogitsLoss(pos_weight=class_weights_tensor[1]),
    model=NewCNN()
)
task.eval()

In [None]:
confidence_threshold_pos = 0.99  # High confidence for class 1
confidence_threshold_neg = 0.01  # High confidence for class 0

pseudo_labels = []  # To store the pseudo labels (predicted labels)
high_confidence_samples = []  # To store high-confidence data (samples)

# Create a DataLoader for X_train_extra
X_train_extra_loader = DataLoader(X_train_extra_reshaped, batch_size=batch_size, shuffle=True)

with torch.no_grad():
    for batch in X_train_extra_loader:
        batch = batch.to(task.device).float()  # Ensure the input is a float tensor
        logits = task.model(batch / 255.0)  # Get model logits
        probabilities = torch.sigmoid(logits).cpu().numpy()  # Apply sigmoid for probabilities

        # Flatten probabilities to 1D array
        probabilities = probabilities.flatten()

        # Filter samples with high confidence in class 0 or class 1
        high_conf_pos_mask = probabilities >= confidence_threshold_pos  # High-confidence for class 1
        high_conf_neg_mask = probabilities <= confidence_threshold_neg  # High-confidence for class 0

        # Combine the masks to get all high-confidence samples
        high_confidence_mask = np.logical_or(high_conf_pos_mask, high_conf_neg_mask)

        # Apply the mask to select only high-confidence samples
        if np.any(high_confidence_mask):
            high_conf_samples_probs = probabilities[high_confidence_mask]  # Get high-confidence probabilities
            high_conf_samples_batch = batch[high_confidence_mask]  # Get high-confidence samples from the batch

            # Store high-confidence samples
            high_confidence_samples.append(high_conf_samples_batch.cpu().numpy())

            # Assign pseudo-labels (0 or 1) based on confidence
            confident_labels = (high_conf_samples_probs >= 0.5).astype(int)  # Label as 1 if above 0.5, otherwise 0
            pseudo_labels.append(confident_labels)

# Convert pseudo_labels and high-confidence samples to numpy arrays
pseudo_labels = np.concatenate(pseudo_labels, axis=0)
high_confidence_samples = np.concatenate(high_confidence_samples, axis=0)

# Output some diagnostics
print("Pseudo labels:", pseudo_labels)
print(f"Original size: {X_train_extra_reshaped.shape}")
print("High confidence pseudo-labeled samples:", high_confidence_samples.shape)
print("Pseudo labels for high-confidence samples:", pseudo_labels.shape)

In [None]:
# Combine X_train_reshaped with X_train_extra_reshaped
X_combined = np.concatenate([X_train_reshaped, high_confidence_samples], axis=0)
y_combined = np.concatenate([y_resampled, pseudo_labels], axis=0)

print(f"Combined training set shape: {X_combined.shape}, Combined labels shape: {y_combined.shape}")

In [None]:
# Create a new dataset with the combined data
combined_dataset = CraterDataset(X_combined, y_combined, transform=train_transform)
combined_loader = DataLoader(combined_dataset, batch_size=batch_size, shuffle=True)

# Calculate class weights to handle imbalance in the resampled training data
class_weights = class_weight.compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_combined),
    y=y_combined
)
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float32)
print(class_weights_tensor)

# Set up the Optuna study and optimize the objective
study = optuna.create_study(direction="maximize")
study.optimize(
    lambda trial: Objective(combined_loader, val_loader, class_weights_tensor, trial),
    n_trials=10 # Perform N trials of hyperparameter search
)

# Print the best hyperparameters found by Optuna
best_trial = study.best_trial
print("Best trial:")
print(f"  F1 score: {best_trial.value}")
print(f"  Params: ")
for key, value in best_trial.params.items():
    print(f"    {key}: {value}")

### Final prediction

In [None]:
class TestDataset(Dataset):
    def __init__(self, images, transform=None):
        self.images = images
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]

        # Convert numpy image to PIL image and apply transformations
        image = Image.fromarray(image.squeeze(), mode='L')  # 'L' for grayscale
        if self.transform:
            image = self.transform(image)

        return image

# Load the test data
X_test = np.load("./data/Xtest1.npy")
X_test = X_test.reshape(-1, 1, 48, 48)  # Reshape to (num_samples, 1, 48, 48)

# Define the transform
test_transform = transforms.Compose([
    transforms.ToTensor(),  # Convert the PIL image to a tensor
])

# Create the test dataset and loader
test_dataset = TestDataset(X_test, transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Load the best model checkpoint
best_model_checkpoint_path = f"./best_model_trial_2.ckpt"
print(f"Loading the best model from: {best_model_checkpoint_path}")

# Load the model from the checkpoint
task = ClassificationTask.load_from_checkpoint(
    checkpoint_path=best_model_checkpoint_path,
    criterion=nn.BCEWithLogitsLoss(pos_weight=class_weights_tensor[1]),
    model=NewCNN()
)
task.eval()

# Make predictions on the test set
logits_list = []
with torch.no_grad():
    for batch in test_loader:
        batch = batch.to(task.device).float()
        logits = task.model(batch)
        logits_list.append(logits.cpu().numpy())

logits = np.concatenate(logits_list, axis=0)
probabilities = torch.sigmoid(torch.tensor(logits)).numpy()
predictions = (probabilities >= 0.8500936271227173).astype(int).flatten()

print(predictions)
np.save("y_pred", predictions)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
!pip install PyPDF2
from PyPDF2 import PdfMerger
from PIL import Image
import os

# Load the test data (X_test.npy) and predictions
X_test = np.load("./data/Xtest1.npy")
X_test = X_test.reshape(-1, 1, 48, 48)  # Reshape to (num_samples, 1, 48, 48)

predictions = y_pred

num_images = X_test.shape[0]
img_shape = (48, 48)
cols = 10
images_per_batch = 100
rows = int(np.ceil(images_per_batch / cols))
batch_image_paths = []

for batch_start in range(0, num_images, images_per_batch):
    fig, axes = plt.subplots(rows, cols, figsize=(cols * 2, rows * 2))
    axes = axes.flatten()

    for i, ax in enumerate(axes):
        img_idx = batch_start + i
        if img_idx < num_images:
            img = X_test[img_idx, :].reshape(img_shape)
            ax.imshow(img, cmap='gray')
            ax.set_title(f'ID: {img_idx}, Pred: {predictions[img_idx]}')
            ax.axis('off')
        else:
            ax.axis('off')

    batch_filename = f'batch_{batch_start // images_per_batch}.png'
    plt.savefig(batch_filename, bbox_inches='tight', pad_inches=0.1, dpi=120)
    batch_image_paths.append(batch_filename)
    plt.close(fig)

# Convert images to PDFs and merge them
batch_pdfs = []
for batch_image in batch_image_paths:
    img = Image.open(batch_image)
    if img.mode != 'RGB':
        img = img.convert('RGB')
    img_resized = img.resize((img.width // 2, img.height // 2), Image.Resampling.LANCZOS)
    pdf_filename = batch_image.replace('.png', '.pdf')
    img_resized.save(pdf_filename, 'PDF', resolution=100.0, quality=100, optimize=True)
    batch_pdfs.append(pdf_filename)

# Merge all PDFs into a single document
pdf_merger = PdfMerger()
for pdf_file in batch_pdfs:
    pdf_merger.append(pdf_file)

final_pdf_filename = 'X_test_images_with_predictions.pdf'
pdf_merger.write(final_pdf_filename)
pdf_merger.close()

print(f"Final merged PDF saved as '{final_pdf_filename}'")

# Clean up: remove batch images and PDFs
for batch_image in batch_image_paths:
    os.remove(batch_image)
for pdf_file in batch_pdfs:
    os.remove(pdf_file)

print("Intermediate batch images and PDF files deleted.")

### Plots

In [None]:
!rm -rf ~/.tensorboard-info
%reload_ext tensorboard
%tensorboard --logdir=lightning_logs/

In [None]:
from tensorboard.backend.event_processing import event_accumulator

# Specify the path to the TensorBoard log file
log_dir = f"lightning_logs/optuna_trials/trial_{best_trial_number}/version_0"

# Load TensorBoard event data
event_acc = event_accumulator.EventAccumulator(log_dir)
event_acc.Reload()

# Extract scalars for train and validation metrics
train_loss = event_acc.Scalars('train_loss_epoch')
train_acc = event_acc.Scalars('train_acc_epoch')
val_loss = event_acc.Scalars('val_loss_epoch')
val_acc = event_acc.Scalars('val_acc_epoch')

# Extract epoch numbers and metric values (filtering by epoch end logs)
epochs = [i for i in range(len(train_loss))]

# Prepare metric values for train
train_loss_values = [x.value for x in train_loss]
train_acc_values = [x.value for x in train_acc]

# Prepare metric values for validation
val_loss_values = [x.value for x in val_loss]
val_acc_values = [x.value for x in val_acc]

# Create plot for training metrics
fig, ax1 = plt.subplots(figsize=(8, 6))

# Plot Training Loss on the left y-axis
ax1.set_xlabel('Epochs')
ax1.set_ylabel('Training Loss', color='tab:red')
ax1.plot(epochs, train_loss_values, color='tab:red', label='Training Loss')
ax1.tick_params(axis='y', labelcolor='tab:red')

# Create a second y-axis for training accuracy
ax2 = ax1.twinx()
ax2.set_ylabel('Training Accuracy', color='tab:blue')
ax2.plot(epochs, train_acc_values, color='tab:blue', label='Training Accuracy')
ax2.tick_params(axis='y', labelcolor='tab:blue')

# Add a title to the plot for training metrics
plt.title('Training Loss and Accuracy per Epoch')

# Show the plot for training
fig.tight_layout()
plt.show()

# Create plot for validation metrics
fig, ax1 = plt.subplots(figsize=(8, 6))

# Plot Validation Loss on the left y-axis
ax1.set_xlabel('Epochs')
ax1.set_ylabel('Validation Loss', color='tab:red')
ax1.plot(epochs, val_loss_values, color='tab:red', label='Validation Loss')
ax1.tick_params(axis='y', labelcolor='tab:red')

# Create a second y-axis for validation accuracy
ax2 = ax1.twinx()
ax2.set_ylabel('Validation Accuracy', color='tab:blue')
ax2.plot(epochs, val_acc_values, color='tab:blue', label='Validation Accuracy')
ax2.tick_params(axis='y', labelcolor='tab:blue')

# Add a title to the plot for validation metrics
plt.title('Validation Loss and Accuracy per Epoch')

# Show the plot for validation
fig.tight_layout()
plt.show()

### Clean-up

In [10]:
import os

def delete_files_in_directory(directory):
    for filename in os.listdir(directory):
        file_path = os.path.join(directory, filename)
        try:
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.remove(file_path)
            elif os.path.isdir(file_path):
                for root, dirs, files in os.walk(file_path, topdown=False):
                    for name in files:
                        os.remove(os.path.join(root, name))
                    for name in dirs:
                        os.rmdir(os.path.join(root, name))
                os.rmdir(file_path)
        except Exception as e:
            print(f"Failed to delete {file_path}. Reason: {e}")

directory = 'lightning_logs/optuna_trials'
delete_files_in_directory(directory)

## SVC Model Training

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import f1_score
from sklearn.decomposition import PCA
from sklearn.model_selection import GridSearchCV, StratifiedShuffleSplit

from sklearn import datasets
from matplotlib import cm

from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    roc_curve, auc, precision_recall_curve, roc_auc_score,
    precision_score, recall_score, f1_score, classification_report
)

# Load data
X_train = np.load("./data/Xtrain1.npy")
y_train = np.load("./data/Ytrain1.npy")
X_train_extra = np.load("./data/Xtrain1_extra.npy")

print(f"X_train shape: {X_train.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"X_train_extra shape: {X_train_extra.shape}")

X_train, X_test, y_train, y_test = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

# Define the parameter grid
C_range = [0.1, 1]
gamma_range = [0.1, 1]
param_grid = dict(C=C_range, gamma=gamma_range, kernel=['rbf'])

# Use StratifiedShuffleSplit for better handling of unbalanced data
cv = StratifiedShuffleSplit(n_splits=5, test_size=0.2, random_state=42)

# Initialize the SVM classifier with class balancing
model = SVC(class_weight='balanced')

# Setup GridSearchCV with F1 scoring and refit based on F1 score
grid_search = GridSearchCV(
    estimator=model,
    param_grid=param_grid,
    scoring='f1',  # Use 'f1' for binary classification
    cv=5,          # Number of folds for cross-validation
    refit='f1',    # Refit on the model that maximizes the F1 score
    verbose=0
)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Fit the grid search
grid_search.fit(X_train_scaled, y_train)

# Best parameters and score
print("Best Parameters:", grid_search.best_params_)
print("Best F1 Score:", grid_search.best_score_)

# Predict on the test set using the best model
y_pred = grid_search.best_estimator_.predict(X_test)

report_dict = classification_report(y_test, y_pred, output_dict=True, digits=6, zero_division=1)
print(f"Weighted Avg F1-Score: {report_dict['weighted avg']['f1-score']}")

ConfusionMatrixDisplay.from_estimator(grid_search.best_estimator_, X_test, y_test)
plt.show()