## Base Line Model
with Local Binary Pattern, Mobile Net 3 small with 1 added layer, flat architecture.

### Imports

In [None]:
import os
import pandas as pd
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from sklearn.model_selection import train_test_split
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from PIL import Image
from skimage.feature import local_binary_pattern
from skimage import color
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### LBP Transform

In [None]:
class LBPTransform:
    def __init__(self, radius=3, n_points=None, method='uniform'):
        self.radius = radius
        self.n_points = n_points if n_points else 8 * radius
        self.method = method

    def __call__(self, img):
        if isinstance(img, Image.Image):
            img = np.array(img)

        if len(img.shape) == 3 :
            gray = color.rgb2gray(img)
        else:
            gray = img

        gray = (gray * 255).astype(np.uint8)

        lbp = local_binary_pattern(gray, self.n_points, self.radius, self.method)

        lbp = (lbp - lbp.min()) / (lbp.max() - lbp.min() + 1e-7)

        lbp_3 = np.stack([lbp, lbp, lbp], axis=-1)

        return lbp_3

### Make LBP Images

In [None]:
def make_lbp_images(input_folder, output_folder, lbp_transformer):
    os.makedirs(output_folder, exist_ok=True)
    img_files = [f for f in os.listdir(input_folder) if f.endswith('.png')]

    print(f"Processing {len(img_files)} images from {input_folder}...")
    for i, fname in enumerate(img_files):
        in_path = os.path.join(input_folder, fname)
        out_path = os.path.join(output_folder, fname)

        if os.path.exists(out_path):
            continue

        img = Image.open(in_path).convert('RGB')
        lbp_img = lbp_transformer(img)

        lbp_img_uint8 = (lbp_img * 255).astype(np.uint8)
        Image.fromarray(lbp_img_uint8).save(out_path)

    print("LBP preprocessing complete.")

### Dataset Class
Class for processing data and combining images with labels

In [None]:
class PGCDataset(Dataset):
    def __init__(self, labels_df, img_folder, id_col='PGCname', label_col='T', transform=None):
        self.labels_df = labels_df.reset_index(drop=True)
        self.img_folder = img_folder
        self.id_col = id_col
        self.label_col = label_col
        self.transform = transform

        available_imgs = {f.replace('.png', '') for f in os.listdir(img_folder)
                            if f.endswith('.png')}
        self.labels_df = self.labels_df[self.labels_df[id_col].isin(available_imgs)].reset_index(drop=True)

        print(f"Dataset created with {len(self.labels_df)} imgs")

    def __len__(self):
        return len(self.labels_df)

    def __getitem__(self, idx):
        row = self.labels_df.iloc[idx]

        img_id = row[self.id_col]
        img_path = os.path.join(self.img_folder, f"{img_id}.png")
        img = Image.open(img_path).convert('RGB')

        label = torch.tensor(int(row[self.label_col]), dtype=torch.long)

        if self.transform:
            img = self.transform(img)

        return img, label, img_id

### Dataset creation

In [None]:
path = '/content/drive/Othercomputers/My laptop/Thesis/Galaxy-Classifier/'
img_folder = path + '/images'
lbp_img_folder = path + '/lbp_images'

id_col = 'PGCname'
label_col = 'T'

img_size = 224

lbp_trans = LBPTransform(5)

labels_df = pd.read_csv(path + 'EFIGI_attributes.txt', sep=r'\s+', comment='#')
labels_df[label_col] = labels_df[label_col].replace({-3:-2, -1:-2}) # S0
labels_df[label_col] = labels_df[label_col].replace({0:1, 2:1}) # Sa
labels_df[label_col] = labels_df[label_col].replace({3:4}) # Sb
labels_df[label_col] = labels_df[label_col].replace({5:6}) # Sc
labels_df[label_col] = labels_df[label_col].replace({8:7, 9:7}) # Sd
labels_df[label_col] = labels_df[label_col].replace({10:11}) # Irr

labels_df[label_col] = labels_df[label_col].replace({-6:0, -5:1, -4:2, -2:3, 1:4, 4:5, 11:8}) # Adjust to 0 - 8


train_df, test_df = train_test_split(labels_df, test_size=0.2, random_state=0, stratify=labels_df[label_col])
train_df, val_df = train_test_split(train_df, test_size=0.125, random_state=0, stratify=train_df[label_col])

# use stratify sampling in training - write to csv file - - tocsv.pandas


train_transform = transforms.Compose([
    transforms.RandomRotation(180),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.Resize((img_size, img_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize((img_size, img_size)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

"""lbp_params = {'radius': 3, 'n_points': 24, 'method': 'uniform'}

lbp_transform = LBPTransform(**lbp_params)

make_lbp_images(img_folder, lbp_img_folder, lbp_transform)
"""


train_dataset = PGCDataset(
    labels_df=train_df,
    img_folder=img_folder,
    id_col=id_col,
    label_col=label_col,
    transform=train_transform
)
val_dataset = PGCDataset(
    labels_df=val_df,
    img_folder=img_folder,
    id_col=id_col,
    label_col=label_col,
    transform=test_transform
)
test_dataset = PGCDataset(
    labels_df=test_df,
    img_folder=img_folder,
    id_col=id_col,
    label_col=label_col,
    transform=test_transform
)

Dataset created with 3120 imgs
Dataset created with 446 imgs
Dataset created with 892 imgs


### Data loader
loads data in batches

In [None]:
labels = train_df[label_col].values
classes= np.unique(labels)
class_weights = compute_class_weight('balanced', classes=classes, y=labels)

sample_weights = np.array([class_weights[np.where(classes == label)[0][0]] for label in labels])
sample_weights = torch.from_numpy(sample_weights).float()

# sampler = torch.utils.data.WeightedRandomSampler(sample_weights, len(sample_weights), replacement=True) # worse for underrepresented classes and overall

train_loader = DataLoader(
    train_dataset,
    batch_size=64,
    shuffle=True,
    num_workers=0,
)

val_loader = DataLoader(
    val_dataset,
    batch_size=32,
    shuffle=False,
    num_workers=0,
)

test_loader = DataLoader(
    test_dataset,
    batch_size=32,
    shuffle=False,
    num_workers=0
)

### Hierarchical model
using pretrained resnet18

In [None]:
class HierarchicalResNet(nn.Module):
  def __init__(self, num_coarse, fine_per_coarse, freeze_backbone=False):
    super(HierarchicalResNet, self).__init__()
    # get model
    resnet = models.resnet18(weights='IMAGENET1K_V1')
    # freeze layers
    for param in resnet.parameters():
      param.requires_grad = not freeze_backbone

    # remove the final full connect layer
    self.backbone = nn.Sequential(*list(resnet.children())[:-1])

    in_features = resnet.fc.in_features

    self.num_coarse = num_coarse
    self.fine_per_coarse = fine_per_coarse

    # classifier heads
    self.coarse_classifier = nn.Sequential(nn.Linear(in_features, num_coarse))
    self.fine_classifiers = nn.ModuleList([nn.Linear(in_features, num_fine) for num_fine in fine_per_coarse])

    self.fine_to_coarse = {}
    self.coarse_to_fine = {}

    # build mapping
    fine_id = 0

    for coarse_id, num_fine in enumerate(fine_per_coarse):
      self.coarse_to_fine[coarse_id] = []
      for local_fine_id in range(num_fine):
        self.fine_to_coarse[fine_id] = (coarse_id, local_fine_id)
        self.coarse_to_fine[coarse_id].append(fine_id)
        fine_id += 1

    self.total_fine_classes = sum(fine_per_coarse)


  def forward(self, x, return_type='train'):
    features = self.backbone(x)
    features = torch.flatten(features, 1)

    # get predictions from features
    coarse_output = self.coarse_classifier(features)
    fine_output = [classifier(features) for classifier in self.fine_classifiers]

    if return_type == 'train':
      return coarse_output, fine_output, features

    elif return_type == 'test':
      return self.joint_prob(coarse_output, fine_output)


  def joint_prob(self, coarse_output, fine_output):
    batch = coarse_output.size(0)
    device = coarse_output.device

    coarse_probs = F.softmax(coarse_output, dim=1)

    fine_cond_probs = [
        F.softmax(logits, dim=1) for logits in fine_output
    ]

    fine_probs = torch.zeros((batch, self.total_fine_classes), device=device)

    fine_id = 0
    for coarse_id, cond_probs in enumerate(fine_cond_probs):
      num_fine_in_coarse = cond_probs.size(1)

      coarse_probs_per_class = coarse_probs[:, coarse_id].unsqueeze(1)
      joint_prob = coarse_probs_per_class * cond_probs

      fine_probs[:, fine_id:fine_id + num_fine_in_coarse] = joint_prob

      fine_id += num_fine_in_coarse

    return coarse_probs, fine_probs, fine_cond_probs


## Hierarchical Loss
using cross entropy

In [None]:
class HierarchicalLoss(nn.Module):
  def __init__(self, fine_per_coarse, fine_weights=None, coarse_weights=None):
    super(HierarchicalLoss, self).__init__()
    # hash map of fine - course
    self.fine_per_coarse = fine_per_coarse

    self.loss_coarse = nn.CrossEntropyLoss(weight=coarse_weights)

    if fine_weights is not None:
      self.loss_fine = nn.ModuleList()
      start_id = 0
      for num_fine in fine_per_coarse:
        coarse_weights = fine_weights[start_id:start_id + num_fine]
        self.loss_fine.append(nn.CrossEntropyLoss(weight=coarse_weights))
        start_id += num_fine
    else:
      self.loss_fine = nn.ModuleList([nn.CrossEntropyLoss() for _ in fine_per_coarse])

    # tensor for mapping
    self.fine_to_coarse = {}
    fine_id = 0
    for coarse_id, num_fine in enumerate(fine_per_coarse):
      for local_fine_id in range(num_fine):
        self.fine_to_coarse[fine_id] = (coarse_id, local_fine_id)
        fine_id += 1


  def forward(self, coarse_logits, fine_logits, fine_labels):
    batch = fine_labels.size(0)
    device = fine_labels.device

    coarse_labels = torch.zeros(batch, dtype=torch.long, device=device)
    local_fine = torch.zeros(batch, dtype=torch.long, device=device)

    for i, global_fine in enumerate(fine_labels):
      coarse_id, local_fine_id = self.fine_to_coarse[global_fine.item()]
      coarse_labels[i] = coarse_id
      local_fine[i] = local_fine_id

    coarse_loss = self.loss_coarse(coarse_logits, coarse_labels)

    fine_loss = 0
    for i in range(batch):
      coarse_id = coarse_labels[i].item()

      local_fine_logits = fine_logits[coarse_id][i].unsqueeze(0)
      fine_label = local_fine[i].unsqueeze(0)
      fine_loss += self.loss_fine[coarse_id](local_fine_logits, fine_label)

    fine_loss /= batch

    loss = coarse_loss + fine_loss

    return loss, coarse_loss, fine_loss


## Calculate class weights

In [None]:
def calculate_class_weights(labels, fine_per_coarse, device='cpu'):
    labels = np.array(labels)

    fine_to_coarse_mapping = {}
    fine_id = 0
    for coarse_id, num_fine in enumerate(fine_per_coarse):
      for local_fine_id in range(num_fine):
        fine_to_coarse_mapping[fine_id] = coarse_id
        fine_id += 1

    fine_classes = np.unique(labels)
    fine_weights = compute_class_weight('balanced', classes=fine_classes, y=labels)

    total_fine_classes = sum(fine_per_coarse)
    fine_weights_full = torch.ones(total_fine_classes, dtype=torch.float, device=device)
    for i, class_id in enumerate(fine_classes):
        fine_weights_full[class_id] = fine_weights[i]

    coarse_labels = np.array([fine_to_coarse_mapping[label] for label in labels])
    coarse_classes = np.unique(coarse_labels)
    coarse_weights = compute_class_weight('balanced', classes=coarse_classes, y=coarse_labels)

    num_coarse_classes = len(fine_per_coarse)
    coarse_weights_full = torch.ones(num_coarse_classes, dtype=torch.float, device=device)
    for i, class_id in enumerate(coarse_classes):
        coarse_weights_full[class_id] = coarse_weights[i]

    return fine_weights_full, coarse_weights_full

## Train/test model methods

In [None]:
def train_one_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss, running_coarse, running_fine = 0.0, 0.0, 0.0
    correct_coarse, correct_fine = 0, 0
    total = 0

    for img, labels, ids in dataloader:
        img = img.to(device, non_blocking=True)
        labels = labels.to(device, non_blocking=True)
        optimizer.zero_grad()

        coarse_logits, fine_logits, _ = model(img, return_type='train')

        loss, coarse_loss, fine_loss = criterion(coarse_logits, fine_logits, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        running_coarse += coarse_loss.item()
        running_fine += fine_loss.item()

        with torch.no_grad():
          coarse_probs, fine_probs, _ = model(img, return_type='test')
          predict_coarse = coarse_probs.argmax(1)
          predict_fine = fine_probs.argmax(1)

          coarse_labels = torch.tensor([
              criterion.fine_to_coarse[label.item()][0]
              for label in labels
          ], device=device)

        total += labels.size(0)
        correct_fine += (predict_fine == labels).sum().item()
        correct_coarse += (predict_coarse == coarse_labels).sum().item()


        print(".", end="")
    print("")

    epoch_loss = running_loss / len(dataloader)
    epoch_coarse = running_coarse / len(dataloader)
    epoch_fine = running_fine / len(dataloader)

    epoch_acc_fine = 100.0 * correct_fine / total
    epoch_acc_coarse = 100.0 * correct_coarse / total

    return {
        'total_loss': epoch_loss,
        'coarse_loss': epoch_coarse,
        'fine_loss': epoch_fine,
        'fine_acc': epoch_acc_fine,
        'coarse_acc': epoch_acc_coarse
    }

def valid(model, dataloader, device, loss_fn, use_weighted=False):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss = 0

    correct_fine, correct_coarse = 0, 0

    all_preds_fine = []
    all_preds_coarse = []
    all_labels_fine = []
    all_labels_coarse = []

    with torch.no_grad():
        for X, y, _ in dataloader:
            X, y = X.to(device), y.to(device)


            coarse_logits, fine_logits, _ = model(X, return_type='train')

            loss, _, _ = loss_fn(coarse_logits, fine_logits, y)
            test_loss += loss.item()

            coarse_probs, fine_probs, _ = model(X, return_type='test')

            predicted_coarse = coarse_probs.argmax(1)
            predicted_fine = fine_probs.argmax(1)

            coarse_labels = torch.tensor([
                loss_fn.fine_to_coarse[label.item()][0]
                for label in y
            ], device=device)

            correct_fine += (predicted_fine == y).type(torch.float).sum().item()
            correct_coarse += (predicted_coarse == coarse_labels).type(torch.float).sum().item()

            all_preds_fine.extend(predicted_fine.cpu().numpy())
            all_preds_coarse.extend(predicted_coarse.cpu().numpy())
            all_labels_fine.extend(y.cpu().numpy())
            all_labels_coarse.extend(coarse_labels.cpu().numpy())


    test_loss /= num_batches
    correct_fine /= size
    correct_coarse /= size

    return {
        'total_loss': test_loss,
        'fine_acc': correct_fine * 100,
        'coarse_acc': correct_coarse * 100,
        'predictions': {
            'fine': all_preds_fine,
            'coarse': all_preds_coarse,
            'labels_fine': all_labels_fine,
            'labels_coarse': all_labels_coarse
        }
    }


def test(model, dataloader, device, loss_fn, use_weighted=False):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss = 0

    correct_fine, correct_coarse = 0, 0

    all_preds_fine = []
    all_preds_coarse = []
    all_labels_fine = []
    all_labels_coarse = []

    with torch.no_grad():
        for X, y, _ in dataloader:
            X, y = X.to(device), y.to(device)


            coarse_logits, fine_logits, _ = model(X, return_type='train')

            loss, _, _ = loss_fn(coarse_logits, fine_logits, y)
            test_loss += loss.item()

            coarse_probs, fine_probs, _ = model(X, return_type='test')

            predicted_coarse = coarse_probs.argmax(1)
            predicted_fine = fine_probs.argmax(1)

            coarse_labels = torch.tensor([
                loss_fn.fine_to_coarse[label.item()][0]
                for label in y
            ], device=device)

            correct_fine += (predicted_fine == y).type(torch.float).sum().item()
            correct_coarse += (predicted_coarse == coarse_labels).type(torch.float).sum().item()

            all_preds_fine.extend(predicted_fine.cpu().numpy())
            all_preds_coarse.extend(predicted_coarse.cpu().numpy())
            all_labels_fine.extend(y.cpu().numpy())
            all_labels_coarse.extend(coarse_labels.cpu().numpy())


    test_loss /= num_batches
    correct_fine /= size
    correct_coarse /= size

    print("Fine classes")
    print(classification_report(all_labels_fine, all_preds_fine, digits=4))

    print("Coarse classes")
    print(classification_report(all_labels_coarse, all_preds_coarse, digits=4))

    return {
        'total_loss': test_loss,
        'fine_acc': correct_fine * 100,
        'coarse_acc': correct_coarse * 100
    }


## Train model

In [None]:
model_name = 'parent_base.pth'
num_coarse = 4
num_fine = 9

fine_per_coarse = [3, 1, 4, 1]

model = HierarchicalResNet(num_coarse, fine_per_coarse, freeze_backbone=True)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
torch.backends.cudnn.benchmark = True
print("Using device:", device)

fine_weights, coarse_weights = calculate_class_weights(train_df[label_col], fine_per_coarse, device)

train_criterion = HierarchicalLoss(fine_per_coarse, fine_weights, coarse_weights)
test_criterion = HierarchicalLoss(fine_per_coarse)
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.001)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3)

best_acc = 0.0

epochs = 50
for epoch in range(epochs):
    print(f"Epoch {epoch+1}/{epochs}")

    train_metrics = train_one_epoch(model, train_loader, train_criterion, optimizer, device)
    print(f"Train - Loss: {train_metrics['total_loss']:.4f}, "
          f"Fine Acc: {train_metrics['fine_acc']:.2f}%, "
          f"Coarse Acc: {train_metrics['coarse_acc']:.2f}%")

    valid_metrics = valid(model, val_loader, device, test_criterion)
    print(f"Valid - Loss: {valid_metrics['total_loss']:.4f}, "
          f"Fine Acc: {valid_metrics['fine_acc']:.2f}%, "
          f"Coarse Acc: {valid_metrics['coarse_acc']:.2f}%")

    scheduler.step(valid_metrics['total_loss'])

    if valid_metrics['fine_acc'] > best_acc:
        best_acc = valid_metrics['fine_acc']
        torch.save(model.state_dict(), path + model_name)


Using device: cuda
Epoch 1/50
.................................................
Train - Loss: 2.1482, Fine Acc: 19.33%, Coarse Acc: 53.37%
Valid - Loss: 1.7117, Fine Acc: 27.58%, Coarse Acc: 69.51%
Epoch 2/50
.................................................
Train - Loss: 1.7622, Fine Acc: 31.35%, Coarse Acc: 66.54%
Valid - Loss: 1.6215, Fine Acc: 31.39%, Coarse Acc: 69.28%
Epoch 3/50
.................................................
Train - Loss: 1.6361, Fine Acc: 36.83%, Coarse Acc: 68.11%
Valid - Loss: 1.4929, Fine Acc: 37.67%, Coarse Acc: 71.75%
Epoch 4/50
.................................................
Train - Loss: 1.5711, Fine Acc: 39.42%, Coarse Acc: 70.93%
Valid - Loss: 1.4063, Fine Acc: 40.13%, Coarse Acc: 73.99%
Epoch 5/50
.................................................
Train - Loss: 1.5326, Fine Acc: 40.77%, Coarse Acc: 70.58%
Valid - Loss: 1.4546, Fine Acc: 40.13%, Coarse Acc: 69.96%
Epoch 6/50
.................................................
Train - Loss: 1.5034, Fin

KeyboardInterrupt: 

## Test model

In [None]:
model = HierarchicalResNet(num_coarse, fine_per_coarse, freeze_backbone=True)

model.load_state_dict(torch.load(path + model_name))
model.to(device)
model.eval()

test_metrics = test(model, test_loader, device, test_criterion)


Fine classes
              precision    recall  f1-score   support

           0     0.7500    0.7500    0.7500         4
           1     0.7213    0.9778    0.8302        45
           2     0.8333    0.5556    0.6667         9
           3     0.7500    0.8411    0.7930       107
           4     0.7938    0.5704    0.6638       135
           5     0.7157    0.7374    0.7264       198
           6     0.6543    0.7067    0.6795       150
           7     0.8642    0.7735    0.8163       181
           8     0.7500    0.9048    0.8201        63

    accuracy                         0.7489       892
   macro avg     0.7592    0.7575    0.7495       892
weighted avg     0.7555    0.7489    0.7466       892

Coarse classes
              precision    recall  f1-score   support

           0     0.7778    0.9655    0.8615        58
           1     0.7563    0.8411    0.7965       107
           2     0.9872    0.9322    0.9589       664
           3     0.7568    0.8889    0.8175       