<a href="https://colab.research.google.com/github/Nicola-Ibrahim/Pareto-Optimization/blob/main/notebooks/02_pareto_interpolation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Autoencoder Formulation for Pareto Front Analysis

Some of considerd limitations:
1. Defining the optimal point (energy/time -> acceleration) at the beginning of the system. The user has the ability to select that.

2. All data features should be taken into account, e.g. (Decision variables + problem parameters + objectives)

In [2]:
import numpy as np
import pandas as pd

import torch; torch.manual_seed(0)
import torch.nn as nn
import torch.nn.functional as F
import torch.utils
import torch.distributions
import torchvision
import numpy as np
import matplotlib.pyplot as plt; plt.rcParams['figure.dpi'] = 200
from torch.utils.data import DataLoader
from tqdm import tqdm



import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, LayerNormalization, MultiHeadAttention
from tensorflow.keras.models import Model


from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split





## Date reading

**Input Data Structure**:
- Let $X = \{\mathbf{x}_i\}_{i=1}^N \subset \mathbb{R}^2$ be the set of Pareto-optimal solutions
- Each solution $\mathbf{x}_i = (f_1^{(i)}, f_2^{(i)}, ...,f_n^{(i)})$ represents a trade-off between:
  - $f_1$: Travel time (minutes)
  - $f_2$: Energy consumption (kWh)
  - $f_n$: Other data variables

Then, filter out the needed data for trainig purpose

In [30]:
pareto_data = pd.read_pickle("./pareto_data.pkl")
print(pareto_data)

     accel_phase1  accel_phase2  travel_time_min  energy_consumption_kWh
0        0.320633      0.320324       111.746856                0.111468
1        3.999286      3.997122        40.836702                0.220684
2        0.475870      0.459985        93.265481                0.148673
3        0.369715      0.362149       105.101609                0.123231
4        0.300027      3.913528        77.547350                0.152288
..            ...           ...              ...                     ...
195      0.334531      3.990467        74.527048                0.157926
196      0.654983      3.994694        59.414347                0.210287
197      0.673767      3.990258        58.882757                0.213357
198      0.398851      3.970569        70.094236                0.168436
199      0.500042      3.989768        64.902046                0.184970

[200 rows x 4 columns]



## Data preparation
**Normalization**: Scale objectives to $[0,1]$ range for training stability:
$$
\hat{f}_k = \frac{f_k - f_{k}^{min}}{f_{k}^{max} - f_{k}^{min}}, \quad \text{for } k=1,2
$$

**Standardization**:
$$
\hat{f}_k^{(i)} = \frac{f_k^{(i)} - \mu_k}{\sigma_k}, \quad \text{for } k=1,2
$$
where $\mu_k$, $\sigma_k$ are the mean and standard deviation of each objective.


In [37]:
# Check if the the data need to be normalized
pareto_data.aggregate({'travel_time_min': ['min', 'max'], 'energy_consumption_kWh': ['min', 'max']})


Unnamed: 0,travel_time_min,energy_consumption_kWh
min,40.836702,0.111468
max,111.746856,0.220684


In [38]:
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

# Normalize to [0, 1]
scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(pareto_data[['travel_time_min', 'energy_consumption_kWh']])

# Split data (80% train, 20% validation)
X_train, X_val = train_test_split(X_normalized, test_size=0.2, random_state=42)

## Conditional VAE Architecture

### Why to use?
In traditional autoencoders, inputs are mapped deterministically to a latent vector
$z=e(x)$. In variational autoencoders, inputs are mapped to a probability distribution over latent vectors, and a latent vector is then sampled from that distribution. The decoder becomes more robust at decoding latent vectors as a result.

### Variational Autoencoder (VAE) Latent Space Mapping

Instead of directly mapping the input **$ x $** to a latent vector **$ z = e(x) $**, we instead map it to:

- **Mean vector**: **$ \mu(x) $**
- **Standard deviation vector**: **$ \sigma(x) $**

These parameters define a **diagonal Gaussian distribution** $ N(\mu(x), \sigma(x)) $, from which we sample the latent vector $ z $:

$$
z \sim N(\mu(x), \sigma(x))
$$

This formulation allows the model to learn a probabilistic latent space representation where each input $ x $ defines its own distribution over latent codes rather than a single deterministic point.


### Encoder Network (Compression)
$$
\mathbf{z} = g_\phi(\mathbf{x}) = \text{LeakyReLU}(\mathbf{W}_2 \cdot \text{ELU}(\mathbf{W}_1\mathbf{x} + \mathbf{b}_1) + \mathbf{b}_2)
$$

### Decoder Network (Reconstruction)
$$
\hat{\mathbf{x}} = f_\theta(\mathbf{z}) = \text{Sigmoid}(\mathbf{W}_4 \cdot \text{ELU}(\mathbf{W}_3\mathbf{z} + \mathbf{b}_3) + \mathbf{b}_4)
$$

**Dimensionality**:
- Input/Output: $\mathbb{R}^2$ (normalized objectives)
- Latent space: $\mathbb{R}^1$ (bottleneck)
- Hidden layers: 32 neurons with ELU activation


## Architecture
Sigmoid ensures outputs stay in normalized $[0,1]$ range

### Encoder
Maps 2D Pareto solutions to a 1D latent space:
$$
\mathbf{z} = \text{Encoder}(\mathbf{x}) = \sigma(\mathbf{W}_2 \cdot \text{ReLU}(\mathbf{W}_1\mathbf{x} + \mathbf{b}_1) + \mathbf{b}_2)
$$

Where:
- $\mathbf{W}_1 \in \mathbb{R}^{h \times 2}$, $\mathbf{W}_2 \in \mathbb{R}^{1 \times h}$ are weight matrices
- $\mathbf{b}_1 \in \mathbb{R}^h$, $\mathbf{b}_2 \in \mathbb{R}^1$ are bias terms
- $h$ is hidden layer size
- $\sigma$ is sigmoid activation

### Decoder
Reconstructs solutions from latent space:
$$
\hat{\mathbf{x}} = \text{Decoder}(\mathbf{z}) = \sigma(\mathbf{W}_4 \cdot \text{ReLU}(\mathbf{W}_3\mathbf{z} + \mathbf{b}_3) + \mathbf{b}_4)
$$

With:
- $\mathbf{W}_3 \in \mathbb{R}^{h \times 1}$, $\mathbf{W}_4 \in \mathbb{R}^{2 \times h}$
- $\mathbf{b}_3 \in \mathbb{R}^h$, $\mathbf{b}_4 \in \mathbb{R}^2$


### Activation function

### Loss Function
Mean Squared Error (MSE) reconstruction loss:
$$
\mathcal{L}_{recon} = \frac{1}{N}\sum_{i=1}^N \|\mathbf{x}_i - \hat{\mathbf{x}}_i\|^2_2
$$


### Input features

[
    target_distance,        # Scalar (e.g., 7.5 km)
    time_weight,            # 0.0 (min) ↔ 1.0 (max)
    energy_weight,          # 0.0 (min) ↔ 1.0 (max)
    constraints_vector      # [max_jerk, battery_limit, ...]
]

### Output Features
[
    acceleration_profile,   # Time-series (100 steps)
    deceleration_profile    # Time-series (100 steps)
]


In [None]:
# Example Training Batch
batch = {
    'input_conditions': torch.tensor([
        # [distance, time_weight, energy_weight, max_jerk]
        [7.5, 0.9, 0.1, 0.3],    # Minimum time
        [10.0, 0.5, 0.5, 0.3],    # Balanced
        [15.0, 0.1, 0.9, 0.3]     # Minimum energy
    ], dtype=torch.float32),

    'output_profiles': torch.tensor([
        # [accel_profile (100 steps), decel_profile (100 steps)]
        [0.8, 0.82, ..., 1.2, 0.5, 0.48, ..., 0.1],  # Aggressive accel
        [0.5, 0.52, ..., 0.6, 0.4, 0.38, ..., 0.2],  # Moderate
        [0.3, 0.31, ..., 0.4, 0.6, 0.62, ..., 0.8]   # Conservative
    ], dtype=torch.float32)
}


### Loss function

loss = reconstruction_loss + α*physics_loss + β*constraint_penalty


### Training process
Filter out the non-dominated solutions as resemble pareto front



In [None]:

class Encoder(nn.Module):
    def __init__(self, input_dim, latent_dim, condition_dim):
        super().__init__()
        self.input_dim = input_dim
        self.condition_dim = condition_dim
        self.shared_encoder = nn.Sequential(
            nn.Linear(input_dim + condition_dim, 512),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2)
        )
        self.mu = nn.Linear(256, latent_dim)
        self.logvar = nn.Linear(256, latent_dim)

    def forward(self, x, conditions):
        x = torch.cat([x, conditions], dim=1)
        h = self.shared_encoder(x)
        return self.mu(h), self.logvar(h)

class Decoder(nn.Module):
    def __init__(self, output_dim, latent_dim, condition_dim):
        super().__init__()
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim + condition_dim, 256),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3),
            nn.Linear(256, 512),
            nn.LeakyReLU(0.2),
            nn.Linear(512, output_dim),
            nn.Tanh()  # Constrain output to [-1,1] for stability
        )

    def forward(self, z, conditions):
        z = torch.cat([z, conditions], dim=1)
        return self.decoder(z)

class PhysicsModel(nn.Module):
    def __init__(self):
        super().__init__()
        # Replace with actual physics simulation layers
        self.simulation_network = nn.Sequential(
            nn.Linear(200, 128),  # Input: 100-step accel + 100-step decel
            nn.LeakyReLU(),
            nn.Linear(128, 2)  # Output: time, energy
        )

    def calculate_jerk(self, profiles):
        # Implement actual jerk calculation
        return torch.norm(profiles[:,1:] - profiles[:,:-1], dim=1)

    def forward(self, profiles):
        simulated = self.simulation_network(profiles)
        jerk = self.calculate_jerk(profiles)
        return simulated[:,0], simulated[:,1], jerk

class CVAE(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.encoder = Encoder(config['input_dim'],
                             config['latent_dim'],
                             config['condition_dim'])
        self.decoder = Decoder(config['output_dim'],
                             config['latent_dim'],
                             config['condition_dim'])
        self.physics_model = PhysicsModel()
        self.device = torch.device(config['device'])

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5*logvar)
        eps = torch.randn_like(std)
        return mu + eps*std

    def forward(self, x, conditions):
        mu, logvar = self.encoder(x, conditions)
        z = self.reparameterize(mu, logvar)
        return self.decoder(z, conditions), mu, logvar

    def loss_function(self, recon_x, x, mu, logvar, conditions):
        # Reconstruction loss
        BCE = F.mse_loss(recon_x, x, reduction='sum')

        # KL divergence
        KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

        # Physics constraints
        time_pred, energy_pred, jerk = self.physics_model(recon_x)

        # Time and energy targets from conditions
        time_target = conditions[:,0] * MAX_TIME
        energy_target = conditions[:,1] * MAX_ENERGY

        # Physics-based losses
        time_loss = F.l1_loss(time_pred, time_target)
        energy_loss = F.l1_loss(energy_pred, energy_target)
        jerk_loss = F.relu(jerk - MAX_JERK).mean()

        return {
            'loss': BCE + KLD + time_loss + energy_loss + jerk_loss,
            'BCE': BCE,
            'KLD': KLD,
            'time_loss': time_loss,
            'energy_loss': energy_loss,
            'jerk_loss': jerk_loss
        }

def train(model, dataloader, config):
    model.to(config['device'])
    model.train()

    optimizer = torch.optim.AdamW(model.parameters(),
                                lr=config['lr'],
                                weight_decay=1e-5)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min')

    for epoch in range(config['epochs']):
        progress = tqdm(dataloader, desc=f"Epoch {epoch+1}")
        total_loss = 0

        for batch in progress:
            x = batch['profile'].to(config['device'])
            conditions = batch['conditions'].to(config['device'])

            optimizer.zero_grad()

            recon_x, mu, logvar = model(x, conditions)
            losses = model.loss_function(recon_x, x, mu, logvar, conditions)

            losses['loss'].backward()
            nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

            total_loss += losses['loss'].item()
            progress.set_postfix(loss=losses['loss'].item()/x.size(0))

        scheduler.step(total_loss)
        print(f"Epoch {epoch+1} Avg Loss: {total_loss/len(dataloader.dataset)}")

def get_device():
    return torch.device('cuda' if torch.cuda.is_available() else 'cpu')




In [None]:
class TransformerEncoder(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.attention = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.norm = LayerNormalization()
        self.dense = Dense(embed_dim, activation='gelu')

    def call(self, inputs):
        attn_output = self.attention(inputs, inputs)
        x = self.norm(inputs + attn_output)
        return self.dense(x)

class TransformerDecoder(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        self.attention = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.norm = LayerNormalization()
        self.dense = Dense(embed_dim, activation='gelu')

    def call(self, inputs, context):
        attn_output = self.attention(inputs, context)
        x = self.norm(inputs + attn_output)
        return self.dense(x)





# Custom loss with reconstruction and latent regularization
def total_loss(y_true, y_pred):
    recon_loss = tf.keras.losses.mean_squared_error(y_true, y_pred)
    latent_reg = tf.reduce_mean(tf.square(latent))
    return recon_loss + 0.1 * latent_reg

# Transformer Autoencoder
input_dim = 2
latent_dim = 1
embed_dim = 32
num_heads = 2

# Encoder
inputs = Input(shape=(input_dim,))
x = Dense(embed_dim)(inputs)
x = TransformerEncoder(embed_dim, num_heads)(x)
latent = Dense(latent_dim, name='latent')(x)

# Decoder
decoder_input = Input(shape=(latent_dim,))
x = Dense(embed_dim)(decoder_input)
x = TransformerDecoder(embed_dim, num_heads)(x, x)
outputs = Dense(input_dim, activation='sigmoid')(x)

# Models
encoder = Model(inputs, latent)
decoder = Model(decoder_input, outputs)
autoencoder = Model(inputs, decoder(encoder(inputs)))


autoencoder.compile(optimizer=tf.keras.optimizers.Adam(0.001), loss=hybrid_loss)

history = autoencoder.fit(
    X_train, X_train,
    epochs=500,
    batch_size=16,
    validation_data=(X_val, X_val),
    callbacks=[
        tf.keras.callbacks.EarlyStopping(patience=20, restore_best_weights=True)
    ]
)

1. The `call()` method of your layer may be crashing. Try to `__call__()` the layer eagerly on some test input first to see if it works. E.g. `x = np.random.random((3, 4)); y = layer(x)`
2. If the `call()` method is correct, then you may need to implement the `def build(self, input_shape)` method on your layer. It should create all variables used by the layer (e.g. by calling `layer.build()` on all its children layers).
Exception encountered: ''Exception encountered when calling Softmax.call().

[1mtuple index out of range[0m

Arguments received by Softmax.call():
  • inputs=tf.Tensor(shape=(None, 2), dtype=float32)
  • mask=None''


IndexError: Exception encountered when calling TransformerEncoder.call().

[1mCould not automatically infer the output shape / dtype of 'transformer_encoder' (of type TransformerEncoder). Either the `TransformerEncoder.call()` method is incorrect, or you need to implement the `TransformerEncoder.compute_output_spec() / compute_output_shape()` method. Error encountered:

Exception encountered when calling Softmax.call().

[1mtuple index out of range[0m

Arguments received by Softmax.call():
  • inputs=tf.Tensor(shape=(None, 2), dtype=float32)
  • mask=None[0m

Arguments received by TransformerEncoder.call():
  • args=('<KerasTensor shape=(None, 32), dtype=float32, sparse=False, name=keras_tensor_1>',)
  • kwargs=<class 'inspect._empty'>

In [None]:
# Training

# Configuration Example
config = {
    'input_dim': 200,       # 100-step accel + 100-step decel
    'output_dim': 200,
    'latent_dim': 32,
    'condition_dim': 4,     # [distance, time_w, energy_w, max_jerk]
    'device': get_device(),
    'epochs': 100,
    'lr': 1e-4,
    'batch_size': 64
}

cvae = CVAE(config)

# Create synthetic dataset
train_data = [...]  # Implement proper Dataset class
dataloader = DataLoader(train_data,
                        batch_size=config['batch_size'],
                        shuffle=True)

# Train model
train(cvae, dataloader, config)

# Save model
torch.save(cvae.state_dict(), 'monocap_cvae.pth')


## Interpolation in Latent Space

Since the autoencoder help to transform the space into more representive, less feature complexity, and clustering the paretor solutions depending on similarity. We can utilize it to query new pareto solution for arabitrary defined distance by the user (7.5km)
Since the optimum pareto solution could not be the best choice we can interpolate it for better result


### Interploation tactics:
- Linear Interploation:
    1. search for the optimum solution in the closest cluster, e.g. if the request is something like (14Km, 0.8 time, 0.2 energy), then the autoencoder should know to narrow down the search for better pareto front accessibility.

    2. apply linear interpolation, taking the one of the optimal point in the pareto front. e.g if the time is more improtant then we have to shift the searching to area wher the time is more important and then select the opmtimum point, then we apply the interpolation on it.

    3. decoded the point to get the corrosponding $u_t$ input

    4. check the feasibilty and validity of the generated new input and deploy to MonoCap
    
- Linear Interpolation
For solutions $\mathbf{x}_A$, $\mathbf{x}_B$:
    1. Encode two solutions:
    $$
    \mathbf{z}_A = \text{Encoder}(\mathbf{x}_A), \quad \mathbf{z}_B = \text{Encoder}(\mathbf{x}_B)
    $$

    2. Linear interpolation:
    $$
    \mathbf{z}_{new} = \alpha\mathbf{z}_A + (1-\alpha)\mathbf{z}_B, \quad \alpha \in [0,1]
    $$

    3. Decode to generate new solution:
    $$
    \mathbf{x}_{new} = \text{Decoder}(\mathbf{z}_{new})
    $$


- Geodesic Interpolation
    For solutions $\mathbf{x}_A$, $\mathbf{x}_B$:
    1. Encode: $\mathbf{z}_A = g_\phi(\mathbf{x}_A)$, $\mathbf{z}_B = g_\phi(\mathbf{x}_B)$
    2. Spherical interpolation:
    $$
    \mathbf{z}_{\text{new}} = \frac{\sin[(1-\alpha)\Omega]}{\sin\Omega}\mathbf{z}_A + \frac{\sin[\alpha\Omega]}{\sin\Omega}\mathbf{z}_B
    $$
    where $\Omega = \arccos(\mathbf{z}_A^\top \mathbf{z}_B)$

    3. Decode:
    $$
    \mathbf{x}_{\text{new}} = f_\theta(\mathbf{z}_{\text{new}})
    $$


In [None]:
# Interpolation with geodesic sampling
def geodesic_interpolate(z1, z2, alpha):
    omega = np.arccos(np.clip(np.dot(z1.T, z2)[0,0], -1, 1))
    return (np.sin((1-alpha)*omega)/np.sin(omega))*z1 + (np.sin(alpha*omega)/np.sin(omega))*z2

z_A = encoder.predict(X_train[0:1])
z_B = encoder.predict(X_train[1:2])

for alpha in np.linspace(0, 1, 5):
    z_new = geodesic_interpolate(z_A, z_B, alpha)
    J_new = decoder.predict(z_new)
    J_original = scaler.inverse_transform(J_new)
    print(f"α={alpha:.1f}: Time={J_original[0,0]:.2f}s, Energy={J_original[0,1]:.2f}kWh")


In [None]:
# Interpolation with linear sampling

# Encode two Pareto solutions
z_A = encoder.predict(X_train[0:1])  # Solution A
z_B = encoder.predict(X_train[1:2])  # Solution B

# Linear interpolation
alpha = 0.5
z_new = alpha * z_A + (1 - alpha) * z_B

# Decode to generate new solution
J_new = decoder.predict(z_new)

# Denormalize
J_new_original = scaler.inverse_transform(J_new)
print(f"Interpolated Solution: Time = {J_new_original[0, 0]:.2f} s, Energy = {J_new_original[0, 1]:.2f} kWh")

## Testing

In [None]:
class MonocapOptimizer:
    def __init__(self, vae_model):
        self.model = vae_model
        self.scaler = load_scaler()  # Trained data normalizer

    def query(self, distance_km, time_preference=0.5, energy_preference=0.5):
        # Normalize inputs
        conditions = self.scaler.transform([
            [distance_km, time_preference, energy_preference, 0.3]
        ])

        # Generate latent sample
        z = torch.randn(1, 32)

        # Decode with conditions
        with torch.no_grad():
            profile = self.model.decoder(torch.cat([
                z,
                torch.tensor(conditions, dtype=torch.float32)
            ]))

        # Post-process
        return self._clamp_profile(profile.numpy())

In [None]:
optimizer = MonocapOptimizer(trained_vae)

# Get minimum time solution for 10km
fast_profile = optimizer.query(
    distance_km=10,
    time_preference=0.9,
    energy_preference=0.1
)

# Get balanced solution for 15km
balanced_profile = optimizer.query(
    distance_km=15,
    time_preference=0.5,
    energy_preference=0.5
)

# Get energy-efficient solution for 20km
efficient_profile = optimizer.query(
    distance_km=20,
    time_preference=0.2,
    energy_preference=0.8
)

## Solution Validation Protocol

### Dominance Verification
$$
\mathbf{x}_{\text{new}} \text{ is non-dominated iff } \nexists \mathbf{x}_i \in X :
\begin{cases}
f_1^{(i)} \leq f_1^{\text{(new)}} \\
f_2^{(i)} \leq f_2^{\text{(new)}} \\
\|\mathbf{x}_i - \mathbf{x}_{\text{new}}\|_2 > \delta
\end{cases}
$$

### Feasibility Check
$$
\mathbf{x}_{\text{new}} \in \mathcal{F} \iff
\begin{cases}
g_1(\mathbf{x}_{\text{new}}) \leq 0 \\
g_2(\mathbf{x}_{\text{new}}) \leq 0 \\
\vdots \\
g_k(\mathbf{x}_{\text{new}}) \leq 0
\end{cases}
$$

Ensure:
$$
f_1^{new} \geq 0, \quad f_2^{new} \geq 0
$$
And any problem-specific constraints (e.g., $g(\mathbf{x}_{new}) \leq 0$)

### Check mapping back to decision space (acceleratio, decelereation)

In [None]:
# Create Pareto Validator class


# Resources

[1] [Autoencoder implementation in Pytorch](https://avandekleut.github.io/vae/)

