In [1]:
import json
import numpy as np
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from torchvision import models
from sklearn.metrics import mean_absolute_error

In [2]:
# Data loading and preprocessing
with open('data.json', 'r') as f:
    raw_data = json.load(f)

def convert_data_point(data_point):
    wl = np.array(data_point["wl"])
    r = np.array(data_point["r"])
    c = np.array([item[2] for item in data_point["l"]])
    return wl, r, c

X, y, wl = [], [], None
for data_point in raw_data:
    wl_, r, c = convert_data_point(data_point)
    X.append(r) 
    y.append(c)
    wl = wl_

X = np.array(X)
y = np.array(y)
X = preprocessing.normalize(X)
y = preprocessing.normalize(y)

len(X)

2000

In [3]:
def evaluate_model(model, dataloader):
    model.eval()
    all_targets = []
    all_predictions = []
    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs = inputs.unsqueeze(1)  # Add channel dimension
            outputs = model(inputs)
            all_targets.extend(targets.numpy())
            all_predictions.extend(outputs.numpy())
    mae = mean_absolute_error(all_targets, all_predictions)
    return mae

In [4]:
# Convert data to PyTorch tensors and add a channel dimension
X = torch.tensor(X, dtype=torch.float32).unsqueeze(1)  # Add channel dimension
y = torch.tensor(y, dtype=torch.float32)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Create datasets and data loaders
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [5]:
# Load a pretrained ResNet18 model
model = torch.hub.load('pytorch/vision:v0.20.0', 'resnet18', pretrained=True)

# Modify the first convolutional layer to accept 1 input channel instead of 3
model.conv1 = nn.Conv2d(1, model.conv1.out_channels, kernel_size=model.conv1.kernel_size, 
                        stride=model.conv1.stride, padding=model.conv1.padding, bias=False)

# Modify the final layer to match the number of output classes
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, y.shape[1])

def mae(y_pred, y_true):
    """
    Calculates the Mean Absolute Error between predictions (y_pred) and ground truth (y_true).
    """
    return torch.nn.functional.l1_loss(y_pred, y_true) 

# Define loss function and optimizer
def RMSELoss(yhat,y):
    return torch.sqrt(torch.mean((yhat-y)**2))

criterion = RMSELoss
optimizer = optim.Adam(model.parameters(), lr=0.001)

#model.load_state_dict(torch.load('best_model.pth', weights_only=True))

Using cache found in /home/aniruth/.cache/torch/hub/pytorch_vision_v0.20.0


In [7]:
# Training loop
print("Starting training")
num_epochs = 100
patience = 5  # Number of epochs to wait for improvement
best_val_loss = float('inf')
epochs_no_improve = 0

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, targets in train_loader:
        inputs = inputs.unsqueeze(1)  # Add channel dimension
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    
    # Validation loop
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs = inputs.unsqueeze(1)  # Add channel dimension
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            val_loss += loss.item()
    
    val_loss /= len(test_loader)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}, Val Loss: {val_loss}")
    torch.save(model.state_dict(), 'last_model.pth')
    
    # Early stopping
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        epochs_no_improve = 0
        torch.save(model.state_dict(), 'best_model.pth')
    else:
        epochs_no_improve += 1
        print(f"Patience {epochs_no_improve}")
        if epochs_no_improve >= patience:
            print("Early stopping")
            break


Starting training
Epoch 1/100, Loss: 0.0710665138810873, Val Loss: 0.06629973011357444
Epoch 2/100, Loss: 0.08707395479083062, Val Loss: 0.7242446712085179
Patience 1
Epoch 3/100, Loss: 0.1005570637434721, Val Loss: 0.1551450171640941
Patience 2
Epoch 4/100, Loss: 0.07342067822813987, Val Loss: 0.04451176630599158
Epoch 5/100, Loss: 0.07616621024906635, Val Loss: 0.5513389536312648
Patience 1
Epoch 6/100, Loss: 0.08191115945577622, Val Loss: 0.12288133161408561
Patience 2
Epoch 7/100, Loss: 0.07235990270972252, Val Loss: 0.16936772423131125
Patience 3
Epoch 8/100, Loss: 0.08954766184091568, Val Loss: 0.1168310631598745
Patience 4
Epoch 9/100, Loss: 0.0733503219485283, Val Loss: 0.07181570678949356
Patience 5
Early stopping


In [10]:
model.load_state_dict(torch.load('best_model.pth', weights_only=True))
train_mae = evaluate_model(model, train_loader)
test_mae = evaluate_model(model, test_loader)
print(f"Best Model, Training MAE: {train_mae}, Testing MAE: {test_mae}")

Best Model, Training MAE: 0.03902892768383026, Testing MAE: 0.03909337893128395


In [9]:
model.load_state_dict(torch.load('last_model.pth', weights_only=True))
train_mae = evaluate_model(model, train_loader)
test_mae = evaluate_model(model, test_loader)
print(f"Last Model, Training MAE: {train_mae}, Testing MAE: {test_mae}")

Last Model, Training MAE: 0.06211673468351364, Testing MAE: 0.06134948879480362
