Generation of data for training and testing the model using numerical solvers

In [1]:
import numpy as np
from scipy.integrate import solve_ivp

# Constants
G = 1.0  # gravitational constant

# arbitrarily user defined ranges for parameters
mass1_range = (0.1, 50.0)
mass2_range = (0.1, 50.0)
x_rel_range = (-50.0, 50.0)   # initial position of body 2 in COM frame
y_rel_range = (-50.0, 50.0)
vx_rel_range = (-25.0, 25.0)  # initial velocity of body 2 in COM frame
vy_rel_range = (-25.0, 25.0)
time_range = (0.1, 50.0)

# Number of trajectory samples and time discretization points
num_samples = 50000    # number of initial condition samples
time_points = 10    # number of time steps between 0 and t_final

# Total datapoints = num_samples * time_points
# Inputs: [m1, m2, x2_com, y2_com, vx2_com, vy2_com, t]
X = np.zeros((num_samples * time_points, 7))
# Outputs(relative to COM)
Y = np.zeros((num_samples * time_points, 2))
# True outputs(absolute positions)
true_Y = np.zeros((num_samples * time_points, 4))

# ODE definition
def two_body_ode(t, state, m1, m2):
    x1, y1, x2, y2, vx1, vy1, vx2, vy2 = state
    dx, dy = x2 - x1, y2 - y1
    r3 = (dx**2 + dy**2)**1.5 + 1e-6 #to prevent division by zero
    ax1 = G * m2 * dx / r3
    ay1 = G * m2 * dy / r3
    ax2 = -G * m1 * dx / r3
    ay2 = -G * m1 * dy / r3
    return [vx1, vy1, vx2, vy2, ax1, ay1, ax2, ay2]

idx = 0
for i in range(num_samples):
    # Sample each parameter from its range
    m1 = np.random.uniform(*mass1_range)
    m2 = np.random.uniform(*mass2_range)
    x2_com0 = np.random.uniform(*x_rel_range)
    y2_com0 = np.random.uniform(*y_rel_range)
    vx2_com0 = np.random.uniform(*vx_rel_range)
    vy2_com0 = np.random.uniform(*vy_rel_range)
    t_final = np.random.uniform(*time_range)

    # Compute initial state in COM frame and COM original pos/vel
    M = m1 + m2
    x1_0 = - (m2 / m1) * x2_com0
    y1_0 = - (m2 / m1) * y2_com0
    x2_0 =   x2_com0
    y2_0 =   y2_com0
    vx1_0 = - (m2 / m1) * vx2_com0
    vy1_0 = - (m2 / m1) * vy2_com0
    vx2_0 =   vx2_com0
    vy2_0 =   vy2_com0
    # COM original position and velocity
    com0_x = 0
    com0_y = 0
    comv_x = 0
    comv_y = 0

    state0 = [x1_0, y1_0, x2_0, y2_0, vx1_0, vy1_0, vx2_0, vy2_0]

    # Time points
    t_eval = np.linspace(0, t_final, time_points)

    # Solve the 2 body problem
    sol = solve_ivp(
        two_body_ode,
        (0, t_final),
        state0,
        args=(m1, m2),
        t_eval=t_eval,
        rtol=1e-6,
        atol=1e-9
    )

    # For each time point, store input and outputs
    for j, t_j in enumerate(t_eval):
        # Relative input and output
        X[idx] = [m1, m2, x2_com0, y2_com0, vx2_com0, vy2_com0, t_j]
        x2_t = sol.y[2, j]
        y2_t = sol.y[3, j]
        Y[idx] = [x2_t, y2_t]
        idx += 1

print("X shape:", X.shape)
print("Y shape:", Y.shape)
print("Sample input X[0]:", X[0])
print("Sample relative output Y[0]:", Y[0])

# Save to file
np.savez('two_body_data_comframe.npz', X=X, Y=Y)

X shape: (500000, 7)
Y shape: (500000, 2)
Sample input X[0]: [ 12.66734582   5.038968    27.59438829 -10.92439523  18.45669273
 -21.66841145   0.        ]
Sample relative output Y[0]: [ 27.59438829 -10.92439523]


Training of Model


In [6]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np

# Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Hyperparams
hidden_layers    = [128,64,32]
dropout_prob     = 0.1
lr               = 1e-3
weight_decay     = 1e-5
batch_size       = 256
num_epochs       = 50
validation_split = 0.2
lambda_ode       = 10   # weight for ODE loss
eps = 1e-3  # small time increment

G = 1.0  # used in acceleration calc

# Dataset
class TwoBodyDataset(Dataset):
    def __init__(self, path):
        data = np.load(path)
        self.X = torch.from_numpy(data['X']).float()
        self.Y = torch.from_numpy(data['Y']).float()
    def __len__(self): return len(self.X)
    def __getitem__(self, i): return self.X[i], self.Y[i]

# Data loaders
full_ds = TwoBodyDataset('two_body_data_comframe.npz')
val_n  = int(len(full_ds)*validation_split)
train_ds, val_ds = random_split(full_ds, [len(full_ds)-val_n, val_n])
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=2)
val_loader   = DataLoader(val_ds,   batch_size=batch_size, shuffle=False, num_workers=2)

# MLP
class MLP(nn.Module):
    def __init__(self, in_dim, h_layers, out_dim, dp):
        super().__init__()
        layers=[]
        prev=in_dim
        for h in h_layers:
            layers += [nn.Linear(prev,h), nn.ReLU(inplace=True), nn.Dropout(dp)]
            prev=h
        layers += [nn.Linear(prev,out_dim)]
        self.net = nn.Sequential(*layers)
    def forward(self,x): return self.net(x)

model     = MLP(7, hidden_layers, 2, dropout_prob).to(device)
mae_loss  = nn.L1Loss()
mse_loss  = nn.MSELoss()
opt       = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

# Helper to compute true accel for body 2
def true_acceleration(xy, m1, m2):
    r2 = (xy**2).sum(dim=1, keepdim=True)
    r3 = (r2 + 1e-6)**1.5
    coeff = G*(m1+m2)/(m1*m2)
    return coeff * xy / r3

for epoch in range(1, num_epochs+1):
    model.train()
    train_mae = 0.0
    train_ode = 0.0
    for Xb, Yb in train_loader:
        Xb, Yb = Xb.to(device), Yb.to(device)
        # Splitting inputs
        m1 = Xb[:,0:1]; m2 = Xb[:,1:2]

        t_raw = Xb[:, 6:7]                     # original time slice [B,1]
        t       = t_raw.detach().clone().requires_grad_(True)
        t_eps   = (t + eps).detach().clone().requires_grad_(True)

        # 2) Forward passes at t and t+eps
        inp       = torch.cat([Xb[:, :6], t],      dim=1)
        inp_eps   = torch.cat([Xb[:, :6], t_eps],  dim=1)
        pred      = model(inp)
        pred_eps  = model(inp_eps)

        # 3) Velocities via auto-diff
        vel = torch.autograd.grad(
            outputs=pred,
            inputs=t,
            grad_outputs=torch.ones_like(pred),
            retain_graph=True
        )[0]

        vel_eps = torch.autograd.grad(
            outputs=pred_eps,
            inputs=t_eps,
            grad_outputs=torch.ones_like(pred_eps),
            retain_graph=True
        )[0]

        # 4) Acceleration
        acc = (vel_eps - vel) / eps

        # true accel
        true_acc = true_acceleration(pred, m1, m2)

        # Align shapes: expand acc to 2 dims
        ode_loss = mae_loss(acc.expand_as(true_acc), true_acc)
        loss_data = mae_loss(pred, Yb)

        loss = loss_data
        opt.zero_grad()
        loss.backward()
        opt.step()

        train_mae += loss_data.item() * Xb.size(0)
        train_ode += ode_loss.item() * Xb.size(0)
    train_mae /= len(train_ds)
    train_ode /= len(train_ds)

    # --- Validation ---
    model.eval()
    val_mae = 0.0
    val_ode = 0.0
    for Xb, Yb in val_loader:
        Xb, Yb = Xb.to(device), Yb.to(device)
        m1 = Xb[:,0:1]; m2 = Xb[:,1:2]
        t_raw = Xb[:, 6:7]                     # original time slice
        t       = t_raw.detach().clone().requires_grad_(True)
        t_eps   = (t + eps).detach().clone().requires_grad_(True)
        # 2) Forward passes at t and t+eps
        inp       = torch.cat([Xb[:, :6], t],      dim=1)
        inp_eps   = torch.cat([Xb[:, :6], t_eps],  dim=1)
        pred      = model(inp)
        pred_eps  = model(inp_eps)
        # 3) Velocities via auto-diff
        vel = torch.autograd.grad(
            outputs=pred,
            inputs=t,
            grad_outputs=torch.ones_like(pred),
            retain_graph=True
        )[0]
        vel_eps = torch.autograd.grad(
            outputs=pred_eps,
            inputs=t_eps,
            grad_outputs=torch.ones_like(pred_eps),
            retain_graph=True
        )[0]
        # 4) Acceleration via forward difference of velocity
        acc = (vel_eps - vel) / eps
        # true accel
        true_acc = true_acceleration(pred, m1, m2)
        l_ode = mae_loss(acc.expand_as(true_acc), true_acc)
        l_data = mae_loss(pred, Yb)
        val_mae += l_data.item() * Xb.size(0)
        val_ode += l_ode.item() * Xb.size(0)
        opt.zero_grad()
    val_mae /= len(val_ds)
    val_ode /= len(val_ds)

    if epoch % 5 == 0 or epoch == 1:
        print(f"Epoch {epoch}/{num_epochs} // Train MAE {train_mae:.6f}  // Train ODE {train_ode:.6f} // Val MAE {val_mae:.6f} // Val ODE {val_ode:.6f}")

# Save model
torch.save(model.state_dict(), "two_body_mlp_with_ode.pt")
print("Done. Model saved as two_body_mlp_with_ode.pt")


Epoch 1/50 // Train MAE 48.160664  // Train ODE 4083.366934 // Val MAE 21.746565 // Val ODE 2.873697
Epoch 5/50 // Train MAE 26.076961  // Train ODE 4739.859475 // Val MAE 10.237095 // Val ODE 9.149054
Epoch 10/50 // Train MAE 23.770549  // Train ODE 5361.082241 // Val MAE 10.515651 // Val ODE 14.266296
Epoch 15/50 // Train MAE 23.202536  // Train ODE 5669.385946 // Val MAE 11.558650 // Val ODE 15.177412
Epoch 20/50 // Train MAE 22.527735  // Train ODE 5870.526025 // Val MAE 13.622782 // Val ODE 17.834897
Epoch 25/50 // Train MAE 22.276503  // Train ODE 6039.984619 // Val MAE 11.329843 // Val ODE 17.997034
Epoch 30/50 // Train MAE 21.914989  // Train ODE 6151.930500 // Val MAE 14.348262 // Val ODE 20.772189
Epoch 35/50 // Train MAE 21.675660  // Train ODE 6243.158755 // Val MAE 14.825022 // Val ODE 22.303972
Epoch 40/50 // Train MAE 21.519232  // Train ODE 6217.790876 // Val MAE 16.792291 // Val ODE 21.792162
Epoch 45/50 // Train MAE 21.387516  // Train ODE 6171.451264 // Val MAE 17.51

User input cell

In [None]:
# Get input from the user
m1 = float(input("Enter mass of body 1 (m1): "))
m2 = float(input("Enter mass of body 2 (m2): "))
x1 = float(input("Enter initial x position of body 1 (x1): "))
y1 = float(input("Enter initial y position of body 1 (y1): "))
x2 = float(input("Enter initial x position of body 2 (x2): "))
y2 = float(input("Enter initial y position of body 2 (y2): "))
vx1 = float(input("Enter initial x velocity of body 1 (vx1): "))
vy1 = float(input("Enter initial y velocity of body 1 (vy1): "))
vx2 = float(input("Enter initial x velocity of body 2 (vx2): "))
vy2 = float(input("Enter initial y velocity of body 2 (vy2): "))
t = float(input("Enter time (t): "))

# Calculate Center of Mass (COM) initial position and velocity
M = m1 + m2
comx = (m1 * x1 + m2 * x2) / M
comy = (m1 * y1 + m2 * y2) / M
comvx = (m1 * vx1 + m2 * vx2) / M
comvy = (m1 * vy1 + m2 * vy2) / M

# Convert initial absolute positions and velocities to COM frame for body 2
x2_com0 = x2 - comx
y2_com0 = y2 - comy
vx2_com0 = vx2 - comvx
vy2_com0 = vy2 - comvy

# Prepare input for the model (relative to COM frame)
input_data = torch.tensor([[m1, m2, x2_com0, y2_com0, vx2_com0, vy2_com0, t]], dtype=torch.float32).to(device)

# Predict the relative position of body 2 at time t using the model
with torch.no_grad():
  predicted_y2_rel = model(input_data)

# predicted_y2_rel contains the predicted (x2_com(t), y2_com(t))
predicted_x2_com_t = predicted_y2_rel[0, 0].item()
predicted_y2_com_t = predicted_y2_rel[0, 1].item()

# Calculate the COM position at time t
com_pos_x_t = comx + comvx * t
com_pos_y_t = comy + comvy * t

#Calculate the absolute positions of bodies at time t
# Absolute position of body 2 = COM position at time t + relative position of body 2 in COM frame at time t
x2_t = com_pos_x_t + predicted_x2_com_t
y2_t = com_pos_y_t + predicted_y2_com_t

# Absolute position of body 1 can be found using the COM definition
# M * COM_t = m1 * pos1_t + m2 * pos2_t
# pos1_t = (M * COM_t - m2 * pos2_t) / m1
x1_t = (M * com_pos_x_t - m2 * x2_t) / m1
y1_t = (M * com_pos_y_t - m2 * y2_t) / m1

print(f"Predicted absolute position of body 1 at time {t}: ({x1_t:.6f}, {y1_t:.6f})")
print(f"Predicted absolute position of body 2 at time {t}: ({x2_t:.6f}, {y2_t:.6f})")