In [13]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


# Logistic Map Experiment

> _Authors:_ [Erfan Mirzaei](https://github.com/erfunmirzaei), [Giacomo Turri](https://github.com/g-turri), and [Pietro Novelli](https://pietronvll.github.io/)

In this notebook, we reproduce the **Logistic Map** experiment from {footcite:t}Kostic2023DPNets using the kooplearn library.
The experiment investigates the challenge of learning representations of the **noisy logistic map**, a one-dimensional dynamical system defined as

$$
    x_{t + 1} = (rx_{t}(1 - x_{t}) + \xi_{t}) \mod 1,
$$

where $\xi_t$ is an i.i.d. trigonometric noise term with density $\propto \cos^N(x)$ (for even integer $N$), and $r$ is a positive parameter controlling the map’s dynamics.
Here, we set $r = 4$, a case for which the exact solution is known.


The corresponding **transfer operator** has rank $N + 1$, is **non-normal**, and admits closed-form eigenvalues and eigenfunctions {footcite:t}Kostic2022. Since $\mathcal{T}$ is non-normal, learning its spectral decomposition is particularly challenging {footcite:t}Kostic2023SpectralRates.
This makes the logistic map a valuable benchmark for testing learned representations. Because the exact form of $\mathcal{T}$ is available, we can bypass operator regression and focus solely on evaluating the quality of the learned representation.

## Setup

In [3]:
import numpy as np
import torch

### Building the Dataset

To learn the logistic map, we first need to construct the dataset.
The process involves the following steps:

- Sample from the dynamical system to generate the training, validation, and test trajectories;
- Prepare a dataloaders for use with PyTorch.

In [4]:
# Defining the number of samples for each data split
n_train_samples = 10000
n_val_samples = 1000
n_test_samples = 1000

The ``kooplearn`` library provides the function [make_logistic_map](../generated/kooplearn.datasets.make_logistic_map.rst) to generate trajectories from the logistic map.
This function takes as input an initial condition, ``X0``, and the number of samples, ``num_samples``, and returns the trajectory starting from ``X0`` and evolving for the specified number of steps.
The resulting trajectory therefore has length ``num_samples + 1``.

Once the trajectory is generated, we can split it into training, validation, and test subsets for subsequent experiments.

In [8]:
from kooplearn.datasets import make_logistic_map

traj = make_logistic_map(
    X0 = 0.5,
    n_steps = n_train_samples + n_val_samples + n_test_samples,
    M = 20,
    random_state = 0) # Setting the random_state for reproducibility

dataset = {
    "train": traj[: n_train_samples],
    "validation": traj[n_train_samples : n_train_samples + n_val_samples],
    "test": traj[n_train_samples + n_val_samples :],
}

train_data = torch.from_numpy(dataset["train"].values).float()
val_data = torch.from_numpy(dataset["validation"].values).float()

In [21]:
from torch.utils.data import TensorDataset, DataLoader
batch_size = 128

# Creating PyTorch TensorDatasets
train_ds = TensorDataset(train_data[:-1], train_data[1:])
val_ds = TensorDataset(val_data[:-1], val_data[1:])

# Creating DataLoaders
train_dl = DataLoader(train_ds, batch_size = batch_size, shuffle=True)
val_dl = DataLoader(val_ds, batch_size=len(val_ds), shuffle=False)

## Evolution operator training algorithms

In the next step, we implement the code needed to train the feature maps used to approximate the evolution (transfer) operator.

In [22]:
# Init report dict
dl_reports = {}
trained_models = {}

# Experiment hyperparameters
learning_rate = 1e-3
opt = torch.optim.Adam
num_epochs = 100
random_state = 42
layer_dims = [64, 128, 64]
metric_deformation_coeff = 1
latent_dim = 8

### Sinusoidal Embedding

The logistic map is defined on the interval $[0, 1]$ and is inherently periodic.
To account for this property, we featurize the signal using trigonometric functions, which naturally encode periodicity.
Specifically, we use $\sin(2\pi x)$ and $\cos(2\pi x)$ as the embedding features.

In [23]:
class SinusoidalEmbedding(torch.nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        # Assuming x is in [0, 1]
        x = 2 * torch.pi * x
        return torch.cat([torch.sin(x), torch.cos(x)], dim=-1)

### Creating an MLP using PyTorch

Next, we use the PyTorch library to define our neural network with the specified hyperparameters:

In [24]:
class SimpleMLP(torch.nn.Module):
    def __init__(
        self, latent_dim: int, layer_dims: list[int], activation=torch.nn.LeakyReLU
    ):
        super().__init__()
        self.activation = activation
        lin_dims = ([2] + layer_dims + [latent_dim])  # The 2 is for the sinusoidal embedding

        layers = []

        for layer_idx in range(len(lin_dims) - 2):
            layers.append(torch.nn.Linear(lin_dims[layer_idx], lin_dims[layer_idx + 1], bias=False))
            layers.append(activation())

        layers.append(torch.nn.Linear(lin_dims[-2], lin_dims[-1], bias=True))

        self.layers = torch.nn.ModuleList(layers)
        self.sin_embedding = SinusoidalEmbedding()

    def forward(self, x):
        # Sinusoidal embedding
        x = self.sin_embedding(x).float()
        # MLP
        for layer in self.layers:
            x = layer(x)
        return x

### Defining the ``FeatureMap``

In [27]:
from kooplearn.torch.utils import FeatureMapEmbedder
from kooplearn.linear_model import Ridge

device = "cuda" if torch.cuda.is_available() else "cpu"

class FeatureMap(torch.nn.Module):
    def __init__(self, latent_dim: int, normalize_latents: bool = True):
        super().__init__()
        self.normalize_latents = normalize_latents
        self.backbone = SimpleMLP(latent_dim=latent_dim, layer_dims=layer_dims)
        self.lin = torch.nn.Linear(latent_dim, latent_dim, bias=False)
    
    def forward(self, X, lagged:bool=False):
        z = self.backbone(X)
        if self.normalize_latents:
            z = torch.nn.functional.normalize(z, dim=-1)
        if lagged:
            z = self.lin(z)
        return z
    
def train_encoder_only(criterion: torch.nn.Module):
    torch.manual_seed(random_state)
    # Initialize model, loss and optimizer
    model = FeatureMap(latent_dim).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    def step(batch, is_train:bool = True):
        batch_X, batch_Y = batch
        batch_X, batch_Y = batch_X.to(device), batch_Y.to(device) 
        if is_train:
            optimizer.zero_grad()
        phi_X, phi_Y = model(batch_X), model(batch_Y, lagged=True)
        loss = criterion(phi_X, phi_Y)
        if is_train:
            loss.backward()
            optimizer.step()
        return loss.item()

    for epoch in range(num_epochs):
        # Training phase
        model.train()
        train_loss = []
        for batch in train_dl:
            train_loss.append(step(batch))
        # Validation phase
        model.eval()
        val_loss = []
        with torch.no_grad():
            for batch in val_dl:
                val_loss.append(step(batch, is_train=False))

        if (epoch + 1)%5 == 0 or (epoch == 0):
            print(f"EPOCH {epoch + 1:>2}  Loss: {np.mean(train_loss):.2f} (train) -  {np.mean(val_loss):.2f} (val)")
    
    embedder = FeatureMapEmbedder(encoder=model)
    evolution_operator_model = Ridge(n_components=latent_dim).fit(embedder.transform(train_data), train_data.numpy(force=True))

    return {
        "model": evolution_operator_model,
        "embedder": embedder,
    }

In [28]:
from kooplearn.torch.nn import VampLoss, SpectralContrastiveLoss

for name, criterion in zip(["VAMPNets", "Spectral Contrastive Loss"], [ VampLoss(center_covariances=False), SpectralContrastiveLoss()]):
    print(f"Fitting {name}")
    trained_models[name] = train_encoder_only(criterion)

Fitting VAMPNets
EPOCH  1  Loss: -3.19 (train) -  -2.99 (val)
EPOCH  5  Loss: -3.40 (train) -  -3.02 (val)
EPOCH 10  Loss: -2.75 (train) -  -2.87 (val)
EPOCH 15  Loss: -2.61 (train) -  -2.62 (val)
EPOCH 20  Loss: -2.61 (train) -  -2.38 (val)
EPOCH 25  Loss: -2.50 (train) -  -2.44 (val)
EPOCH 30  Loss: -2.37 (train) -  -1.92 (val)
EPOCH 35  Loss: -2.83 (train) -  -2.82 (val)
EPOCH 40  Loss: -2.53 (train) -  -2.90 (val)
EPOCH 45  Loss: -2.39 (train) -  -2.45 (val)
EPOCH 50  Loss: -2.58 (train) -  -2.85 (val)
EPOCH 55  Loss: -2.44 (train) -  -2.88 (val)
EPOCH 60  Loss: -2.31 (train) -  -2.30 (val)
EPOCH 65  Loss: -2.37 (train) -  -2.34 (val)
EPOCH 70  Loss: -2.40 (train) -  -2.32 (val)
EPOCH 75  Loss: -2.40 (train) -  -2.40 (val)
EPOCH 80  Loss: -2.50 (train) -  -2.32 (val)
EPOCH 85  Loss: -2.41 (train) -  -2.60 (val)
EPOCH 90  Loss: -2.40 (train) -  -2.48 (val)
EPOCH 95  Loss: -2.37 (train) -  -2.01 (val)


The rank attribute has been updated to 5.
Consider decreasing the rank parameter.


EPOCH 100  Loss: -2.50 (train) -  -2.48 (val)
Fitting Spectral Contrastive Loss
EPOCH  1  Loss: -0.99 (train) -  -1.23 (val)
EPOCH  5  Loss: -2.08 (train) -  -2.19 (val)
EPOCH 10  Loss: -2.87 (train) -  -2.92 (val)
EPOCH 15  Loss: -3.20 (train) -  -3.23 (val)
EPOCH 20  Loss: -3.32 (train) -  -3.33 (val)
EPOCH 25  Loss: -3.45 (train) -  -3.49 (val)
EPOCH 30  Loss: -3.73 (train) -  -3.79 (val)
EPOCH 35  Loss: -4.00 (train) -  -4.05 (val)
EPOCH 40  Loss: -4.20 (train) -  -4.23 (val)
EPOCH 45  Loss: -4.35 (train) -  -4.37 (val)
EPOCH 50  Loss: -4.43 (train) -  -4.43 (val)
EPOCH 55  Loss: -4.43 (train) -  -4.43 (val)
EPOCH 60  Loss: -4.45 (train) -  -4.45 (val)
EPOCH 65  Loss: -4.48 (train) -  -4.50 (val)
EPOCH 70  Loss: -4.48 (train) -  -4.48 (val)
EPOCH 75  Loss: -4.48 (train) -  -4.44 (val)
EPOCH 80  Loss: -4.49 (train) -  -4.48 (val)
EPOCH 85  Loss: -4.51 (train) -  -4.47 (val)
EPOCH 90  Loss: -4.48 (train) -  -4.48 (val)
EPOCH 95  Loss: -4.51 (train) -  -4.49 (val)


The rank attribute has been updated to 7.
Consider decreasing the rank parameter.


EPOCH 100  Loss: -4.53 (train) -  -4.49 (val)


## Evaluation Metrics
Now it is time to evaluate the performance of model. To do this, for this specific example we will use two different metrics that are used in the paper: (i) the optimality gap, and (ii) the spectral error.

**Optimality Gap**

The definition of the optimality gap in the paper is $
\sum_{i = 1}^{3} \sigma_{i}^{2}(\tau) - \mathcal{P}^{0}(w)$, which informs on how close one is to capture the best rank-r approximation of the transfer operator, $\mathcal{T}$.

**Spectral Error**

The spectral error is calculated by $ max_i \ min_j \left | \lambda_i(\mathcal{P}_{\mathcal{H}}\tau_{|\mathcal{H}}) - \lambda_j(\tau) \right |$. This formula measures how well the true eigenbalues of $\mathcal{T}$ can be recovered within the representation space $\mathcal{H_w}$.


In order to calculate the above metrics for evaluation the performance of the learned representation we should first calculate populational covariances and cross-covariances for the learned feature map.


The population_covs function computes population covariance and cross-covariance matrices based on given feature and logistic maps. It begins by generating input data points and then calculates eigenvalues and left eigenfunctions of the logistic map. Subsequently, it normalizes the Perron eigenvector and evaluates the feature map on the input data to obtain a feature matrix. Covariance and cross-covariance matrices are then computed using the feature matrix and normalized Perron eigenvector.

On the other hand, the evaluate_representation function assesses the performance of the obtained representation. It computes the OLS estimator, estimates eigenvalues, and evaluates metrics such as the optimality gap and spectral error to provide insights into the model's quality and accuracy.

In [None]:
from kooplearn.abc import FeatureMap
from scipy.integrate import romb

def population_covs(feature_map: FeatureMap, logistic: LogisticMap, pow_of_two_k: int = 12):
    """Computes the population covariance and cross-covariance"""
    x = np.linspace(0, 1, 2**pow_of_two_k + 1)[:, None]
    vals, lv = logistic.eig(eval_left_on=x) # returns eigenvalues and left eigenfunctions
    perron_eig_idx = np.argmax(np.abs(vals))
    pi = lv[:, perron_eig_idx] #eigenfunction correspond with the max eigenvalue
    assert np.isreal(pi).all()
    pi = pi.real
    pi = pi / romb(pi, dx=1 / 2**pow_of_two_k)  # Normalization of π
    # Evaluating the feature map
    phi = feature_map(x)  # [2**pow_of_two_k + 1, d]
    # Covariance
    cov_unfolded = (
        phi.reshape(2**pow_of_two_k + 1, -1, 1)
        * phi.reshape(2**pow_of_two_k + 1, 1, -1)
        * pi.reshape(-1, 1, 1)
    )
    cov = romb(cov_unfolded, dx=1 / 2**pow_of_two_k, axis=0)
    # Cross-covariance
    alphas = np.stack(
        [logistic.noise_feature_composed_map(x, n) for n in range(logistic.N + 1)],
        axis=1,
    )
    betas = np.stack(
        [logistic.noise_feature(x, n) for n in range(logistic.N + 1)], axis=1
    )

    cov_alpha_unfolded = (
        phi.reshape(2**pow_of_two_k + 1, -1, 1)
        * alphas.reshape(2**pow_of_two_k + 1, 1, -1)
        * pi.reshape(-1, 1, 1)
    )
    cov_beta_unfolded = phi.reshape(2**pow_of_two_k + 1, -1, 1) * betas.reshape(
        2**pow_of_two_k + 1, 1, -1
    )

    cov_alpha = romb(cov_alpha_unfolded, dx=1 / 2**pow_of_two_k, axis=0)
    cov_beta = romb(cov_beta_unfolded, dx=1 / 2**pow_of_two_k, axis=0)

    cross_cov = cov_alpha @ (cov_beta.T)
    return cov, cross_cov

In [None]:
from kooplearn._src.metrics import directed_hausdorff_distance
from kooplearn._src.utils import topk

def evaluate_representation(feature_map: FeatureMap, logistic_map: LogisticMap):
    report = {}
    # Compute OLS estimator
    cov, cross_cov = population_covs(feature_map, logistic_map)
    OLS_estimator = np.linalg.solve(cov, cross_cov)

    # Eigenvalue estimation
    OLS_eigs = np.linalg.eigvals(OLS_estimator)
    top_eigs = topk(np.abs(OLS_eigs), 3)
    OLS_eigs = OLS_eigs[top_eigs.indices]

    true_eigs = logistic_map.eig()
    top_eigs = topk(np.abs(true_eigs), 3)
    true_eigs = true_eigs[top_eigs.indices]

    report["hausdorff-distance"] = directed_hausdorff_distance(OLS_eigs, true_eigs)
    # VAMP2-score
    M = np.linalg.multi_dot(
        [
            np.linalg.pinv(cov, hermitian=True),
            cross_cov,
            np.linalg.pinv(cov, hermitian=True),
            cross_cov.T,
        ]
    )
    feature_dim = cov.shape[0]
    report["optimality-gap"] = np.sum(logistic_map.svals()[:feature_dim] ** 2) - np.trace(M)

    # Feasibility
    # report["feasibility-gap"] = np.linalg.norm(cov - np.eye(feature_dim), ord=2)
    # report["estimator-eigenvalues"] = OLS_eigs
    # report["covariance-eigenvalues"] = np.linalg.eigvalsh(cov)
    return report

Now we can assess the performace of the learned representation based on optimality gap and spectral error or hausdorff distance:

In [None]:
evaluate_representation(dpnet_fmap, logmap)

As we saw the result is aligned with the reported results in the paper, however we only run for one seed. Therefore, to establish a more meaningful statistical comparison between different models which tries to learn the representation we run each models for 20 independent runs and report mean and standard deviation.

## Evaluating DPNet

Now we run the DPNet over 20 independent runs:

In [None]:
relaxed = False
report = []

for rng_seed in range(num_rng_seeds):

    trainer = lightning.Trainer(**trainer_kwargs)
    net_kwargs = {"feature_dim": feature_dim, "layer_dims": layer_dims}

    # Defining the model
    dpnet_fmap = NNFeatureMap(
        SimpleMLP,
        DPNets['loss_fn'],
        opt,
        trainer,
        encoder_kwargs = net_kwargs,
        loss_kwargs = DPNets['loss_kwargs'],
        optimizer_kwargs = {"lr": 5e-5},
        lagged_encoder = SimpleMLP,
        lagged_encoder_kwargs = net_kwargs,
        seed = rng_seed)

    # Init
    torch.manual_seed(rng_seed)
    kaiming_init(dpnet_fmap.lightning_module.encoder)
    kaiming_init(dpnet_fmap.lightning_module.lagged_encoder)

    dpnet_fmap.fit(train_dl)

    report.append(evaluate_representation(dpnet_fmap, logmap))

Now we calculate mean and standard deviation for each metrics:

In [None]:
def stack_reports(reports):
    stacked = {}
    for key in [
        "hausdorff-distance",
        "optimality-gap",
        # "feasibility-gap",
        # "fit-time",
        # "time_per_epoch",
    ]:
        if key in reports[0]:
            stacked[key] = np.mean([r[key] for r in reports])
            key_std = key + "_std"
            stacked[key_std] = np.std([r[key] for r in reports])
    # for key in ["estimator-eigenvalues", "covariance-eigenvalues"]:
    #     stacked[key] = np.stack([r[key] for r in reports])
    return stacked

In [None]:
full_DPNet_report = stack_reports(report)

In [None]:
full_DPNet_report

## Evaluating DPNet relaxed

Now we run the DPNet-relaxed over 20 independent runs. To do so we just need to change the flag that we pass to the DPNet module to whether use the relaxed version or not. For more information about the difference between the relaxed version of loss function with the original read the section 3 of the paper \cite{}.

In [None]:
DPNets = {'loss_fn': DPLoss, 'loss_kwargs': {'relaxed': True, 'metric_deformation': metric_deformation_coeff, 'center_covariances': False}}
report = []

for rng_seed in range(num_rng_seeds):

    trainer = lightning.Trainer(**trainer_kwargs)
    net_kwargs = {"feature_dim": feature_dim, "layer_dims": layer_dims}

    # Defining the model
    dpnet_relaxed_fmap = NNFeatureMap(
        SimpleMLP,
        DPNets['loss_fn'],
        opt,
        trainer,
        encoder_kwargs = net_kwargs,
        loss_kwargs = DPNets['loss_kwargs'],
        optimizer_kwargs = {"lr": 5e-5},
        lagged_encoder = SimpleMLP,
        lagged_encoder_kwargs = net_kwargs,
        seed = rng_seed)

    # Init
    torch.manual_seed(rng_seed)
    kaiming_init(dpnet_relaxed_fmap.lightning_module.encoder)
    kaiming_init(dpnet_relaxed_fmap.lightning_module.lagged_encoder)

    dpnet_relaxed_fmap.fit(train_dl)

    report.append(evaluate_representation(dpnet_relaxed_fmap, logmap))

In [None]:
full_DPNet_relaxed_report = stack_reports(report)

In [None]:
full_DPNet_relaxed_report

## Evaluating VAMPNet

Now we run the VAMPNet \cite{} over 20 independent runs. To do so we just need to import the VAMPNet class from the kooplearn's feature maps.

In [None]:
from kooplearn.nn import VAMPLoss

In [None]:
VAMPNets = {'loss_fn': VAMPLoss, 'loss_kwargs': {'schatten_norm': 2, 'center_covariances': False}}
report = []

for rng_seed in range(num_rng_seeds):

    trainer = lightning.Trainer(**trainer_kwargs)
    net_kwargs = {"feature_dim": feature_dim, "layer_dims": layer_dims}

    # Defining the model
    vampnet_fmap = NNFeatureMap(
        SimpleMLP,
        VAMPNets['loss_fn'],
        opt,
        trainer,
        encoder_kwargs = net_kwargs,
        loss_kwargs = VAMPNets['loss_kwargs'],
        optimizer_kwargs = {"lr": 5e-5},
        lagged_encoder = SimpleMLP,
        lagged_encoder_kwargs = net_kwargs,
        seed = rng_seed)

    # Init
    torch.manual_seed(rng_seed)
    kaiming_init(vampnet_fmap.lightning_module.encoder)
    kaiming_init(vampnet_fmap.lightning_module.lagged_encoder)

    vampnet_fmap.fit(train_dl)

    report.append(evaluate_representation(vampnet_fmap, logmap))

In [None]:
full_VAMPNet_report = stack_reports(report)

In [None]:
full_VAMPNet_report

## Evaluating Noise Kernel

In [None]:
import scipy.special
from kooplearn.models.feature_maps import ConcatenateFeatureMaps

In [None]:
def noise_feat(n, x):
    return logmap.noise_feature(x, n)

def NoiseKernel(order: int = 3):
    binom_coeffs = [scipy.special.binom(N_exp, i) for i in range(N_exp + 1)]
    sorted_coeffs = np.argsort(binom_coeffs)

    fn_list = [partial(noise_feat, n) for n in sorted_coeffs[:order]]

    return ConcatenateFeatureMaps(fn_list)

In [None]:
full_NoiseKernel_report = evaluate_representation(NoiseKernel(feature_dim), logistic_map=logmap)

In [None]:
full_NoiseKernel_report

## Evaluating Cheby-T Kernel

In [None]:
def scaled_chebyt(n, x):
    return scipy.special.eval_chebyt(n, 2 * x - 1)

def ChebyT(feature_dim: int = 3):

    fn_list = [partial(scaled_chebyt, n) for n in range(feature_dim)]
    return ConcatenateFeatureMaps(fn_list)

In [None]:
full_ChebyTKenel_report = evaluate_representation(ChebyT(feature_dim),logmap)

In [None]:
full_ChebyTKenel_report