In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import h5py
from torchvision import transforms
from tqdm import tqdm
import numpy as np

# Check for CUDA
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Custom HDF5 Dataset Class
class HDF5Dataset(data.Dataset):
    def __init__(self, hdf5_path):
        self.hdf5_path = hdf5_path
        with h5py.File(hdf5_path, 'r') as f:
            self.length = len(f['image'])
        self.transform = transforms.Compose([
            transforms.Resize((64, 64)),
            transforms.ToTensor(),
        ])
    
    def __len__(self):
        return self.length
    
    def __getitem__(self, idx):
        with h5py.File(self.hdf5_path, 'r') as f:
            img = torch.tensor(f['image'][idx], dtype=torch.float32)
            photo = torch.tensor([
                f['g_cmodel_mag'][idx], f['r_cmodel_mag'][idx], f['i_cmodel_mag'][idx],
                f['z_cmodel_mag'][idx], f['y_cmodel_mag'][idx],
                f['g_ellipticity'][idx], f['r_ellipticity'][idx], f['i_ellipticity'][idx],
                f['z_ellipticity'][idx], f['y_ellipticity'][idx],
                f['g_sersic_index'][idx], f['r_sersic_index'][idx], f['i_sersic_index'][idx],
                f['z_sersic_index'][idx], f['y_sersic_index'][idx],
            ], dtype=torch.float32)
            redshift = torch.tensor(f['specz_redshift'][idx], dtype=torch.float32)
        return photo, img, redshift

# Photometric Feature Extractor
class PhotometricMLP(nn.Module):
    def __init__(self, input_size):
        super(PhotometricMLP, self).__init__()
        self.fc1 = nn.Linear(input_size, 256)
        self.bn1 = nn.BatchNorm1d(256)
        self.fc2 = nn.Linear(256, 128)
        self.bn2 = nn.BatchNorm1d(128)
        self.dropout = nn.Dropout(0.3)
        self.gelu = nn.GELU()
    
    def forward(self, x):
        x = self.gelu(self.bn1(self.fc1(x)))
        x = self.dropout(self.gelu(self.bn2(self.fc2(x))))
        return x

# CNN for Image Feature Extraction
class ImageCNN(nn.Module):
    def __init__(self):
        super(ImageCNN, self).__init__()
        self.conv1 = nn.Conv2d(5, 32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.adaptive_pool = nn.AdaptiveAvgPool2d((4, 4))
        self.gelu = nn.GELU()
        self.fc = nn.Linear(128 * 4 * 4, 128)
    
    def forward(self, x):
        x = self.pool(self.gelu(self.bn1(self.conv1(x))))
        x = self.pool(self.gelu(self.bn2(self.conv2(x))))
        x = self.pool(self.gelu(self.bn3(self.conv3(x))))
        x = self.adaptive_pool(x)
        x = x.view(x.size(0), -1)
        x = self.gelu(self.fc(x))
        return x

# Hybrid Model with Attention-based Fusion
class HybridRedshiftModel(nn.Module):
    def __init__(self, photometric_input):
        super(HybridRedshiftModel, self).__init__()
        self.photo = PhotometricMLP(photometric_input).to(device)
        self.image = ImageCNN().to(device)
        
        # Attention-based Fusion Layer
        self.attn = nn.MultiheadAttention(embed_dim=128, num_heads=4, batch_first=True)
        
        self.fc1 = nn.Linear(128, 64)
        self.fc2 = nn.Linear(64, 1)
        self.gelu = nn.GELU()
    
    def forward(self, photo, img):
        photo_feat = self.photo(photo).unsqueeze(1)  # Add sequence dimension
        img_feat = self.image(img).unsqueeze(1)
        
        combined, _ = self.attn(photo_feat, img_feat, img_feat)
        combined = combined.squeeze(1)
        
        fused = self.gelu(self.fc1(combined))
        output = self.fc2(fused).squeeze(1)
        return output

# Instantiate Model, Optimizer, Loss
model = HybridRedshiftModel(photometric_input=15).to(device)
optimizer = optim.RMSprop(model.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-5)
criterion = nn.SmoothL1Loss()

# Load Dataset & DataLoader
dataset = HDF5Dataset("D:/Galaxy Datasets/temp_training.hdf5")
dataloader = data.DataLoader(dataset, batch_size=32, shuffle=True)

# Training Function with Early Stopping
def train_model(model, dataloader, optimizer, criterion, num_epochs=10, patience=3, save_path='28_mar_model.pth'):
    model.train()
    best_loss = np.inf
    early_stop_counter = 0
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2)
    
    for epoch in range(num_epochs):
        epoch_loss = 0.0
        with tqdm(dataloader, unit="batch") as tepoch:
            for photo, img, labels in tepoch:
                tepoch.set_description(f"Epoch {epoch+1}")
                photo, img, labels = photo.to(device), img.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(photo, img)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                epoch_loss += loss.item()
                tepoch.set_postfix(loss=loss.item())
        
        avg_loss = epoch_loss / len(dataloader)
        print(f"Epoch {epoch+1} Loss: {avg_loss}")
        scheduler.step(avg_loss)
        
        if avg_loss < best_loss:
            best_loss = avg_loss
            early_stop_counter = 0
            torch.save(model.state_dict(), save_path)
            print(f"Model saved at epoch {epoch+1} with loss {best_loss}")
        else:
            early_stop_counter += 1
            if early_stop_counter >= patience:
                print("Early stopping triggered.")
                break

# Train Model
train_model(model, dataloader, optimizer, criterion, num_epochs=10, patience=3, save_path='28_mar_model.pth')

Epoch 1:   1%|          | 58/6393 [00:05<07:33, 13.98batch/s, loss=0.797] 

In [3]:
model.load_state_dict(torch.load('24_mar_model_better.pth'))
model.eval()

# Evaluate Model
mse_loss = 0.0
absolute_error = 0.0
num_samples = len(dataset)
with torch.no_grad():
    for photo, img, label in dataloader:
        photo, img, label = photo.to(device), img.to(device), label.to(device)
        predictions = model(photo, img)
        mse_loss += nn.MSELoss()(predictions, label).item()
        absolute_error += torch.abs(predictions - label).sum().item()

mse_loss /= num_samples
mae_loss = absolute_error / num_samples
print(f"Mean Squared Error (MSE): {mse_loss}")
print(f"Mean Absolute Error (MAE): {mae_loss}")

Mean Squared Error (MSE): 0.0011970361350641455
Mean Absolute Error (MAE): 0.08259364907729065


In [None]:
percentage_accuracy = 100 * (1 - mae_loss / torch.mean(torch.tensor(dataset[:][2])).item())
print(f"Percentage Accuracy: {percentage_accuracy}%")

In [9]:
for i in range (0,10):
    print(f"Original value: {label[i]:.1f}      Predicted value: {predictions[i]:.1f}")

Original value: 0.2      Predicted value: 0.4
Original value: 0.3      Predicted value: 0.3
Original value: 0.2      Predicted value: 0.2
Original value: 0.7      Predicted value: 0.6
Original value: 0.6      Predicted value: 0.6
Original value: 0.2      Predicted value: 0.2
Original value: 0.6      Predicted value: 0.7
Original value: 0.1      Predicted value: 0.2
Original value: 0.1      Predicted value: 0.1
Original value: 0.1      Predicted value: 0.2
