In [1]:
import copy
import sys
from pathlib import Path

import numpy as np
import pandas as pd
import torch

sys.path.append("..")
from NGS.data import NGSDataset, add_noise, preprocess
from NGS.ema import EMA
from NGS.experiment import rollout
from NGS.hyperparameter import HyperParameter
from path import DATA_DIR, RESULT_DIR
from rossler.model import RosslerModel

device = torch.device("cuda")

In [2]:
def get_distance(trajectory1: np.ndarray, trajectory2: np.ndarray) -> np.ndarray:
    """
    Compute distance between two trajectories in 3D Euclidean space

    trajectory1, trajectory2: [S+1, N, 3]

    Return: [S, ]
    """
    delta = trajectory1 - trajectory2
    distance = np.sqrt((delta**2.0).sum(axis=-1))
    mean_distance = distance.mean(axis=1)
    return mean_distance


def store_trajectory(result_dir: Path, file_name: str, model: RosslerModel, ema: EMA) -> None:
    # Load data
    data_df = pd.read_pickle(DATA_DIR / f"{file_name}.pkl")
    _, test = preprocess(data_df)
    dataset = NGSDataset(**test, window=-1)

    # Rollout
    with ema():
        pred_trajectories, nfevs, runtimes = rollout(model, dataset, device)

    # Store result
    pd.DataFrame(
        {"trajectories": pred_trajectories, "runtime": runtimes, "nfev": nfevs}
    ).to_pickle(result_dir / f"{file_name}.pkl")

def store_lyapunov(result_dir: Path, hp: HyperParameter, model: RosslerModel, ema: EMA) -> None:
    # Load train data
    train_df = pd.read_pickle(DATA_DIR / "rossler_train.pkl")
    train, _ = preprocess(train_df, val_ratio=0.2)
    true_trajectories = train["trajectories"]
    eval_times = [np.insert(np.cumsum(dt), 0, 0.0) for dt in train["dts"]]

    # Result
    exponents = {}
    for n in [0.0001, 0.001, 0.01, 0.1]:
        # Create noisy data with given noise level
        rng = np.random.default_rng(hp.seed)
        _, seed_noise = rng.integers(42, size=(2,))
        noisy_train = copy.deepcopy(train)
        add_noise(noisy_train, n, seed_noise)
        noisy_dataset = NGSDataset(**noisy_train, window=-1)

        # Rollout
        with ema():
            trajectories_from_noise, *_ = rollout(model, noisy_dataset, device)

        # Measure Lyapunov Exponents
        distances = [
            get_distance(ngs, true)
            for ngs, true in zip(trajectories_from_noise, true_trajectories)
        ]
        exponents[n] = [    # lin-log fit to measure exponent
            np.polyfit(eval_time[11:], np.log10(distance[11:]), deg=1)[0]
            for eval_time, distance in zip(eval_times, distances)
        ]

    # Store result
    pd.DataFrame(exponents).to_pickle(result_dir / "lyapunov.pkl")

def evaluate(missing: float, noise: float) -> None:
    exp_id = f"rossler_p{missing}_s{noise}"
    result_dir = RESULT_DIR / exp_id

    # Check validity of result directory
    hp = HyperParameter.from_yaml(result_dir / "hyperparameter.yaml")
    assert hp.missing == missing
    assert hp.noise == noise

    # Load model
    checkpoint = torch.load(result_dir / "checkpoint.pth", map_location="cpu")
    model = RosslerModel(hp.emb_dim, hp.depth, hp.dropout).to(device)
    ema = EMA(model)
    ema.load_state_dict(checkpoint["ema"])

    # NGS-predicted Trajectories
    for file_name in ["rossler_train", "rossler_test_int", "rossler_test_ext"]:
        store_trajectory(result_dir, file_name, model, ema)

    # Lyapunov Exponents of NGS
    store_lyapunov(result_dir, hp, model, ema)


In [3]:
missing, noise = 0.1, 0.001
evaluate(missing, noise)
