In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models import resnet18
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import roc_auc_score, roc_curve, auc, accuracy_score
import numpy as np
import glob
import os
import torchvision.transforms as T
from PIL import Image
from torchvision.models import resnet18, ResNet18_Weights
import matplotlib.pyplot as plt
from sklearn.preprocessing import label_binarize

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

In [None]:
class PhysicsDataset(Dataset):
    def __init__(self, root, transform=None):
        self.data = []
        self.labels = []
        self.transform = transform
        classes = {'no': 0, 'sphere': 1, 'vort': 2}
        for label, idx in classes.items():
            files = glob.glob(os.path.join(root, label, '*.npy'))
            for file in files:
                self.data.append(file)
                self.labels.append(idx)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img = np.load(self.data[idx])

        if img.ndim == 2:
            img = np.stack([img]*3, axis=-1)
        elif img.shape[-1] == 1:
            img = np.concatenate([img]*3, axis=-1)
        else:
            img = img.transpose(1, 2, 0)

        img = np.squeeze(img)
        img = Image.fromarray((img*255).astype(np.uint8)).convert('RGB')

        if self.transform:
            img = self.transform(img)

        label = torch.tensor(self.labels[idx], dtype=torch.long)
        return img, label

In [None]:
dataset = PhysicsDataset('data/train', transform=T.Compose([
    T.Resize((224, 224)),
    T.ToTensor(),
]))

loader = DataLoader(dataset, batch_size=64, shuffle=False)

mean = torch.zeros(3)
std = torch.zeros(3)

print("Calculating mean and std...")

for images, _ in loader:
    mean += images.mean([0, 2, 3])
    std += images.std([0, 2, 3])

mean /= len(loader)
std /= len(loader)

print(f"Mean: {mean}")
print(f"Std: {std}")

In [None]:
transform = T.Compose([
    T.Resize((224, 224)),
    T.ToTensor(),
    T.Normalize(mean=mean[0], std=std[0])
])

In [None]:
train_data = PhysicsDataset('data/train', transform=transform)
val_data = PhysicsDataset('data/val', transform=transform)

train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
val_loader = DataLoader(val_data, batch_size=32)

In [None]:
class PhysicsLayer(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc = nn.Linear(512, 128)

    def forward(self, x):
        x = self.fc(x)
        beta = x[:, :64]
        theta = x[:, 64:]
        alpha = theta - beta
        return torch.cat([beta, alpha], dim=1)

In [None]:
class LensPINN(nn.Module):
    def __init__(self, num_classes=3):
        super().__init__()
        self.features = resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)
        self.features.fc = nn.Identity()
        self.physics = PhysicsLayer()
        self.classifier = nn.Sequential(
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.physics(x)
        return self.classifier(x)

In [None]:
model = LensPINN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [None]:
train_losses, test_accuracies = [], []

for epoch in range(20):
    model.train()
    total_loss = 0
    for imgs, labels in train_loader:
        imgs, labels = imgs.to(device), labels.to(device)
        outputs = model(imgs)
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        
    avg_loss = total_loss / len(train_loader)
    train_losses.append(avg_loss)

    model.eval()
    correct = total = 0
    with torch.no_grad():
        for imgs, labels in val_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    accuracy = correct / total
    test_accuracies.append(accuracy)
    print(f"Epoch {epoch+1}, Loss: {avg_loss:.4f}, Test Accuracy: {accuracy:.4f}")

In [None]:
model.eval()
y_true, y_scores = [], []

with torch.no_grad():
    for imgs, labels in val_loader:
        imgs = imgs.to(device)
        outputs = model(imgs)
        probs = torch.softmax(outputs, dim=1)
        y_true.extend(labels.cpu().numpy())
        y_scores.extend(probs.cpu().numpy())

y_true = np.array(y_true)
y_scores = np.array(y_scores)
y_pred = np.argmax(y_scores, axis=1)

acc = accuracy_score(y_true, y_pred)
print(f"Accuracy: {acc:.4f}")

classes = ['no', 'sphere', 'vort']
y_true_bin = label_binarize(y_true, classes=[0, 1, 2])

plt.figure(figsize=(10, 8))
for i, class_name in enumerate(classes):
    fpr, tpr, _ = roc_curve(y_true_bin[:, i], y_scores[:, i])
    roc_auc = auc(fpr, tpr)
    plt.plot(fpr, tpr, lw=2, label=f'ROC curve {class_name} (area = {roc_auc:.2f})')

plt.plot([0, 1], [0, 1], 'k--')

plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve for Multi-class Classification')
plt.legend(loc='lower right')
plt.grid()
plt.show()

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(range(1, len(test_accuracies)+1), test_accuracies, marker='o')
plt.xlabel('Epoch')
plt.ylabel('Validation Accuracy')
plt.title('Validation Accuracy over Epochs')
plt.grid()
plt.show()

In [None]:
torch.save(model.state_dict(), 'models/pinn_model.pth')

In [None]:
'''
# test model
model = LensPINN().to(device)
model.load_state_dict(torch.load('models/pinn_model.pth'))
model.eval()
'''