## Configuration

In [None]:
! pip install pytorch-lightning wandb

In [None]:
import numpy as np
import wandb
import subprocess
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import torch
from torch import nn
from sklearn.metrics import mean_squared_error
from torch.utils.data import DataLoader, TensorDataset
import torch.nn.functional as F
import pytorch_lightning as pl

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Hyperparameters

In [None]:
MLP = True
ATN = False
RNN = False

nine_six = False

tsteps = 3

MAX_EPOCHS = 3000

## Helpers

In [None]:

if MLP:
  # Define the MLP model
  class Generator(nn.Module):
      def __init__(self, input_size, hidden_size, output_size):
          super(Generator, self).__init__()
          self.layers = nn.Sequential(
              nn.Linear(input_size, hidden_size),
              nn.ReLU(),
              nn.Linear(hidden_size, hidden_size),
              nn.ReLU(),
              nn.Linear(hidden_size, output_size)
          )

      def forward(self, x):
          return self.layers(x)

if ATN:
  class Generator(nn.Module):
      def __init__(self, input_size, hidden_size, output_size, nhead=16, num_layers=3):
          super(Generator, self).__init__()

          self.embedding = nn.Linear(input_size, hidden_size)
          self.transformer = nn.Transformer(hidden_size, nhead, num_layers)
          self.fc = nn.Linear(hidden_size, output_size)

      def forward(self, src, tgt):
        src = self.embedding(src)
        tgt = self.embedding(tgt)
        x = self.transformer(src, tgt)
        x = self.fc(x)
        return x

if RNN:
  class LSTMGenerator(nn.Module):
      def __init__(self, input_size, hidden_size, output_size, num_layers=1):
          super(LSTMGenerator, self).__init__()
          self.hidden_size = hidden_size
          self.num_layers = num_layers

          self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
          self.fc = nn.Linear(hidden_size, output_size)

      def forward(self, x, hidden=None):
          if hidden is None:
              hidden = self.init_hidden(x.size(0))

          out, hidden = self.lstm(x, hidden)
          out = self.fc(out)

          return out, hidden

      def init_hidden(self, batch_size):
          h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size)
          c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size)
          return (h0, c0)

if MLP or RNN:
  class LorenzSystemModule(pl.LightningModule):
      def __init__(self, model, learning_rate=1e-3):
          super().__init__()
          self.model = model
          self.learning_rate = learning_rate
          self.loss_function = nn.MSELoss()

      def forward(self, x):
          return self.model(x)

      def training_step(self, batch, batch_idx):
          x, y = batch
          y_pred = self(x)
          loss = self.loss_function(y_pred, y)
          self.log("train_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
          wandb.log({"train_loss": loss})
          return loss

      def validation_step(self, batch, batch_idx):
          x, y = batch
          y_pred = self(x)
          loss = self.loss_function(y_pred, y)
          self.log("val_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
          wandb.log({"val_loss": loss})
          return loss

      def configure_optimizers(self):
          optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)
          return optimizer

if ATN:
  class LorenzSystemModule(pl.LightningModule):
      def __init__(self, model, learning_rate=1e-3):
          super().__init__()
          self.model = model
          self.learning_rate = learning_rate
          self.loss_function = nn.MSELoss()

      def forward(self, x, y):
          return self.model(x, y)

      def training_step(self, batch, batch_idx):
          x, y = batch
          y_pred = self(x, y)
          print("REAL")
          print(y)
          print("FAKE")
          print(y_pred)
          loss = self.loss_function(y_pred, y)
          self.log("train_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
          wandb.log({"train_loss": loss})
          return loss

      def validation_step(self, batch, batch_idx):
          x, y = batch
          y_pred = self(x, y)
          loss = self.loss_function(y_pred, y)
          self.log("val_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
          wandb.log({"val_loss": loss})
          return loss

      def configure_optimizers(self):
          optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)
          return optimizer

## Experiment

In [None]:
from scipy.integrate import odeint
from sklearn.preprocessing import MinMaxScaler

def execute_experiment(experiment_config):
  experiment_title = experiment_config[0]
  Sparsity = experiment_config[1]
  wandb.init(name=experiment_title, project="chaotic-rnn", entity="patrickallencooper")

  hidden = 64

  USE_SPARSE = True

  if nine_six:
    def lorenz_system(T, dt, N=3, F=20, init_condition=None):
      
      def lorenz_96_deriv(X, t0, N=N, F=F):
          dXdt = np.zeros(N)
          for i in range(N):
              dXdt[i] = (X[(i+1) % N] - X[(i-2) % N]) * X[(i-1) % N] - X[i] + F
          return dXdt

      if init_condition is None:
          init_condition = [1.0, 1.0, 1.0]

      time_points = np.arange(0, T, dt)
      trajectory = odeint(lorenz_96_deriv, init_condition, time_points)
      return trajectory.T

  else:
    def lorenz_system(T, dt, init_condition=None):
        sigma, beta, rho = 10, 2.667, 28

        def lorenz_deriv(X, t0, sigma=sigma, beta=beta, rho=rho):
            x, y, z = X
            return [sigma * (y - x), x * (rho - z) - y, x * y - beta * z]

        if init_condition is None:
            init_condition = [1.0, 1.0, 1.0]

        time_points = np.arange(0, T, dt)
        trajectory = odeint(lorenz_deriv, init_condition, time_points)
        x, y, z = trajectory.T
        return x, y, z

  def generate_initial_conditions(n_points, x_range, y_range, z_range):
      x0 = np.random.uniform(x_range[0], x_range[1], n_points)
      y0 = np.random.uniform(y_range[0], y_range[1], n_points)
      z0 = np.random.uniform(z_range[0], z_range[1], n_points)
      return np.array([x0, y0, z0]).T

  # Generate the Lorenz system data
  n_points = 1
  x_range = [-20.0, 20.0]
  y_range = [-20.0, 20.0]
  z_range = [-20.0, 20.0]
  T, dt = 100, 0.005

  print("Generating Initial Conditions:")
  initial_conditions = generate_initial_conditions(n_points, x_range, y_range, z_range)

  print("Integrating trajectories:")
  data_list = []
  for init_condition in initial_conditions:
      x, y, z = lorenz_system(T, dt, init_condition)
      data = np.array([x[:-1], y[:-1], z[:-1], x[1:], y[1:], z[1:]]).T
      data_list.append(data)

  data = np.concatenate(data_list, axis=0)

  scaler = MinMaxScaler()
  data = scaler.fit_transform(data)

  print("Pre Sparsity Length: " + str(len(data)))
  if USE_SPARSE:
    data = data[::Sparsity]
    print("Post Sparsity Length: " + str(len(data)))
  train_data_pre = torch.Tensor(data)

  train_data, test_data = train_data_pre[:int(len(train_data_pre) * 0.8)], train_data_pre[int(len(train_data_pre) * 0.8):]
  train_data_sparse = train_data
  test_data_sparse = test_data

  # Split the data into training and test datasets
  train_dataset_sparse = TensorDataset(torch.Tensor(train_data_sparse[:, :tsteps]), torch.Tensor(train_data_sparse[:, tsteps:]))
  test_dataset_sparse = TensorDataset(torch.Tensor(test_data_sparse[:, :tsteps]), torch.Tensor(test_data_sparse[:, tsteps:]))

  train_loader = DataLoader(train_dataset_sparse, batch_size=2048, shuffle=True)
  val_loader = DataLoader(test_dataset_sparse, batch_size=2048, shuffle=False)

  input_size, hidden_size, output_size = tsteps, hidden, tsteps
  model = Generator(input_size, hidden_size, output_size)
  lorenz_system_module = LorenzSystemModule(model)

  # Train the model and compute validation loss
  trainer = pl.Trainer(max_epochs=MAX_EPOCHS)
  trainer.fit(lorenz_system_module, train_loader, val_loader)

  model_scripted = torch.jit.script(model)
  model_scripted.save(experiment_title + '.pt')

  model_cloud = '! cp ' + experiment_title + '.pt drive/MyDrive/"Colab Notebooks"/Classes/"Chaotic Dynamics"/"Models"'

  subprocess.run(model_cloud, stdout=subprocess.PIPE, shell=True)

  # Test the model
  model.eval()
  with torch.no_grad():
      initial_state = torch.Tensor(train_data[0, :tsteps]).unsqueeze(0)
      predicted_trajectory = [initial_state]
      for _ in range(len(train_data) - 1):
        if MLP:
          prediction = model(predicted_trajectory[-1])
        else:
          prediction = model(predicted_trajectory[-1], predicted_trajectory[0])
        predicted_trajectory.append(prediction)

  predicted_trajectory = torch.cat(predicted_trajectory).detach().numpy()
  #predicted_trajectory = scaler.inverse_transform(predicted_trajectory)

  # Plot the predicted and true Lorenz system trajectories
  fig = plt.figure(figsize=(12, 6))

  ax1 = fig.add_subplot(121, projection='3d')
  ax1.plot(x, y, z, linewidth=1, color='red')
  ax1.set_title("True Lorenz System Trajectory")
  ax1.set_xlabel("X")
  ax1.set_ylabel("Y")
  ax1.set_zlabel("Z")

  ax2 = fig.add_subplot(122, projection='3d')
  ax2.plot(predicted_trajectory[:, 0], predicted_trajectory[:, 1], predicted_trajectory[:, 2], linewidth=1, color='blue')
  ax2.set_title("Predicted Lorenz System Trajectory")
  ax2.set_xlabel("X")
  ax2.set_ylabel("Y")
  ax2.set_zlabel("Z")

  plt.savefig(experiment_title + ".png", dpi=300)

  image_cloud = '! cp ' + experiment_title + '.png drive/MyDrive/"Colab Notebooks"/Classes/"Chaotic Dynamics"/"Images"'

  subprocess.run(image_cloud, stdout=subprocess.PIPE, shell=True)

  # Test the model with extended timesteps
  model.eval()
  with torch.no_grad():
      initial_state = torch.Tensor(test_data[0, :tsteps]).unsqueeze(0)
      predicted_trajectory_test = [initial_state]
      for _ in range(len(test_data) - 1):
        if MLP:
          prediction = model(predicted_trajectory_test[-1])
        else:
          prediction = model(predicted_trajectory_test[-1], predicted_trajectory_test[0])
        predicted_trajectory_test.append(prediction)

  predicted_trajectory_test = torch.cat(predicted_trajectory_test).detach().numpy()
  #predicted_trajectory_test = scaler.inverse_transform(predicted_trajectory_test)

  # Plot the predicted and true Lorenz system trajectories with extended timesteps
  fig = plt.figure(figsize=(18, 6))

  ax1 = fig.add_subplot(131, projection='3d')
  ax1.plot(test_data[:, 0], test_data[:, 1], test_data[:, 2], linewidth=1, color='red')
  ax1.set_title("True Lorenz System Trajectory (Extended)")
  ax1.set_xlabel("X")
  ax1.set_ylabel("Y")
  ax1.set_zlabel("Z")

  ax2 = fig.add_subplot(132, projection='3d')
  ax2.plot(predicted_trajectory[:, 0], predicted_trajectory[:, 1], predicted_trajectory[:, 2], linewidth=1, color='blue')
  ax2.set_title("Predicted Lorenz System Trajectory")
  ax2.set_xlabel("X")
  ax2.set_ylabel("Y")
  ax2.set_zlabel("Z")

  ax3 = fig.add_subplot(133, projection='3d')
  ax3.plot(predicted_trajectory_test[:, 0], predicted_trajectory_test[:, 1], predicted_trajectory_test[:, 2], linewidth=1, color='green')
  ax3.set_title("Predicted Lorenz System Trajectory (Extended)")
  ax3.set_xlabel("X")
  ax3.set_ylabel("Y")
  ax3.set_zlabel("Z")

  plt.savefig(experiment_title + "_extended.png", dpi=300)

  image_exd_cloud = '! cp ' + experiment_title + '_extended.png drive/MyDrive/"Colab Notebooks"/Classes/"Chaotic Dynamics"/"Images"'

  subprocess.run(image_exd_cloud, stdout=subprocess.PIPE, shell=True)

  # calculate the mse for the rk method and push to log file
  #rk_mse = torch.nn.functional.mse_loss(predicted_trajectory_test, test_data)
  #rk_mse_short = torch.nn.functional.mse_loss(predicted_trajectory_test[:1000], test_data[:1000])
  #print(rk_mse)
  #print(rk_mse_short)

  # Test the model against MTU
  truth_comparison = train_data[100:1100, :tsteps].numpy()
  model.eval()
  with torch.no_grad():
      initial_state = torch.Tensor(train_data[100, :tsteps]).unsqueeze(0)
      predicted_trajectory_test_mse = [initial_state]
      for _ in range(1000 - 1):
        if MLP:
          prediction = model(predicted_trajectory_test_mse[-1])
        else:
          prediction = model(predicted_trajectory_test_mse[-1], predicted_trajectory_test_mse[0])
        predicted_trajectory_test_mse.append(prediction)

  predicted_trajectory_test_mse = torch.cat(predicted_trajectory_test_mse).detach().numpy()
  #predicted_trajectory_test_mse = scaler.inverse_transform(predicted_trajectory_test_mse)
  # Calculate the mean squared error (MSE) for each entry
  print(predicted_trajectory_test_mse.shape)
  print(truth_comparison.shape)
  mse = (predicted_trajectory_test_mse - truth_comparison) ** 2

  #mse = np.cumsum(mse)

  # Find the indices where the error grows above 0.3
  error_above_03_indices = np.nonzero(mse > 0.3)

  plt.figure()
  fig, ax = plt.subplots()

  # Plot the data
  ax.plot(range(len(mse)), mse, label=['X','Y','Z'])

  # Set custom X tick positions and labels
  custom_xticks_positions = [200, 400, 600, 800, 1000]
  custom_xticks_labels = ['1', '2', '3', '4', '5']
  ax.set_xticks(custom_xticks_positions)
  ax.set_xticklabels(custom_xticks_labels)

  # Plot the MSE values
  ax.set_xlabel('MTU')
  ax.set_ylabel('MSE')

  ax.axhline(y=0.3, linestyle='--', color='r')

  # Set the legend and display the plot
  plt.legend()
  
  plt.savefig(experiment_title + "_mse.png", dpi=300)

  image_mse_cloud = '! cp ' + experiment_title + '_mse.png drive/MyDrive/"Colab Notebooks"/Classes/"Chaotic Dynamics"/"Images"'

  subprocess.run(image_mse_cloud, stdout=subprocess.PIPE, shell=True)

  # DO SO FOR SUMMED ARRAY
  mse = (np.sum(predicted_trajectory_test_mse, axis=1) - np.sum(truth_comparison, axis=1)) ** 2

  # Find the indices where the error grows above 0.3
  error_above_03_indices = np.nonzero(mse > 0.3)

  plt.figure()
  fig, ax = plt.subplots()

  # Plot the data
  ax.plot(range(len(mse)), mse, label=['Trajectory MSE'])

  # Set custom X tick positions and labels
  custom_xticks_positions = [200, 400, 600, 800, 1000]
  custom_xticks_labels = ['1', '2', '3', '4', '5']
  ax.set_xticks(custom_xticks_positions)
  ax.set_xticklabels(custom_xticks_labels)

  # Plot the MSE values
  ax.set_xlabel('MTU')
  ax.set_ylabel('MSE')

  ax.axhline(y=0.3, linestyle='--', color='r')

  # Highlight the points where the error grows above 0.3
  #plt.scatter(error_above_03_indices, mse[error_above_03_indices], color='red', label='Error > 0.3')

  # Set the legend and display the plot
  plt.legend()
  
  plt.savefig(experiment_title + "_mse_sum.png", dpi=300)

  image_mse_cloud = '! cp ' + experiment_title + '_mse_sum.png drive/MyDrive/"Colab Notebooks"/Classes/"Chaotic Dynamics"/"Images"'

  subprocess.run(image_mse_cloud, stdout=subprocess.PIPE, shell=True)


## Control

In [None]:
if MLP:
  experiment_configuration = [("final_CMLP_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_1", 1),
                              ("final_CMLP_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_10", 2),
                              ("final_CMLP_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_20", 5),
                              ("final_CMLP_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_40", 10),
                              ("final_CMLP_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_60", 15),
                              ("final_CMLP_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_80", 20)]

if ATN:
  experiment_configuration = [("ATN_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_1", 1),
                              ("ATN_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_10", 2),
                              ("ATN_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_20", 5),
                              ("ATN_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_40", 10),
                              ("ATN_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_60", 15),
                              ("ATN_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_80", 20)]

if RNN:
  experiment_configuration = [("RNN_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_1", 1),
                              ("RNN_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_10", 2),
                              ("RNN_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_20", 5),
                              ("RNN_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_40", 10),
                              ("RNN_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_60", 15),
                              ("RNN_epoch_" + str(MAX_EPOCHS) + "_hidden_64_lr_1e-3_dt_0.01_sparse_80", 20)]

for experiment_config in experiment_configuration:
  execute_experiment(experiment_config)

The research question here, is how well does the neural network generalize to unseen points within a chaotic system, in this particular case Lorenz, given a very sparse sampling.

This is not strictly speaking a 'fair' comparison as one has access to historical data about the system, but you can imagine a scenario where you have specific guarantees about a system up until a point and you would like to extrapolate beyond, can a neural network go further while preserving the dynamics of a system. In terms of utility this may be a bit contrived, but I think it does provide some sense the ability to model a chaotic system as a timeseries.

Step 1: Train a model on very very few points using dense RK as baseline.

Step 2: Run RK on a Comparable number of points.

Step 3: Compare results.

Chart outcomes, explain under what conditions such a model could be useful.