In [None]:
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm
import pandas as pd
import math
from functools import partial
from typing import Iterable, Callable, Optional
from numbers import Number

In [None]:
class PredictionTask:
    def __init__(self, input, outcome=None):
        self.A = input
        self.B = outcome


class WeightedPredictionClass:
    def __init__(self, tasks: Iterable[PredictionTask], weights: Optional[Iterable[Number]] = None):
        self.tasks = tasks
        for task in self.tasks:
            if task.B is None:
                raise ValueError("All WeightedPredictionClass task outcomes must be known.")
        J = len(tasks)
        if weights is not None:
            if len(weights) != J:
                raise ValueError(f"len(weights), {len(weights)}, must equal len(tasks), {J}")
            self.weights = np.array(weights) / sum(weights)
        else:
            self.weights = np.ones(J)/J

    def predictive_success(self, predicted_log_likelihood: Callable[[PredictionTask], float]) -> float:
        return sum(w * predicted_log_likelihood(task) 
                   for w, task in zip(self.weights, self.tasks)
                   if task.B is not None)

In [None]:
def design_matrix(model_name, x):
    if model_name == "POLY-4":
        X = np.column_stack([np.ones(len(x)), x, x**2, x**3, x**4])
    elif model_name == "CUBIC":
        X = np.column_stack([np.ones(len(x)), x, x**2, x**3])
    elif model_name == "PAR":
        X = np.column_stack([np.ones(len(x)), x, x**2])
    elif model_name == "LIN":
        X = np.column_stack([np.ones(len(x)), x])
    elif model_name == "LIN0":
        X = x.reshape(-1, 1)  # Only x as a feature (no intercept)
    else:
        raise NotImplementedError(model_name)
    return X

def log_likelihood(y_pred, y, sigma2) -> float:
    """Average log-likelihood"""
    pi = math.pi
    ln = math.log
    MSE = np.mean((y_pred - y)**2)
    return -(1/2)*(ln(2*pi*sigma2) + MSE/sigma2)


def predicted_ll(model_name, task: PredictionTask) -> float:
    x = task.A['x_known']
    y = task.A['y_known']
    x_out = task.A['x_to_predict']
    fit_results = sm.OLS(y, design_matrix(model_name, x)).fit()
    y_pred = fit_results.predict(design_matrix(model_name, x_out))
    return log_likelihood(y_pred, task.B, fit_results.mse_resid)

In [None]:
def generate_y(x, noise_std=0.0):
    """Generates data based on the specified model."""
    y = 0.5 + 0.5 * np.tanh(x - 2)
    if noise_std:
        y += np.random.normal(0, noise_std, size=len(x))
    return y

def generate_x(x_range, step=0.1):
    return np.round(np.arange(x_range[0], x_range[1] + step, step), decimals=1)

In [None]:
# Define x ranges
x_ranges = {
    "X0": (0, 3.5),
    "Xtarg": (3.6, 5),
    "Xall": (0, 5),
    "Xcal": (0, 2.5),
    "Xgen": (2.6, 3.5),
    "Xcal1": (0, 2.3),
    "Xgen1": (2.4, 3.5),
    "Xcal2": (0, 2.4),
    "Xgen2": (2.5, 3.5),
    "Xcal3": (0, 2.6),
    "Xgen3": (2.7, 3.5),
}

x_all = generate_x(x_ranges["Xall"])
y_true = generate_y(x_all)
f_dict = {x: y for x, y in zip(x_all, y_true)}
#plt.figure(figsize=(12, 8))
#plt.plot(x_all, y_true, label="TRUE", color="blue", linewidth=2)

def task(in_range_name: str, out_range_name: str) -> PredictionTask:
    x_known = generate_x(x_ranges[in_range_name])
    x_to_predict = generate_x(x_ranges[out_range_name])
    return PredictionTask(
        dict(x_known=x_known,
             y_known=np.array([f_dict[x_i] for x_i in x_known]),
             x_to_predict=x_to_predict
        ),
        np.array([f_dict[x_i] for x_i in x_to_predict])
    )

In [None]:
predicted_ll("POLY-4", task("Xcal", "Xgen"))

In [None]:
task_of_interest = task("X0", "Xall")
plt.scatter(task_of_interest.A['x_known'], task_of_interest.A['y_known'], label="TRUE", color="blue")
plt.xlim(0, 5)
plt.ylim(0, 1)
plt.show()

In [None]:
def run_gen_simulation(range_names, models):
    """Runs the simulation and returns the results."""

    results_dict = {}
    for model_name in models:
        results_dict[model_name] = {}
        names = []
        for item in range_names: #range_name, x_range in _x_ranges.items():
            name = f"{item[0]}->{item[1]}"
            names.append(name)
            results_dict[model_name][name] = {
                "ll_score": predicted_ll(model_name, task(item[0], item[1]))
                #"x": x,  # Store x and y for plotting
                #"y": y,
            }
    display(pd.DataFrame([[model] + [results_dict[model][range_name]["ll_score"] for range_name in names] for model in models]))
    return results_dict

sim_models = list(reversed(["POLY-4", "CUBIC", "PAR", "LIN", "LIN0"]))
sim_range_names = [("Xcal", "Xgen"), ("X0", "Xtarg"), ("X0", "Xall")]
results_gen = run_gen_simulation(sim_range_names, sim_models)

In [None]:
temp0 = np.exp(np.array([[results_gen[model][range_name]["ll_score"] for range_name in results_gen[model]] for model in sim_models]))
temp0

In [None]:
temp0[:, 0]/temp0[1, 0]

In [None]:
step = 0.1
explore_range_names = []
for i in range(0, 10):
    cal_end = round(2.5 + step * i, 1)
    gen_start = round(cal_end + step, 1)
    cal_name = f"Xcal{cal_end:.1f}"
    gen_name = f"Xgen{cal_end:.1f}"
    x_ranges[cal_name] = (0, cal_end)
    x_ranges[gen_name] = (gen_start, 3.5)
    explore_range_names.append((cal_name, gen_name))
results_gen2 = run_gen_simulation(explore_range_names, sim_models)

In [None]:
step = 0.1
tasks = []
for i in range(0, 10):
    cal_end = round(2.5 + step * i, 1)
    gen_start = round(cal_end + step, 1)
    cal_name = f"Xcal{cal_end:.1f}"
    gen_name = f"Xgen{cal_end:.1f}"
    x_ranges[cal_name] = (0, cal_end)
    x_ranges[gen_name] = (gen_start, 3.5)
    tasks.append(task(cal_name, gen_name))
pclass = WeightedPredictionClass(tasks)
[math.exp(pclass.predictive_success(partial(predicted_ll, model_name))) for model_name in sim_models]

In [None]:
temp = np.exp(np.mean(np.array([[results_gen2[model][range_name]["ll_score"] for range_name in results_gen2[model]] for model in sim_models]), axis=1))
temp

In [None]:
temp/temp[1]

In [None]:
math.log(10)

In [None]:
math.log(2)

In [None]:
[pclass.predictive_success(partial(predicted_ll, model_name)) for model_name in sim_models]

In [None]:
x_ranges

In [None]:
step = 0.1
explore_range_names = []
for i in range(0, 10):
    cal_end = round(2.5 + step * i, 1)
    gen_start = cal_end #round(cal_end + step, 1)
    cal_name = f"Xcal{cal_end:.1f}orig"
    gen_name = f"Xgen{cal_end:.1f}orig"
    x_ranges[cal_name] = (0, cal_end)
    x_ranges[gen_name] = (gen_start, 3.5)
    explore_range_names.append((cal_name, gen_name))
results_gen3 = run_gen_simulation(explore_range_names, sim_models)