# Spatial Graph Neural Network (Spatial GNN)

Prediction of the intention of pedestrians to cross the street or not, using Graph Neural Networks and the coordinates of their skeleton that was previously generated using Openpose in the JAAD dataset.

**Input:** Pedestrian skeleton graph.

**Output:** Binary classification (crossing or not crossing the street).

In [None]:
import numpy as np

from torch import from_numpy
from torch import cuda
from torch import no_grad
from torch import optim

from torch_geometric.data import Data
from torch_geometric.loader import DataLoader

import matplotlib.pyplot as plt

import pandas as pd

%matplotlib inline

from Code.GNN import *
from Code.SkeletonsDataset import *
from Code.ModelTrainEvaluate import *
from Code.MetricsPlots import *

# Dataset

## Training dataset

In [None]:
train_dataset = SkeletonsDataset('Data/train_annotations_with_skeletons.csv', normalization='minmax', target='cross')

print('train_dataset len:', len(train_dataset))
print('Shape of each skeletons data (x):', train_dataset.data[0].x.shape)

train_dataset.loadedData

Important columns:

In [None]:
train_dataset.loadedData[['video','frame','decision_point','skeleton','skeleton_detected','cross','crossing']]

Number of elements per class:

In [None]:
totalRows = len(train_dataset.loadedData)
crossingRows = len(train_dataset.loadedData[train_dataset.loadedData['cross']=='crossing'])
nocrossingRows = len(train_dataset.loadedData[train_dataset.loadedData['cross']=='not-crossing'])

print('Training dataset total rows:', totalRows)
print('Training dataset crossing class samples:', crossingRows)
print('Training dataset not-crossing class samples:', nocrossingRows)

plt.figure(figsize=(10,10))
plt.bar(1, crossingRows, label='Crossing class')
plt.bar(0, nocrossingRows, label='Not-crossing class')
plt.legend(loc='upper left', prop={'size': 15})
plt.xticks([0, 1], size=15)
plt.yticks(size=15)
plt.xlabel('Class', size=15)
plt.ylabel('Number of samples', size=15)
plt.title('Dataset classes distribution', size=15)
plt.show()

In [None]:
skeleton = train_dataset.data[10].x[:, 0:2].tolist()

skeleton2 = []

for sk in skeleton:
    if(sk!=[0, 0]):
        skeleton2.append(sk)
        
skeleton2 = np.asarray(skeleton2)

plt.figure()
plt.scatter(skeleton2[:, 0], skeleton2[:, 1])
plt.show()

## Validation dataset

In [None]:
val_dataset = SkeletonsDataset('Data/val_annotations_with_skeletons.csv', normalization='minmax',
                               norm_precomputed_values = [train_dataset.xmax, train_dataset.xmin], target='cross')
                               # norm_precomputed_values = [train_dataset.xmean, train_dataset.xstd]

val_dataset.shuffle()

In [None]:
print('val_dataset len:', len(val_dataset))
print('Shape of each skeletons data (x):', val_dataset.data[0].x.shape)

val_dataset.loadedData

Important columns:

In [None]:
val_dataset.loadedData[['video','frame','decision_point','skeleton','skeleton_detected','cross','crossing']]

Number of elements per class:

In [None]:
totalRows = len(val_dataset.loadedData)
crossingRows = len(val_dataset.loadedData[val_dataset.loadedData['cross']=='crossing'])
nocrossingRows = len(val_dataset.loadedData[val_dataset.loadedData['cross']=='not-crossing'])

print('Validation dataset total rows:', totalRows)
print('Validation dataset crossing class samples:', crossingRows)
print('Validation dataset not-crossing class samples:', nocrossingRows)

plt.figure(figsize=(10,10))
plt.bar(1, crossingRows, label='Crossing class')
plt.bar(0, nocrossingRows, label='Not-crossing class')
plt.legend(loc='best', prop={'size': 15})
plt.xticks([0, 1], size=15)
plt.yticks(size=15)
plt.xlabel('Class', size=15)
plt.ylabel('Number of samples', size=15)
plt.title('Dataset classes distribution', size=15)
plt.show()

Since the classes of the two datasets are unbalanced, we cannot rely only on accuracy as our metric.

## Showing a skeleton

In [None]:
skeleton = val_dataset.data[10].x[:, 0:2].tolist()

skeleton2 = []

for sk in skeleton:
    if(sk!=[0, 0]):
        skeleton2.append(sk)
        
skeleton2 = np.asarray(skeleton2)

plt.figure()
plt.scatter(skeleton2[:, 0], skeleton2[:, 1])
plt.show()

# Training

In [None]:
numberOfClasses = 2

y = train_dataset.loadedData['cross'].to_numpy()
y = np.where(y=='crossing', 1, 0)
bc = np.bincount(y)

class_weights = len(train_dataset.loadedData) / (numberOfClasses * bc)
class_weights = torch.tensor(class_weights, dtype=torch.float)

print('class_weights:', class_weights)

In [None]:
train_dataset.shuffle()
test_dataset = train_dataset[:5000]
train_dataset = train_dataset[5000:]

len(train_dataset), len(val_dataset), len(test_dataset)

In [None]:
# First element of training subset:
t0 = train_dataset[0]

# Node features:
t1 = t0.x

# Number of nodes:
numberOfNodes = t1.shape[0]

# Number of dimensions of each node features:
embed_dim = t1.shape[1]

print('Number of nodes per skeleton:', numberOfNodes)
print('Number of features per node:', embed_dim)

In [None]:
num_epochs = 25
batch_size = 50

device = torch.device('cpu')
model = SpatialGNN(embed_dim, numberOfClasses, numberOfNodes).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.005)
crit = torch.nn.BCELoss()#weight=class_weights)

train_loader = DataLoader(train_dataset, batch_size=batch_size)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

loss_values = []

metrics_train = []
metrics_val = []

for epoch in range(num_epochs):

    train_loss = train(model, train_loader, device, optimizer, crit)
    loss_values.append(train_loss)

    train_metrics = evaluate(model, train_loader, device, computed_loss=train_loss)
    val_metrics = evaluate(model, val_loader, device, loss_crit=crit)

    metrics_train.append(train_metrics)
    metrics_val.append(val_metrics)
    
    if num_epochs <= 25:
        
        print_evaluation_train_val(epoch, train_metrics, val_metrics)

## Model performance plots

In [None]:
plot_loss(num_epochs, loss_values, figsize=10, textsize=15)

In [None]:
plot_classification_metrics_train_val(num_epochs, metrics_train, metrics_val, figsize=10, textsize=15)

## ROC curves

### Train set

In [None]:
fpr, tpr, roc_auc = ROC(model, train_loader, device, numberOfClasses)

for plotclass in range(0, numberOfClasses):
    plot_ROC(plotclass, fpr, tpr, roc_auc)

### Validation set

In [None]:
fpr, tpr, roc_auc = ROC(model, val_loader, device, numberOfClasses)

for plotclass in range(0, numberOfClasses):
    plot_ROC(plotclass, fpr, tpr, roc_auc)

# Testing

In [None]:
test_loader = DataLoader(test_dataset, batch_size=batch_size)
test_metrics = evaluate(model, test_loader, device)


print_evaluation_test(test_metrics)