# Imports

In [None]:
from __future__ import annotations

import os
from typing import Final
from urllib.error import HTTPError

import nltk
import numpy
import pandas
import seaborn
from matplotlib import pyplot
from pandas import DataFrame, Series
from rich.console import Console
from sklearn.linear_model import LinearRegression, Ridge, RidgeCV, SGDRegressor
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from ydata_profiling import ProfileReport

# Environment Setup

In [None]:
GITHOME: Final[str] = "https://raw.githubusercontent.com/ISIS3301-202510-G1/Laboratory-1/refs/heads/main/data/"

In [None]:
console: Console = Console(width=120)

# 1. Loading Data

In [None]:
# Attempt to load datasets from the online source
try:
    # Load testing and training datasets from the specified URL
    testing: DataFrame = pandas.read_csv(os.path.join(GITHOME, "data/test-data.csv"))
    training: DataFrame = pandas.read_csv(os.path.join(GITHOME, "data/train-data.csv"))
# Fallback to local files if an HTTP error occurs
except HTTPError as error:
    console.log("Online dataset could not be retrieved. Falling back to local files.")
    testing: DataFrame = pandas.read_csv("data/test-dataset.csv")
    training: DataFrame = pandas.read_csv("data/train-dataset.csv")

In [None]:
# Confirm successful loading of datasets
console.log("Testing and training datasets successfully loaded!")

In [None]:
# Validate that both datasets are loaded and contain data
assert not testing.empty and not training.empty, "Error: One or both datasets are empty!"
console.log("Dataset validation passed. Both datasets are loaded and non-empty.")

In [None]:
# Display datasets to confirm successful loading
# At this stage, only confirm the data is available without further inspection
console.log("Preview of datasets loaded:")

In [None]:
console.log("Testing dataset:")
testing  # Shows the dataset object for basic confirmation

In [None]:
console.log("Training dataset:")
training  # Shows the dataset object for basic confirmation

# 2. Data Understanding

In [None]:
# ProfileReport(training)

## 2.1. Dataset Overview

In [None]:
training.info()

In [None]:
training.describe()

In [None]:
training.head()

## 2.2 Data Quality Check

In [None]:
# Initialize a list to collect error messages
errors: list[str] = []

# Validate constraints
invalid: Series  # Define `invalid` as a Series for consistency

# ra: 0 <= ra <= 360
if not (invalid := training[(training['ra'] < 0) | (training['ra'] > 360)]).empty:
    errors.append(f"Column 'ra' contains values outside the range [0, 360]. Invalid rows: {invalid[['ra']].values.tolist()}")

# dec: -90 <= dec <= 90
if not (invalid := training[(training['dec'] < -90) | (training['dec'] > 90)]).empty:
    errors.append(f"Column 'dec' contains values outside the range [-90, 90]. Invalid rows: {invalid[['dec']].values.tolist()}")

# Magnitudes (u, g, r, i, z): > 0
for feature in ('u', 'g', 'r', 'i', 'z'):
    if not (invalid := training[training[feature] <= 0]).empty:
        errors.append(f"Column '{feature}' contains non-positive values. Invalid rows: {invalid[[feature]].values.tolist()}")

# run: dtype == int and >= 0
if not (invalid := training[training['run'] < 0]).empty:
    errors.append(f"Column 'run' contains negative values. Invalid rows: {invalid[['run']].values.tolist()}")
if training['run'].dtype != int:
    errors.append(f"Column 'run' is not of type integer.")

# camcol: Valid values {1, 2, 3, 4, 5, 6}
if not (invalid := training[~training['camcol'].isin({1, 2, 3, 4, 5, 6})]).empty:
    errors.append(f"Column 'camcol' contains invalid values. Invalid rows: {invalid[['camcol']].values.tolist()}")

# field: > 0
if not (invalid := training[training['field'] < 1]).empty:
    errors.append(f"Column 'field' contains invalid values. Invalid rows: {invalid[['field']].values.tolist()}")

# score: 0 <= score <= 1
if not (invalid := training[(training['score'] < 0) | (training['score'] > 1)]).empty:
    errors.append(f"Column 'score' contains invalid values. Invalid rows: {invalid[['score']].values.tolist()}")

# clean: Valid values {0, 1}
if not (invalid := training[~training['clean'].isin({0, 1})]).empty:
    errors.append(f"Column 'clean' contains invalid values. Invalid rows: {invalid[['clean']].values.tolist()}")

# class: Valid values {"STAR", "GALAXY", "QSO"}
if not (invalid := training[~training['class'].isin({"STAR", "GALAXY", "QSO"})]).empty:
    errors.append(f"Column 'class' contains invalid values. Invalid rows: {invalid[['class']].values.tolist()}")

# redshift: Must be numeric
if training['redshift'].isnull().any():
    errors.append(f"Column 'redshift' contains missing values.")

# mjd: > 0
if not (invalid := training[training['mjd'] <= 0]).empty:
    errors.append(f"Column 'mjd' contains non-positive values. Invalid rows: {invalid[['mjd']].values.tolist()}")

# rowv and colv: Must be numeric
for feature in ('rowv', 'colv'):
    if training[feature].isnull().any():
        errors.append(f"Column '{feature}' contains missing values.")

# Log all errors
if errors:
    console.log("Data validation issues found:")
    console.log(*map("- {}".format, errors), sep="\n")
else:
    console.log("All data validation checks passed successfully.")


## 2.3. Exploratory Data Analysis

### 2.3.1. Numerical Features Analysis

In [None]:
# List of numerical features
numerical: list[str] = ["ra", "dec", "u", "g", "r", "i", "z", "redshift", "mjd", "rowv", "colv"]

# Visualize distributions
for feature in numerical:
    pyplot.figure(figsize=(8, 4))
    seaborn.histplot(training[feature], kde=True)
    pyplot.title(f'Distribution of {feature}')
    pyplot.xlabel(feature)
    pyplot.ylabel('Frequency')
    pyplot.show()

### 2.3.2. Categorical Features Analysis

In [None]:
# Bar plot for 'class'
pyplot.figure(figsize=(8, 4))
seaborn.countplot(data=training, x='class', order=training['class'].value_counts().index)
pyplot.title('Frequency Distribution of Class')
pyplot.xlabel('Class')
pyplot.ylabel('Count')
pyplot.show()

# Bar plot for 'clean'
pyplot.figure(figsize=(8, 4))
seaborn.countplot(data=training, x='clean')
pyplot.title('Frequency Distribution of Clean')
pyplot.xlabel('Clean')
pyplot.ylabel('Count')
pyplot.show()


### 2.3.3. Target Variable Analysis

In [None]:
# Distribution of redshift
pyplot.figure(figsize=(8, 4))
seaborn.histplot(training['redshift'], kde=True, bins=30)
pyplot.title('Distribution of Redshift')
pyplot.xlabel('Redshift')
pyplot.ylabel('Frequency')
pyplot.show()

### 2.3.4. Relationships Between Features

In [None]:
# Correlation matrix for numerical features
correlations: DataFrame = training[numerical].corr()

# Visualize the correlation matrix as a heatmap
pyplot.figure(figsize=(10, 8))
seaborn.heatmap(correlations, annot=True, cmap='coolwarm', fmt='.2f')
pyplot.title('Correlation Matrix')
pyplot.show()

In [None]:
# Scatter plot of redshift vs selected features
for feature in ['u', 'g', 'r', 'i', 'z']:
    pyplot.figure(figsize=(8, 4))
    seaborn.scatterplot(data=training, x=feature, y='redshift')
    pyplot.title(f'Relationship between {feature} and Redshift')
    pyplot.xlabel(feature)
    pyplot.ylabel('Redshift')
    pyplot.show()

In [None]:
# Box plot for 'redshift' grouped by 'class'
pyplot.figure(figsize=(10, 6))
seaborn.boxplot(data=training, x='class', y='redshift')
pyplot.title('Box Plot of Redshift by Class')
pyplot.xlabel('Class')
pyplot.ylabel('Redshift')
pyplot.show()

In [None]:
# Box plot for 'redshift' grouped by 'clean'
pyplot.figure(figsize=(8, 4))
seaborn.boxplot(data=training, x='clean', y='redshift')
pyplot.title('Box Plot of Redshift by Clean Flag')
pyplot.xlabel('Clean')
pyplot.ylabel('Redshift')
pyplot.show()

# 3. Data Preparation

## 3.1 Removing Unnecessary Features

In [None]:
unnecessary: list[str] = ['objid', 'run', 'camcol', 'field', 'mjd']
training.drop(columns=unnecessary, inplace=True)

## 3.2. Cleaning and Encoding Categorical Features

In [None]:
# Clean the 'class' column (categorical feature)
training['class'] = training['class'].apply(lambda x: min({"STAR", "GALAXY", "QSO"}, key=lambda category: nltk.edit_distance(x, category)))
# One-hot encode the 'class' column
training = pandas.get_dummies(training, columns=['class'])

## 3.3. Transforming Binary Features

In [None]:
# Convert 'clean' column to boolean (True/False)
training['clean'] = training['clean'].astype(bool)

## 3.4. Feature Selection Based on Correlation

In [None]:
# Set a correlation threshold
threshold: float = 0.5

# Calculate correlations with the target ('redshift')
correlations: DataFrame = training.corr()

# Select features with absolute correlation > threshold
targets: Series[float] = correlations['redshift'].abs()
features: Series[str] = targets[targets > threshold].index

# Keep only the selected features and the target
training = training[features]

## 3.5. Normalizing Numeric Features

In [None]:
# Identify numeric features to normalize
numerics: list[str] = list({'u', 'g', 'r', 'i', 'z', 'rowv', 'colv'} & set(training.columns))

# Initialize scaler
scaler: StandardScaler = StandardScaler()

# Apply scaling
training[numerics] = scaler.fit_transform(training[numerics])

## 3.6. Handling Outliers

In [None]:
def sanitize(dataset: DataFrame, columns: list[str]) -> DataFrame:
    for col in columns:
        q1: float = dataset[col].quantile(0.25)
        q3: float = dataset[col].quantile(0.75)
        iqr: float = q3 - q1
        lower_bound: float = q1 - 1.5 * iqr
        upper_bound: float = q3 + 1.5 * iqr
        # Keep only rows within bounds
        dataset = dataset[(dataset[col] >= lower_bound) & (dataset[col] <= upper_bound)]
    return dataset

# Identify numeric columns for outlier handling
numerics: list[str] = list({'u', 'g', 'r', 'i', 'z', 'rowv', 'colv'} & set(training.columns))

# Remove outliers
training = sanitize(training, numerics)


## 3.7. Transforming Skewed Features

In [None]:
# Calculate skewness for numeric features
features: Series[str] = training[numerics].skew().sort_values(ascending=False)

# Identify skewed features with skewness > |0.5|
columns: list[str] = features[features.abs() > 0.5].index.tolist()

# Apply `log1p` transformation to reduce skewness
for feature in columns:
    if (training[feature] > 0).all():
        training[feature] = numpy.log1p(training[feature])
    else:
        console.log(f"Skipping transformation for {feature} due to non-positive values.")


## 3.8. Splitting the Data

In [None]:
x: DataFrame = training.drop(columns=['redshift'])
y: Series[float] = training['redshift']

x_train: DataFrame
y_train: Series[float]
x_test: DataFrame
y_test: Series[float]
x_train, x_test, y_train, y_test = train_test_split(x, y)

console.log(f"Training Features Shape: {x_train.shape}")
console.log(f"Training Target Shape: {y_train.shape}")
console.log(f"Test Features Shape: {x_test.shape}")
console.log(f"Test Target Shape: {y_test.shape}")

# 4. Modeling

In [None]:
class ModelSelector:
    def __init__(self) -> None:
        # Optimized model initialization
        self._models: dict[str, LinearRegression | Ridge | RidgeCV | SGDRegressor] = {
            # Linear Regression (default, no hyperparameters to tune here)
            'linear-regression': LinearRegression(),

            # Ridge Regression with an optimized regularization strength
            'ridge': Ridge(alpha=1.0, solver='auto', random_state=42),

            # Ridge Cross-Validation to select the best alpha (regularization strength)
            'ridge-cv': RidgeCV(alphas=[0.1, 1.0, 10.0], cv=5, scoring='neg_mean_squared_error'),

            # Stochastic Gradient Descent Regressor with adjusted parameters
            'sgd-regressor': SGDRegressor(
                max_iter=1000,  # Sufficient iterations for convergence
                tol=1e-3,       # Stopping criterion
                penalty='l2',   # L2 regularization (ridge-like behavior)
                eta0=0.01,      # Lower initial learning rate
                learning_rate='adaptive',  # Adjust learning rate during training
                random_state=42
            )
        }

        self._metrics: dict[str, dict[str, float]] = {
            'linear-regression': {},
            'ridge': {},
            'ridge-cv': {},
            'sgd-regressor': {}
        }

    def fit(self, x: DataFrame, y: Series) -> None:
        for name, model in self._models.items():
            model.fit(x, y)

    def test(self, x: DataFrame, y: Series) -> None:
        for name, model in self._models.items():
            # Generate predictions
            prediction: Series = model.predict(x)

            # Calculate metrics
            mse: float = mean_squared_error(y, prediction)
            rmse: float = mse ** 0.5
            r2: float = r2_score(y, prediction)

            # Store metrics
            self._metrics[name]['MSE'] = mse
            self._metrics[name]['RMSE'] = rmse
            self._metrics[name]['R²'] = r2


selector: ModelSelector = ModelSelector()
selector.fit(x_train, y_train)
selector.test(x_test, y_test)
print(selector._metrics)