In [None]:

import torch
import os
import re
import json
import torchmetrics
import numpy as np
import pandas as pd
import torch.nn.functional as F
import torch.nn as nn
from sklearn import metrics
from matplotlib import pyplot as plt
from torch_geometric_temporal.nn.recurrent import DCRNN
from torch_geometric_temporal.dataset import ChickenpoxDatasetLoader
from torch_geometric_temporal.signal import StaticGraphTemporalSignal
from torch_geometric_temporal.signal import temporal_signal_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import OrdinalEncoder




In [None]:
class CreateGraphDataset(object):

    """Dataset consisting of key feature points coordinates. Data is extracted from 
    .csv files created using DeepLabCut, a software for labeling video frames. 
    One video should have corresponding .csv file, with the filename starting with age of a dog followed by
    underscore (Example: adult_1.csv). Every set of keypoint for given frame will be associated with the 
    corresponding age group label."""
    def __init__(self, path_to_dataset : str)-> None:
        self.path_to_dataset = path_to_dataset
        self._get_files_and_fileapths()
        
    
    def _get_files_and_fileapths(self) -> None:
        # Method creating helper variables for file operations
        dataset_files_path = []
        self.dataset_files = os.listdir(self.path_to_dataset)
        for n, file in enumerate(self.dataset_files):
            dataset_files_path.append(os.path.join(self.path_to_dataset,file))
        self.dataset_files_path = dataset_files_path

    def _extract_features_and_timesteps(self) -> None:
        # Method to extract features and labels from .csv files defining dataset created with #DeepLabCut.
        # Csv has to be named age_n.csv, where age is a age category and n is the number of occurence of 
        # this category in the dataset
        
        label_list = []
        data_list = []
            
        for n,datafile in enumerate(self.dataset_files_path):     
            labeled_data = pd.read_csv(datafile)
            for index, row in labeled_data.iterrows():
                tuple_list = []
                if index >= 2: ## skip first row
                    row = labeled_data.iloc[index].values[3:] ## take only coordinates from the rows
                    assert (len(row))%2 == 0, f"Odd number of coordinates in row {index + 2}, maybe some coordinates are missing"
                    for i in range(1,len(row),2): 
                        x_coord = round(float((row[i-1])),4) ## extract x coordinate
                        y_coord = round(float((row[i])),4) ## extract y coordinate
                        assert np.isnan(x_coord) == False, f"Nan coordinate x found in row: {index+1}"
                        assert np.isnan(y_coord) == False, f"Nan coordinate y found in row: {index+1}"
                        tuple_list.append(((x_coord), (y_coord))) ## write every coordinate pair into list
                    data_list.append(tuple_list) ## write list of pairs for a given timestep to a list
                    label_list.append([re.split("\_",self.dataset_files[n])[0]]) ## add a label to every frame
        self._features = data_list
        self._labels = label_list

    

    def _assign_bodypart_to_node(self) -> None:
        # Method to assign bodypart name to a node

        def next_number(number_list):
            #helper method
            for x in number_list:
                yield x

        node_dict = {}
        dataframe = pd.read_csv(self.dataset_files_path[0]) ## read any file in the dataset
        bodypart_row = dataframe.iloc[0][3:] # extract row containing bodypart names
        bodyparts_num = int(len(bodypart_row)/2) # count bodyparts
        num_generator = next_number(list(range(bodyparts_num)))                     ## yeah, I know it's terrible, really
        for bp_idx in range(0,len(bodypart_row),2):
            bodypart = str(bodypart_row[bp_idx]) # extract bodypart name
            node_dict[bodypart] = next(num_generator) # assign bodypart to a node number
        self._node_ids = node_dict

    def _create_fc_graph_edges(self) -> None:
        # Method to create fully connected undirected graph.
        # Can only be called after _node_itds attribute is assigned
        edges = []
        for i in range(len(self._node_ids)):
            for j in range(len(self._node_ids)):
                edges.append((i,j))
        self._edges = edges
        
    def create_json_dataset(self, save_path :str) -> None:
        """Method to create .json file containing the dataset.
            Args types:
            * **save_path** *(str)* - Path where the file should be saved (ending with filename.json).
        """
        self._extract_features_and_timesteps()
        self._assign_bodypart_to_node()
        self._create_fc_graph_edges()

        create_graph_ds = {
        "edges" : self._edges,
        "node_ids" : self._node_ids,
        "features" : self._features,
        "labels" : self._labels
        }

        json_dataset = json.dumps(create_graph_ds)
        with open(save_path, "w") as outfile:
            outfile.write(json_dataset)

In [None]:
class SkeleonDatasetLoader(object):
    """Creates a loader for Dog Age keypoints video dataset. User can specify the encoding 
    of the labels (one hot, sparse or original). Sparse encoding assigns class numbers following the
    alphanumerical order.

       Args types:
        * **dataset_path:
"""
    def __init__(self,dataset_path: str, one_hot : bool = False, sparse : bool = True) -> None:
        self._dataset_path = dataset_path
        self._one_hot = one_hot
        self._sparse = sparse
        self._open_json_dataset()
        self._get_labels_and_features()


    def _open_json_dataset(self):
        self._dataset = json.load(open(self._dataset_path))

    def _get_edges(self):
        self._edges = np.array(self._dataset["edges"]).T

    def _get_edge_weights(self):
        self._edge_weights = np.ones(self._edges.shape[1])

    def _get_labels_and_features(self):
        self._features = np.array(self._dataset["features"])
        if self._one_hot is True:
            labels = np.array(self._dataset["labels"])
            self._labels = OneHotEncoder(sparse=False).fit_transform(labels)
        elif self._sparse is True:
            labels = np.array(self._dataset["labels"])
            self._labels = OrdinalEncoder().fit_transform(labels)
        else:
            self._labels = np.array(self._dataset["labels"])
        
    def get_dataset(self):
        """Returning the Dog age video keypoints data iterator.

        Return types:
            * **dataset** *(StaticGraphTemporalSignal)* - The Dog Age Video dataset.
        """
        self._get_edges()
        self._get_edge_weights()
        self._get_labels_and_features()
        dataset = StaticGraphTemporalSignal(
            self._edges, self._edge_weights, self._features, self._labels
        )
        return dataset

In [None]:
class RecurrentGCN(torch.nn.Module):
    def __init__(self, node_features):
        super(RecurrentGCN, self).__init__()
        self.recurrent = DCRNN(node_features, 128, 1)
        self.fc1 = torch.nn.Linear(128 * 4, 1)
        self.flatten = torch.nn.Flatten(start_dim=0)
        self.squeeze = torch.squeeze
    def forward(self, x, edge_index, edge_weight):
        h = self.recurrent(x, edge_index, edge_weight)
        h = F.relu(h)
        h = self.flatten(h)
        h = self.fc1(h)
        h = torch.sigmoid(h)
        return h

In [None]:
dataset_creator = CreateGraphDataset("dataset")
dataset_creator.create_json_dataset("json_datasets\sample.json")

In [None]:
new_loader = SkeleonDatasetLoader("json_datasets\sample.json", sparse= True, one_hot=False)
my_dataset = new_loader.get_dataset()

In [None]:
def get_accuracy(y_true, y_prob):
    """Binary accuracy calculation"""
    y_prob = np.array(y_prob)
    y_prob = np.where(y_prob <= 0.5, 0, y_prob)
    y_prob = np.where(y_prob > 0.5, 1, y_prob)

    accuracy = metrics.accuracy_score(y_true, y_prob)
    return accuracy

In [None]:
from tqdm import tqdm

model = RecurrentGCN(node_features = 2)

loss_fn =  nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

model.train()

for epoch in tqdm(range(500)):
    epoch_loss = 0.0
    preds = []
    ground_truth = []
    for time, snapshot in enumerate(my_dataset):
        y_hat = model(snapshot.x, snapshot.edge_index, snapshot.edge_attr)
        ##loss
        loss = loss_fn(y_hat,snapshot.y)
        epoch_loss += loss
        ## get preds & gorund truth
        preds.append(y_hat.detach().numpy())
        ground_truth.append(snapshot.y.numpy())
        ##backprop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    ## calculate acc
    acc = get_accuracy(ground_truth,preds)

    print(f'Epoch: {epoch}', f'Epoch accuracy: {acc}', f'Epoch loss: {epoch_loss.detach().numpy()}')


In [90]:

model.eval()
eval_loss = 0
preds = []
ground_truth = []
for time, snapshot in enumerate(my_dataset):
    y_hat = model(snapshot.x, snapshot.edge_index, snapshot.edge_attr)
    loss = loss_fn(y_hat,snapshot.y)
    eval_loss += loss.detach().numpy()
    preds.append(y_hat.detach().numpy())
    ground_truth.append(snapshot.y.numpy())

test_acc = get_accuracy(ground_truth,preds)
print( f'Test accuracy: {test_acc}', f'Test loss: {eval_loss}')

Test accuracy: 1.0 Test loss: 0.015035977237857878
