In [None]:
# Import.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.decomposition import PCA
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

# Setup plotting params.
plt.rcParams['figure.figsize'] = (14, 10)
sns.set_theme()

%matplotlib inline

In [None]:
# Constants.
DATA_PATH  = "../data/"
ASTN_FILE = DATA_PATH + "table1.csv"
SPECTRA_PATH = "../data/spectra/"
RESULTS_CSV = "linreg_Teff_results.csv"
COL_NAMES = ["wavelength", "flux", "flux_err"]

# File list.
data_astn = pd.read_csv(ASTN_FILE, sep=',')

In [None]:
# Get ranges of each wing. The ranges are arbitrarily defined as
#     [
#         [flux_min - W_RMAX, flux_min - W_RMIN],
#         [flux_min + W_RMIN, flux_min + W_RMAX]
#     ]
def get_wings(df, W_RMIN = 4, W_RMAX = 10):
    # Get wavelength with minimum flux.
    # NOTE. This could be improved with a Gaussian or Laplacian fit.
    flux_min = df["wavelength"][df["flux"].idxmin()]

    # Return wing ranges based on function parameters.
    return (
        (flux_min-W_RMAX, flux_min-W_RMIN),
        (flux_min+W_RMIN, flux_min+W_RMAX)
    )

# Get and store the target temperature and fluxes for all spectra inside the li-
#     mits defined by W_RMIN and W_RMAX. The way these parameters are used is
#     described in the `get_wings(...)` function. L_WING and R_WING set the
#     wings to be used:
#
#     L_WING | R_WING || resulting operation
#     True   | True   || Return data from both wings.
#     True   | False  || Only return data from left wing.
#     False  | True   || Only return data from right wing.
#     False  | False  || Return None.
def get_row(in_row, W_RMIN=4, W_RMAX=10, L_WING=True, R_WING=True):
    # Return None if no wing is set.
    if not L_WING and not R_WING:
        return None

    # Open dataframe.
    df = pd.read_csv(
        SPECTRA_PATH + in_row["filename"],
        sep="\s+", header=None, names=COL_NAMES
    )

    # Get wings and cut data outside of them.
    w_lims = get_wings(df, W_RMIN, W_RMAX)
    cond = True
    if L_WING:
        cond = cond & ((w_lims[0][0]>df["wavelength"]) | (df["wavelength"]>w_lims[0][1]))
    if R_WING:
        cond = cond & ((w_lims[1][0]>df["wavelength"]) | (df["wavelength"]>w_lims[1][1]))
    
    df.drop(df[cond].index, inplace=True)

    # TODO. Scale importance of each parameter based on its error. We should
    #       prioritize entires with a lower flux_err. Then, include the strength
    #       of this scaling as a parameter.

    # Create the output row.
    flux_list = df["flux"].reset_index(drop=True).transpose()
    out_row = {
        "filename"    : in_row["filename"],
        "temperature" : in_row["temperature"]
    }

    # Place the ordered fluxes in.
    for i, flux in flux_list.items():
        out_row["flux %03d" % i] = flux

    return out_row

# Normalize set x, return x_scaled and x's minima and maxima.
def scale(x):
    x_min = x.min()
    x_max = x.max()
    return ((x - x_min)/(x_max - x_min), x_min, x_max)

# De-scale x_scaled using previously obtained minima and maxima.
def descale(x_scaled, x_min, x_max):
    return (x_max - x_min) * x_scaled + x_min

# Get a PCA decomposition from the data in a pandas dataframe.
def get_pca_decomposition(X_train, X_test, PCA_NCOMPS=5, debug=False):
    pca = PCA(n_components = PCA_NCOMPS)
    X_train = pca.fit_transform(X_train)
    X_test  = pca.transform(X_test)

    if (debug):
        print("PCA explained variance ratios:")
        ratios = pca.explained_variance_ratio_
        for i in range(len(ratios)):
            print("  * PCA param %2d: %6.4f" % (i, ratios[i]))

    return (X_train, X_test)

# Plot the real y test set vs predicted values, descaled with y_min and y_max.
def plot_test_set(y_pred, y_test, y_min, y_max):
    palette = ["#96D5A4", "#FA9A58"]

    # Make lineplots.
    sns.lineplot(
        descale(y_test.values, y_min, y_max),
        color = palette[0], label = "Real values"
    )
    sns.lineplot(
        descale(y_pred, y_min, y_max),
        color = palette[1], label = "Linear regression", dashes = (2,2)
    )

    # Set labels and such.
    plt.ylabel("T_eff")
    plt.legend(loc = "upper right")
    plt.ylim(5000, 7500)

    # plt.savefig("NAME.png")
    print("\nLinear regression result:")
    plt.show()

In [None]:
# Run a linear regression model on a PCA decomposition of the data points on the
#     2 wings of a set of flux spectra files. The process uses three hyperpar-
#     ameters:
#       * W_RMIN      : Wing minimum distance from distribution minimum.
#       * W_RMAX      : Wing maximum distance from distribution minimum.
#       * PCA_NCOMPS  : Number of PCA components to extract.
# Returns a tuple with mean absolute error, mean squared error, and R2 score.
def linreg_Teff(
    W_RMIN=4, W_RMAX=10, PCA_NCOMPS=5, L_WING=True, R_WING=True,
    plot=False, debug=False
):
    # Check input data validity.
    if (W_RMIN > W_RMAX):
        return None
    if not L_WING and not R_WING:
        return None

    # Create input dataframe containing flux measurements on the wings, filename
    #     and target variable (temperature).
    column_list = list(
        get_row(data_astn.loc[0], W_RMIN, W_RMAX, L_WING, R_WING).keys()
    )
    df = pd.DataFrame(columns=column_list)

    for in_row in data_astn.iterrows():
        df.loc[in_row[0]] = get_row(in_row[1], W_RMIN, W_RMAX, L_WING, R_WING)

    # Remove columns with NaN values.
    df = df.dropna(axis = 1)

    # --+ Prepare X_train, X_test, y_train, y_test
    # Get X and y.
    X = df.drop(["filename", "temperature"], axis=1)
    y = df["temperature"]

    # Scale.
    (X_scaled, X_min, X_max) = scale(X)
    (y_scaled, y_min, y_max) = scale(y)

    # Split 80:20.
    cutoff = int(.80 * X_scaled.shape[0])

    (X_train, X_test) = (X_scaled[:cutoff], X_scaled[cutoff:])
    (y_train, y_test) = (y_scaled[:cutoff], y_scaled[cutoff:])

    # Perform PCA.
    if PCA_NCOMPS > 0:
        (X_train, X_test) = get_pca_decomposition(X_train, X_test, PCA_NCOMPS)

    # Fit and predict w/ linear regression.
    lr = LinearRegression()
    lr.fit(X_train, y_train)
    y_pred = lr.predict(X_test)

    if plot:
        plot_test_set(y_pred, y_test, y_min, y_max)

    return (
        mean_absolute_error(y_test, y_pred),
        mean_squared_error( y_test, y_pred),
        r2_score(           y_test, y_pred)
    )

In [None]:
# Perform grid search through three arrays, one for each hyperparameter. Return
#     a dataframe with the results.
def grid_search(ARR_W_RMIN, ARR_W_RMAX, ARR_PCA_NCOMPS, L_WING, R_WING):
    res_df = pd.DataFrame(columns=(
        "R_min", "R_max", "N_comps", "mae", "mse", "R2")
    )

    iter = 0
    for W_RMIN in ARR_W_RMIN:
        # Print progress.
        for i in range(iter):
            print("■", end='')
        for i in range(len(ARR_W_RMIN) - iter):
            print("□", end='')
        print("")
        iter+=1

        for W_RMAX in ARR_W_RMAX:
            for PCA_NCOMPS in ARR_PCA_NCOMPS:
                # Create results array with set hyperparameters.
                try:
                    res = linreg_Teff(
                        W_RMIN, W_RMAX, PCA_NCOMPS, L_WING, R_WING, False, False
                    )
                # On extreme cases, we won't get enough data points for PCA.
                except ValueError:
                    continue
                if res is None:
                    continue

                # Create a row with the result.
                res_row = pd.DataFrame({
                    "R_min"   : [W_RMIN],
                    "R_max"   : [W_RMAX],
                    "N_comps" : [PCA_NCOMPS],
                    "mae"     : [res[0]],
                    "mse"     : [res[1]],
                    "R2"      : [res[2]]
                })

                # Append row to output dataframe.
                res_df = pd.concat([res_df, res_row], ignore_index=True)

    # Save to a csv to avoid re-running if unnecessary.
    res_df.to_csv(RESULTS_CSV, sep=',')

    return res_df

In [None]:
# Define which wings will be used.
L_WING = False
R_WING = True

# Write filename.
RESULTS_CSV = "linreg_Teff.wingtest_"
RESULTS_CSV += 'T' if L_WING else 'F'
RESULTS_CSV += 'T' if R_WING else 'F'
RESULTS_CSV += ".csv"

# Obtain a results dataframe.
try:
    res_df = pd.read_csv(RESULTS_CSV, sep=',')
except FileNotFoundError: # results csv doesn't exist, create it.
    res_df = grid_search(
        np.linspace(0, 10, 21),
        np.linspace(2, 12, 21),
        np.arange(1, 21, 1),
        L_WING, R_WING
    )

In [None]:
# Check best performing PCA numbers.
res_df_npc = res_df.groupby("N_comps").mean().reset_index()

fig, axes = plt.subplots(3, 1, sharey = False, sharex = True)
plt.rcParams['figure.figsize'] = (14, 10)

metrics = ("mae", "mse", "R2")

for pi in range(3):
    axes[pi].plot(res_df_npc["N_comps"], res_df_npc[metrics[pi]])
    axes[pi].set_title(metrics[pi])

plt.tight_layout()
plt.show()

In [None]:
best_results = {
    "mae" : pd.DataFrame(columns=("R_min", "R_max", "N_comps", "mae", "mse", "R2")),
    "mse" : pd.DataFrame(columns=("R_min", "R_max", "N_comps", "mae", "mse", "R2")),
    "R2"  : pd.DataFrame(columns=("R_min", "R_max", "N_comps", "mae", "mse", "R2"))
}

for W_RMAX in np.linspace(2, 12, 21):
    for W_RMIN in np.linspace(0, 10, 21):
        df = res_df.loc[(res_df["R_max"] == W_RMAX) & (res_df["R_min"] == W_RMIN)]

        for metric in metrics:
            if df[metric].empty: continue
            idx = df[metric].idxmax() if metric == "R2" else df[metric].idxmin()
            row = df.loc[[idx]]
            best_results[metric] = pd.concat([best_results[metric], row], ignore_index=True)

In [None]:
# Define gradient.
from matplotlib.colors import LinearSegmentedColormap
custom_cmap = LinearSegmentedColormap.from_list("custom_hsv", [
    (0,    '#fdf6e3'),
    (0.8, '#fbe3da'),
    (1,    '#df69ba')
], N=256)

# Choose which metrics to plot.
metrics = ["R2"]

with sns.axes_style("white"):
    plt.rcParams['figure.figsize'] = (14, 10)
    
    for metric in metrics:
        # Make plot.
        heatmap_data = best_results[metric].pivot(index="R_max", columns="R_min", values=metric)
        sns.heatmap(heatmap_data, linewidth=.5, annot=True, annot_kws={"size": 9}, cmap=custom_cmap)

        # Show.
        plt.title(metric)

        # Write filename, save figure, and show.
        filename  = "heatmap_%s." % metric
        filename += 'T' if L_WING else 'F'
        filename += 'T' if R_WING else 'F'
        filename += ".pdf"

        plt.savefig(filename)
        plt.show()

In [None]:
# For the best model, see how robust it is if we change the train set. Then, test different train set sizes.
# Best TT model (R2 = 0.992).
#   * R_min   = 0.0,
#   * R_max   = 6.5,
#   * N_comps = 8.
R2_list = []

W_RMIN  = 0.0
W_RMAX  = 6.5
N_COMPS = 8

for seed in np.random.randint(1, high=2**32, size=1000, dtype=int):
    result = linreg_Teff(
        data_astn,
        W_RMIN, W_RMAX, N_COMPS, True, True, seed,
        False, False
    )
    R2_list.append(result[2])

print("R2 = %5.4f +- %5.4f" % (np.mean(R2_list), np.std(R2_list)))