# Modified Neural Network on Pedestrian Dynamics


###### This is a part of MLCMS Winter 2023-24 Final Project. Designed and Developed by Gaurav Vaibhav and team.

## 1. Importing required packages


We are importing the required packages and the dependencies.

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import os

from PedNeuralNetwork import PedNeuralNetwork
from torch.utils.data import  TensorDataset

from generated_data.K_10 import *

os.environ['KMP_DUPLICATE_LIB_OK']='True' # To prevent the kernel from dying.

# 2. Testing Training Data Preparation


In this section, we are defining a function which takes input arguments as datasets for training and testing along with train/test sizes. It returns training and testing dataset inputs(2*10+1) and targets (observed velocity).

In [2]:
import pandas as pd
import torch

def train_test_dataset(*args):

    dataset1, dataset2, train_size, test_size = args
    train_dataset = [dataset1[i] for i in torch.randperm(train_size)]
    test_dataset = [dataset2[i] for i in torch.randperm(test_size)]
    
    train_dataset = torch.stack([sample for sample in train_dataset])
    test_dataset = torch.stack([sample for sample in test_dataset])
    X_train = train_dataset[:,:-1]
    Y_train = train_dataset[:,-1]
    X_test = test_dataset[:,:-1]
    Y_test= test_dataset[:,-1]

    return X_train, X_test, Y_train, Y_test

# 3. Defining the model and its hyperparams



We are declaring hyperparameters for the model and assigning the values (after trial-and-error iteration) following by instantiating the model.

In [3]:
hparams = {"batch_size" : 10,
           "learning_rate" : 0.005,
           "decay" : 0.0001,
           "epochs" : 10}

input_size = 21
output_size = 1
hidden_sizes = [3]      #List of the dimensions of hidden layers

# Instantiate the model
model = PedNeuralNetwork(input_size, hidden_sizes, output_size)

# 4. Training the Model

We are training the model with three necessary steps: Forward, Backward, Updating the model weights. Scheduler is used to display the progress bar.

In [4]:
import torch.nn as nn
import numpy as np
from tqdm import tqdm


def create_tqdm_bar(iterable, desc):
    return tqdm(enumerate(iterable),total=len(iterable), ncols=150, desc=desc)

def train_model(model, train_loader, loss_func, epochs=10):
    
    optimizer = torch.optim.Adam(model.parameters(),lr=hparams["learning_rate"],
                                          weight_decay=hparams["decay"])
    
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=epochs * len(train_loader) / 5, gamma=0.7)

    for epoch in range(epochs):
        
        # Train
        training_loop = create_tqdm_bar(train_loader, desc=f'Training Epoch [{epoch}/{epochs}]')
        training_loss = 0
            
        for train_iteration, batch in training_loop:
            optimizer.zero_grad()
            data = batch[0]
            vel = batch[1]
            predicted_vel = model(data) #Forward().
            loss = loss_func(predicted_vel, vel) # Compute the loss over the predictions and the ground truth.
            loss.backward()  # Backward().
            optimizer.step() # Update the parameters.
            
            training_loss += loss.item()
            scheduler.step()
            training_loop.set_postfix(train_loss = "{:.8f}".format(training_loss / (train_iteration + 1)))
            # Update the progress bar.


# 5. Evaluating the Model

In this part, We have defined function for evaluating the model.

In [5]:
def evaluate_model(model, dataset):
    model.eval()
    criterion = torch.nn.MSELoss()

    dataloader = torch.utils.data.DataLoader(dataset, batch_size=1, shuffle=False)
    loss = 0
    pred_tensor = []
    target_tensor = []
    for batch in dataloader:
        inputs = batch[0]
        targets = batch[1]
        preds = model.forward(inputs)
        loss += criterion(
            torch.squeeze(targets), torch.squeeze(preds)
        ).item()
        pred_tensor.append(preds)
        target_tensor.append(targets)
    
    pred_tensor = torch.cat(pred_tensor, dim=0)
    target_tensor = torch.cat(target_tensor, dim=0)

    return pred_tensor, loss/(len(dataloader))



# 6. Losses for different training/testing datasets

We are determining the testing errors for 7 training/testing dataset combinations: R/R, B/B, R/B, B/R, R+B/R, R+B/B, R+B/R+B and plotting the same.

In [6]:
R_dataset = np.array(pd.read_csv("generated_data/K_10/R_230.csv", sep=',',header=1))        #Provide the suitable file for Ring scenario
B_dataset = np.array(pd.read_csv("generated_data/K_10/B_095.csv", sep=',',header=1))        #Provide the suitable file for Bottleneck scenario
R_dataset[:,:21] = R_dataset[:,:21]/100
B_dataset[:,:21] = B_dataset[:,:21]/100
R_dataset = torch.tensor(R_dataset, dtype=torch.float32)
B_dataset = torch.tensor(B_dataset, dtype=torch.float32)
RB_dataset = torch.concat([R_dataset, B_dataset])
RB_dataset = [RB_dataset[i] for i in torch.randperm(len(RB_dataset))]
RB_dataset = RB_dataset[:int(0.5*len(RB_dataset))]

train_list = [R_dataset, B_dataset, R_dataset, B_dataset, RB_dataset, RB_dataset, RB_dataset]
test_list =  [R_dataset, B_dataset, B_dataset, R_dataset, R_dataset, B_dataset, RB_dataset]
test_error = []
test_labels = ['R/R', 'B/B', 'R/B', 'B/R', 'R+B/R', 'R+B/B', 'R+B/R+B']

for i in range(len(train_list)):
    print('Training the model for ', test_labels[i])
    train_size = int(len(train_list[i]))
    test_size = int(len(test_list[i])*0.5)
    X_train, X_test, Y_train, Y_test = train_test_dataset(train_list[i], test_list[i], train_size, test_size)
    Y_train = Y_train.reshape(-1,1)
    Y_test = Y_test.reshape(-1,1)
    #print('Length of total dataset',len(train_list[i]))
    #print('Length of training dataset', X_train.shape)
    #print('Length of testing dataset',  X_test.shape)

    #Dropping the few last datasets after creating batches
    batch_size=hparams['batch_size']
    nsamples = len(X_train) // batch_size
    trim_length = nsamples * batch_size
    train_dataset = TensorDataset(X_train[:trim_length], Y_train[:trim_length])

    # Loading the training dataset into batches
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size, shuffle=True)
    epochs = hparams.get('epochs', 20)
    loss_func = nn.MSELoss()
    train_model(model, train_loader, loss_func, epochs=epochs)
    
    test_dataset = TensorDataset(X_test, Y_test)
    pred_tensor, test_score = evaluate_model(model, test_dataset)
    test_error.append(test_score)
    print("Testing Error for ", test_labels[i], " : ", test_error[i],"\n")

fig = plt.figure(figsize=(10, 6))
plt.plot(test_labels, test_error)
plt.ylabel('MSE')
plt.title('Testing errors for diff. datasets')

Training the model for  R/R


Training Epoch [0/10]:  53%|███████████████████████████████▌                            | 10683/20302 [00:52<00:46, 204.97it/s, train_loss=0.04994799]


KeyboardInterrupt: 

# 7. Ring and Bottleneck predicted speed trend for R+B/R+B

As given int the paper, we are evaluating the trend of predicted speeds for Ring and Bottleneck scenarios.

In [None]:
from utils import CustomDataset1, CustomDataset2

def mixed_dataset():

    R_dataset = np.array(pd.read_csv("generated_data/K_10/R_230.csv", sep=',',header=1))
    B_dataset = np.array(pd.read_csv("generated_data/K_10/B_095.csv", sep=',',header=1))
    R_dataset[:,:21] = R_dataset[:,:21]/100
    B_dataset[:,:21] = B_dataset[:,:21]/100
    R_dataset = torch.tensor(R_dataset, dtype=torch.float32)
    B_dataset = torch.tensor(B_dataset, dtype=torch.float32)

    # Combing both datasets with identifier (to identify particular dataset from NN output belongs to R or B)
    dataset1 = CustomDataset1(R_dataset)
    dataset2 = CustomDataset2(B_dataset)

    # Combine the datasets
    RB_dataset = dataset1 + dataset2
    RB_dataset = [RB_dataset[i] for i in torch.randperm(len(RB_dataset))]

    #Splitting 50% training and test data
    data = [data for data,_ in RB_dataset]
    data = torch.stack(data)
    split_length = int(len(data)*0.5)
    train_dataset = data[:split_length,:]
    test_dataset = data[split_length+1:,:]

    #Storing the identifiers R/B for combined datasets
    identifiers = [identifier for _,identifier in RB_dataset]
    identifiers = torch.tensor(identifiers)
    test_identifier = identifiers[split_length+1:]
    R_indices = [index for index, elem in enumerate(test_identifier) if elem==1]
    B_indices = [index for index, elem in enumerate(test_identifier) if elem==2]

    X_train = train_dataset[:,:-1]
    Y_train = train_dataset[:,-1]
    X_test = test_dataset[:,:-1]
    Y_test= test_dataset[:,-1]

    Y_train = Y_train.reshape(-1,1)
    Y_test = Y_test.reshape(-1,1)
    #print('Length of total dataset', train_size+test_size)
    #print('Length of training dataset', X_train.shape)
    #print('Length of testing dataset',  X_test.shape)

    #Dropping the few last datasets after creating batches
    batch_size=hparams['batch_size']
    nsamples = len(X_train) // batch_size
    trim_length = nsamples * batch_size
    train_dataset = TensorDataset(X_train[:trim_length], Y_train[:trim_length])

    # Loading the training dataset into batches
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size, shuffle=True)

    #Training and evaluating the model
    loss_func = nn.MSELoss()
    train_model(model, train_loader, loss_func, epochs=epochs)
    test_data = TensorDataset(X_test, Y_test)
    pred_vel, test_score = evaluate_model(model, test_data)

    #Concatenating the pred. speed with identifier
    R_predVel = torch.cat([test_dataset[R_indices,0].view(-1,1), pred_vel[R_indices,:]], dim=1)
    B_predVel = torch.cat([test_dataset[B_indices,0].view(-1,1), pred_vel[B_indices,:]], dim=1)
    R_predVel[:,0] /= R_predVel[:,0].mean()
    B_predVel[:,0] /= B_predVel[:,0].mean()

    return test_score, R_predVel, B_predVel

Fitting the predicted speed curve


In [None]:
#Generating results from R+B/R+B
test_score, R_predVel, B_predVel = mixed_dataset()

# Fitting a polynomial curve (adjust the degree as needed)
degree = 2
R_predVel = R_predVel.detach().numpy()
B_predVel = B_predVel.detach().numpy()
Rcoefficients = np.polyfit(R_predVel[:,0], R_predVel[:,1], degree)
Bcoefficients = np.polyfit(B_predVel[:,0], B_predVel[:,1], degree)
Rpoly = np.poly1d(Rcoefficients)
Bpoly = np.poly1d(Bcoefficients)

# Generating points for the fitted curve
Rx_fit = np.linspace(0.5, 3.5, 100)
Ry_fit = Rpoly(Rx_fit)
Bx_fit = np.linspace(0.5, 3.5, 100)
By_fit = Bpoly(Bx_fit)

# Plotting the scattered points and the fitted curve
fig = plt.figure(figsize=(10, 6))
plt.scatter(R_predVel[:,0], R_predVel[:,1])
plt.scatter(B_predVel[:,0], B_predVel[:,1])
plt.plot(Rx_fit, Ry_fit, 'b', label=f'Ring')
plt.plot(Bx_fit, By_fit, 'r', label=f'Bottleneck')

#axes titles
plt.xlim(0.5,3.5)
plt.ylim(0.0,1.0)
plt.legend()
plt.xlabel('Mean Spacing, m')
plt.ylabel('Speed, m/s')
plt.title('Pred. Velocity comparison for Ring & Bottleneck')
plt.show()


# 8. Testing errors for different model architectures

As mentioned in the paper, we are computing the testing errors against the model complexities. Though, it doesn't fully replicate the results of paper, it still convinces that the network of 1 hidden layer with 3 nodes is sufficient enough for this problem and two layers are not helping much to improve the efficiency.

In [None]:
hidden_sizes = [[1],[2],[3],[4,2],[6,3],[10,4]]      #List of the dimensions of hidden layers
hidden_sizes_labels = ['[1]', '[2]', '[3]', '[4,2]', '[6,3]', '[10,4]']
test_scores = []

for i in range(len(hidden_sizes)):
    model = PedNeuralNetwork(input_size, hidden_sizes[i], output_size)
    test_score, R_predVel, B_predVel = mixed_dataset()
    print('Testing error for ', hidden_sizes_labels[i],' : ', test_score,'\n')
    test_scores.append(test_score)

fig = plt.figure(figsize=(10, 6))
plt.plot(hidden_sizes_labels, test_scores)
plt.ylabel('MSE')
plt.title('Testing errors for diff. model network')
