# GNNs

## mass into account

In [26]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data
from torch_geometric.nn import radius_graph

def fully_connected(input_dim, hidden_dim, output_dim, num_layers):
    layers = []
    # Add hidden layers
    for _ in range(num_layers):
        layers.append(torch.nn.Linear(input_dim, hidden_dim))
        layers.append(torch.nn.ReLU())
        input_dim = hidden_dim
    # Add output layer
    layers.append(torch.nn.Linear(hidden_dim, output_dim))
    return torch.nn.Sequential(*layers)

# Function to create edge weights based on mass difference
def create_edge_weights_based_on_mass(data):
    row, col = data.edge_index  # Get indices of connected nodes
    # Get masses for each node in the connection
    mass_row = data.x[row][:, -1]  # Assume mass is the last feature in each node
    mass_col = data.x[col][:, -1]
    # Compute absolute mass difference to use as edge weight
    edge_weight = torch.abs(mass_row - mass_col)  # Shape: [num_edges]
    data.edge_weight = edge_weight
    return data

class ParticleGNN(torch.nn.Module):
    def __init__(self, input_dim, fc_dim, fc_layers, gnn_dim, message_passing_steps, output_dim):
        super(ParticleGNN, self).__init__()
        # Encoder for initial node features
        self.encoder = fully_connected(input_dim, fc_dim, gnn_dim, fc_layers)
        
        # GCN layers for message passing
        self.gnns = []
        for i in range(message_passing_steps):
            gnn = GCNConv(gnn_dim, gnn_dim)
            setattr(self, f'gnn_{i}', gnn)
            self.gnns.append(gnn)
        
        # Decoder to transform final node features to output
        self.decoder = fully_connected(gnn_dim, fc_dim, output_dim, fc_layers)

    def forward(self, data):
        x, edge_index, edge_weight = data.x, data.edge_index, data.edge_weight
        x = self.encoder(x)
        
        for gnn in self.gnns:
            x = F.relu(gnn(x, edge_index, edge_weight=edge_weight))
        
        x = self.decoder(x)
        return x

# Sample data for testing
'''
{'position': [0.1, 0.2, 0.3], 'velocity': [0.01, 0.02, 0.03], 'mass': 1.0},
    {'position': [0.4, 0.5, 0.6], 'velocity': [0.04, 0.05, 0.06], 'mass': 2.0},
    '''
particles = [
    {'position': torch.rand(3), 'velocity': torch.rand(3), 'mass': torch.rand(1)}
    for _ in range(1000)

    
    # Add more particles as needed
]

# Function to convert particles to a torch_geometric Data object
def transform_particles_to_graph_with_radius_and_mass_edge_weights(features, positions, radius):
    # Extract features for each particle: [x, y, z, vx, vy, vz, mass]

    
    # Create edges based on radius
    edge_index = radius_graph(positions, r=radius)
    
    # Create data object
    graph_data = Data(x=features, edge_index=edge_index)
    
    # Add edge weights based on mass
    graph_data = create_edge_weights_based_on_mass(graph_data)
    return graph_data



# Instantiate and test the model
model = ParticleGNN(input_dim=7, fc_dim=128, fc_layers=1, gnn_dim=128, message_passing_steps=5, output_dim=3)
print(model)    


ParticleGNN(
  (encoder): Sequential(
    (0): Linear(in_features=7, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=128, bias=True)
  )
  (gnn_0): GCNConv(128, 128)
  (gnn_1): GCNConv(128, 128)
  (gnn_2): GCNConv(128, 128)
  (gnn_3): GCNConv(128, 128)
  (gnn_4): GCNConv(128, 128)
  (decoder): Sequential(
    (0): Linear(in_features=128, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=3, bias=True)
  )
)


In [27]:
def euclidean_distance(a, b):
    return torch.sqrt(torch.sum((a - b)**2, dim=-1) + 1e-12)
def mean_distance(a, b):
    return torch.mean(euclidean_distance(a, b))

In [28]:
from datagen import generate_dataset

data = generate_dataset(window_size=0)

Generating dataset with 5 scenes...


100%|██████████| 5/5 [00:48<00:00,  9.75s/it]


In [31]:
import torch
def generate_graph_dataset(data, radius):
    graphs = []
    gt_acc = []
    for i in range(len(data)):
        masses = torch.tensor(data[i]['masses']).unsqueeze(-1)
        positions = torch.tensor(data[i]['pos'])
        velocities = torch.tensor(data[i]['vel'])
        features = torch.cat([positions, velocities, masses], dim=-1)
        graph_data = transform_particles_to_graph_with_radius_and_mass_edge_weights(features, positions, radius)
        graphs.append(graph_data)
        gt_acc.append(torch.tensor(data[i]['acc']))
    return graphs, gt_acc

In [32]:
Gs, accs = generate_graph_dataset(data, 2.0)

In [37]:
# create a dataloader for the dataset with Gs as input and accs as target
import torch_geometric.data as gd

model = ParticleGNN(input_dim=7, fc_dim=128, fc_layers=1, gnn_dim=128, message_passing_steps=5, output_dim=3)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
batch_size = 32
dataset = list(zip(Gs, accs))
# Training loop
model.train()
for epoch in range(10):
    epoch_losses = []
    for i in range(len(dataset) // batch_size):
        optimizer.zero_grad()
        data, target = zip(*dataset[i*batch_size:(i+1)*batch_size])
        losses = []
        for j in range(len(data)):
            output = model(data[j])
            l= mean_distance(output, target[j])
            losses.append(l)

        loss = sum(losses) / len(losses)
        loss.backward()
        optimizer.step()
        epoch_losses.append(loss.item())

    print(f'Epoch {epoch}, Loss: {sum(epoch_losses) / len(epoch_losses)}')







Epoch 0, Loss: 2.2612630808964753
Epoch 1, Loss: 2.085260417216863
Epoch 2, Loss: 2.027791576507764
Epoch 3, Loss: 1.9857922891775768


KeyboardInterrupt: 