In [None]:
import os, time
from PIL import Image
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
from torch.cuda.amp import autocast, GradScaler

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

from google.colab import drive
drive.mount('/content/drive')

Using device: cuda
Mounted at /content/drive


In [None]:
#%%
# 2) Use the exact Drive paths for your reef & sand folders
reef_dir = "/content/drive/MyDrive/Tamu(grad)/CSCE753 CVRP/project test model implementation/reef"
sand_dir = "/content/drive/MyDrive/Tamu(grad)/CSCE753 CVRP/project test model implementation/sand"

# Debug
print("reef_dir contents:", os.listdir(reef_dir))
print("sand_dir contents:", os.listdir(sand_dir))

# get images
reef_files = []
for root, _, files in os.walk(reef_dir):
    for f in files:
        if f.lower().endswith(".png"):
            reef_files.append(os.path.join(root, f))

sand_files = []
for root, _, files in os.walk(sand_dir):
    for f in files:
        if f.lower().endswith(".png"):
            sand_files.append(os.path.join(root, f))

print(f"Found {len(reef_files)} reef images and {len(sand_files)} sand images")

all_files  = reef_files + sand_files
all_labels = [0]*len(reef_files) + [1]*len(sand_files)

#%%
# 3) Split into train/val/test (70/15/15)
train_files, temp_files, train_labels, temp_labels = train_test_split(
    all_files, all_labels, test_size=0.30, stratify=all_labels, random_state=42)

val_files, test_files, val_labels, test_labels = train_test_split(
    temp_files, temp_labels, test_size=0.50, stratify=temp_labels, random_state=42)

print("Train:", len(train_files), "Val:", len(val_files), "Test:", len(test_files))

#%%
# 4) Transforms ver 01
# train_transform = transforms.Compose([
#     transforms.Resize(256),
#     transforms.RandomResizedCrop(224),
#     transforms.RandomHorizontalFlip(),
#     transforms.ColorJitter(0.3,0.3,0.3,0.15),
#     transforms.RandomErasing(p=0.5, scale=(0.02,0.2), ratio=(0.3,3.3), value='random'),
#     transforms.ToTensor(),
#     transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
# ])
# val_transform = transforms.Compose([
#     transforms.Resize(256),
#     transforms.CenterCrop(224),
#     transforms.ToTensor(),
#     transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
# ])

# 4) Transforms ver 02
train_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(0.3, 0.3, 0.3, 0.15),
    transforms.ToTensor(),
    transforms.RandomErasing(
        p=0.5,
        scale=(0.02, 0.2),
        ratio=(0.3, 3.3),
        value='random'
    ),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406],
                         [0.229, 0.224, 0.225])
])


# 5) Dataset & DataLoader
class CustomImageDataset(Dataset):
    def __init__(self, files, labels, transform=None):
        self.files, self.labels, self.transform = files, labels, transform
    def __len__(self):
        return len(self.files)
    def __getitem__(self, idx):
        img = Image.open(self.files[idx]).convert("RGB")
        if self.transform:
            img = self.transform(img)
        return img, self.labels[idx]

train_ds = CustomImageDataset(train_files, train_labels, transform=train_transform)
val_ds   = CustomImageDataset(val_files,   val_labels,   transform=val_transform)
test_ds  = CustomImageDataset(test_files,  test_labels,  transform=val_transform)

train_loader = DataLoader(train_ds, batch_size=32, shuffle=True,  num_workers=2)
val_loader   = DataLoader(val_ds,   batch_size=32, shuffle=False, num_workers=2)
test_loader  = DataLoader(test_ds,  batch_size=32, shuffle=False, num_workers=2)


# 6) Models
class FocalLoss(nn.Module):
    def __init__(self, gamma=2, weight=None, reduction="mean"):
        super().__init__()
        self.gamma, self.reduction = gamma, reduction
        self.ce = nn.CrossEntropyLoss(weight=weight, reduction="none")
    def forward(self, x, y):
        logpt = -self.ce(x, y)
        pt = torch.exp(logpt)
        loss = -((1 - pt)**self.gamma) * logpt
        return loss.mean() if self.reduction=="mean" else loss.sum() if self.reduction=="sum" else loss

class ChannelAttention(nn.Module):
    def __init__(self, c, ratio=16):
        super().__init__()
        self.avg = nn.AdaptiveAvgPool2d(1)
        self.max = nn.AdaptiveMaxPool2d(1)
        self.fc  = nn.Sequential(
            nn.Conv2d(c, c//ratio, 1, bias=False),
            nn.ReLU(),
            nn.Conv2d(c//ratio, c, 1, bias=False)
        )
        self.sig = nn.Sigmoid()
    def forward(self, x):
        return x * self.sig(self.fc(self.avg(x)) + self.fc(self.max(x)))

class SpatialAttention(nn.Module):
    def __init__(self, k=7):
        super().__init__()
        self.conv = nn.Conv2d(2,1,k,padding=(k-1)//2,bias=False)
        self.sig  = nn.Sigmoid()
    def forward(self, x):
        avg = x.mean(1,keepdim=True)
        mx,_ = x.max(1,keepdim=True)
        return x * self.sig(self.conv(torch.cat([avg, mx],1)))

class CBAM(nn.Module):
    def __init__(self, c):
        super().__init__()
        self.ca = ChannelAttention(c)
        self.sa = SpatialAttention()
    def forward(self, x):
        return self.sa(self.ca(x))

def replace_relu6(module, new_act):
    for name, child in module.named_children():
        if isinstance(child, nn.ReLU6):
            setattr(module, name, new_act())
        else:
            replace_relu6(child, new_act)

def get_act(name):
    return {
        "relu6": nn.ReLU6,
        "leaky": lambda: nn.LeakyReLU(0.01,inplace=True),
        "gelu": nn.GELU,
        "silu": nn.SiLU
    }[name.lower()]

class CustomMobileNetV2(nn.Module):
    def __init__(self, num_classes=2, activation="relu6",
                 use_cbam=False, extra_fc=False,
                 dropout_rate=0.2, fc_dim=128):
        super().__init__()
        self.net = models.mobilenet_v2(pretrained=True)
        act = get_act(activation)
        replace_relu6(self.net, act)
        c = self.net.last_channel
        if use_cbam: self.cbam = CBAM(c)
        if extra_fc:
            self.net.classifier = nn.Sequential(
                nn.Dropout(dropout_rate),
                nn.Linear(c, fc_dim),
                act(),
                nn.Linear(fc_dim, num_classes)
            )
        else:
            self.net.classifier = nn.Sequential(
                nn.Dropout(dropout_rate),
                nn.Linear(c, num_classes)
            )
    def forward(self, x):
        x = self.net.features(x).mean([2,3])
        if hasattr(self, "cbam"):
            xb = self.cbam(x.unsqueeze(-1).unsqueeze(-1))
            x = xb.view(x.size(0), -1)
        return self.net.classifier(x)


# 7) optimizers, schedulers, scalers
num_epochs = 50
patience   = 5

models_dict = {
    "Baseline_ReLU6": CustomMobileNetV2(activation="relu6"),
    "LeakyReLU":       CustomMobileNetV2(activation="leaky"),
    "GeLU":            CustomMobileNetV2(activation="gelu"),
    "SiLU":            CustomMobileNetV2(activation="silu"),
    "Extra_FFN":       CustomMobileNetV2(activation="relu6", use_cbam=True, extra_fc=True, fc_dim=256),
    "High_Dropout":    CustomMobileNetV2(activation="relu6", dropout_rate=0.7),
}

optimizers = {}
schedulers = {}
scalers    = {}
criterion  = FocalLoss(gamma=2)

for name, m in models_dict.items():
    m.to(device)
    optimizers[name] = optim.AdamW(m.parameters(), lr=1e-4, weight_decay=1e-5)
    schedulers[name] = optim.lr_scheduler.CosineAnnealingLR(optimizers[name], T_max=num_epochs)
    scalers[name]    = GradScaler()

best_val = {n:1e9 for n in models_dict}
no_imp   = {n:0   for n in models_dict}

history = {n:{'train_loss':[], 'train_acc':[], 'val_loss':[], 'val_acc':[]} for n in models_dict}

# 8) Train
for epoch in range(1, num_epochs+1):
    print(f"Epoch {epoch}/{num_epochs}")
    for name, model in models_dict.items():
        model.train()
        running_loss, total, correct = 0.0, 0, 0
        for xb, yb in train_loader:
            xb, yb = xb.to(device), yb.to(device)
            optimizers[name].zero_grad()
            with autocast():
                out  = model(xb)
                loss = criterion(out, yb)
            scalers[name].scale(loss).backward()
            scalers[name].step(optimizers[name])
            scalers[name].update()

            running_loss += loss.item() * xb.size(0)
            total       += yb.size(0)
            correct     += (out.argmax(1) == yb).sum().item()

        train_loss = running_loss / total
        train_acc  = correct / total

        # -=-=-=-=-=-=-= Validate -=-=-=-=-=-=-=-=-=-=-=
        model.eval()
        val_loss, vtotal, vcorrect = 0.0, 0, 0
        with torch.no_grad():
            for xb, yb in val_loader:
                xb, yb = xb.to(device), yb.to(device)
                out  = model(xb)
                loss = criterion(out, yb)

                val_loss += loss.item() * xb.size(0)
                vtotal   += yb.size(0)
                vcorrect += (out.argmax(1) == yb).sum().item()

        val_loss /= vtotal
        val_acc   = vcorrect / vtotal

        # -=-=-=-=-=-=-=-=-=-= scheduler -=-=-=-=-=-=-=-=-=-=-=-
        history[name]['train_loss'].append(train_loss)
        history[name]['train_acc'].append(train_acc)
        history[name]['val_loss'].append(val_loss)
        history[name]['val_acc'].append(val_acc)

        print(f"  {name}: TrL={train_loss:.4f} TrA={train_acc:.4f}  ValL={val_loss:.4f} ValA={val_acc:.4f}")

        schedulers[name].step()

    print("-" * 60)

# 9) infernece
print("\nFinal Test Results:")
for name, model in models_dict.items():
    model.eval()
    test_loss, ttotal, tcorrect = 0.0, 0, 0
    with torch.no_grad():
        for xb, yb in test_loader:
            xb, yb = xb.to(device), yb.to(device)
            out  = model(xb)
            loss = criterion(out, yb)
            test_loss += loss.item() * xb.size(0)
            ttotal    += yb.size(0)
            tcorrect  += (out.argmax(1) == yb).sum().item()
    print(f"{name}: TestL={test_loss/ttotal:.4f}  TestA={tcorrect/ttotal:.4f}")

reef_dir contents: ['reef 64', 'reef 70']
sand_dir contents: ['sand 64', 'sand 70']
Found 861 reef images and 1504 sand images
Train: 1655 Val: 355 Test: 355




Epoch 1/50


  scalers[name]    = GradScaler()
  with autocast():


  Baseline_ReLU6: TrL=0.0769 TrA=0.8876  ValL=0.0108 ValA=0.9915
  LeakyReLU: TrL=0.0674 TrA=0.8900  ValL=0.0067 ValA=0.9859
  GeLU: TrL=0.1115 TrA=0.8254  ValL=0.0403 ValA=0.9606
  SiLU: TrL=0.1258 TrA=0.7861  ValL=0.0564 ValA=0.9380
  Extra_FFN: TrL=0.0658 TrA=0.8979  ValL=0.0121 ValA=0.9915
  High_Dropout: TrL=0.1121 TrA=0.8363  ValL=0.0072 ValA=0.9915
------------------------------------------------------------
Epoch 2/50
  Baseline_ReLU6: TrL=0.0320 TrA=0.9601  ValL=0.0116 ValA=0.9859
  LeakyReLU: TrL=0.0335 TrA=0.9553  ValL=0.0137 ValA=0.9915
  GeLU: TrL=0.0846 TrA=0.8598  ValL=0.0213 ValA=0.9859
  SiLU: TrL=0.0945 TrA=0.8508  ValL=0.0366 ValA=0.9549
  Extra_FFN: TrL=0.0314 TrA=0.9589  ValL=0.0118 ValA=0.9915
  High_Dropout: TrL=0.0462 TrA=0.9384  ValL=0.0077 ValA=0.9915
------------------------------------------------------------
Epoch 3/50
  Baseline_ReLU6: TrL=0.0285 TrA=0.9583  ValL=0.0169 ValA=0.9775
  LeakyReLU: TrL=0.0323 TrA=0.9571  ValL=0.0119 ValA=0.9944
  GeLU: TrL=0.0