In [1]:
#Dance of the Planets

import numpy as np

G = 6.67430e-11 #Gravitational constant(m^3 kg^-1 s^-2)

#things the user will provide

m_1 = float(input("Enter mass of body 1 (in kg): "))
m_2 = float(input("Enter mass of body 2 (in kg): "))
x_1_0 = np.array([float(input("Enter the initial x-coordinate of the body 1(in m): ")), 
               float(input("Enter the initial y-coordinate of the body 1(in m): "))])

x_2_0 = np.array([float(input("Enter the initial x-coordinate of the body 2(in m): ")), 
               float(input("Enter the initial y-coordinate of the body 2(in m): "))])
v_1_0 = np.array([float(input("Enter the initial x-velocity of the body 1 (in m/s): ")), 
               float(input("Enter the initial y-velocity of the body 1(in m/s): "))])

v_2_0 = np.array([float(input("Enter the initial x-velocity of the body 2(in m/s): ")), 
               float(input("Enter the initial y-velocity of the body 2(in m/s): "))])
t = float(input("Enter the time for prediction (seconds): "))

Enter mass of body 1 (in kg):  2
Enter mass of body 2 (in kg):  3
Enter the initial x-coordinate of the body 1(in m):  2
Enter the initial y-coordinate of the body 1(in m):  1
Enter the initial x-coordinate of the body 2(in m):  4
Enter the initial y-coordinate of the body 2(in m):  2
Enter the initial x-velocity of the body 1 (in m/s):  2
Enter the initial y-velocity of the body 1(in m/s):  5
Enter the initial x-velocity of the body 2(in m/s):  2
Enter the initial y-velocity of the body 2(in m/s):  3
Enter the time for prediction (seconds):  15


In [2]:
#calculating coordinates at T using ODE solvers
#I will use odeint for solving this
from scipy.integrate import odeint

#we need to define 2 functions for it

def two_body_ode(y, t, G, m1, m2): #defines the system of ODE which odeint will solve
    # y contains: x1_x, x1_y, v1_x, v1_y, x2_x, x2_y, v2_x, v2_y
    x1 = y[0:2]
    v1 = y[2:4]
    x2 = y[4:6]
    v2 = y[6:8]

    r = x2 - x1            # vector from body 1 to body 2
    dist_cubed = np.linalg.norm(r)**3

    dx1_dt = v1
    dv1_dt = G * m2 * r / dist_cubed
    dx2_dt = v2
    dv2_dt = G * m1 * (-r) / dist_cubed

    return np.concatenate([dx1_dt, dv1_dt, dx2_dt, dv2_dt])

def get_coordinates_at_t(t, y0, G, m1, m2): #uses odeint to integrate the ode over the specified time t
    t_interval = [0, t]
    sol = odeint(two_body_ode, y0, t_interval, args=(G, m1, m2))
    return sol[-1]  # returns an array of length 8

y0 = np.concatenate([x_1_0, v_1_0, x_2_0, v_2_0])

In [3]:
#Dataset generation
import pandas as pd
#for generating dataset I need to find the values at different times.
#for solving ODE I have used odeint.
no_points = 1000 # let dataset have 10000 points
dt = t/no_points # formula for finding dt

#lists for storing data for DataFrame
time_T = []
x_1 = []
y_1 = []
x_2 = []
y_2 = []
v_1_x = []
v_1_y = []
v_2_x = []
v_2_y = []

#making variable for loop
time_current = 0

while time_current < t :
    result = get_coordinates_at_t(time_current, y0, G, m_1, m_2)
    x_1_current, v_1_current, x_2_current, v_2_current = result[0:2], result[2:4], result[4:6], result[6:8]
    

    #store the data using .append()
    time_T.append(time_current)
    x_1.append(x_1_current[0])
    y_1.append(x_1_current[1])
    x_2.append(x_2_current[0])
    y_2.append(x_2_current[1])
    v_1_x.append(v_1_current[0])
    v_1_y.append(v_1_current[1])
    v_2_x.append(v_2_current[0])
    v_2_y.append(v_2_current[1])

    time_current += dt #update the time to be not stuck in infinite loop

#create dataframe
data = pd.DataFrame({
    'T': time_T,
    'x_1': x_1,
    'y_1': y_1,
    'x_2': x_2,
    'y_2': y_2,
    'v_1_x': v_1_x,
    'v_1_y': v_1_y,
    'v_2_x': v_2_x,
    'v_2_y': v_2_y
})

data

Unnamed: 0,T,x_1,y_1,x_2,y_2,v_1_x,v_1_y,v_2_x,v_2_y
0,0.000,2.00,1.000,4.00,2.000,2.0,5.0,2.0,3.0
1,0.015,2.03,1.075,4.03,2.045,2.0,5.0,2.0,3.0
2,0.030,2.06,1.150,4.06,2.090,2.0,5.0,2.0,3.0
3,0.045,2.09,1.225,4.09,2.135,2.0,5.0,2.0,3.0
4,0.060,2.12,1.300,4.12,2.180,2.0,5.0,2.0,3.0
...,...,...,...,...,...,...,...,...,...
995,14.925,31.85,75.625,33.85,46.775,2.0,5.0,2.0,3.0
996,14.940,31.88,75.700,33.88,46.820,2.0,5.0,2.0,3.0
997,14.955,31.91,75.775,33.91,46.865,2.0,5.0,2.0,3.0
998,14.970,31.94,75.850,33.94,46.910,2.0,5.0,2.0,3.0


In [4]:
#as we only need to predict x_1, y_1, x_2 and y_2 we can drop v_1_x, v_1_y, v_2_x and v_2_y

data = data.drop( ["v_1_x", "v_1_y", "v_2_x", "v_2_y"], axis = 1) #code to drop columns 

# Save the DataFrame to a CSV file
data.to_csv('data.csv', index=False)

data

Unnamed: 0,T,x_1,y_1,x_2,y_2
0,0.000,2.00,1.000,4.00,2.000
1,0.015,2.03,1.075,4.03,2.045
2,0.030,2.06,1.150,4.06,2.090
3,0.045,2.09,1.225,4.09,2.135
4,0.060,2.12,1.300,4.12,2.180
...,...,...,...,...,...
995,14.925,31.85,75.625,33.85,46.775
996,14.940,31.88,75.700,33.88,46.820
997,14.955,31.91,75.775,33.91,46.865
998,14.970,31.94,75.850,33.94,46.910


In [5]:
#Splitting the data

#as this has input only as time I'm not using train_test_split as it might lead to data leakage, instead I'm manually splitting the data
#could use TimeSeriesSplit as well
#refer source - https://tomerkatzav.medium.com/split-time-series-dataset-826b7dc39cd9
train_data_size = int(len(data)*0.7) #spliiting such that 70% is train data, 15% is validation data and rest is test data
val_data_size = int(len(data)*0.15)

train_data = data.iloc[ :train_data_size, :] #splitting
val_data = data.iloc[train_data_size:train_data_size + val_data_size, :]
test_data = data.iloc[train_data_size + val_data_size:, :]


In [6]:
#converting in numpy
train_data = train_data.values 
val_data = val_data.values
test_data = test_data.values


In [7]:
#creating neural model
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional  as F

class NN(nn.Module) :
    def __init__(self):
        super().__init__()   #this is how u write constructor in python 
        self.layers = nn.Sequential(
            nn.Linear(1,128),  #1 input layer
            nn.Tanh(),  #using a smooth activation function
            nn.Linear(128, 128),
            nn.Tanh(),
            nn.Linear(128, 64),
            nn.Tanh(),
            nn.Linear(64,64),  #3 hidden layers activated with tanh as its smooth and differentiable
            nn.Tanh(),
            nn.Linear(64,4)   #4 output layers
        )

    def forward(self,T) :
        return self.layers(T)

model = NN()

In [8]:
#now first I need to convert the data into Pytorch tensors
train_data_tensor = torch.FloatTensor(train_data) 
val_data_tensor= torch.FloatTensor(val_data)
test_data_tensor = torch.FloatTensor(test_data)

loss_fn = F.mse_loss # defining loss function

opt = torch.optim.Adam(model.parameters(), lr = 1e-3) #using Adam(Adaptive Moment Estimation) optimizer as it 
                                                       #converges quickly and is widely used in deep learning.

In [9]:
#in this I will calculate ode residuals and ode loss
def ode_loss_fn(pred_denorm,t,m_1,m_2,G):
   
    #splitting
    x1_pred = pred_denorm[:,:2]
    x2_pred = pred_denorm[:,2:]
    if not t.requires_grad:
        t = t.requires_grad_(True) #enables to compute derivatives w.r.t. time


    
    #derived velocities
    v1_derived = torch.autograd.grad(x1_pred,t,grad_outputs=torch.ones_like(x1_pred),create_graph = True)[0] #this returns a tupple so I used [0] to get velocity
    v2_derived = torch.autograd.grad(x2_pred,t,grad_outputs=torch.ones_like(x1_pred),create_graph = True)[0]

    #derived accelerations
    a1_derived = torch.autograd.grad(v1_derived,t,grad_outputs=torch.ones_like(v1_derived),create_graph = True)[0]
    a2_derived = torch.autograd.grad(v2_derived,t,grad_outputs=torch.ones_like(v2_derived),create_graph = True)[0]

    #predicted relative poition between them 
    r = x2_pred - x1_pred
    r_mod = torch.norm(r,dim=1,keepdim = True)

    r_cube = r_mod**3 #cube

    #predicted acceleration
    a1_pred = G*m_2*r/r_cube
    a2_pred = G*m_1*(-r)/r_cube

    #residuals
    residual_1 = a1_derived - a1_pred
    residual_2 = a2_derived - a2_pred

    ode_loss = residual_1.pow(2).mean() + residual_2.pow(2).mean() #ode loss

    mean_ode_residual = (residual_1.norm(dim = 1).mean() + residual_2.norm(dim = 1).mean())/2 #mean ode residual

    return ode_loss,mean_ode_residual

In [10]:
#training of the data
no_epochs = 1500 #lets keep it 200
total_residual_train = 0.0 #calculating ode residuals
total_residual_val = 0.0

for epoch in range(no_epochs):
    model.train() #for setting the model in training mode
    
    #getting time
    time = train_data_tensor[:,:1]
    
    y_train = train_data_tensor[:,1:]
    time = time.clone().detach().requires_grad_(True) #so model will predict based on differentiabe time
    
    opt.zero_grad()  #apply zero grad condition
    pred_test = model(time)  #predictions
    mse_loss = loss_fn(pred_test,y_train) #mse loss
    ode_loss,mean_residual_train = ode_loss_fn(pred_test,time,m_1,m_2,G) #ode loss function
    total_residual_train += mean_residual_train
    total_loss = 0.6*ode_loss+ 0.4*mse_loss  #total weighted loss
    total_loss.backward()  #backward step
    opt.step() #used for adjusting weights 
    
    
    model.eval()
    #gettig time
    time_val = val_data_tensor[:,:1]
    y_val = val_data_tensor[:,1:]
    
    time_val = time_val.clone().detach().requires_grad_(True) #so model will predict based on differentiabe time
    
    val_pred = model(time_val)
    val_mse_loss = loss_fn(val_pred,y_val) #mse loss
    val_ode_loss,mean_residual_val = ode_loss_fn(val_pred,time_val,m_1,m_2,G) #ode loss and mean residuals
    total_residual_val += mean_residual_val
    val_total_loss = 0.6*val_ode_loss+0.4*val_mse_loss #total weighted loss
    #print statement
    if (epoch+1)%10 == 0 or epoch == 0 :
        print("Epoch [{}/{}], Loss : {:.4f}".format(epoch+1,no_epochs,val_total_loss))

Epoch [1/1500], Loss : 621.1776
Epoch [10/1500], Loss : 588.5578
Epoch [20/1500], Loss : 560.1760
Epoch [30/1500], Loss : 534.8855
Epoch [40/1500], Loss : 510.8741
Epoch [50/1500], Loss : 489.5933
Epoch [60/1500], Loss : 470.3182
Epoch [70/1500], Loss : 452.6034
Epoch [80/1500], Loss : 436.1937
Epoch [90/1500], Loss : 420.9111
Epoch [100/1500], Loss : 406.6222
Epoch [110/1500], Loss : 393.2247
Epoch [120/1500], Loss : 380.6378
Epoch [130/1500], Loss : 368.7957
Epoch [140/1500], Loss : 357.6434
Epoch [150/1500], Loss : 347.1334
Epoch [160/1500], Loss : 337.2239
Epoch [170/1500], Loss : 327.8777
Epoch [180/1500], Loss : 319.0620
Epoch [190/1500], Loss : 310.7630
Epoch [200/1500], Loss : 302.6259
Epoch [210/1500], Loss : 293.8206
Epoch [220/1500], Loss : 284.9972
Epoch [230/1500], Loss : 276.4988
Epoch [240/1500], Loss : 268.3252
Epoch [250/1500], Loss : 260.4834
Epoch [260/1500], Loss : 252.9632
Epoch [270/1500], Loss : 245.7520
Epoch [280/1500], Loss : 238.8363
Epoch [290/1500], Loss : 

In [11]:
#mean ode residuals
train_residual = total_residual_train/len(train_data_tensor)
val_residual = total_residual_val/len(val_data_tensor)
#print statement
print("Mean Residual across train data: {}".format(train_residual))
print("Mean Residual across val data: {}".format(val_residual))

Mean Residual across train data: 1.4273813962936401
Mean Residual across val data: 3.7749547958374023


In [14]:
from sklearn.metrics import mean_absolute_error
total_residual_test = 0.0
model.eval() #switch to evaluation mode

#getting time
time_test = test_data_tensor[:,:1]
y_test = test_data_tensor[:,1:]
time_test = time_test.clone().detach().requires_grad_(True)
y_test_pred = model(time_test) #predictions
ode_loss,mean_residual_test = ode_loss_fn(y_test_pred,time_test,m_1,m_2,G) #ode loss function
total_residual_test += mean_residual_test 

mae = mean_absolute_error(y_test.detach().numpy(),y_test_pred.detach().numpy()) #mae

#print step
print("Mae for test data is : {}".format(mae))
print("Residual across test data: {}".format(total_residual_test/len(test_data_tensor)))

Mae for test data is : 12.852261543273926
Residual across test data: 7.551054295618087e-05


In [15]:
#find coordinates at t
result = get_coordinates_at_t(t, y0, G, m_1, m_2)
x1, v1, x2, v2 = result[0:2], result[2:4], result[4:6], result[6:8]
#storing x1 and x2 in single numpy array
final_test_case_y = np.concatenate([x1,x2])

#predicting coordinates at time t
final_test_case_x_tensor = torch.tensor([t])

model.eval() #switch to evaluation mode

with torch.no_grad():
    final_test_case_y_pred_tensor = model(final_test_case_x_tensor) #predictions

#calculating mae for this
mae_final = mean_absolute_error(final_test_case_y_pred_tensor.numpy(), final_test_case_y) #calculate to numpy before

#print statement
print("MAE of final test case is: {}".format(mae_final))

MAE of final test case is: 16.249011993435253
