# **Loss Functions for Synthethic Time Series**

First, we import both the project modules and the necessary libraries:

In [52]:
from src.ts_simulator.distributions import NormalDist, CauchyDist, GumbelDist, LogNormalDist, ParetoDist
from src.ts_simulator.distributions import IDistSimulator
from src.ts_simulator.simulator import TimeSeriesSimulator
from src.graphics.classes import PlotSimulatedTS
from src.dnn.utils import train_and_predict, create_sequences_from_series, l_infinity_loss, quantile_loss_fn

In [53]:
import os
import numpy as np
import matplotlib.pyplot as plt
import re
from typing import Dict
from sklearn.preprocessing import StandardScaler, MinMaxScaler

### Time Series Simulation

Fix number of points and random seed

In [54]:
n=280
seed = 12

Create simulated time series with a specific functional form

In [55]:
x: np.ndarray = np.linspace(0, 1, n)
sin_ts: np.ndarray = np.sin(12 * x + 1)

Define error distributions for determinisitic time series (to add noise)

In [56]:
normal: IDistSimulator = NormalDist({"loc": 0, "scale": 0.25})
cauchy: IDistSimulator = CauchyDist({"loc": 0, "scale": 0.01})
gumbel: IDistSimulator = GumbelDist({"loc": 0, "scale": 1.1})
lognormal: IDistSimulator = LogNormalDist({"loc": 0, "scale": 0.65})
pareto: IDistSimulator = ParetoDist({"xm": 0.01, "alpha": 1.25})

errors: Dict[str, IDistSimulator] = {
    "Normal": normal,
    "Cauchy": cauchy,
    "Gumbel": gumbel,
    "LogNormal": lognormal,
    "Pareto": pareto
}

mean_dist: Dict[str, float] = {
    "Normal": normal.theory()["mean"],
    "Cauchy": cauchy.theory()["mean"],
    "Gumbel": gumbel.theory()["mean"],
    "LogNormal": lognormal.theory()["mean"],
    "Pareto": pareto.theory()["mean"]
}

median_dist: Dict[str, float] = {
    "Normal": normal.theory()["median"],
    "Cauchy": cauchy.theory()["median"],
    "Gumbel": gumbel.theory()["median"],
    "LogNormal": lognormal.theory()["median"],
    "Pareto": pareto.theory()["median"]
}

q_1: Dict[str, float] = {
    "Normal": normal.quantile(0.1),
    "Cauchy": cauchy.quantile(0.1),
    "Gumbel": gumbel.quantile(0.1),
    "LogNormal": lognormal.quantile(0.1),
    "Pareto": pareto.quantile(0.1)
}

q_9: Dict[str, float] = {
    "Normal": normal.quantile(0.9),
    "Cauchy": cauchy.quantile(0.9),
    "Gumbel": gumbel.quantile(0.9),
    "LogNormal": lognormal.quantile(0.9),
    "Pareto": pareto.quantile(0.9)
}

Create a time series without noise and create image

In [57]:
simul = TimeSeriesSimulator()

dict_series: Dict[str, np.ndarray] = {}

noise_zero: np.ndarray = np.zeros(n)

ts_orig: np.ndarray = simul.get_ts(
    determ_series=sin_ts,
    noise_series=noise_zero,
    constant_determ=0,
    constant_noise=0
)

dict_series["Original"] = ts_orig

output_dir = os.path.join(
    os.getcwd(), "experiments/sim_ts_errors/ts_plots")
os.makedirs(output_dir, exist_ok=True)

plotter = PlotSimulatedTS()

fig = plotter.plot_sim_ts(
    x,
    ts_orig,
    sin_ts,
    errors=None,
    title="Time Series without Noise"
)

fig.savefig(os.path.join(output_dir, "series_no_noise.png"))
plt.close(fig)

Add different noises to the deterministic part of the time series and obtain figures for all noises:

In [58]:
for name, dist_sim in errors.items():
    
    noise = simul.simulate_noise(n=n, dist=dist_sim, seed=seed)
    theory = dist_sim.theory()

    ts = simul.get_ts(
        determ_series=sin_ts,
        noise_series=noise
    )

    dict_series[name] = ts.reshape(-1, 1)

    fig = plotter.plot_sim_ts(
        x,
        ts,
        sin_ts + mean_dist[name],
        errors=noise,
        title=f"Time Series with {name} Noise"
    )
    fig.savefig(os.path.join(output_dir, f"series_{name}.png"))
    plt.close(fig)

### Train ANNs

Define loss functions and ANN parameters

In [59]:
loss_functions = {
    "L_1": "mae",
    "L_2": "mse",
    "L_inf": l_infinity_loss,
    "Quantile_0.1": quantile_loss_fn(0.1),
    "Quantile_0.9": quantile_loss_fn(0.9),
}

seq_length = 1
batch_size = 32
epochs = 250
learning_rate = 0.01

scaler = StandardScaler()

dict_predictions = {}
histories = {}

Train the different ANNs and obtain the predictions for each possible time series and each possible loss function

In [60]:
for series_name, series_data in dict_series.items():
    X, y = create_sequences_from_series(series=series_data, seq_length=1)
    X = X.reshape(-1, seq_length, 1)
    for loss_name, loss_fn in loss_functions.items():
        print(
            f"\n\nTraining {series_name} with {loss_name} loss function...\n\n")
        preds, history = train_and_predict(
            X=X,
            y=y,
            loss_function=loss_fn,
            seq_length=seq_length,
            batch_size=batch_size,
            epochs=epochs,
            learning_rate=learning_rate
        )

        dict_predictions[f"{series_name}_{loss_name}"] = preds
        histories[f"{series_name}_{loss_name}"]  = history



Training Original with L_1 loss function...


Epoch 1/250
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - loss: 0.3860 - mae: 0.3860 - mape: 71.1447 - mse: 0.2535   
Epoch 2/250
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - loss: 0.1047 - mae: 0.1047 - mape: 40.8642 - mse: 0.0180 
Epoch 3/250
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - loss: 0.0772 - mae: 0.0772 - mape: 33.1893 - mse: 0.0106 
Epoch 4/250
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - loss: 0.0481 - mae: 0.0481 - mape: 52.9435 - mse: 0.0039  
Epoch 5/250
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 0.0305 - mae: 0.0305 - mape: 28.1287 - mse: 0.0017 
Epoch 6/250
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - loss: 0.0339 - mae: 0.0339 - mape: 19.6908 - mse: 0.0017 
Epoch 7/250
[1m9/9[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 

Create ANN learning curves for each time series with a specific noise:

In [61]:
base_dir = os.path.join(os.getcwd(), "experiments", "sim_ts_errors")
plot_dir = os.path.join(base_dir, "learning_curves")
os.makedirs(plot_dir, exist_ok=True)

def safe_name(s: str) -> str:
    return re.sub(r'[^A-Za-z0-9_.-]+', '-', str(s))

use_log_scale = True

for series_name in dict_series.keys():
    series_dir = os.path.join(plot_dir, safe_name(series_name))
    os.makedirs(series_dir, exist_ok=True)

    for loss_name in loss_functions.keys():
        key = f"{series_name}_{loss_name}"
        if key not in histories:
            alt_key = f"{series_name}_{loss_name.replace('_', '')}"
            if alt_key in histories:
                key = alt_key

        hist = histories[key].history
        train = hist.get("loss", None)
        val   = hist.get("val_loss", None)

        fig, ax = plt.subplots(figsize=(10, 6))
        ax.plot(train, label=f"{loss_name} (train)")
        if val is not None:
            ax.plot(val, label=f"{loss_name} (val)", linestyle="--")

        if use_log_scale:
            ax.set_yscale("log")

        ax.set_title(f"Learning Curve — Dist='{series_name}' — Loss='{loss_name}'")
        ax.set_xlabel("Epoch")
        ax.set_ylabel("Loss")
        ax.legend(fontsize="small")
        ax.grid(True)
        plt.tight_layout()

        fname = f"learning_curve_{safe_name(series_name)}_{safe_name(loss_name)}.png"
        out_path = os.path.join(series_dir, fname)
        fig.savefig(out_path, dpi=150)
        plt.close(fig)

Create prediction plots for time series with various noises (separetely):

In [62]:
base_dir = os.path.join(os.getcwd(), "experiments", "sim_ts_errors")
plot_dir = os.path.join(base_dir, "prediction_plots")
os.makedirs(plot_dir, exist_ok=True)

for key, pred_array in dict_predictions.items():
    # Clave esperada: f"{series_name}_{loss_name}"
    if "_" in key:
        series_name, loss_name = key.split("_", 1)
    else:
        series_name, loss_name = key, "unknown"

    if series_name == "Original":
        continue
    
    if loss_name == "L_1":
        quantity = np.repeat(median_dist[series_name],n)
    elif loss_name == "L_2":
        quantity = np.repeat(mean_dist[series_name],n)
    elif loss_name == "L_inf":
        quantity = np.repeat(((np.max(dict_series[series_name])+np.min(dict_series[series_name]))/2),n)
    elif loss_name == "Quantile_0.1":
        quantity = np.repeat(q_1[series_name],n)
    elif loss_name == "Quantile_0.9":
        quantity = np.repeat(q_9[series_name],n)

    series_dir = os.path.join(plot_dir, safe_name(series_name))
    os.makedirs(series_dir, exist_ok=True)

    plt.figure(figsize=(10, 6))
    plt.plot(
        x, sin_ts + mean_dist[series_name],
        label=f"True Signal",
        alpha=1, color="black", linestyle="--"
    )
    plt.plot(
        x, sin_ts + quantity,
        label=f"Approximated Quantity",
        alpha=1, color="green", linestyle=":"
    )
    plt.plot(
        x, dict_series[series_name],
        label=f"Time Series with {series_name} Noise",
        alpha=0.35, color="red"
    )
    plt.plot(
        x[1:], pred_array,
        label=f"Prediction ({loss_name})",
        linestyle='-', color="blue", alpha=0.8
    )
    plt.title(f"ANN Prediction — Noise: {series_name} | Loss: {loss_name}")
    plt.xlabel("Time")
    plt.ylabel("Value")
    plt.legend()
    plt.grid(True)
    plt.tight_layout()

    fname = f"pred_{safe_name(series_name)}_{safe_name(loss_name)}.png"
    out_path = os.path.join(series_dir, fname)
    plt.savefig(out_path, dpi=150)
    plt.close()

Prediction Grids:

In [67]:
# =========================
# Grid de predicciones (sim_ts): L1/L2 y Quantiles 0.1/0.9
# =========================
base_dir = os.path.join(os.getcwd(), "experiments", "sim_ts_errors")
grid_pred_dir = os.path.join(base_dir, "grid_predictions")
os.makedirs(grid_pred_dir, exist_ok=True)

def safe_name(s: str) -> str:
    return re.sub(r'[^A-Za-z0-9_.-]+', '-', str(s))

def get_true_series(series_arr: np.ndarray) -> np.ndarray:
    """Asegura vector 1D de la serie verdadera."""
    return np.asarray(series_arr).ravel()

def align_pred_axis_sim(x_full: np.ndarray, y_true: np.ndarray, y_pred: np.ndarray):
    """
    Devuelve (x_true, y_true, x_pred, y_pred) alineados para plot:
    - Si len(y_pred) == len(y_true): usa x_full
    - Si len(y_pred) == len(y_true)-1: usa x_full[1:]
    - Si no, recorta a la mínima longitud
    """
    y_true = np.asarray(y_true).ravel()
    y_pred = np.asarray(y_pred).ravel()
    n_true = y_true.size
    n_pred = y_pred.size

    if n_pred == n_true:
        return x_full, y_true, x_full, y_pred
    elif n_pred == max(n_true - 1, 0):
        return x_full, y_true, x_full[1:1+n_pred], y_pred
    else:
        m = min(n_true, n_pred)
        return x_full[:m], y_true[:m], x_full[:m], y_pred[:m]

def get_pred(series_name: str, suffix: str):
    """Devuelve (x_pred, y_pred) si existe la clave, si no (None, None)."""
    key = f"{series_name}_{suffix}"
    if key not in dict_predictions:
        return None, None
    y_pred = np.asarray(dict_predictions[key]).ravel()
    return y_pred.size, y_pred  # devolvemos tamaño y vector para alinear después

for series_name, series_arr in dict_series.items():
    if series_name == "Original":
        # Normalmente no entrenas sobre 'Original'; sáltalo si no hay preds
        has_any = any(f"{series_name}_{suf}" in dict_predictions
                      for suf in ("L_1", "L_2", "Quantile_0.1", "Quantile_0.9"))
        if not has_any:
            continue

    y_true = get_true_series(series_arr)  # (n,)
    # Predicciones (pueden faltar)
    nL2, yL2 = get_pred(series_name, "L_2")
    nL1, yL1 = get_pred(series_name, "L_1")
    nQ09, yQ09 = get_pred(series_name, "Quantile_0.9")
    nQ01, yQ01 = get_pred(series_name, "Quantile_0.1")

    left_has_any  = (yL2 is not None) or (yL1 is not None)
    right_has_any = (yQ09 is not None) or (yQ01 is not None)
    if not (left_has_any or right_has_any):
        continue

    series_dir = os.path.join(grid_pred_dir, safe_name(series_name))
    os.makedirs(series_dir, exist_ok=True)

    fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(20,6), sharey=True)

    # ----- Panel izquierdo: L1 vs L2 -----
    ax = axes[0]
    ax.plot(x, y_true, label="True Signal", alpha=1, color="black", linestyle="--")
    if yL2 is not None:
        x_true_L2, y_true_L2, x_pred_L2, y_pred_L2 = align_pred_axis_sim(x, y_true, yL2)
        ax.plot(x_pred_L2, y_pred_L2, label="Prediction (L_2)", color="red", alpha=0.9)
    if yL1 is not None:
        x_true_L1, y_true_L1, x_pred_L1, y_pred_L1 = align_pred_axis_sim(x, y_true, yL1)
        ax.plot(x_pred_L1, y_pred_L1, label="Prediction (L_1)", color="blue", alpha=0.9)
    ax.set_title(f"Predictions — {series_name} (L1 vs L2)")
    ax.set_xlabel("t")
    ax.set_ylabel("Value")
    ax.legend(fontsize="small")
    ax.grid(True, linestyle="--", alpha=0.3)

    # ----- Panel derecho: cuantiles 0.1 vs 0.9 -----
    ax = axes[1]
    ax.plot(x, y_true, label="True Signal", alpha=1, color="black", linestyle="--")
    if yQ09 is not None:
        x_true_Q09, y_true_Q09, x_pred_Q09, y_pred_Q09 = align_pred_axis_sim(x, y_true, yQ09)
        ax.plot(x_pred_Q09, y_pred_Q09, label="Prediction (Quantile 0.9)", color="orange", alpha=0.9)
    if yQ01 is not None:
        x_true_Q01, y_true_Q01, x_pred_Q01, y_pred_Q01 = align_pred_axis_sim(x, y_true, yQ01)
        ax.plot(x_pred_Q01, y_pred_Q01, label="Prediction (Quantile 0.1)", color="purple", alpha=0.9)
    ax.set_title(f"Predictions — {series_name} (Quantiles)")
    ax.set_xlabel("t")
    ax.legend(fontsize="small")
    ax.grid(True, linestyle="--", alpha=0.3)

    plt.tight_layout()
    fname = f"grid_predictions_{safe_name(series_name)}.png"
    out_path = os.path.join(series_dir, fname)
    fig.savefig(out_path, dpi=150)
    plt.close(fig)


Create graph grids for every distribution error:

In [64]:
base_dir = os.path.join(os.getcwd(), "experiments", "sim_ts_errors")
hist_dir = os.path.join(base_dir, "error_histograms")
os.makedirs(hist_dir, exist_ok=True)

bins = 50
alpha = 0.5

def unique_preserve_order(xs):
    seen, out = set(), []
    for x in xs:
        if x not in seen:
            seen.add(x)
            out.append(x)
    return out

for series_name, original in dict_series.items():
    errors = {}
    for key, pred_array in dict_predictions.items():
        s, loss_name = key.split("_", 1)
        if s != series_name:
            continue
        y_true = original[seq_length : seq_length + len(pred_array)]
        err = np.ravel(pred_array) - np.ravel(y_true)
        errors[loss_name] = err

    loss_names_all = list(errors.keys())
    loss_names_no_linf = [ln for ln in loss_names_all if ln != 'L_inf']
    preferred = []
    for tag in ('L1', 'L2'):
        for ln in loss_names_no_linf:
            if ln == tag:
                preferred.append(ln)
                break
    others = [ln for ln in loss_names_no_linf if ln not in preferred]
    ordered = unique_preserve_order(preferred + others)

    selected = ordered[:4]

    pairs = []
    if len(selected) >= 2:
        pairs.append((selected[0], selected[1]))
    if len(selected) >= 4:
        pairs.append((selected[2], selected[3]))
    ncols = len(pairs)
    fig, axes = plt.subplots(nrows=1, ncols=ncols, figsize=(20,6),
                             sharex=True, sharey=True)
    axes = np.atleast_1d(axes)

    for i, (ax, (ln1, ln2)) in enumerate(zip(axes, pairs)):
        color1 = f"C{2*i}"
        color2 = f"C{2*i+1}"

        ax.hist(errors[ln1], bins=bins, density=True, alpha=alpha, label=ln1, color=color1)
        ax.hist(errors[ln2], bins=bins, density=True, alpha=alpha, label=ln2, color=color2)

        mean1 = float(np.mean(errors[ln1]))
        mean2 = float(np.mean(errors[ln2]))
        ax.axvline(mean1, linestyle=':', linewidth=2, color=color1, label=f"{ln1} mean")
        ax.axvline(mean2, linestyle=':', linewidth=2, color=color2, label=f"{ln2} mean")

        ax.set_title(f"{series_name} — {ln1} vs {ln2}")
        ax.set_ylabel("Densidad")
        ax.legend(fontsize='small')
        ax.grid(True, linestyle='--', alpha=0.3)

    axes[-1].set_xlabel("Error (ŷₜ – yₜ)")

    plt.tight_layout()
    out_path = os.path.join(hist_dir, f"grid2_hist_errors_{series_name}.png")
    fig.savefig(out_path, dpi=150)
    plt.close(fig)