In [4]:
import pickle
def load_trajectories(file_path):
    with open(file_path, "rb") as f:
        trajectories = pickle.load(f)
    return trajectories
trajectories = load_trajectories("./trajectories10.pkl")

For all trajectories together

In [None]:
"""
gives you
Overall reconstruction accuracy: 91.87%
using
    latent_dim = 64


or
Overall reconstruction accuracy: 88.42%
using
    latent_dim = 32

"""

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np

class ResidualBlock(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(dim, dim),
            nn.LayerNorm(dim),
            nn.GELU(),
            nn.Dropout(0.05),
            nn.Linear(dim, dim),
            nn.LayerNorm(dim)
        )

    def forward(self, x):
        return x + self.fc(x)

class Encoder(nn.Module):
    def __init__(self, input_dim, latent_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 1024),
            nn.LayerNorm(1024),
            nn.GELU(),
            ResidualBlock(1024),

            nn.Linear(1024, 512),
            nn.LayerNorm(512),
            nn.GELU(),
            ResidualBlock(512),

            nn.Linear(512, latent_dim),
        )

    def forward(self, x):
        return self.net(x)

class Decoder(nn.Module):
    def __init__(self, input_dim, latent_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(latent_dim, 512),
            nn.LayerNorm(512),
            nn.GELU(),
            ResidualBlock(512),

            nn.Linear(512, 1024),
            nn.LayerNorm(1024),
            nn.GELU(),
            ResidualBlock(1024),

            nn.Linear(1024, input_dim)
        )

    def forward(self, x):
        return self.net(x)

def train_autoencoder(trajectories, latent_dim=64, batch_size=128, num_epochs=100, lr=1e-3):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    trajectory_lengths = [t.shape[0] for t in trajectories]
    input_dim = trajectories[0].shape[1]

    stacked_trajectories = torch.cat(trajectories, dim=0)

    # Create dataset and loader
    dataset = TensorDataset(stacked_trajectories)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # Initialize models
    encoder = Encoder(input_dim, latent_dim).to(device)
    decoder = Decoder(input_dim, latent_dim).to(device)

    optimizer = optim.Adam(list(encoder.parameters()) + list(decoder.parameters()), lr=lr)
    criterion = nn.MSELoss()
    best_loss = float('inf')

    for epoch in range(1, num_epochs + 1):
        encoder.train()
        decoder.train()
        epoch_loss = 0.0

        for (batch,) in loader:
            x = batch.to(device)

            # Forward pass
            z = encoder(x)
            x_recon = decoder(z)

            # Compute loss
            loss = criterion(x_recon, x)

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item() * x.size(0)

        epoch_loss /= len(dataset)

        # Evaluation
        if epoch % 10 == 0 or epoch == num_epochs:
            encoder.eval()
            decoder.eval()

            with torch.no_grad():
                # Compute Euclidean distance 
                val_batch = stacked_trajectories.to(device)
                val_recon = decoder(encoder(val_batch))
                l2_dist = torch.norm(val_recon - val_batch, dim=1).mean().item()

                # Cosine similarity 
                cos_sim = nn.functional.cosine_similarity(val_recon, val_batch, dim=1).mean().item()

                print(f"Epoch {epoch:03d} | Loss: {epoch_loss:.6f} | L2 Dist: {l2_dist:.6f} | Cosine Sim: {cos_sim:.6f}")

            if epoch_loss < best_loss:
                best_loss = epoch_loss

    return encoder, decoder, trajectory_lengths

def encode_trajectories(encoder, trajectories, device=None):
    """
    Encode all trajectories to lower dimension while preserving trajectory structure
    """
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    encoder.eval()
    trajectory_lengths = [t.shape[0] for t in trajectories]
    encoded_trajectories = []

    with torch.no_grad():
        for trajectory in trajectories:
            # Encode each trajectory
            encoded = encoder(trajectory.to(device))
            encoded_trajectories.append(encoded)

    return encoded_trajectories, trajectory_lengths

def decode_trajectories(decoder, encoded_trajectories, device=None):
    """
    Decode all encoded trajectories back to original dimension
    """
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    decoder.eval()
    decoded_trajectories = []

    with torch.no_grad():
        for encoded in encoded_trajectories:
            # Decode each trajectory
            decoded = decoder(encoded.to(device))
            decoded_trajectories.append(decoded)

    return decoded_trajectories

def evaluate_reconstruction(original_trajectories, decoded_trajectories):
    """
    Evaluate reconstruction quality across all trajectories
    """
    total_l2_dist = 0
    total_cos_sim = 0
    total_points = 0

    for orig, recon in zip(original_trajectories, decoded_trajectories):
        if orig.device != recon.device:
            recon = recon.to(orig.device)

        # L2 distance
        l2_dist = torch.norm(recon - orig, dim=1).mean().item()

        # Cosine similarity
        cos_sim = nn.functional.cosine_similarity(recon, orig, dim=1).mean().item()

        # Weight by number of points in trajectory
        n_points = orig.shape[0]
        total_l2_dist += l2_dist * n_points
        total_cos_sim += cos_sim * n_points
        total_points += n_points

    avg_l2_dist = total_l2_dist / total_points
    avg_cos_sim = total_cos_sim / total_points

    return {
        "avg_l2_distance": avg_l2_dist,
        "avg_cosine_similarity": avg_cos_sim
    }


def calculate_reconstruction_accuracy(encoder, decoder, trajectories, device=None):
    """
    Calculate reconstruction accuracy as a percentage for the PyTorch autoencoder
    """
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Get encoded and decoded trajectories
    encoded_trajectories, _ = encode_trajectories(encoder, trajectories, device)
    decoded_trajectories = decode_trajectories(decoder, encoded_trajectories, device)

    original_data = torch.cat(trajectories, dim=0).cpu().numpy()
    reconstructed_data = torch.cat(decoded_trajectories, dim=0).cpu().numpy()

    mse = np.mean(np.square(original_data - reconstructed_data))
    variance = np.var(original_data)
    accuracy_percentage = (1 - mse/variance) * 100

    # Calculate component-wise accuracy
    component_mse = np.mean(np.square(original_data - reconstructed_data), axis=0)
    component_var = np.var(original_data, axis=0)
    component_accuracy = np.mean((1 - component_mse/component_var) * 100)

    metrics = evaluate_reconstruction(trajectories, decoded_trajectories)

    print("\nReconstruction Accuracy Metrics:")
    print(f"Overall reconstruction accuracy: {accuracy_percentage:.2f}%")
    print(f"Average component-wise accuracy: {component_accuracy:.2f}%")
    print(f"Average L2 Distance: {metrics['avg_l2_distance']:.6f}")
    print(f"Average Cosine Similarity: {metrics['avg_cosine_similarity']:.6f}")

    return {
        "overall_accuracy": accuracy_percentage,
        "component_accuracy": (1 - component_mse/component_var) * 100,
        "l2_distance": metrics["avg_l2_distance"],
        "cosine_similarity": metrics["avg_cosine_similarity"],
        "mse": mse
    }


if __name__ == "__main__":
    # Hyperparameters
    latent_dim = 32  # Target reduced dimension
    batch_size = 64
    lr = 1e-3
    num_epochs = 32

    encoder, decoder, trajectory_lengths = train_autoencoder(
        trajectories=trajectories,
        latent_dim=latent_dim,
        batch_size=batch_size,
        num_epochs=num_epochs,
        lr=lr
    )



    encoded_trajectories, _ = encode_trajectories(encoder, trajectories)
    decoded_trajectories = decode_trajectories(decoder, encoded_trajectories)
    metrics = evaluate_reconstruction(trajectories, decoded_trajectories)

    print("\nFinal Evaluation:")
    print(f"Average L2 Distance: {metrics['avg_l2_distance']:.6f}")
    print(f"Average Cosine Similarity: {metrics['avg_cosine_similarity']:.6f}")

    accuracy_metrics = calculate_reconstruction_accuracy(encoder, decoder, trajectories)




Epoch 010 | Loss: 1.791686 | L2 Dist: 49.975746 | Cosine Sim: 0.867917
Epoch 020 | Loss: 1.176212 | L2 Dist: 40.038967 | Cosine Sim: 0.917338
Epoch 030 | Loss: 0.941497 | L2 Dist: 35.702042 | Cosine Sim: 0.935312
Epoch 032 | Loss: 0.912739 | L2 Dist: 35.106148 | Cosine Sim: 0.937644

Final Evaluation:
Average L2 Distance: 35.106148
Average Cosine Similarity: 0.937644

Reconstruction Accuracy Metrics:
Overall reconstruction accuracy: 88.42%
Average component-wise accuracy: 73.72%
Average L2 Distance: 35.106148
Average Cosine Similarity: 0.937644


In [None]:
"""
gives you

Overall reconstruction accuracy: 85%
using
    latent_dim = 32

"""
import tensorflow as tf
import numpy as np
from tensorflow.keras import layers, Model, optimizers, losses, metrics
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import matplotlib.pyplot as plt
from tqdm import tqdm

class ResidualBlock(layers.Layer):
    def __init__(self, dim, dropout_rate=0.05):
        super(ResidualBlock, self).__init__()
        self.layer_norm1 = layers.LayerNormalization(epsilon=1e-6)
        self.dense1 = layers.Dense(dim, activation=None)
        self.activation = layers.Activation('swish')  # Using Swish activation (SiLU)
        self.dropout = layers.Dropout(dropout_rate)
        self.layer_norm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dense2 = layers.Dense(dim, activation=None)

    def call(self, inputs, training=False):
        x = self.layer_norm1(inputs)
        x = self.dense1(x)
        x = self.activation(x)
        x = self.dropout(x, training=training)
        x = self.layer_norm2(x)
        x = self.dense2(x)
        return x + inputs  # Residual connection

class TFAutoencoder:
    def __init__(self, input_dim=1536, latent_dim=64):
        self.input_dim = input_dim
        self.latent_dim = latent_dim
        self.encoder = self._build_encoder()
        self.decoder = self._build_decoder()
        self.model = self._build_autoencoder()

    def _build_encoder(self):
        inputs = layers.Input(shape=(self.input_dim,))

        # Initial projection
        x = layers.Dense(1024)(inputs)
        x = layers.LayerNormalization(epsilon=1e-6)(x)
        x = layers.Activation('swish')(x)

        # First residual block
        x = ResidualBlock(1024)(x)

        # Middle layers
        x = layers.Dense(512)(x)
        x = layers.LayerNormalization(epsilon=1e-6)(x)
        x = layers.Activation('swish')(x)

        # Second residual block
        x = ResidualBlock(512)(x)

        # Final projection to latent space
        encoded = layers.Dense(self.latent_dim)(x)

        return Model(inputs, encoded, name="encoder")

    def _build_decoder(self):
        encoded_inputs = layers.Input(shape=(self.latent_dim,))

        # Initial projection
        x = layers.Dense(512)(encoded_inputs)
        x = layers.LayerNormalization(epsilon=1e-6)(x)
        x = layers.Activation('swish')(x)

        # First residual block
        x = ResidualBlock(512)(x)

        # Middle layers
        x = layers.Dense(1024)(x)
        x = layers.LayerNormalization(epsilon=1e-6)(x)
        x = layers.Activation('swish')(x)

        # Second residual block
        x = ResidualBlock(1024)(x)

        # Final projection back to original space
        decoded = layers.Dense(self.input_dim)(x)

        return Model(encoded_inputs, decoded, name="decoder")

    def _build_autoencoder(self):
        inputs = layers.Input(shape=(self.input_dim,))
        encoded = self.encoder(inputs)
        decoded = self.decoder(encoded)
        return Model(inputs, decoded, name="autoencoder")

    def compile(self, learning_rate=1e-3):
        lr_schedule = tf.keras.optimizers.schedules.CosineDecay(
            initial_learning_rate=learning_rate,
            decay_steps=10000
        )

        optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)

        def combined_loss(y_true, y_pred):
            # MSE 
            mse = tf.reduce_mean(tf.square(y_true - y_pred))

            # Cosine similarity 
            y_true_norm = tf.nn.l2_normalize(y_true, axis=1)
            y_pred_norm = tf.nn.l2_normalize(y_pred, axis=1)
            cosine_loss = 1 - tf.reduce_mean(tf.reduce_sum(y_true_norm * y_pred_norm, axis=1))

            return mse + 0.2 * cosine_loss

        self.model.compile(
            optimizer=optimizer,
            loss=combined_loss,
            metrics=[
                'mse',  # Mean Squared Error
                tf.keras.metrics.CosineSimilarity(axis=1)  # Cosine similarity
            ]
        )

    def train(self, trajectories, batch_size=128, epochs=100, validation_split=0.1):
        # Stack all trajectories while keeping track of lengths
        trajectory_lengths = [t.shape[0] for t in trajectories]
        stacked_data = np.vstack([t.numpy() if isinstance(t, tf.Tensor) else t for t in trajectories])

        stacked_tensor = tf.convert_to_tensor(stacked_data, dtype=tf.float32)

        callbacks = [
            EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
            ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)
        ]

        # Train the model
        history = self.model.fit(
            stacked_tensor, stacked_tensor,  # Autoencoder: input = target
            batch_size=batch_size,
            epochs=epochs,
            validation_split=validation_split,
            callbacks=callbacks,
            shuffle=True
        )

        return history, trajectory_lengths

    def encode_trajectories(self, trajectories):
        """Encode each trajectory individually while preserving structure"""
        encoded_trajectories = []
        for trajectory in trajectories:
            if not isinstance(trajectory, (np.ndarray, tf.Tensor)):
                trajectory = trajectory.numpy()

            if not isinstance(trajectory, tf.Tensor):
                trajectory = tf.convert_to_tensor(trajectory, dtype=tf.float32)

            # Encode 
            encoded = self.encoder.predict(trajectory, verbose=0)
            encoded_trajectories.append(encoded)

        return encoded_trajectories

    def decode_trajectories(self, encoded_trajectories):
        """Decode each encoded trajectory back to original dimensionality"""
        decoded_trajectories = []
        for encoded in encoded_trajectories:
            if not isinstance(encoded, tf.Tensor):
                encoded = tf.convert_to_tensor(encoded, dtype=tf.float32)

            # Decode 
            decoded = self.decoder.predict(encoded, verbose=0)
            decoded_trajectories.append(decoded)

        return decoded_trajectories

    def evaluate_reconstruction(self, original_trajectories, decoded_trajectories=None):
        """Evaluate reconstruction quality across all trajectories"""
        if decoded_trajectories is None:
            # If decoded trajectories not provided, encode and decode originals
            encoded = self.encode_trajectories(original_trajectories)
            decoded_trajectories = self.decode_trajectories(encoded)

        total_mse = 0
        total_cosine_sim = 0
        total_points = 0

        for i, (orig, recon) in enumerate(zip(original_trajectories, decoded_trajectories)):
            if isinstance(orig, tf.Tensor):
                orig = orig.numpy()
            if isinstance(recon, tf.Tensor):
                recon = recon.numpy()
            if not isinstance(orig, np.ndarray):
                orig = orig.numpy()
            if not isinstance(recon, np.ndarray):
                recon = recon.numpy()

            # Calculate MSE
            mse = np.mean(np.square(orig - recon))

            # Calculate cosine similarity
            orig_norm = orig / np.linalg.norm(orig, axis=1, keepdims=True)
            recon_norm = recon / np.linalg.norm(recon, axis=1, keepdims=True)
            cosine_sim = np.mean(np.sum(orig_norm * recon_norm, axis=1))

            # Weight by trajectory length
            n_points = orig.shape[0]
            total_mse += mse * n_points
            total_cosine_sim += cosine_sim * n_points
            total_points += n_points

        avg_mse = total_mse / total_points
        avg_cosine_sim = total_cosine_sim / total_points

        return {
            "mse": avg_mse,
            "cosine_similarity": avg_cosine_sim
        }

def prepare_tf_data(trajectories):
    """Convert PyTorch trajectories to TensorFlow tensors if needed"""
    tf_trajectories = []
    for trajectory in trajectories:
        if not isinstance(trajectory, (np.ndarray, tf.Tensor)):
            # Convert PyTorch tensor to numpy
            trajectory = trajectory.cpu().numpy()
        tf_trajectories.append(trajectory)
    return tf_trajectories

def process_trajectories(trajectories):
    """Main function to process trajectories with TensorFlow autoencoder"""
    tf_trajectories = prepare_tf_data(trajectories)

    input_dim = tf_trajectories[0].shape[1]
    latent_dim = 32  # Target reduced dimension

    autoencoder = TFAutoencoder(input_dim=input_dim, latent_dim=latent_dim)
    autoencoder.compile(learning_rate=1e-3)

    # Train the autoencoder
    print("Training autoencoder...")
    history, trajectory_lengths = autoencoder.train(
        tf_trajectories,
        batch_size=64,
        epochs=32,
        validation_split=0.1
    )

    print("Encoding trajectories...")
    encoded_trajectories = autoencoder.encode_trajectories(tf_trajectories)
    print("Decoding trajectories...")
    decoded_trajectories = autoencoder.decode_trajectories(encoded_trajectories)

    print("Evaluating reconstruction quality...")
    metrics = autoencoder.evaluate_reconstruction(tf_trajectories, decoded_trajectories)

    print("\nFinal Evaluation:")
    print(f"Average MSE: {metrics['mse']:.6f}")
    print(f"Average Cosine Similarity: {metrics['cosine_similarity']:.6f}")

    return autoencoder, encoded_trajectories, metrics


def test_component_reconstruction(autoencoder, test_trajectories):
    if not isinstance(test_trajectories[0], (np.ndarray, tf.Tensor)):
        test_trajectories = prepare_tf_data(test_trajectories)

    encoded = autoencoder.encode_trajectories(test_trajectories)
    reconstructed = autoencoder.decode_trajectories(encoded)

    all_orig = np.vstack([t if isinstance(t, np.ndarray) else t.numpy() for t in test_trajectories])
    all_recon = np.vstack([r if isinstance(r, np.ndarray) else r.numpy() for r in reconstructed])

    # Calculate MSE 
    component_mse = np.mean(np.square(all_orig - all_recon), axis=0)

    # Summary statistics
    avg_mse = np.mean(component_mse)

    print(f"Overall MSE: {avg_mse:.6f}")

    return component_mse

def calculate_reconstruction_percentage(autoencoder, test_trajectories):
    if not isinstance(test_trajectories[0], (np.ndarray, tf.Tensor)):
        test_trajectories = prepare_tf_data(test_trajectories)

    encoded = autoencoder.encode_trajectories(test_trajectories)
    reconstructed = autoencoder.decode_trajectories(encoded)

    all_orig = np.vstack([t if isinstance(t, np.ndarray) else t.numpy() for t in test_trajectories])
    all_recon = np.vstack([r if isinstance(r, np.ndarray) else r.numpy() for r in reconstructed])

    mse = np.mean(np.square(all_orig - all_recon))
    variance = np.var(all_orig)
    accuracy_percentage = (1 - mse/variance) * 100

    # Calculate component-wise accuracy
    component_mse = np.mean(np.square(all_orig - all_recon), axis=0)
    component_var = np.var(all_orig, axis=0)
    component_accuracy = (1 - component_mse/component_var) * 100

    print(f"Overall reconstruction accuracy: {accuracy_percentage:.2f}%")
    print(f"Average component-wise accuracy: {np.mean(component_accuracy):.2f}%")
    print(f"Median component-wise accuracy: {np.median(component_accuracy):.2f}%")

    return {
        "overall_accuracy": accuracy_percentage,
        "component_accuracy": component_accuracy
    }

if __name__ == "__main__":
    autoencoder, encoded_trajectories, metrics = process_trajectories(trajectories)
    test_component_reconstruction(autoencoder, trajectories)
    calculate_reconstruction_percentage(autoencoder, trajectories)

Training autoencoder...
Epoch 1/32
[1m256/256[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 42ms/step - cosine_similarity: 0.6532 - loss: 3.9791 - mse: 3.9098 - val_cosine_similarity: 0.7582 - val_loss: 2.9131 - val_mse: 2.8647 - learning_rate: 9.9838e-04
Epoch 2/32
[1m256/256[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 11ms/step - cosine_similarity: 0.7787 - loss: 2.7283 - mse: 2.6841 - val_cosine_similarity: 0.7927 - val_loss: 2.5412 - val_mse: 2.4997 - learning_rate: 9.9355e-04
Epoch 3/32
[1m256/256[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 10ms/step - cosine_similarity: 0.8141 - loss: 2.3399 - mse: 2.3027 - val_cosine_similarity: 0.8169 - val_loss: 2.2861 - val_mse: 2.2495 - learning_rate: 9.8552e-04
Epoch 4/32
[1m256/256[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 11ms/step - cosine_similarity: 0.8341 - loss: 2.1229 - mse: 2.0897 - val_cosine_similarity: 0.8275 - val_loss: 2.1740 - val_mse: 2.1395 - learning_rate: 9.7435e-04
Epoch 5/32
