# Import libraries

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch import nn
from torch.utils.data import DataLoader, TensorDataset
import os

# Import raw data

In [2]:
#Defining function to check if directory exists, if not it generates it
def check_and_make_dir(dir):
    if not os.path.isdir(dir):os.mkdir(dir)
#Base directory 
base_dir = '/Users/samsonmercier/Desktop/Work/PhD/Research/Second_Generals/'
#File containing temperature values
raw_T_data = np.loadtxt(base_dir+'Data/bt-4500k/training_data_T.csv', delimiter=',')
#File containing pressure values
raw_P_data = np.loadtxt(base_dir+'Data/bt-4500k/training_data_P.csv', delimiter=',')
#Path to store model
model_save_path = base_dir+'Model_Storage/RNN/'
check_and_make_dir(model_save_path)
#Path to store plots
plot_save_path = base_dir+'Plots/RNN/'
check_and_make_dir(plot_save_path)

#Last 51 columns are the temperature/pressure values, 
#First 5 are the input values (H2 pressure in bar, CO2 pressure in bar, LoD in hours, Obliquity in deg, H2+Co2 pressure) but we remove the last one since it's not adding info.
raw_inputs = raw_T_data[:, :4]
raw_outputs_T = raw_T_data[:, 5:]
raw_outputs_P = raw_P_data[:, 5:]

#Storing useful quantitites
N = raw_inputs.shape[0] #Number of data points
D = raw_inputs.shape[1] #Number of features
O = raw_outputs_T.shape[1] #Number of outputs


# Define hyper-parameters

In [3]:
#Defining partition of data used for 1. training 2. validation and 3. testing
data_partitions = [0.7, 0.1, 0.2]

#Defining the device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
num_threads = 6
torch.set_num_threads(num_threads)
print(f"Using {device} device with {num_threads} threads")
torch.set_default_device(device)

#Defining the noise seed
partition_seed = 4
rng = torch.Generator(device=device)
rng.manual_seed(partition_seed)

#Neural network width and depth
nn_width = 100
nn_depth = 5

#Optimizer learning rate
learning_rate = 1e-5

#Batch size 
batch_size = 64

#Number of epochs 
n_epochs = 1000

#Define storage for losses
train_losses = []
eval_losses = []

Using cpu device with 6 threads


# Fitting the training data with a recurrent neural network

## First step : Define a training, validation, and testing set

In [4]:
#Splitting the data 

## Retrieving indices of data partitions
train_idx, valid_idx, test_idx = torch.utils.data.random_split(range(N), data_partitions, generator=rng)

## Generate the data partitions
### Training
train_inputs = torch.tensor(raw_inputs[train_idx], dtype=torch.float32)
train_outputs_T = torch.tensor(raw_outputs_T[train_idx], dtype=torch.float32)
### Validation
valid_inputs = torch.tensor(raw_inputs[valid_idx], dtype=torch.float32)
valid_outputs_T = torch.tensor(raw_outputs_T[valid_idx], dtype=torch.float32)
### Testing
test_inputs = torch.tensor(raw_inputs[test_idx], dtype=torch.float32)
test_outputs_T = torch.tensor(raw_outputs_T[test_idx], dtype=torch.float32)
test_outputs_P = torch.tensor(raw_outputs_P[test_idx], dtype=torch.float32)

##Generating data loaders
train_dataloader = DataLoader(TensorDataset(train_inputs,train_outputs_T), batch_size=64, generator=rng, shuffle=True)
eval_dataloader = DataLoader(TensorDataset(valid_inputs,valid_outputs_T), batch_size=64, generator=rng)

## Second step : Define the neural network

In [5]:
# ============================================================
# Variable-depth recurrent cell
# ============================================================
class RecurrentNeuralNetworkCell(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, depth):
        super().__init__()
        self.depth = depth

        # First layer: takes input + hidden state
        self.input_layer = nn.Linear(input_dim + hidden_dim, hidden_dim)

        # Create a list of additional hidden layers
        self.hidden_layers = nn.ModuleList([
            nn.Linear(hidden_dim, hidden_dim) for _ in range(depth - 1)
        ])

        # Output layer
        self.output_layer = nn.Linear(hidden_dim, output_dim)

    def forward(self, x, h_prev):
        # Concatenate input and previous hidden state
        combined = torch.cat((x, h_prev), dim=1)

        # First layer
        h = torch.tanh(self.input_layer(combined))

        # Additional hidden layers
        for layer in self.hidden_layers:
            h = torch.tanh(layer(h))

        # Output
        y = self.output_layer(h)
        return y, h

# ============================================================
# Multi-step RNN wrapper
# ============================================================
class DeepRNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, depth):
        super().__init__()
        self.hidden_size = hidden_dim
        self.cell = RecurrentNeuralNetworkCell( input_dim, hidden_dim, output_dim, depth)

    def forward(self, x):
        """
        x: (batch_size, seq_len, input_size)
        Returns:
            y_seq: (batch_size, seq_len, output_size)
        """
        batch_size, seq_len, _ = x.shape
        device = x.device

        # Initialize hidden state
        h = torch.zeros(batch_size, self.hidden_size, device=device)

        outputs = []
        for t in range(seq_len):
            x_t = x[:, t, :]          # (batch_size, input_size)
            y_t, h = self.cell(x_t, h)
            outputs.append(y_t.unsqueeze(1))  # keep sequence dimension

        y_seq = torch.cat(outputs, dim=1)     # (batch_size, seq_len, output_size)
        return y_seq

In [6]:
model = DeepRNN(D, nn_width, O, nn_depth).to(device)
print(model)

DeepRNN(
  (cell): RecurrentNeuralNetworkCell(
    (input_layer): Linear(in_features=104, out_features=100, bias=True)
    (hidden_layers): ModuleList(
      (0-3): 4 x Linear(in_features=100, out_features=100, bias=True)
    )
    (output_layer): Linear(in_features=100, out_features=51, bias=True)
  )
)


## Fourth step : Define optimization functions

In [7]:
# --- Training loop ---
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    # Set the model to training mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.train()
    total_loss=0
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        total_loss+=loss.item()
        print(f"Train loss: {loss.item():>7f}  [{batch * batch_size + len(X):>5d}/{size:>5d}]")

    #Store loss
    train_losses.append(total_loss/len(dataloader))




# --- Evaluation loop ---
def eval_loop(dataloader, model, loss_fn):
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.eval()
    num_batches = len(dataloader)
    eval_loss = 0

    # Evaluating the model with torch.no_grad() ensures that no gradients are computed during test mode
    # also serves to reduce unnecessary gradient computations and memory usage for tensors with requires_grad=True
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            eval_loss += loss_fn(pred, y).item()

    #Store loss
    eval_loss /= num_batches
    eval_losses.append(eval_loss)
    print(f"Eval loss={eval_loss:.5f}")


## Fifth step : Run optimization

In [8]:
# --- Loss and optimizer ---
loss_fn = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

# --- Loss and optimizer ---
for t in range(n_epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, model, loss_fn, optimizer)
    eval_loop(eval_dataloader, model, loss_fn)
print("Done!")

#Save model 
torch.save(model.state_dict(), model_save_path + f'{n_epochs}epochs_{learning_rate}LR_{batch_size}BS.pth')

Epoch 1
-------------------------------


ValueError: not enough values to unpack (expected 3, got 2)

## Sixth step : Diagnostic plots

In [None]:
# Loss curves
fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True, gridspec_kw={'height_ratios':[3, 1]}, figsize=(10, 6))
ax1.plot(np.arange(n_epochs), train_losses, label="Train")
ax1.plot(np.arange(n_epochs), eval_losses, label="Validation")
ax2.plot(np.arange(n_epochs), np.array(train_losses) - np.array(eval_losses), label="Train")
ax1.set_yscale('log')
# ax2.set_yscale('log')
ax2.set_xlabel("Epoch")
ax1.set_ylabel("MSE Loss")
ax2.set_ylabel("Loss Diff.")
ax1.legend()
ax1.grid()
plt.subplots_adjust(hspace=0)
plt.savefig(plot_save_path+'/loss.pdf')

In [None]:
#Comparing predicted T-P profiles vs true T-P profiles with residuals
substep = 1000

#Converting tensors to numpy arrays if this isn't already done
if (type(test_outputs_T) != np.ndarray):
    test_outputs_T = test_outputs_T.detach().cpu().numpy()
    test_outputs_P = test_outputs_P.detach().cpu().numpy()

for test_idx, (test_input, test_output_T, test_output_P) in enumerate(zip(test_inputs, test_outputs_T, test_outputs_P)):

    #Retrieve prediction
    pred_output_T = model(test_input.reshape(1, 1, D)).detach().numpy()
    pred_output_T = pred_output_T.reshape(O)

    #Convert to numpy
    test_input = test_input.numpy()

    #Plotting
    if (test_idx % substep == 0):
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=[8, 6], sharey=True, gridspec_kw = {'width_ratios':[3, 1]})
        ax1.plot(test_output_T, np.log10(test_output_P/1000), '.', linestyle='-', color='blue', linewidth=2)
        ax1.plot(pred_output_T, np.log10(test_output_P/1000), color='green', linewidth=2)
        ax1.invert_yaxis()
        ax1.set_ylabel(r'log$_{10}$ Pressure (bar)')
        ax1.set_xlabel('Temperature (K)')
        ax2.plot(pred_output_T - test_output_T, np.log10(test_output_P/1000), '.', linestyle='-', color='green', linewidth=2)
        ax2.set_xlabel('Residuals (K)')
        plt.suptitle(rf'H$_2$O : {test_input[0]} bar, CO$_2$ : {test_input[1]} bar, LoD : {test_input[2]:.0f} days, Obliquity : {test_input[3]} deg')
        plt.tight_layout()
        plt.savefig(plot_save_path+f'/pred_vs_actual_n.{test_idx}.pdf')
    

In [None]:
#Plotting all residuals 

#Storage
residuals = np.zeros(test_outputs_T.shape,  dtype=object)

#Converting tensors to numpy arrays if this isn't already done
if (type(test_outputs_T) != np.ndarray):
    test_outputs_T = test_outputs_T.numpy()

for test_idx, (test_input, test_output_T) in enumerate(zip(test_inputs, test_outputs_T)):

    #Retrieve prediction
    residuals[test_idx] = model(test_input.reshape(1, 1, D)).detach().numpy().reshape(O) - test_output_T


fig, ax = plt.subplots(figsize=[8, 6])
ax.plot(residuals, color='green', alpha=0.2)
ax.axhline(0, color='black', linestyle='dashed')
plt.xlabel('Output dimension')
plt.ylabel('Temperature (K)')
plt.savefig(plot_save_path+f'/residuals.pdf')
print(f'Median: {np.median(residuals):.3f} K, Standard deviation: {np.std(residuals):.3f} K')