In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from torch import nn
from torch.utils.data import Dataset, DataLoader
import torch
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer

In [None]:
# Load Data
raw_dataset = pd.read_csv("data/asteroids.csv")

In [None]:
raw_dataset["diameter_missing"] = raw_dataset["diameter"].isna().astype(int)
raw_dataset["pha"] = raw_dataset["pha"].map({"Y": 1.0, "N" : 0.0})
raw_dataset = raw_dataset[raw_dataset["pha"].isin([1.0,0.0])]

In [None]:
dataset = raw_dataset[raw_dataset["neo"] == "Y"]

features = ["H", "epoch", "diameter_missing", "epoch_mjd", "e", "a", "q", "i", "om", "w", "rms", "per", "per_y"]
target = "pha"

In [None]:
# Analysis

# print(dataset[target].info())
# print(dataset.describe())
# print(dataset["pha"].value_counts())

# dataset["pha"].value_counts().plot(kind='bar')
# plt.title("PHA vs Non-PHA Asteroids")
# plt.xlabel("PHA")
# plt.ylabel("Count")

# X.hist(bins=30, figsize=(10,8))
# plt.suptitle("Feature Distributions")
# plt.show()

# import seaborn as sns

# for col in ["diameter", "albedo", "H", "e", "a"]:
#     sns.boxplot(data=dataset, x="pha", y=col)
#     plt.title(f"{col} vs PHA")
#     plt.show()

# sns.pairplot(dataset, vars=["diameter", "H", "albedo"], hue="pha")
# plt.show()

In [None]:
# Split Data

train_df, test_df = train_test_split(dataset, test_size=0.4, random_state=42)
test_df, val_df = train_test_split(train_df, test_size=0.5, random_state=42)


In [303]:
# Impute Missing Values

imputer = SimpleImputer(strategy='median')

train_df[features] = imputer.fit_transform(train_df[features])
val_df[features]    = imputer.transform(val_df[features])
test_df[features]   = imputer.transform(test_df[features])

In [304]:
# Scale Features

scaler = StandardScaler()

train_df[features] = scaler.fit_transform(train_df[features])
val_df[features] = scaler.transform(val_df[features])
test_df[features] = scaler.transform(test_df[features])

In [305]:
# Dataset & DataLoader

class AsteroidDataset(Dataset):
    def __init__(self, df, feature_cols, target_col):
        self.X = torch.tensor(df[feature_cols].values, dtype=torch.float32)
        self.y = torch.tensor(df[target_col].values, dtype=torch.float32)
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]
    
train_dataset = AsteroidDataset(train_df, features, target)
test_dataset = AsteroidDataset(test_df, features, target)
val_dataset = AsteroidDataset(val_df, features, target)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32)
val_loader = DataLoader(val_dataset, batch_size=32)

In [306]:
# Model

class ResidualBlock(nn.Module):
    def __init__(self, dim, dropout=0.1):
        super().__init__()
        self.fc = nn.Linear(dim, dim)
        self.bn = nn.LayerNorm(dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(dim, dim)
        self.bn2 = nn.LayerNorm(dim)
    
    def forward(self, x):
        out = self.fc(x)
        out = self.bn(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.fc2(out)
        return self.bn2(out + x)

class Network(nn.Module):
    def __init__(self, input_dim, hidden_dim=256, num_blocks=3, output_dim=1, dropout=0.1):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.bn = nn.LayerNorm(hidden_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.blocks = nn.Sequential(*[ResidualBlock(hidden_dim, dropout) for _ in range(num_blocks)])
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        
        
    def forward(self, x):
        out = self.fc1(x)
        out = self.bn(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.blocks(out)
        out = self.fc2(out)
        return out

In [307]:
# Model Setup

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = Network(len(features)).to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)


In [308]:
pos_weight = train_df[train_df["pha"] == 0.0].size / train_df[train_df["pha"] == 1.0].size
pos_weight = torch.tensor([pos_weight]).to(device)
# print(train_df)
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)

In [309]:
# Validation

def validate():
    model.eval()
    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for X, y in val_loader:
            X, y = X.to(device), y.to(device)
            logits = model(X)
            loss = criterion(logits.squeeze(), y)
            total_loss += loss.item() * X.size(0)

            preds = (torch.sigmoid(logits) > 0.5).float()
            correct += (preds.squeeze() == y).sum().item()
            total += y.size(0)

    print(f"Validation Loss: {total_loss/total:.4f} | Validation Accuracy: {correct/total:.4f}")

In [310]:
def train(epochs=20):
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        correct = 0
        total = 0

        for X,y in train_loader:
            X, y = X.to(device), y.to(device)

            optimizer.zero_grad()
            logits = model(X)
            loss = criterion(logits.squeeze(), y)
            loss.backward()
            optimizer.step()

            # Tracking
            total_loss += loss.item() * X.size(0)
            preds = (torch.sigmoid(logits) > 0.5).float()
            correct += (preds.squeeze() == y).sum().item()
            total += y.size(0)

        print(f"Epoch {epoch}: Train Loss = {total_loss/total:.4f} | Train Acc: {correct/total:.4f}")
        validate()

train()

ValueError: Target size (torch.Size([1])) must be the same as input size (torch.Size([]))

In [None]:
def test():
    tp = 0
    tn = 0
    fp = 0
    fn = 0
    total_loss = 0
    with torch.no_grad():
        for X, y in test_loader:
            X, y = X.to(device), y.to(device)
            logits = model(X)
            loss = criterion(logits.squeeze(), y)
            total_loss += loss.item()
            
            preds = (torch.sigmoid(logits) > 0.5).float()
            
            tp += torch.logical_and(y, preds.squeeze()).sum().item()
            tn += torch.logical_not(torch.logical_or(y, preds.squeeze())).sum().item()
            fp += torch.logical_and(preds.squeeze(), torch.logical_not(y)).sum().item()
            fn += torch.logical_and(y, torch.logical_not(preds.squeeze())).sum().item()
            
            print(f"Test Metrics: Accuracy={(tp + tn)/(tp + tn + fp + fn):.4f} | Precision={(tp/(tp + fp)):.4f} | Recall={(tp/(fn + tp)):.4f} | F1={tp/(tp + (.5 * (tp + fn))):.4f}")
            
test()


## Handling null values
- Create a flag column for diameter missing asteroids
- Remove diameter attribute and other columns with null values
- dataset.isnull().sum()
- diameter, albedo, diameter_sigma

## Handling imbalanced target
- Try weighting 'Y' class more
- Afterwards, observe recall & F1