In [None]:
import matplotlib.pyplot as plt

def plot_trajectory_3(trajectory):
    """
    trajectory has shape (40,3,4) that is 40 time steps, 3 particles and 4 features (x, y, v_x, v_y)
    """
    # Smaller image
    plt.figure(figsize=(5,5))

    print(trajectory.shape)
    for i in range(trajectory.shape[1]):
        x = trajectory[:, i, 0]
        y = trajectory[:, i, 1]
        plt.plot(x, y)

    plt.xlabel('x')
    plt.ylabel('y')
    plt.show()

: 

In [None]:
import matplotlib.pyplot as plt

def plot_trajectory_1(trajectory):
    """
    trajectory has shape (40,4) that is 40 time steps, 4 features (x, y, v_x, v_y)
    """
    # Smaller image
    plt.figure(figsize=(5,5))

    for p in range(trajectory.shape[0]):
        x = trajectory[:, 0]
        y = trajectory[:, 1]
        plt.plot(x, y)

    plt.xlabel('x')
    plt.ylabel('y')
    plt.show()

: 

In [None]:
import numpy as np

def create_random_trajectory_1(plot=False):
    """
    Creates a random parabolic trajectory for 3 particles
    """

    # Generate random coefficients
    a1 = np.random.uniform(-0.05, 0.05)
    b1 = np.random.uniform(-1, 1)
    c1 = np.random.uniform(0, 20)

    # Generate random x values
    x = np.linspace(0, 20, 40)

    y1 = a1 * x**2 + b1 * x + c1

    # Ensure all y values are within the range 0 to 20
    y1 = np.clip(y1, 0, 20)

    # Calculate velocity
    velocity_x1 = np.gradient(x)
    velocity_y1 = np.gradient(y1)

    # Return numpy array of shape (40, 4)
    trajectory =  np.vstack((x, y1, velocity_x1, velocity_y1)).T

    # Return numpy array of shape (40, 3, 4)
    if plot:
        plot_trajectory_1(trajectory)
    
    return trajectory

create_random_trajectory_1(True)

: 

In [None]:
import numpy as np

def create_random_trajectory_3(plot=False):
    """
    Creates a random parabolic trajectory for 3 particles
    """

    # Generate random coefficients
    a1 = np.random.uniform(-0.05, 0.05)
    b1 = np.random.uniform(-1, 1)
    c1 = np.random.uniform(0, 20)

    a2 = np.random.uniform(-0.05, 0.05)
    b2 = np.random.uniform(-1, 1)
    c2 = np.random.uniform(0, 20)

    a3 = np.random.uniform(-0.05, 0.05)
    b3 = np.random.uniform(-1, 1)
    c3 = np.random.uniform(0, 20)

    # Generate random x values
    x = np.linspace(0, 20, 40)

    y1 = a1 * x**2 + b1 * x + c1
    y2 = a2 * x**2 + b2 * x + c2
    y3 = a3 * x**2 + b3 * x + c3

    # Ensure all y values are within the range 0 to 20
    y1 = np.clip(y1, 0, 20)
    y2 = np.clip(y2, 0, 20)
    y3 = np.clip(y3, 0, 20)

    # Calculate velocity
    velocity_x1 = np.gradient(x)
    velocity_y1 = np.gradient(y1)
    velocity_x2 = np.gradient(x)
    velocity_y2 = np.gradient(y2)
    velocity_x3 = np.gradient(x)
    velocity_y3 = np.gradient(y3)

    # Return numpy array of shape (40, 4)
    trajectory1 =  np.vstack((x, y1, velocity_x1, velocity_y1)).T
    trajectory2 =  np.vstack((x, y2, velocity_x2, velocity_y2)).T
    trajectory3 =  np.vstack((x, y3, velocity_x3, velocity_y3)).T

    # Return numpy array of shape (40, 3, 4)
    trajectory = np.array([trajectory1, trajectory2, trajectory3])
    trajectory = np.transpose(trajectory, (1, 0, 2))
    if plot:
        plot_trajectory_3(trajectory)
    
    return trajectory

create_random_trajectory_3(True)

: 

In [None]:
from torch.utils.data import Dataset, DataLoader
import torch

class TrajectoryDataset(Dataset):
    def __init__(self, num_trajectories):
        self.trajectories = [create_random_trajectory_3() for _ in range(num_trajectories)]
    
    def __len__(self):
        return len(self.trajectories)
    
    def __getitem__(self, idx):
        data = self.trajectories[idx]
        # Convert to torch tensor
        data = torch.tensor(data, dtype=torch.float32)

        X = data[0, :].unsqueeze(0)  # Shape: (1, 4)
        Y = data[1:, :]  # Shape: (39, 4)
        return X, Y

: 

In [None]:
# Create a dataset with 1000 trajectories
dataset = TrajectoryDataset(1000)
x, y = dataset[0]
x.shape, y.shape

: 

In [None]:
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
next(iter(dataloader))[0].shape, next(iter(dataloader))[1].shape

: 

In [None]:
import torch
import torch.nn as nn

class GRUModel(nn.Module):
    def __init__(self):
        super(GRUModel, self).__init__()
        self.gru = nn.GRU(4, 64, batch_first=True)  # GRU layer for sequence generation
        self.fc = nn.Linear(64, 4)                 # Fully connected layer for output

    def forward(self, x):
        batch_size, time_steps, num_particles, num_features = x.size()
        
        # Initial state for the 3 particles
        x = x[:, 0, :, :].view(batch_size * num_particles, 1, num_features)  # Shape: (batch_size * num_particles, 1, 4)
        
        outputs = []
        hidden = None

        # Generate the sequence of 39 steps
        for _ in range(39):
            out, hidden = self.gru(x, hidden)  # Shape: (batch_size * num_particles, 1, hidden_size)
            out = self.fc(out[:, -1, :])       # Shape: (batch_size * num_particles, num_features)
            outputs.append(out.view(batch_size, num_particles, num_features))  # Reshape to (batch_size, num_particles, num_features)
            x = out.unsqueeze(1)  # Prepare the output as the next input, shape: (batch_size * num_particles, 1, num_features)
        
        outputs = torch.stack(outputs, dim=1)  # Shape: (batch_size, 39, num_particles, num_features)
        
        return outputs


: 

In [None]:
import torch.optim as optim
from torch.utils.data import DataLoader

# Create dataset and dataloader
num_trajectories = 1000  # Number of trajectories in the dataset
dataset = TrajectoryDataset(num_trajectories)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Create the model
model = GRUModel()

# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 50

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0
    for X_batch, Y_batch in dataloader:
        optimizer.zero_grad()
        output = model(X_batch)
        print(output.shape)
        loss = criterion(output, Y_batch)
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
    
    avg_loss = epoch_loss / len(dataloader)
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}')

# Evaluate the model
model.eval()
with torch.no_grad():
    total_loss = 0
    for X_batch, Y_batch in dataloader:
        output = model(X_batch)
        loss = criterion(output, Y_batch)
        total_loss += loss.item()
    avg_loss = total_loss / len(dataloader)
    print(f'Evaluation Loss: {avg_loss:.4f}')

# Make predictions
with torch.no_grad():
    for X_batch, _ in dataloader:
        predictions = model(X_batch)
        break  # Just to get predictions for the first batch

# `predictions` shape will be (batch_size, 39, 3, 4)


: 

In [None]:
def plot_trajectories(X, Y, predictions):
    plt.figure(figsize=(10, 6))
    plt.plot(X[0, :, 0], X[0, :, 1], 'ro-', label='Input Trajectory')
    plt.plot(Y[0, :, 0], Y[0, :, 1], 'bo-', label='Ground Truth')
    plt.plot(predictions[0, :, 0], predictions[0, :, 1], 'go-', label='Predictions')
    plt.xlabel('x')
    plt.ylabel('y')
    plt.title('Trajectory Prediction')
    plt.legend()
    plt.show()

test_dataset = TrajectoryDataset(1)
test_loader = DataLoader(test_dataset, batch_size=1)

X, Y = next(iter(test_loader))

predictions = model(X)

true_trajectory = test_dataset.trajectories[0]

predicted_trajectory = torch.cat((X, predictions), dim=1).detach().squeeze(0).numpy()
plot_trajectory_3(true_trajectory)
print(predicted_trajectory.shape)
plot_trajectory_3(predicted_trajectory)


: 