# Evaluating Embedding Models

This notebook provide the code for evaluating the embedding model based on various criteria. The notebook must be rerun for each different embedding model, and the corresponding model architecture should be applied to each one.

## Setting up the model

### Importing and Arguments
This section sets up the arguments for each model. The parameters that need to be adjusted based on the embedding model used are training_h5_file, eval_h5_file, and n_embd. For the Repressilator model, the number of embeddings is set to 32 when using MLP, KAN, or MLP + KAN, and 64 when using (Encoder KAN + Decoder MLP) or (Encoder MLP + Decoder KAN).

In [3]:
import sys
import os
import logging
import h5py
import torch
import torch.nn as nn
import numpy as np

from typing import Dict, List, Tuple
# Torch imports
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torch.utils.data.dataset import Dataset
from torch.optim.lr_scheduler import ExponentialLR
# Trphysx imports
from trphysx.config import HfArgumentParser
from trphysx.config.args import ModelArguments, TrainingArguments, DataArguments, ArgUtils
from trphysx.embedding import EmbeddingModel
from trphysx.config.configuration_phys import PhysConfig
from trphysx.embedding import EmbeddingTrainingHead
from trphysx.embedding.training import EmbeddingParser, EmbeddingDataHandler, EmbeddingTrainer
from kan import KAN, KANLinear

Tensor = torch.Tensor
TensorTuple = Tuple[torch.Tensor]
FloatTuple = Tuple[float]

logger = logging.getLogger(__name__)




argv = []
argv = argv + ["--training_h5_file", "../../data/Repressilator/One_Parameter/Repressilator_training.hdf5"]
argv = argv + ["--eval_h5_file", "../../data/Repressilator/One_Parameter/Repressilator_validation.hdf5"]
argv = argv + ["--stride", "16"]
#argv = argv + ["--batch_size", "256"]
argv = argv + ["--block_size", "16"]
argv = argv + ["--n_train", "10240"]
argv = argv + ["--n_eval", "2048"]
argv = argv + ["--epochs", "300"]

# Setup logging
logging.basicConfig(
    filename='logging.log',
    filemode='a',
    format="%(asctime)s - %(levelname)s - %(name)s -   %(message)s",
    datefmt="%m/%d/%Y %H:%M:%S",
    level=logging.DEBUG,
    force=True,)

args = EmbeddingParser().parse(argv)
if(torch.cuda.is_available()):
    use_cuda = "cuda"
args.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
logger.info("Torch device: {}".format(args.device))

In [4]:
class RepressilatorConfig(PhysConfig):
    """
    This is the configuration class for the modeling of the Repressilator system.
    """
    # same model as rossler
    #model_type = "rossler"
    model_type = "repressilator"

    def __init__(
        self,
        n_ctx=32,
        n_embd=32,
        n_layer=4,
        n_head=4, # n_head must be a factor of n_embd
        state_dims=[6],
        activation_function="gelu_new",
        initializer_range=0.02,
        **kwargs
    ):
        super().__init__(
            n_ctx=n_ctx,
            n_embd=n_embd,
            n_layer=n_layer,
            n_head=n_head,
            state_dims=state_dims,
            activation_function=activation_function,
            initializer_range=initializer_range,
            **kwargs
        )

### Data Loader

In [5]:
class RepressilatorDataHandler(EmbeddingDataHandler):
    """Embedding data handler for repressilator system.
    Contains methods for creating training and testing loaders,
    dataset class and data collator.
    """
    class RepressilatorDataset(Dataset):
        def __init__(self, examples):
            self.examples = examples

        def __len__(self):
            return len(self.examples)

        def __getitem__(self, i) -> Dict[str, torch.Tensor]:
            return {"states": self.examples[i]}

    class RepressilatorDataCollator:
        """
        Data collator for Repressilator embedding problem
        """
        # Default collator
        def __call__(self, examples:List[Dict[str, torch.Tensor]]) -> Dict[str, torch.Tensor]:

            x_data_tensor =  torch.stack([example['states'] for example in examples])
            return {"states": x_data_tensor}

    def createTrainingLoader(
        self,
        file_path: str,
        block_size: int,
        stride:int = 1,
        ndata:int = -1,
        batch_size:int = 32,
        shuffle=True,
    ) -> DataLoader:
        """Creating embedding training data loader for Repressilator system.
        For a single training simulation, the total time-series is sub-chunked into
        smaller blocks for training.

        Args:
            file_path (str): Path to HDF5 file with training data
            block_size (int): The length of time-series blocks
            stride (int): Stride of each time-series block
            ndata (int, optional): Number of training time-series. If negative, all of the provided
            data will be used. Defaults to -1.
            batch_size (int, optional): Training batch size. Defaults to 32.
            shuffle (bool, optional): Turn on mini-batch shuffling in dataloader. Defaults to True.

        Returns:
            (DataLoader): Training loader
        """
        logger.info('Creating training loader')
        assert os.path.isfile(file_path), "Training HDF5 file {} not found".format(file_path)

        examples = []
        with h5py.File(file_path, "r") as f:
            # Iterate through stored time-series
            samples = 0
            for key in f.keys():
                data_series = torch.Tensor(f[key])
                # Stride over time-series by specified block size
                for i in range(0,  data_series.size(0) - block_size + 1, stride):
                    examples.append(data_series[i : i + block_size].unsqueeze(0))

                samples = samples + 1
                if(ndata > 0 and samples > ndata): #If we have enough time-series samples break loop
                    break

        data = torch.cat(examples, dim=0)
        logger.info("Training data-set size: {}".format(data.size()))

        # Normalize training data
        # Normalize x and y with Gaussian, normalize z with max/min
        self.mu = torch.tensor([torch.mean(data[:,:,0]), torch.mean(data[:,:,1]), torch.mean(data[:,:,2]),torch.mean(data[:,:,3]),torch.mean(data[:,:,4]),torch.mean(data[:,:,5])])
        self.std = torch.tensor([torch.std(data[:,:,0]), torch.std(data[:,:,1]), torch.std(data[:,:,2]),torch.std(data[:,:,3]),torch.std(data[:,:,4]),torch.std(data[:,:,5])])

        # Needs to min-max normalization due to the reservoir matrix, needing to have a spectral density below 1
        if(data.size(0) < batch_size):
            logger.warn('Lower batch-size to {:d}'.format(data.size(0)))
            batch_size = data.size(0)

        dataset = self.RepressilatorDataset(data)
        data_collator = self.RepressilatorDataCollator()
        training_loader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, collate_fn=data_collator, drop_last=True)
        return training_loader

    def createTestingLoader(self,
        file_path: str,
        block_size: int,
        ndata:int = -1,
        batch_size:int=32,
        shuffle=False
    ) -> DataLoader:
        """Creating testing/validation data loader for Repressilator system.
        For a data case with time-steps [0,T], this method extract a smaller
        time-series to be used for testing [0, S], s.t. S < T.

        Args:
            file_path (str): Path to HDF5 file with testing data
            block_size (int): The length of testing time-series
            ndata (int, optional): Number of testing time-series. If negative, all of the provided
            data will be used. Defaults to -1.
            batch_size (int, optional): Testing batch size. Defaults to 32.
            shuffle (bool, optional): Turn on mini-batch shuffling in dataloader. Defaults to False.

        Returns:
            (DataLoader): Testing/validation data loader
        """
        logger.info('Creating testing loader')
        assert os.path.isfile(file_path), "Testing HDF5 file {} not found".format(file_path)

        examples = []
        with h5py.File(file_path, "r") as f:
            # Iterate through stored time-series
            samples = 0
            for key in f.keys():
                data_series = torch.Tensor(f[key])
                # Stride over time-series
                x = 0
                for i in range(0,  data_series.size(0) - block_size + 1, block_size):  # Truncate in block of block_size
                    examples.append(data_series[i : i + block_size].unsqueeze(0))
                    x = x + 1

                samples = samples + 1
                if(ndata > 0 and samples >= ndata): #If we have enough time-series samples break loop
                    break

        # Combine data-series
        data = torch.cat(examples, dim=0)
        logger.info("Testing data-set size: {}".format(data.size()))

        if(data.size(0) < batch_size):
            logger.warn('Lower batch-size to {:d}'.format(data.size(0)))
            batch_size = data.size(0)

        #data = (data - self.mu.unsqueeze(0).unsqueeze(0)) / self.std.unsqueeze(0).unsqueeze(0)
        dataset = self.RepressilatorDataset(data)
        data_collator = self.RepressilatorDataCollator()
        testing_loader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, collate_fn=data_collator,drop_last=False)

        return testing_loader


In [6]:
print(args.n_eval)

data_handler = RepressilatorDataHandler()
# Set up data-loaders

training_loader = data_handler.createTrainingLoader(
    args.training_h5_file,
    block_size=args.block_size,
    stride=args.stride,
    ndata=args.n_train,
    batch_size=args.batch_size)

validation_loader = data_handler.createTestingLoader(
    "../../data/Repressilator/One_Parameter/Repressilator_validation.hdf5",
    block_size=1024,
    ndata=args.n_eval,
    batch_size=1)



2048


  data_series = torch.Tensor(f[key])


## Embedding Model Architecture

This section of the code needs to be modified for each embedding model. Navigate to the corresponding train_repressilator_enn_{desired_architecture}.py file and copy the RepressilatorEmbedding class.

In [7]:
"""## Embedding Neural Network Class"""

class RepressilatorEmbedding(EmbeddingModel):
    """Embedding model for the Repressilator ODE system

    Args:
        config (PhysConfig) Configuration class with transformer/embedding parameters
    """
    #model_name = "embedding_rossler"
    model_name = "embedding_repressilator"

    def __init__(self, config: PhysConfig) -> None:
        """Constructor method
        """
        super().__init__(config)

        hidden_states = int(abs(config.state_dims[0] - config.n_embd)/2) + 1
        hidden_states = 500

        self.observableNet = nn.Sequential(
            nn.Linear(config.state_dims[0], hidden_states),
            nn.ReLU(),
            nn.Linear(hidden_states, config.n_embd),
            nn.LayerNorm(config.n_embd, eps=config.layer_norm_epsilon),
            nn.Dropout(config.embd_pdrop)
        )

        self.recoveryNet = nn.Sequential(
            nn.Linear(config.n_embd, hidden_states),
            nn.ReLU(),
            nn.Linear(hidden_states, config.state_dims[0])
        )
        # Learned koopman operator
        # Learns skew-symmetric matrix with a diagonal
        self.obsdim = config.n_embd
        self.kMatrixDiag = nn.Parameter(torch.linspace(1, 0, config.n_embd))

        xidx = []
        yidx = []
        for i in range(1, 3):
            yidx.append(np.arange(i, config.n_embd))
            xidx.append(np.arange(0, config.n_embd-i))

        self.xidx = torch.LongTensor(np.concatenate(xidx))
        self.yidx = torch.LongTensor(np.concatenate(yidx))
        self.kMatrixUT = nn.Parameter(0.1*torch.rand(self.xidx.size(0)))
        # Normalization occurs inside the model
        self.register_buffer('mu', torch.tensor([0., 0., 0., 0., 0., 0.]))
        self.register_buffer('std', torch.tensor([1., 1., 1., 1., 1., 1.]))
        print('Number of embedding parameters: {}'.format( super().num_parameters ))

    def forward(self, x: Tensor) -> TensorTuple:
        """Forward pass

        Args:
            x (torch.Tensor): [B, 6] Input feature tensor

        Returns:
            (tuple): tuple containing:

                | (torch.Tensor): [B, config.n_embd] Koopman observables
                | (torch.Tensor): [B, 6] Recovered feature tensor
        """
        # Encode
        x = self._normalize(x)
        g = self.observableNet(x)
        # Decode
        out = self.recoveryNet(g)
        xhat = self._unnormalize(out)
        return g, xhat

    def embed(self, x: Tensor) -> Tensor:
        """Embeds tensor of state variables to Koopman observables

        Args:
            x (Tensor): [B, 6] input feature tensor

        Returns:
            (Tensor): [B, config.n_embd] Koopman observables
        """
        x = self._normalize(x)
        g = self.observableNet(x)
        return g

    def recover(self, g: Tensor) -> Tensor:
        """Recovers feature tensor from Koopman observables

        Args:
            g (Tensor): [B, config.n_embd] Koopman observables

        Returns:
            (Tensor): [B, 6] Physical feature tensor
        """
        out = self.recoveryNet(g)
        x = self._unnormalize(out)
        return x

    def koopmanOperation(self, g: Tensor) -> Tensor:
        """Applies the learned koopman operator on the given observables.

        Args:
            (Tensor): [B, config.n_embd] Koopman observables

        Returns:
            (Tensor): [B, config.n_embd] Koopman observables at the next time-step
        """
        # Koopman operator
        kMatrix = Variable(torch.zeros(self.obsdim, self.obsdim)).to(self.kMatrixUT.device)
        # Populate the off diagonal terms
        kMatrix[self.xidx, self.yidx] = self.kMatrixUT
        kMatrix[self.yidx, self.xidx] = -self.kMatrixUT

        # Populate the diagonal
        ind = np.diag_indices(kMatrix.shape[0])
        kMatrix[ind[0], ind[1]] = self.kMatrixDiag

        # Apply Koopman operation
        gnext = torch.bmm(kMatrix.expand(g.size(0), kMatrix.size(0), kMatrix.size(0)), g.unsqueeze(-1))
        self.kMatrix = kMatrix
        return gnext.squeeze(-1) # Squeeze empty dim from bmm

    @property
    def koopmanOperator(self, requires_grad: bool = True) -> Tensor:
        """Current Koopman operator

        Args:
            requires_grad (bool, optional): if to return with gradient storage, defaults to True
        """
        if not requires_grad:
            return self.kMatrix.detach()
        else:
            return self.kMatrix

    def _normalize(self, x: Tensor) -> Tensor:
        return (x - self.mu.unsqueeze(0))/self.std.unsqueeze(0)

    def _unnormalize(self, x: Tensor) -> Tensor:
        return self.std.unsqueeze(0)*x + self.mu.unsqueeze(0)

    @property
    def koopmanDiag(self):
        return self.kMatrixDiag


## Argument Utility and Loading the model
this part you just need to run it without modeification

### Arg Util

In [8]:
"""
=====
Distributed by: Notre Dame SCAI Lab (MIT Liscense)
- Associated publication:
url: https://arxiv.org/abs/2010.03957
doi:
github: https://github.com/zabaras/transformer-physx
=====
"""
import os
from dataclasses import dataclass, field
from typing import Optional, Tuple #Needs python 3.8 for literal

HOME = os.getcwd()
INITS = ['lorenz', 'cylinder', 'grayscott']

@dataclass
class ModelArguments:
    """
    Arguments pertaining to which model/config/tokenizer we are going to fine-tune, or train from scratch.
    """
    init_name: str = field(
        default='lorenz', metadata={"help": "Used as a global default initialization token for different experiments."}
    )
    model_name: str = field(
        default=None, metadata={"help": "The name model of the transformer model"},
    )
    config_name: str = field(
        default=None, metadata={"help": "Pretrained config name or path if not the same as model_name"}
    )
    embedding_name: str = field(
        default=None, metadata={"help": "Pretrained embedding model name"}
    )
    embedding_file_or_path: str = field(
        default=None, metadata={"help": "Pretrained embedding model path"}
    )
    transformer_file_or_path: str = field(
        default=None, metadata={"help": "Pretrained transformer model path"}
    )
    viz_name: str = field(
        default=None, metadata={"help": "Visualization class name"}
    )

@dataclass
class DataArguments:
    """
    Arguments pertaining to training and evaluation data.
    """
    n_train: int = field(
        default=2048, metadata={"help": "Number of training time-series to use"}
    )
    n_eval: int = field(
        default=256, metadata={"help": "Number of evaluation time-series to use"}
    )
    stride: int = field(
        default=32, metadata={"help": " Stride to segment the training data at"}
    )
    training_h5_file: str = field(
        default=None, metadata={"help": "File path to the training data hdf5 file"}
    )
    eval_h5_file: str = field(
        default=None, metadata={"help": "File path to the evaluation data hdf5 file"}
    )
    overwrite_cache: bool = field(
        default=False, metadata={"help": "Overwrite the cached training and evaluation sets"}
    )
    cache_path:str= field(
        default=None, metadata={"help": "File directory to write cache file to"}
    )

@dataclass
class TrainingArguments:
    """
    Arguments pertaining to what data we are going to input our model for training and eval.
    """
    block_size: int = field(
        default=-1,
        metadata={
            "help": "Optional input sequence length after tokenization."
            "The training dataset will be truncated in block of this size for training."
            "Default to the model max input length for single sentence inputs (take into account special tokens)."
        },
    )
    # Training paths for logging, checkpoints etc.
    exp_dir: str = field(
        default=None, metadata={"help": "Directory to store data related to the experiment"}
    )
    ckpt_dir: str = field(
        default=None, metadata={"help": "Directory to save model checkpoints during training"}
    )
    plot_dir: str = field(
        default=None, metadata={"help": "Directory to save plots during training"}
    )
    save_steps: int = field(
        default=25, metadata={"help": "Epoch stride to save checkpoints"}
    )
    eval_steps: int = field(
        default=25, metadata={"help": "Epoch stride to evaluate validation data-set"}
    )
    plot_max: int = field(
        default=3, metadata={"help": "Max number of eval cases to plot"}
    )

    epoch_start: int = field(
        default=0, metadata={"help": "Epoch to start training at"}
    )
    epochs: int = field(
        default=200, metadata={"help": "Number of epochs to train"}
    )

    # ===== Optimization parameters =====
    lr: float = field(
        default=0.001, metadata={"help": "Learning rate"}
    )
    max_grad_norm: float = field(
        default=0.1, metadata={"help": "Norm limit for clipping gradients"}
    )
    dataloader_drop_last: bool = field(
        default=True, metadata={"help": "Drop training cases no in a full mini-batch"}
    )
    gradient_accumulation_steps: int = field(
        default=int(1), metadata={"help": "How many mini-batches to compute before updating weights"}
    )

    # ===== Data loader parameters =====
    train_batch_size: int = field(
        default=256, metadata={"help": "Number of training cases in mini-batch"}
    )
    eval_batch_size: int = field(
        default=16, metadata={"help": "Number of evaluation cases in mini-batch"}
    )

    # ===== Parallel parameters =====
    local_rank: int = field(
        default=-1, metadata={"help": "Local rank of the CPU process, -1 means just use a single CPU"}
    )
    n_gpu: int = field(
        default=1, metadata={"help": "Number of GPUs per CPU"}
    )
    seed: int = field(
        default=12345, metadata={"help": "Random seed for reproducibility"}
    )
    notes: str = field(
        default=None, metadata={"help": "Notes that will be appended to experiment folder"}
    )


class ArgUtils:
    """Argument utility class for modifying particular arguments after initialization
    """
    @classmethod
    def config(
        cls,
        modelArgs: ModelArguments,
        dataArgs: DataArguments,
        trainingArgs: TrainingArguments,
        create_paths: bool = True
    ) -> Tuple[ModelArguments, DataArguments, TrainingArguments]:
        """Runs additional runtime configuration updates for argument instances

        Args:
            modelArgs (ModelArguments): Transformer model arguments
            dataArgs (DataArguments): Data loader/ data set arguments
            trainingArgs (TrainingArguments): Training arguments
            create_paths (bool, optional): Create training/testing folders. Defaults to True.

        Returns:
            Tuple[ModelArguments, DataArguments, TrainingArguments]: Updated argument instances
        """
        modelArgs = cls.configModelNames(modelArgs)

        if create_paths:
            modelArgs, dataArgs, trainingArgs = cls.configPaths(modelArgs, dataArgs, trainingArgs)

        trainingArgs = cls.configTorchDevices(trainingArgs)

        return modelArgs, dataArgs, trainingArgs

    @classmethod
    def configModelNames(cls, modelArgs: ModelArguments) -> ModelArguments:
        # Set up model, config, viz and embedding names
        if not modelArgs.init_name in INITS:
            logger.warn('Selected init name not in built-in models. Be careful.')

        attribs = ["model_name", "config_name", "embedding_name", "viz_name"]
        for attrib in attribs:
            if getattr(modelArgs, attrib) is None:
                setattr(modelArgs, attrib, modelArgs.init_name)

        return modelArgs

    @classmethod
    def configPaths(
        cls,
        modelArgs: ModelArguments,
        dataArgs: DataArguments,
        trainingArgs: TrainingArguments
    ) -> Tuple[ModelArguments, DataArguments, TrainingArguments]:
        """Sets up various folder path parameters

        Args:
            modelArgs (ModelArguments): Transformer model arguments
            dataArgs (DataArguments): Data loader/ data set arguments
            trainingArgs (TrainingArguments): Training arguments

        Returns:
            Tuple[ModelArguments, DataArguments, TrainingArguments]: Updated argument instances
        """
        if(trainingArgs.exp_dir is None):
            trainingArgs.exp_dir = os.path.join(HOME, 'outputs', 'transformer_{:s}'.format(modelArgs.config_name), \
                    'ntrain{:d}_epochs{:d}_batch{:d}'.format(dataArgs.n_train, trainingArgs.epochs, trainingArgs.train_batch_size))
            if trainingArgs.notes: # If notes add them to experiment folder name
                trainingArgs.exp_dir = os.path.join(os.path.dirname(trainingArgs.exp_dir), os.path.basename(trainingArgs.exp_dir)+'_{:s}'.format(trainingArgs.notes))

        if(trainingArgs.ckpt_dir is None):
            trainingArgs.ckpt_dir = os.path.join(trainingArgs.exp_dir, 'checkpoints')

        if(trainingArgs.plot_dir is None):
            trainingArgs.plot_dir = os.path.join(trainingArgs.exp_dir, 'viz')

        # Create directories if they don't exist already
        os.makedirs(trainingArgs.exp_dir, exist_ok=True)
        os.makedirs(trainingArgs.ckpt_dir, exist_ok=True)
        os.makedirs(trainingArgs.plot_dir, exist_ok=True)

        return modelArgs, dataArgs, trainingArgs

    @classmethod
    def configTorchDevices(cls, args: TrainingArguments) -> TrainingArguments:
        """Sets up device ids for training

        Args:
            args (TrainingArguments): Training arguments

        Returns:
            TrainingArguments: Updated argument instance
        """
        # Set up parallel PyTorch device(s)
        if(torch.cuda.device_count() > 1 and args.n_gpu > 1):
            if(torch.cuda.device_count() < args.n_gpu):
                args.n_gpu = torch.cuda.device_count()
            if(args.n_gpu < 1):
                args.n_gpu = torch.cuda.device_count()
            logging.info("Looks like we have {:d} GPUs to use. Going parallel.".format(args.n_gpu))
            args.device_ids = [i for i in range(0,args.n_gpu)]
            args.src_device = "cuda:{}".format(args.device_ids[0])
        # Set up parallel PyTorch single GPU device
        elif(torch.cuda.is_available()):
            logging.info("Using a single GPU for training.")
            args.device_ids = [0]
            args.src_device = "cuda:{}".format(args.device_ids[0])
            args.n_gpu = 1
        # CPU only
        else:
            logging.info("No GPUs found, will be training on CPU.")
            args.src_device = "cpu"

        return args

### Loading the model

In [9]:

# Parse arguments using the hugging face argument parser
parser = HfArgumentParser((ModelArguments, DataArguments, TrainingArguments))
model_args, data_args, training_args = parser.parse_args_into_dataclasses(argv)

model_args, data_args, training_args = ArgUtils.config(model_args, data_args, training_args)
# Rossler configuration
config = RepressilatorConfig()
# Load embedding model
embedding_model = RepressilatorEmbedding(config).to(training_args.src_device)


Number of embedding parameters: 39195


## Evaluation of Prediction and Reconstruction

In [8]:
def evaluate_reconstruction(eval_dataloader: DataLoader, embedding_model: EmbeddingModel):

    with torch.no_grad():
        test_loss = 0
        for states in eval_dataloader:

            states = states['states']
                
            embedding_model.eval()
            device = embedding_model.devices[0]

            mseLoss = nn.MSELoss()

            yTarget = states[:,:].to(device)
            xInput = states[:,:].to(device)
            yPred = torch.zeros(yTarget.size()).to(device) 
            # Test accuracy of one time-step
            for i in range(xInput.size(1)):
                xInput0 = xInput[:,i].to(device)
                g0 = embedding_model.embed(xInput0)
                yPred0 = embedding_model.recover(g0).detach()
                yPred[:,i] = yPred0.squeeze()
            
            loss = mseLoss(yTarget, yPred)/len(eval_dataloader)
            test_loss = test_loss + loss
            test_loss = test_loss 
    return test_loss



In [9]:
# evaluate the next prediction of the model
def evaluate_next_prediction(eval_dataloader: DataLoader, embedding_model: EmbeddingModel):

    with torch.no_grad():
        test_loss = 0
        for states in eval_dataloader:

            states = states['states']
            
                
            embedding_model.eval()
            device = embedding_model.devices[0]

            mseLoss = nn.MSELoss()

            yTarget = states[:,1:].to(device)
            xInput = states[:,:-1].to(device)
            yPred = torch.zeros(yTarget.size()).to(device) 
            # Test accuracy of one time-step
            for i in range(xInput.size(1)):
                xInput0 = xInput[:,i].to(device)
                g0 = embedding_model.embed(xInput0)
                g1 = embedding_model.koopmanOperation(g0)
                yPred0 = embedding_model.recover(g1).detach()
                yPred[:,i] = yPred0.squeeze()
            
            loss = mseLoss(yTarget, yPred) / len(eval_dataloader)
            test_loss = test_loss + loss
            test_loss = test_loss 
    return test_loss

 Write down the path for the checkpoints for the embedding model 

In [11]:
embedding_path_m = "/Users/mohammedahmed/Desktop/Project/PROJECT/transformer-physx-main/examples/Repressilator_with_6_dimensions/mlp/outputs/embedding_mlp/ntrain10240_epochs300_batch256/checkpoints"
mds = [25,50,75,100,125,150,175,200,225,250,275,300]
# use the embedding path and change the embedding model using mds and save the test loss of each model in a list
test_reconstruction_loss_list = []
test_pred_loss_list = []

for md in mds:
    embedding_model.load_model(os.path.join(embedding_path_m, f'embedding_repressilator{md}.pth'))
    test_r_loss = evaluate_reconstruction(validation_loader, embedding_model)
    test_pred_loss = evaluate_next_prediction(validation_loader, embedding_model)
    test_reconstruction_loss_list.append(test_r_loss)
    test_pred_loss_list.append(test_pred_loss)
    print('Test Reconstruction loss: {:.10f}'.format(test_r_loss))
    print('Test Prediction loss: {:.10f}'.format(test_pred_loss))


Test Reconstruction loss: 0.0274432618
Test Prediction loss: 0.4777341187
Test Reconstruction loss: 0.0314728431
Test Prediction loss: 0.4231641293
Test Reconstruction loss: 0.0290263481
Test Prediction loss: 0.1733724177
Test Reconstruction loss: 0.0157393701
Test Prediction loss: 0.1774320900
Test Reconstruction loss: 0.0110473009
Test Prediction loss: 0.1319861710
Test Reconstruction loss: 0.0158657823
Test Prediction loss: 0.1147892997
Test Reconstruction loss: 0.0057862415
Test Prediction loss: 0.0943471491
Test Reconstruction loss: 0.0049849944
Test Prediction loss: 0.0891392678
Test Reconstruction loss: 0.0039107669
Test Prediction loss: 0.0852381065
Test Reconstruction loss: 0.0053483546
Test Prediction loss: 0.0817463323
Test Reconstruction loss: 0.0023220070
Test Prediction loss: 0.0760205612
Test Reconstruction loss: 0.0018877045
Test Prediction loss: 0.0694253147


In [12]:
# move both losses to cpu and convert them to numpy arrays
test_reconstruction_loss_list = torch.tensor(test_reconstruction_loss_list).cpu().numpy()
test_pred_loss_list = torch.tensor(test_pred_loss_list).cpu().numpy()

print(f'reconstruction loss list mlp: {test_reconstruction_loss_list}')
print(f'prediction loss list mlp: {test_pred_loss_list}')


reconstruction loss list mlp: [0.02744326 0.03147284 0.02902635 0.01573937 0.0110473  0.01586578
 0.00578624 0.00498499 0.00391077 0.00534835 0.00232201 0.0018877 ]
prediction loss list mlp: [0.47773412 0.42316413 0.17337242 0.17743209 0.13198617 0.1147893
 0.09434715 0.08913927 0.08523811 0.08174633 0.07602056 0.06942531]


## Evaluating the Isotropic Measurements

In [10]:
embedding_model.load_model(os.path.join("./mlp/outputs/embedding_mlp/ntrain10240_epochs300_batch256/checkpoints", f'embedding_repressilator300.pth'))

  self.load_state_dict(torch.load(file_or_path_directory, map_location=lambda storage, loc: storage))


### Generate the Original data and the embedded data

In [12]:


embedding_model.eval()
device = embedding_model.devices[0]
with torch.no_grad():
    embedding = []
    true_data = []
    for states in validation_loader:
        states = states['states']
        for i in range(states.size(1)):
            xInput0 = states[:,i].to(device)
            g0 = embedding_model.embed(xInput0)
            g0 = g0.permute(1,0)
            embedding.append(g0)
            true_data.append(states[:,i])
    

In [13]:
# transform embedding into numpy array
embedding = torch.cat(embedding, dim=1).cpu().numpy()
embedding = embedding.T
print(embedding.shape)
true_data = torch.cat(true_data, dim=0).cpu().numpy()
print(true_data.shape)


(65536, 32)
(65536, 6)


### Cosine Similarity

In [14]:
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

def average_cosine_similarity(data, num_samples=10000):
  """Calculates the average cosine similarity for a given number of random points.

  Args:
    data: A numpy array of shape (p, d) where p is the number of points and d is the dimension.
    num_samples: The number of random points to sample.

  Returns:
    The average cosine similarity between the sampled points.
  """

  p, d = data.shape
  if num_samples > p:
    raise ValueError("Number of samples cannot be greater than the number of points")

  # Sample random indices without replacement
  random_indices = np.random.choice(p, size=num_samples, replace=False)

  # Extract the corresponding points
  sampled_data = data[random_indices]

  # Calculate pairwise cosine similarities
  similarities = cosine_similarity(sampled_data)

  # Exclude self-similarities (diagonal elements)
  similarities = similarities[np.triu_indices_from(similarities, k=1)]

  # Calculate the average cosine similarity
  average_similarity = np.mean(similarities)

  return average_similarity

In [15]:
print(f'Average cosine similarity score: {average_cosine_similarity(embedding)}')

Average cosine similarity score: 0.8895577788352966


In [16]:
print(f'Average cosine similarity score: {average_cosine_similarity(true_data)}')

Average cosine similarity score: 0.6333780288696289


### Partition Score

#### for the embedding of the MLP, KAN and MLP +KAN I used the function below for calculating the exact partition score

In [11]:
def partition_score(points):
    """Partition score of isotropy taken from Mu et al. 2018.  
       1 is isotropic and 0 in anisotropic."""
    _, C = np.linalg.eig(np.matmul(points,points.T))
    scores = []
    for c in C:
        scores.append(np.sum(np.exp(c*points.T)))
    return min(scores)/max(scores) 

#### Given the substantial size of the embedding space, we opted to approximate rather than compute exact values for the (KAN encoder and MLP decoder) and (MLP encoder and MLP decoder) architectures. The use of exact calculations resulted in kernel crashes.For this reason, we employed the following function:

In [25]:
import numpy as np

def partition_score(data, num_eigenvalues=100):
  """Calculates an approximation of the partition score.

  Args:
    data: A numpy array of shape (num_points, dimensions).
    num_eigenvalues: The number of eigenvalues to consider for approximation.

  Returns:
    An approximation of the partition score.
  """
  data = np.array(data)- np.mean(data)
  # Calculate covariance matrix
  cov_matrix = np.cov(data.T)

  # Calculate eigenvalues and eigenvectors
  eigenvalues, eigenvectors = np.linalg.eig(cov_matrix)

  scores = []
  
  for i in range(len(eigenvalues)):
    score = np.sum(np.exp(eigenvalues[i] * data))
    scores.append(score)
  arranged_scores = np.sort(scores)
  return min(scores)/max(scores) 

In [12]:
print(f'partition score: {partition_score(embedding)}')

: 

### The VarEx Score

In [13]:
import numpy as np
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

def varex_score(points, p=0.3):
  """Computes how uniformly the first p% of principal components of points are distributed.

  Args:
    points: A numpy array of shape (number of points, dimension) representing the points.
    p: The percentage of principal components to consider.

  Returns:
    The varex score, a measure of uniformity in variance distribution.
  """

  _, n = points.shape
  print(points.shape)
  #num_components = int(np.floor(n * p))
  num_components = 3

  scaler = StandardScaler()
  data_standardized = scaler.fit_transform(points)
  pca_model = PCA(n_components=num_components)
  pc_embed = pca_model.fit_transform(data_standardized)

  # Explained variance for each principal component
  explained_variance_ratio = pca_model.explained_variance_ratio_

  # Varex score (total variance explained)
  varex_score = np.sum(explained_variance_ratio)

  print(f"Explained Variance Ratio: {explained_variance_ratio}")
  print(f"Varex Score (Variance Explained): {varex_score}")

  return varex_score


In [14]:
print(f'varex score: {varex_score(embedding, p=0.2)}')

(65536, 32)
Explained Variance Ratio: [0.34522158 0.29615283 0.15111923]
Varex Score (Variance Explained): 0.7924936413764954
varex score: 0.7924936413764954


### IsoScore

In [None]:
from IsoScore import IsoScore
# transform the embedding to cpu
embedding = embedding
score = IsoScore.IsoScore(embedding)
print(score)

## UMAP Functions

In [17]:
import umap
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Example data:
# original_data: 6-dimensional data
np.random.seed(42)
labels = np.random.randint(0, 5, 65536)   # 5 classes for coloring



  from .autonotebook import tqdm as notebook_tqdm


In [18]:
# For original 6D space
umap_original = umap.UMAP(n_neighbors=64, min_dist=0.1, n_components=2)
original_data_2d = umap_original.fit_transform(true_data)

# For embedded 32D space
umap_embedded = umap.UMAP(n_neighbors=64, min_dist=0.05, n_components=2)
embedded_data_2d = umap_embedded.fit_transform(embedding)

OMP: Info #276: omp_set_nested routine deprecated, please use omp_set_max_active_levels instead.


In [19]:
# plot the original and embedded data in 2d
plt.figure(figsize=(14, 7))

# Plot original data
plt.subplot(1, 2, 1)
plt.scatter(original_data_2d[:, 0], original_data_2d[:, 1], c=labels, cmap='Spectral', s=5)
plt.title('UMAP Projection of Original 6D Data')

# Plot embedded data
plt.subplot(1, 2, 2)
plt.scatter(embedded_data_2d[:, 0], embedded_data_2d[:, 1], c=labels, cmap='Spectral', s=5)
plt.title('UMAP Projection of Embedded 64D Data')

plt.savefig('original_embedded_data_eKdM.png')

### Jaccard and Hit Rate Score

In [20]:
from sklearn.neighbors import NearestNeighbors
import numpy as np

# Number of neighbors to consider
n_neighbors = 10  # or any number appropriate for your analysis

# Calculate neighbors for original data
nbrs_original = NearestNeighbors(n_neighbors=n_neighbors).fit(true_data)
distances_original, indices_original = nbrs_original.kneighbors(true_data)

# Calculate neighbors for each embedding
nbrs_embedded_1 = NearestNeighbors(n_neighbors=n_neighbors).fit(embedding)
distances_embedded_1, indices_embedded_1 = nbrs_embedded_1.kneighbors(embedding)


print(f'Original Data Neighbors: {indices_original}')
print(f'Embedded Data Neighbors: {indices_embedded_1}')

Original Data Neighbors: [[    0 43009     1 ... 57345 43011 57346]
 [    1 43010     2 ... 43012 43008 57345]
 [    2     3 43011 ... 43013 43009     5]
 ...
 [65533 21270 28430 ...  7971 43894 39912]
 [65534 21271 28431 ... 16356  7972 39913]
 [65535 21272 28432 ... 16357 18328 39914]]
Embedded Data Neighbors: [[    0 43009 43008 ... 57344 57345     3]
 [    1 43010     2 ... 43008 43012 57345]
 [    2 43011     3 ... 43013 43009     0]
 ...
 [65533 21270 28430 ... 60210 20379 54177]
 [65534 21271 28431 ... 60211 54178 20380]
 [65535 21272 28432 ... 60212 54179 18328]]


In [21]:
def jaccard_index(list1, list2):
    intersection = len(set(list1).intersection(list2))
    union = len(set(list1).union(list2))
    return intersection / union

jaccard_scores_1 = []
for i in range(len(true_data)):
    jaccard = jaccard_index(indices_original[i], indices_embedded_1[i])
    jaccard_scores_1.append(jaccard)

In [22]:
mean_jaccard_1 = np.mean(jaccard_scores_1)
print(f"Mean Jaccard Index for Embedding 1: {mean_jaccard_1}")


Mean Jaccard Index for Embedding 1: 0.7553834268991722


In [23]:
def hit_rate(original_indices, embedded_indices):
    hits = 0
    for i in range(len(original_indices)):
        hits += len(set(original_indices[i]).intersection(embedded_indices[i]))
    return hits / (len(original_indices) * len(original_indices[0]))

hit_rate_1 = hit_rate(indices_original, indices_embedded_1)

print(f"Hit Rate for Embedding 1: {hit_rate_1}")


Hit Rate for Embedding 1: 0.8509124755859375
