In [None]:
import torch
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
from sklearn.calibration import LabelEncoder

In [None]:
import pandas as pd
df = pd.read_csv('Data\k2pandc final.csv')
df

In [None]:
# -------------------------------
# 1. Load dataset
# -------------------------------
def load_dataset(path: str) -> pd.DataFrame:
    df = pd.read_csv(path)
    print(df.info())
    return df

In [None]:
# -------------------------------
# 2. Handle missing values
# -------------------------------
def fill_missing_values(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    num_cols = df.select_dtypes(include=['float64', 'int64']).columns
    cat_cols = df.select_dtypes(include=['object', 'category', 'bool']).columns

    # Numerical → median
    df[num_cols] = df[num_cols].fillna(df[num_cols].median())

    # Categorical → mode
    for col in cat_cols:
        mode = df[col].mode()
        df[col] = df[col].fillna(mode[0] if not mode.empty else 'Unknown')

    return df

In [None]:
# -------------------------------
# 3. Scale numerical features
# -------------------------------
def scale_numerical(df: pd.DataFrame) -> pd.DataFrame:
    df_scaled = df.copy()
    num_cols = df_scaled.select_dtypes(include=['float64', 'int64']).columns
    scaler = StandardScaler()
    df_scaled[num_cols] = scaler.fit_transform(df_scaled[num_cols])
    return df_scaled, scaler

In [None]:
# -------------------------------
# 4. Encode categorical features
# -------------------------------

def encode_categorical(df: pd.DataFrame):
    df_encoded = df.copy()
    cat_cols = df_encoded.select_dtypes(include=['object', 'category', 'bool']).columns

    le_dict = {}
    for col in cat_cols:
        le = LabelEncoder()
        df_encoded[col] = le.fit_transform(df_encoded[col].fillna('NaN_Label'))
        le_dict[col] = le
    
    return df_encoded, le_dict

In [None]:
# -------------------------------
# 5. Feature selection (correlation + importance)
# -------------------------------
def select_features(df_encoded: pd.DataFrame, target_col: str, N: int = 20):
    from sklearn.ensemble import RandomForestClassifier

    X = df_encoded.drop(columns=[target_col])
    y = df_encoded[target_col]

    X = pd.get_dummies(X)  # one-hot if needed
    X_train, _, y_train, _ = train_test_split(X, y, random_state=42)

    # Correlation
    corr_with_target = df_encoded.corr(numeric_only=True)[target_col].drop(target_col).abs()
    top_corr = corr_with_target.sort_values(ascending=False).head(N).index.tolist()

    # Random Forest importance
    rf = RandomForestClassifier(random_state=42)
    rf.fit(X_train, y_train)
    importances = pd.Series(rf.feature_importances_, index=X.columns).sort_values(ascending=False)
    top_importance = importances.head(N).index.tolist()

    # Combine
    selected_features = list(set(top_corr + top_importance))
    return selected_features

In [None]:
# -------------------------------
# 6. Build preprocessed dataframe
# -------------------------------
def build_preprocessed_df(df_encoded: pd.DataFrame, selected_features: list, target_col: str):
    X = pd.get_dummies(df_encoded.drop(columns=[target_col]))
    valuable_features_df = X[selected_features]

    # Add target
    preprocessed_df = valuable_features_df.copy()
    preprocessed_df[target_col] = df_encoded[target_col]
    return preprocessed_df

In [None]:
# -------------------------------
# 7. Downsample classes
# -------------------------------
def downsample_classes(df: pd.DataFrame, target_col: str):
    from sklearn.utils import resample

    df_downsample = df.copy()
    class_counts = df_downsample[target_col].value_counts()
    min_count = class_counts.min()

    dfs = []
    for cls in class_counts.index:
        cls_df = df_downsample[df_downsample[target_col] == cls]
        cls_downsampled = resample(cls_df, replace=False, n_samples=min_count, random_state=42)
        dfs.append(cls_downsampled)

    df_downsampled = pd.concat(dfs).sample(frac=1, random_state=42).reset_index(drop=True)
    return df_downsampled

In [None]:
path = "Data/cumulative_2025.10.03_07.59.03.csv"
target_col = "koi_disposition"

df = load_dataset(path)
df_filled = fill_missing_values(df)
df_scaled, scaler = scale_numerical(df_filled)
df_encoded, le_dict = encode_categorical(df_scaled)

selected_features = select_features(df_encoded, target_col, N=20)
preprocessed_df = build_preprocessed_df(df_encoded, selected_features, target_col)

df_downsampled = downsample_classes(preprocessed_df, target_col)

print("Final preprocessed dataset shape:", df_downsampled.shape)
print(df_downsampled.info())

In [None]:
df = df_downsampled.drop(columns=['koi_pdisposition']).copy()

In [None]:
x_train, x_test, y_train, y_test = train_test_split(
    df.drop(columns=[target_col]),  # Features
    df[target_col],                 # Target
    test_size=0.2,
    random_state=42,
    stratify=df[target_col]
)

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# Fit a Random Forest model
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(x_train, y_train)

# Predict on test set
y_pred = rf.predict(x_test)

# Print accuracy
print("Test accuracy:", accuracy_score(y_test, y_pred))


In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.metrics import recall_score, roc_auc_score
from sklearn.preprocessing import label_binarize

# Assume you have these (replace with your actual data)
# X_train: np.array or torch.Tensor, shape [n_samples, input_size]
# y_train: np.array or torch.Tensor, shape [n_samples], values 0-2
# X_test, y_test: similar

# Convert to tensors if not already
X_train = torch.tensor(x_train.to_numpy(), dtype=torch.float32)
y_train = torch.tensor(y_train.to_numpy(), dtype=torch.long)
X_test = torch.tensor(x_test.to_numpy(), dtype=torch.float32)
y_test = torch.tensor(y_test.to_numpy(), dtype=torch.long)

# Create datasets
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)

# Data loaders
batch_size = 32  # Adjust based on your data size and memory
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Compute class weights for imbalance (optional but recommended for recall)
class_counts = np.bincount(y_train.numpy())
class_weights = 1. / class_counts
class_weights = torch.tensor(class_weights / class_weights.sum(), dtype=torch.float32)  # Normalize

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ResidualBlock(nn.Module):
    def __init__(self, in_features, out_features, dropout_rate=0.3):  # <-- fixed __init__
        super(ResidualBlock, self).__init__()  # <-- fixed __init__
        self.fc = nn.Linear(in_features, out_features)
        self.ln = nn.LayerNorm(out_features)  # Layer norm for better stability
        self.dropout = nn.Dropout(dropout_rate)
        
        # Projection if dimensions differ
        self.proj = nn.Linear(in_features, out_features) if in_features != out_features else nn.Identity()
    
    def forward(self, x):
        residual = self.proj(x)
        x = F.silu(self.ln(self.fc(x)))  # Swish activation
        x = self.dropout(x)
        return x + residual  # Skip connection


class ResidualMLP(nn.Module):
    def __init__(self, input_size=16, num_classes=3):  # <-- fixed __init__
        super(ResidualMLP, self).__init__()  # <-- fixed __init__
        self.entry = nn.Linear(input_size, 256)  # Entry layer
        
        # Residual blocks for depth
        self.res_block1 = ResidualBlock(256, 256)
        self.res_block2 = ResidualBlock(256, 128)
        self.res_block3 = ResidualBlock(128, 128)
        self.res_block4 = ResidualBlock(128, 64)
        
        self.dropout_final = nn.Dropout(0.2)
        self.fc_out = nn.Linear(64, num_classes)
    
    def forward(self, x):
        x = F.silu(self.entry(x))
        
        x = self.res_block1(x)
        x = self.res_block2(x)
        x = self.res_block3(x)
        x = self.res_block4(x)
        
        x = self.dropout_final(x)
        x = self.fc_out(x)  # Logits output
        return x

In [None]:
print("CUDA Available:", torch.cuda.is_available())

if torch.cuda.is_available():
    print("GPU Name:", torch.cuda.get_device_name(0))
    print("Number of GPUs:", torch.cuda.device_count())
    print("Current Device Index:", torch.cuda.current_device())

In [None]:
%pip install torch
import torch
import torch.nn as nn
import numpy as np

# Device setup
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Initialize model
input_size = X_train.shape[1]  # number of features
model = ResidualMLP(input_size=input_size, num_classes=3).to(device)

# Optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=0.005, weight_decay=1e-5)

# Learning rate scheduler (ReduceLROnPlateau equivalent)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='max', factor=0.8, patience=6, min_lr=1e-6
)

# Early stopping setup
patience = 20
best_accuracy = 0.0
epochs_no_improve = 0
best_model_path = "best_residual_mlp_model.pth"

# Training loop
epochs = 300
for epoch in range(epochs):
    # --- Training ---
    model.train()
    train_loss = 0.0
    correct = 0
    total = 0
    
    for batch_X, batch_y in train_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        
        optimizer.zero_grad()
        outputs = model(batch_X)
        
        loss = loss_fn(outputs, batch_y)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()
    
    avg_train_loss = train_loss / len(train_loader)
    train_accuracy = 100 * correct / total

    # --- Evaluation on test set ---
    model.eval()
    test_loss = 0.0
    correct_test = 0
    total_test = 0
    with torch.no_grad():
        for batch_X, batch_y in test_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            outputs = model(batch_X)
            loss = loss_fn(outputs, batch_y)
            test_loss += loss.item()
            
            _, predicted = torch.max(outputs, 1)
            total_test += batch_y.size(0)
            correct_test += (predicted == batch_y).sum().item()
    
    avg_test_loss = test_loss / len(test_loader)
    test_accuracy = 100 * correct_test / total_test

    # --- Scheduler step ---
    old_lr = optimizer.param_groups[0]['lr']
    scheduler.step(test_accuracy)
    new_lr = optimizer.param_groups[0]['lr']
    if new_lr < old_lr:
        print(f"📉 Learning rate reduced from {old_lr:.6f} to {new_lr:.6f}")

    # --- Save best model ---
    if test_accuracy > best_accuracy:
        best_accuracy = test_accuracy
        torch.save(model.state_dict(), best_model_path)
        print(f"✅ Saved new best model at epoch {epoch+1} with Test Accuracy: {best_accuracy:.2f}%")
        epochs_no_improve = 0  # reset patience counter
    else:
        epochs_no_improve += 1

    # --- Logging ---
    print(
        f"Epoch {epoch+1}/{epochs} | "
        f"Train Loss: {avg_train_loss:.4f}, Train Acc: {train_accuracy:.2f}% | "
        f"Test Loss: {avg_test_loss:.4f}, Test Acc: {test_accuracy:.2f}%"
    )


print(f"\n🎯 Training finished. Best Test Accuracy: {best_accuracy:.2f}%")


In [None]:
from sklearn.metrics import classification_report

# --- Evaluate on test set and generate classification report ---
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        outputs = model(batch_X)
        _, predicted = torch.max(outputs, 1)
        
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(batch_y.cpu().numpy())

# Generate report
print("\n📊 Classification Report:")
print(classification_report(all_labels, all_preds, digits=4))
