In [None]:
from pathlib import Path

import numpy as np
from astropy.table import Table
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, PolynomialFeatures

import torch
from torch.utils.data import DataLoader, TensorDataset, random_split
import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint

import torch.nn as nn
import torch.optim as optim
import os
import math
from plot_utils import *

### Read in the cleaned APOGEE data and make a Kiel Diagram for the whole sample.

In [None]:
apogee_path = Path("./data/apogee_cleaned.parquet")
apogee_cat = pd.read_parquet(apogee_path)


In [None]:
fig_kiel, ax_kiel = plot_kiel_scatter_density(
    apogee_cat['TEFF'],
    apogee_cat['LOGG'],
    apogee_cat['FE_H']
)

### Train-test split and fit a simple linear model

In [None]:
def fourier_features(x, degree=1, add_bias=True):
    if x.ndim == 1:
        x = x[:, np.newaxis]
    
    n_samples, n_features = x.shape
    features = []
    for d in range(1, degree + 1):
        features.append(np.sin(d * x))
        features.append(np.cos(d * x))
    if add_bias:
        features.append(np.ones((n_samples, 1)))
        
    return np.concatenate(features, axis=1)

In [None]:
# Create simple linear model to predict feh from teff and logg
# perform train test split and normalize the data


X = apogee_cat[['TEFF', 'LOGG']]
y = apogee_cat['FE_H']

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.999, random_state=42)

_, X_test, _, y_test = train_test_split(X_test, y_test, test_size=0.002, random_state=42)

# Normalize the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train).astype("float64")
X_test_scaled = scaler.transform(X_test).astype("float64")
y_train = y_train.astype("float64")
y_test = y_test.astype("float64")

#Print the sizes of the training and testing sets
print(f"Training set size: {X_train_scaled.shape[0]}")
print(f"Testing set size: {X_test_scaled.shape[0]}")

In [None]:
# Plot the kiel diagram for the predicted values (test set only)
fig_kiel, ax_kiel = plot_kiel_scatter_density(
    X_train['TEFF'],
    X_train['LOGG'], 
    y_train,
    title='Kiel Diagram - Train Set',
    colorbar_label='Predicted [Fe/H]',
    scatter = True
)


In [None]:
# import math
# n = 35
# k=2
# print(math.comb(n + k , k))

In [None]:
# # model = LinearRegression()
# model = Ridge()
# model.fit(X_train_scaled, y_train)
# # Predict FE_H values
# predicted_feh = model.predict(X_test_scaled)

# # Plot the kiel diagram for the predicted values (test set only)
# fig_kiel, ax_kiel = plot_kiel_scatter_density(
#     X_test['TEFF'],
#     X_test['LOGG'], 
#     predicted_feh,
#     title='Kiel Diagram - Linear model',
#     colorbar_label='Predicted [Fe/H]',
# )

# #Print the slope and intercept of the linear model
# print(f"Slope: {model.coef_}, Intercept: {model.intercept_}")

### Polynomial Regression (with Linear Algebra Optimizer)

In [None]:

def polynomial_regression_lin_alg(X_train_scaled, X_test_scaled, y_train, test_teff, test_logg, degree, use_fourier_features=False, use_pinv=False):
    """
    Fits a polynomial regression model and plots the Kiel diagram with predicted values.
    
    Args:
        X_train_scaled: Scaled training features
        X_test_scaled: Scaled test features  
        y_train: Training target values
        test_teff: Test set effective temperatures
        test_logg: Test set surface gravities
        degree: Degree of polynomial features
        
    Returns:
        tuple: (poly_model, predicted_feh_poly, fig, ax, density_map)
    """
    if use_fourier_features:
        X_train_poly = fourier_features(X_train_scaled,degree=degree, add_bias=True)
        X_test_poly = fourier_features(X_test_scaled,degree=degree, add_bias=True)
    else:
        # # Create polynomial features
        poly = PolynomialFeatures(degree=degree, include_bias=False)
        # Transform the training and testing data
        X_train_poly = poly.fit_transform(X_train_scaled)
        X_test_poly = poly.transform(X_test_scaled)


    #Rescale after feature crafting
    poly_scale = StandardScaler()
    poly_scale.fit(X_train_poly)
    X_train_poly_scaled = poly_scale.transform(X_train_poly)
    X_test_poly_scaled = poly_scale.transform(X_test_poly)


    if use_pinv:
         # Alternate fitting algorithm
        beta_hat = np.linalg.pinv(X_train_poly_scaled) @ y_train
        predicted_feh_train = X_train_poly_scaled @ beta_hat
        predicted_feh_test = X_test_poly_scaled @ beta_hat
        poly_model = beta_hat
    else:
        # Fit the polynomial regression model
        poly_model = LinearRegression(fit_intercept=True)
        # poly_model = Ridge(alpha = 5 ,fit_intercept=True)
        poly_model.fit(X_train_poly_scaled, y_train)
        # Predict FE_H values using the polynomial model
        predicted_feh_test = poly_model.predict(X_test_poly_scaled)
        predicted_feh_train = poly_model.predict(X_train_poly_scaled)

   


    train_loss = np.mean((predicted_feh_train-y_train)**2)
    test_loss =  np.mean((predicted_feh_test-y_test)**2)


    return poly_model, predicted_feh_test, train_loss, test_loss



In [None]:
n_features = 1500

# Generate polynomial regression plots for different degrees
polynomial_degrees = [i+1 for i in range(n_features)]

# select N random integers instead of sequential degrees
# polynomial_degrees = np.random.choice(n_features, size=n_features, replace=False)
train_loss_list = []
test_loss_list = []

use_fourier_features = True

pbar = tqdm(polynomial_degrees)
for degree in pbar:
    pbar.set_description(f"Generating polynomial regression of degree {degree}...")
    model, predictions,train_loss, test_loss = polynomial_regression_lin_alg(
        X_train_scaled, X_test_scaled, y_train, X_test["TEFF"], X_test["LOGG"], degree,
        use_fourier_features=use_fourier_features,
        use_pinv = True
        
    )
    train_loss_list.append(train_loss)
    test_loss_list.append(test_loss)
    # Plot the kiel diagram for the predicted values (test set only)
# fig, ax = plot_kiel_scatter_density(
#     X_test["TEFF"],
#     X_test["LOGG"], 
#     predictions,
#     title=f'Polynomial regression of degree {degree} | Number of parameters: {len(model.coef_)}',
#     colorbar_label='Predicted [Fe/H]',
# )

In [None]:
# polynomial_degrees = [i+1 for i in range(70)]
if use_fourier_features:
    num_features = [2*n+1 for n in polynomial_degrees]
else:
    num_features = [math.comb(n + 2 , 2) for n in polynomial_degrees]
# 

fig, ax = plt.subplots(1,1, figsize=(5,3))
ax.scatter(num_features, test_loss_list, label="Test Loss")
ax.scatter(num_features, train_loss_list,label="Train Loss")
ax.axvline(len(X_train), ls="--", color="k", label="Interpolation Threshold")


ax.set_yscale("log")
ax.set_xscale("log")
# ax.set_ylim(ymax=1e5)
ax.set_xlabel("Number of parameters")
ax.set_ylabel("Loss Value")
plt.legend(frameon=False,loc="upper left")