In [302]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.metrics import (
    mean_squared_error,
    mean_absolute_error,
    r2_score,
    make_scorer
)
from sklearn.base import clone
from scipy.stats import randint, uniform
from sklearn.model_selection import RandomizedSearchCV

from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from xgboost import XGBRegressor


In [303]:
df = pd.read_csv('../data/E-INSPIRE_I_master_catalogue.csv')

# Display basic information about the dataset
print("Dataset Shape:", df.shape)
print("\nFeature Information:")
df.info()

# Show first few rows
df.head()


np.random.seed(42)


Dataset Shape: (430, 75)

Feature Information:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 430 entries, 0 to 429
Data columns (total 75 columns):
 #   Column                 Non-Null Count  Dtype  
---  ------                 --------------  -----  
 0   GALAXY ID              430 non-null    object 
 1   ra                     430 non-null    float64
 2   dec                    430 non-null    float64
 3   plate                  430 non-null    int64  
 4   mjd                    430 non-null    int64  
 5   fiberid                430 non-null    int64  
 6   objid                  430 non-null    float64
 7   deVRad_r               430 non-null    float64
 8   deVRadErr_r            430 non-null    float64
 9   expRad_r               430 non-null    float64
 10  expRadErr_r            430 non-null    float64
 11  z                      430 non-null    float64
 12  zErr                   430 non-null    float64
 13  SNR                    430 non-null    float64
 14  velDisp_SDS

In [304]:
def preprocess_data(df, selected_features, log_transform_features=None, one_hot_encode=False):
    X = df[selected_features].copy()
    
    # Store original MgFe for potential restoration
    original_mgfe = X['MgFe'].copy() if 'MgFe' in X.columns else None
    
    if one_hot_encode and 'MgFe' in X.columns:
        # Define bins and labels
        bins = [0.0, 0.1, 0.2, 0.3, 0.4, np.inf]
        mgfe_labels = ['MgFe_0.0', 'MgFe_0.1', 'MgFe_0.2', 'MgFe_0.3', 'MgFe_0.4']
        
        # Create binned version
        X['MgFe_binned'] = pd.cut(X['MgFe'], 
                                 bins=bins, 
                                 labels=mgfe_labels, 
                                 include_lowest=True, 
                                 right=False)
        
        # One-hot encode the binned column
        mgfe_encoded = pd.get_dummies(X['MgFe_binned'], prefix='', prefix_sep='')
        
        # Drop original and binned MgFe columns
        X = X.drop(['MgFe', 'MgFe_binned'], axis=1)
        
        # Add encoded columns
        X = pd.concat([X, mgfe_encoded], axis=1)
    
    # Log transform features
    if log_transform_features is None:
        log_transform_features = {
            'age_mean_mass': False,
            'velDisp_ppxf_res': False,
            '[M/H]_mean_mass': False
        }

    for feature, do_log in log_transform_features.items():
        if do_log and feature in X.columns:
            X[feature] = np.log10(X[feature] + 1e-10)
    
    return X, X.columns.tolist(), original_mgfe

def restore_original_mgfe(df, original_mgfe, mgfe_labels=None):
    if mgfe_labels is None:
        mgfe_labels = ['MgFe_0.0', 'MgFe_0.1', 'MgFe_0.2', 'MgFe_0.3', 'MgFe_0.4']
    
    # Remove one-hot columns
    df = df.drop(columns=[col for col in df.columns if col in mgfe_labels])
    
    # Restore original MgFe
    if original_mgfe is not None:
        df['MgFe'] = original_mgfe
    
    return df

In [305]:
def plot_feature_importance(model, feature_names):
    """Plot feature importances for models that support them."""
    if hasattr(model.named_steps['regressor'], 'feature_importances_'):
        importance = model.named_steps['regressor'].feature_importances_
        indices = np.argsort(importance)[::-1]

        plt.figure(figsize=(10, 6))
        plt.title("Feature Importances")
        plt.bar(range(len(importance)), importance[indices])
        plt.xticks(range(len(importance)), [feature_names[i] for i in indices], rotation=45)
        plt.tight_layout()
        plt.show()

        # Print numerical values
        for i in indices:
            print(f"{feature_names[i]}: {importance[i]:.4f}")
    else:
        print("This model doesn't support feature importances")

def plot_regression_results(y_true, y_pred, model_name):
    """Plot regression results with metrics."""
    fig, ax = plt.subplots(figsize=(8, 6))
    ax.scatter(y_true, y_pred, alpha=0.5)

    # Diagonal line for perfect predictions
    min_val, max_val = 0, 1
    ax.plot([min_val, max_val], [min_val, max_val], 'r--', lw=2)

    # Calculate metrics
    r2 = r2_score(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mae = mean_absolute_error(y_true, y_pred)

    ax.set_title(f'{model_name} Regression Results')
    ax.set_xlabel('True Values')
    ax.set_ylabel('Predicted Values')

    text = (f'R² = {r2:.3f}\n'
            f'RMSE = {rmse:.3f}\n'
            f'MAE = {mae:.3f}')
    ax.text(0.05, 0.95, text, transform=ax.transAxes,
            verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.7))

    return fig

In [306]:
def comprehensive_regression_analysis(X, y, test_size=0.2, random_state=42, log_transform_features=None):
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state
    )

    results = {}

    for name, model_info in models.items():
        print(f"\nEvaluating {name}...")
        
        # Create and fit scaler
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)

        if model_info['params']:
            best_score = float('-inf')
            best_params = None
            best_model = None
            
            # Manual grid search for XGBoost and Random Forest
            if name in ['XGBoost', 'Random Forest']:
                from itertools import product
                param_combinations = [dict(zip(model_info['params'].keys(), v)) 
                                   for v in product(*model_info['params'].values())]
                
                for params in param_combinations:
                    # Extract random_state from params
                    rand_state = params.pop('random_state', 42)
                    
                    if name == 'XGBoost':
                        model = XGBRegressor(random_state=rand_state, objective='reg:squarederror', **params)
                    else:  # Random Forest
                        model = RandomForestRegressor(random_state=rand_state, **params)
                    
                    # Put random_state back in params for recording purposes
                    params['random_state'] = rand_state
                    
                    model.fit(X_train_scaled, y_train)
                    score = -mean_squared_error(y_train, model.predict(X_train_scaled))
                    
                    if score > best_score:
                        best_score = score
                        best_params = params
                        best_model = model
            else:
                # Use GridSearchCV for other models
                grid_search = GridSearchCV(
                    estimator=model_info['model'],
                    param_grid=model_info['params'],
                    cv=5,
                    scoring='neg_mean_squared_error',
                    n_jobs=1
                )
                grid_search.fit(X_train_scaled, y_train)
                best_model = grid_search.best_estimator_
                best_params = grid_search.best_params_
            
            y_pred = best_model.predict(X_test_scaled)
            
            results[name] = {
                'best_params': best_params,
                'y_pred': y_pred,
                'r2_score': r2_score(y_test, y_pred),
                'rmse': np.sqrt(mean_squared_error(y_test, y_pred)),
                'mae': mean_absolute_error(y_test, y_pred)
            }
            
            print("Best Parameters:", best_params)
            
            if hasattr(best_model, 'feature_importances_'):
                importance = best_model.feature_importances_
                indices = np.argsort(importance)[::-1]
                
                plt.figure(figsize=(10, 6))
                plt.title("Feature Importances")
                plt.bar(range(len(importance)), importance[indices])
                plt.xticks(range(len(importance)), [X.columns[i] for i in indices], rotation=45)
                plt.tight_layout()
                plt.show()
                
                for i in indices:
                    print(f"{X.columns[i]}: {importance[i]:.4f}")
        else:
            # For models without parameters (like Linear Regression)
            model = clone(model_info['model'])
            model.fit(X_train_scaled, y_train)
            y_pred = model.predict(X_test_scaled)
            
            results[name] = {
                'y_pred': y_pred,
                'r2_score': r2_score(y_test, y_pred),
                'rmse': np.sqrt(mean_squared_error(y_test, y_pred)),
                'mae': mean_absolute_error(y_test, y_pred)
            }
        
        # Visualization
        plot_regression_results(y_test, results[name]['y_pred'], name)
        plt.show()
    
    return results

In [307]:
models = {
    'XGBoost': {
        'model': XGBRegressor(objective='reg:squarederror'),
        'params': {
            'n_estimators': [300, 350, 400],       # back to lower values
            'max_depth': [7, 8, 9],                # slightly lower
            'learning_rate': [0.2, 0.3, 0.4],      # back to lower values
            'subsample': [0.5, 0.6, 0.7],          # increase from 0.4
            'random_state': [42]
        }
    },
    'Random Forest': {
        'model': RandomForestRegressor(),
        'params': {
            'n_estimators': [125, 150, 175],       # around 150
            'max_depth': [15, 20, 25],             # around 20
            'min_samples_split': [2],              # keep at 2 since it's consistently best
            'max_features': [0.4, 0.5, 0.6],       # around 0.5
            'max_samples': [0.9, 0.95, 1.0],       # fine-tune near 1.0
            'random_state': [42]
        }
    },
    'Linear Regression': {
        'model': LinearRegression(),
        'params': {}
    },
}

In [308]:
# Select features for the analysis
selected_features = ['MgFe', '[M/H]_mean_mass', 'velDisp_ppxf_res', 'age_mean_mass']

# Configure log transformations
log_transform_config = {
    'age_mean_mass': True,
    'velDisp_ppxf_res': False,
    'MgFe': False,
    '[M/H]_mean_mass': False
}

# Preprocess data
one_hot_encode = True  # Set this to False if you don't want one-hot encoding

# Preprocess data
X, feature_names, original_mgfe = preprocess_data(
    df, 
    selected_features, 
    log_transform_features=log_transform_config,
    one_hot_encode=one_hot_encode
)
y = df['DoR'].values

# Run the analysis
results = comprehensive_regression_analysis(
    X,
    y,
    test_size=0.2,
    random_state=42,
    log_transform_features=log_transform_config
)

# Print final results comparison
print("\n--- Regression Model Comparison ---")
for name, result in results.items():
    print(f"\n{name}:")
    print(f"R² Score: {result['r2_score']:.4f}")
    print(f"RMSE: {result['rmse']:.4f}")
    print(f"MAE: {result['mae']:.4f}")
    if 'best_params' in result:
        print("Best Hyperparameters:", result['best_params'])
        
if one_hot_encode:
    X = restore_original_mgfe(X, original_mgfe)


Evaluating XGBoost...


KeyboardInterrupt: 