Preparing Data For Training (Rope Data)

In [2]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
import matplotlib.pyplot as plt

In [6]:
def simulate_lorenz(initial_state, steps=500, dt=0.01, 
                    sigma=10.0, rho=28.0, beta=8.0/3.0):
    def lorenz_system(state):
        x, y, z = state
        dxdt = sigma * (y - x)
        dydt = x * (rho - z) - y
        dzdt = x * y - beta * z
        return np.array([dxdt, dydt, dzdt])

    state = np.array(initial_state)
    trajectory = []
    for _ in range(steps):
        trajectory.append(state.copy())
        # Runge-Kutta 4th order integration for better accuracy
        k1 = lorenz_system(state)
        k2 = lorenz_system(state + 0.5 * dt * k1)
        k3 = lorenz_system(state + 0.5 * dt * k2)
        k4 = lorenz_system(state + dt * k3)
        state += (dt / 6.0) * (k1 + 2*k2 + 2*k3 + k4)
    
    return np.array(trajectory)

Building A Graph Neural Network(GNN)

In [9]:
import torch
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [10]:
def create_sequence_data(trajectory):
    """Create sequences for training with next-step prediction"""
    X, y = [], []
    for i in range(len(trajectory) - 1):
        X.append(trajectory[i])
        y.append(trajectory[i+1])
    return np.array(X), np.array(y)

In [13]:
class ImprovedGNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(ImprovedGNN, self).__init__()
        # Add fully connected layers for comparison
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, output_dim)
        
        # GNN layers (with fully connected graph)
        self.conv1 = GCNConv(input_dim, hidden_dim)
        self.conv2 = GCNConv(hidden_dim, output_dim)
        
        # Layer normalization for stability
        self.norm1 = nn.LayerNorm(hidden_dim)
        self.norm2 = nn.LayerNorm(output_dim)

    def create_full_graph(self, x):
        """Create a fully connected graph"""
        n = x.size(0)
        edge_index = torch.stack([
            torch.repeat_interleave(torch.arange(n), n),
            torch.tile(torch.arange(n), (n,))
        ]).to(x.device)
        return edge_index

    def forward(self, x):
        # Ensure input is a tensor
        x = x.float()
        
        # Create fully connected graph
        edge_index = self.create_full_graph(x)
        
        # GNN path
        gnn_x = self.conv1(x, edge_index).relu()
        gnn_x = self.norm1(gnn_x)
        gnn_x = self.conv2(gnn_x, edge_index)
        gnn_x = self.norm2(gnn_x)
        
        # Fully connected path
        fc_x = self.fc1(x).relu()
        fc_x = self.fc2(fc_x).relu()
        fc_x = self.fc3(fc_x)
        
        # Combine GNN and FC outputs
        return (gnn_x + fc_x) / 2

In [16]:
class KoopmanModel(torch.nn.Module):
    def __init__(self, input_dim, koopman_dim):
        super(KoopmanModel, self).__init__()
        self.encoder = SimpleGNN(input_dim, koopman_dim)
        self.koopman_matrix = torch.nn.Parameter(torch.eye(koopman_dim).to(device))
        self.decoder = SimpleGNN(koopman_dim, input_dim)
    def forward(self, data):
        #print("Type of data in KoopmanModel:", type(data))
        koopman_space = self.encoder(data)
        #print("koopman_space shape:", koopman_space.shape)
        next_koopman_space = koopman_space @ self.koopman_matrix
        new_data = Data(x=next_koopman_space, edge_index=data.edge_index)
        next_state = self.decoder(new_data)
        #print("next_state shape:", next_state.shape)
        return next_state

In [18]:
from torch.optim import Adam

def train_model(model, dataset, epochs=(10)):
    optimizer = Adam(model.parameters(), lr=0.01)
    loss_fn = torch.nn.MSELoss()

    model = model.to(device)
    for epoch in range(epochs):
        total_loss = 0
        correct = 0
        total_samples = 0 
        for data in dataset:
            data = data.to(device)
            #print(type(data))

            optimizer.zero_grad()

            prediction = model(data)

            loss = loss_fn(prediction, data.x)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            threshold = 0.15
            correct += (torch.abs(prediction - data.x) < threshold).sum().item()
            total_samples += data.x.numel()  
        accuracy = (correct / total_samples) * 100 
        print(f"Epoch {epoch + 1}, Loss {total_loss/len(dataset)}, Accuracy{accuracy:.2f}")

In [20]:
dataset = [create_graph_lorenz(state) for state in data]
print(type(dataset[0]))
print(dataset[0].x)

NameError: name 'data' is not defined

In [22]:
model = KoopmanModel(input_dim=3, koopman_dim=4).to(device)

NameError: name 'SimpleGNN' is not defined

In [24]:
train_model(model, dataset, epochs=10)

NameError: name 'model' is not defined

In [None]:
import os

In [28]:
save_folder = "spring-koopman-models"
os.makedirs(save_folder, exist_ok = True)

In [30]:
save_path = os.path.join(save_folder, "spring-koopman-model-2.0.pth")
torch.save(model.state_dict(), save_path)

print(f"Model saved to {save_path}")

Model saved to spring-koopman-models\spring-koopman-model-2.0.pth


In [34]:
model = KoopmanModel(input_dim=2, koopman_dim=3).to(device)
model.load_state_dict(torch.load(save_path, weights_only=True))
model.eval()

KoopmanModel(
  (encoder): SimpleGNN(
    (conv1): GCNConv(2, 3)
    (conv2): GCNConv(3, 3)
  )
  (decoder): SimpleGNN(
    (conv1): GCNConv(3, 2)
    (conv2): GCNConv(2, 2)
  )
)