# Exploring how the TorchData API works with TigerGraph data

### by Doris Voina (dorisvoina@gmail.com)

#### What is TorchData?
#### TorchData (or "torchdata") is part of Pytorch 1.11 and enables data loading and building (see [link to documentation](https://pytorch.org/data/main/tutorial.html)) . According to the documentation, 
>TorchData is a library of common modular data loading primitives for easily constructing flexible and performant data pipelines.

#### The modular data loading primitives of torchdata are capable of accomplishing a variety of functions:
- FileLister: lists out files in a directory
- Filter: filters the elements in DataPipe based on a given function
- FileOpener: consumes file paths and returns opened file streams
- Mapper: Applies a function over each item from the source DataPipe 

**This example is a tutorial that showcases how to use a TigerGraph dataset with a data loading implementation using PyTorch's torchdata in order to solve a Machine Learning problem with Cora, a well-known graph dataset of papers and their citations.**

Let's use torchdata by looking at a particular problem: 
starting with a graph, a common problem is classifying nodes of the graph. A common approach is to consider node features and then classify nodes according to these features, using say a neural network. While we can use the features provided in the dataset, we can further enrich these features by adding node properties in the graph, such as for instance pagerank, a property whereby the number and quality of conenctions to a node are counted in order to determine a rough estimate of how important the node is. The underlying assumption of pagerank is that more important nodes are likely to receive more connections from other nodes.  

**Summary**
- import the Cora dataset from TigerGraph using torchdata's HttpReader: data is node features (occurrence of words in a paper), the pagerank feature, and labels; 
- build a data loder by shuffling, batching, collating, etc. data
- train a simple feedforward neural network on the data


#### Let's dig in!

In [1]:
import pyTigerGraph as tg

#import tgml

import torch

import torchdata
from torchdata.datapipes.iter import (
    IterableWrapper,
    IterKeyZipper,
    IterDataPipe,
    HttpReader,
    Zipper,
    Mapper,
    Shuffler,
    Sampler,
    Batcher,
    Collator
)

import random
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data 

import json

  from .autonotebook import tqdm as notebook_tqdm


## Get features: pagerank + other features...
### Read the pagerank feature with torchdata's HttpReader*

*see Appendix about the GSQL queries used with HttpReader

set parameters of pagerank algorithm

In [None]:
params = {'v_type': 'PAPER', 'e_type': 'CITE', 'max_change': 0.001, 'max_iter': 25, 'damping': 0.85,
         'top_k': 100, 'print_accum': True, 'result_attr':'pagerank','file_path':'','display_edges': False}

In [None]:
out_pr = HttpReader(IterableWrapper(["http://35.230.92.92:14240/restpp/query/Cora/tg_pagerank?v_type=Paper&e_type=Cite&max_change=0.001&max_iter=25&damping=0.85&top_k=2708&print_accum=True&result_attr=pagerank&display_edges=False"]))

In [None]:
@torch.utils.data.functional_datapipe('process_data')
class HttpReader_processing(IterDataPipe):
    # A custom DataPipe to load and parse mesh data into PyG data objects.
    def __init__(self, out: IterDataPipe):
        super().__init__()
        self.out = out

    def __iter__(self):
            
        reader_dp = self.out.readlines()
        it = iter(reader_dp)
        path, line = next(it)

        out = json.loads(line.decode("utf8"))
    
        yield out

In [None]:
out_pr = out_pr.process_data()
out_pagerank = next(iter(out_pr))
print(out_pagerank['results'][0]['@@top_scores_heap'][:5])

create dictionary with key = vertex ID, value = real scalar which is pagerank

In [None]:
list_of_pr = out_pagerank['results'][0]['@@top_scores_heap']

dict_pr = {}
for d in list_of_pr:
    dict_pr[d["Vertex_ID"]] = d["score"]

### Read other node features + labels with torchdata's HttpReader

In [None]:
out = HttpReader(IterableWrapper(["http://35.230.92.92:14240/restpp/query/Cora/vertex_hloader_x_y_train_mask_val_mask_test_mask"]))

In [None]:
out = out.process_data()
out_features = next(iter(out))
out_features = out_features["results"][0]["vertex_batch"]

create a pandas dataframe to be used in the neural network later...

In [None]:
df_td = pd.DataFrame(columns = ["v_id", "x", "y", "train_mask", "val_mask", "test_mask", "pagerank"])

for ind, v_id in enumerate(out_features):
    
    split_line = out_features[v_id][:-1].split(",")
    x_line = split_line[1][:-1]
    x = [int(x_line.split(" ")[i]) for i in range(len(x_line.split(" ")))]
    y = int(split_line[2])
    train_mask = int(split_line[5])
    val_mask = int(split_line[4])
    test_mask = int(split_line[3])
    
    df_td.loc[ind] = [v_id, x, y, train_mask, val_mask, test_mask, dict_pr[v_id]]

In [None]:
df_td.head()

In [None]:
df_paper = df_td

## Using torchdata for shuffling, batching, collating, etc.
### a specific example of using it

choose how much percentage of the data is training and testing, respectively.

In [None]:
train_perc = 0.7
valid_perc = 0.15

In [None]:
def coll_fn(batch):
    
    xs = [sample[0] for sample in batch]
    ys = [sample[1] for sample in batch] 
    
    return torch.tensor(xs), torch.tensor(ys)

In [None]:
def sample_fn(n):
    r = random.random()
    if r<=train_perc:
        return 0
    elif r<train_perc+valid_perc:
        return 1
    else:
        return 2

Creating our own data_loader function that applies a Batcher, Shuffler, and Collator. Using these data primitives, the data loader 
is easily customizable.

In [None]:
def data_loader(data_x, data_y, shuffle, batch_sz, collator_fn = coll_fn):   #working

    df_x = IterableWrapper(data_x)
    df_y = IterableWrapper(data_y)
    data_xy = Zipper(df_x, df_y)
    
    train_set, valid_set, test_set = data_xy.demux(num_instances=3, classifier_fn=sample_fn)

    data_xy = data_xy.batch(batch_sz).collate(coll_fn)
    train_set = train_set.batch(batch_sz).collate(coll_fn)
    valid_set = valid_set.batch(batch_sz).collate(coll_fn)
    test_set = test_set.batch(batch_sz).collate(coll_fn)
    
    if shuffle:
        data_xy = data_xy.shuffle()
        train_set = train_set.shuffle()
        valid_set = valid_set.shuffle()
        test_set = test_set.shuffle()
         
    return train_set, valid_set, test_set

add pagerank feature

In [None]:
frame = {'x': df_paper.x}
df_x = pd.DataFrame(frame) #save in new dataframe so as not to modify df_paper (in case we need it later)

list_feature = []
for pr, x in zip(df_paper.pagerank, df_x.x):
    x.append(pr)

In [None]:
shuffle = True
batch_size = 5
train_set, valid_set, test_set = data_loader(df_x.x, df_paper.y, shuffle, batch_size)

# Train simple linear neural network

### on data with graph feature pagerank

create a simple feedforward network that has 2 linear hidden layers and applies the ReLU non-linearity

In [None]:
class simple_NN(nn.Module):
    def __init__(self, input_size, hidden_dim, output_size):
        super(simple_NN, self).__init__()

        self.linear1 = nn.Linear(input_size, hidden_dim)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(hidden_dim, output_size)

    def forward(self, x):

        x = x.double()
        sz = x.size()[1]
        out = self.linear1(x)
        out = self.relu(out)
        out = self.linear2(out)

        return out


In [None]:
cuda = torch.cuda.is_available()

In [None]:
input_size = len(df_x.x.iloc[0])
hidden_dim = 500
output_size = len(df_paper.y.unique())

model = simple_NN(input_size, hidden_dim, output_size)

model.double()
if cuda:
    model.cuda()

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
cuda = torch.cuda.is_available()

seed = 15
torch.manual_seed(seed)
if cuda:
    torch.cuda.manual_seed(args.seed)

kwargs = {'num_workers': 1, 'pin_memory': True} if torch.cuda.is_available() else {}

In [None]:
"""
save_path = "models_and_results"
project = "cora_classification_w/_simple_net"

save_path =  '/'.join([save_path, ])
if not os.path.exists(save_path):
        os.makedirs(save_path)
"""

choose optimizer algorithm (ADAM) and loss function (Cross Entropy loss)

In [None]:
optimizer = optim.Adam(model.parameters())
error = nn.CrossEntropyLoss()

training and validation functions

In [None]:
def train(model, dataT, dataV, epochs):
    model.train()

    acc_test_duringL = []
    acc_train_duringL = []
    
    for epoch in range(epochs):
        for batch_idx, (data, target) in enumerate(dataT):
            if cuda:
                data, target = data.cuda(), target.cuda()

            optimizer.zero_grad()
            output = model(data)
            predicted = torch.max(output.data, 1)[1]
            loss = error(output, target)
            loss.backward()
            optimizer.step()
            acc_train_duringL.append(float((predicted.to(device) == target).sum())/len(target))
        
            if batch_idx % 10 == 0:
                print('Train Epoch: {} batch #: {}, accuracy: {:.6f}, Loss: {:.6f}'.format(
                    epoch, batch_idx, float((predicted.to(device) == target).sum())/len(target), loss.item()))
    
                #acc = validate(model, dataV)
                #acc_test_duringL.append(acc)
            
        
    return acc_train_duringL
    #, acc_test_duringL

In [None]:
def validate(model, dataV):
    
    model.eval()
    val_loss = 0
    correct = 0
    total = 0
            
    for data, target in dataV:
        
        if cuda:
            data, target = data.cuda(), target.cuda()
        output = model(data)
        # sum up batch loss
        val_loss += F.nll_loss(output, target, size_average=False).item()

        pred = output.data.max(1, keepdim=True)[1]
        correct += pred.eq(target.data.view_as(pred)).long().cpu().sum()
        total += len(target)

    val_loss /= total
    print('\nValidation set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        val_loss, correct, total,
        100. * correct / total))

    return 100. * correct / total

training our feedforwrd neural network and printing the test data accuracy at the end...

In [None]:
acc_train_duringL = train(model, train_set, test_set, 1)
acc_test = validate(model, test_set)

In [None]:
print("final test accuracy is {}%!".format(acc_test))

# Appendix

### GSQL query: tg_pagerank

```
CREATE QUERY tg_pagerank (STRING v_type, STRING e_type,
 FLOAT max_change=0.001, INT max_iter=25, FLOAT damping=0.85, INT top_k = 100,
 BOOL print_accum = TRUE, STRING result_attr =  "", STRING file_path = "",
 BOOL display_edges = FALSE) SYNTAX V2 {
 ```



/*
 Compute the pageRank score for each vertex in the GRAPH
 In each iteration, compute a score for each vertex:
     score = (1-damping) + damping*sum(received scores FROM its neighbors).
 The pageRank algorithm stops when either of the following is true:
 a) it reaches max_iter iterations;
 b) the max score change for any vertex compared to the last iteration <= max_change.
 v_type: vertex types to traverse          print_accum: print JSON output
 e_type: edge types to traverse            result_attr: INT attr to store results to
 max_iter; max #iterations                 file_path: file to write CSV output to
 top_k: #top scores to output              display_edges: output edges for visualization
 max_change: max allowed change between iterations to achieve convergence
 damping: importance of traversal vs. random teleport

 This query supports only taking in a single edge for the time being (8/13/2020).
*/

```
TYPEDEF TUPLE<VERTEX Vertex_ID, FLOAT score> Vertex_Score;   \  
HeapAccum<Vertex_Score>(top_k, score DESC) @@top_scores_heap;  
MaxAccum<FLOAT> @@max_diff = 9999;    # max score change in an iteration  
SumAccum<FLOAT> @sum_recvd_score = 0; # sum of scores each vertex receives FROM neighbors  
SumAccum<FLOAT> @sum_score = 1;           # initial score for every vertex is 1.  
SetAccum<EDGE> @@edge_set;             # list of all edges, if display is needed  
FILE f (file_path);  
```

#PageRank iterations

```
Start = {v_type};                     # Start with all vertices of specified type(s)
WHILE @@max_diff > max_change 
    LIMIT max_iter DO
        @@max_diff = 0;
    V = SELECT s
	FROM Start:s -(e_type:e)- v_type:t
	ACCUM 
            t.@sum_recvd_score += s.@sum_score/(s.outdegree(e_type)) 
	POST-ACCUM 
            s.@sum_score = (1.0-damping) + damping * s.@sum_recvd_score,
	    s.@sum_recvd_score = 0,
	    @@max_diff += abs(s.@sum_score - s.@sum_score');
END; # END WHILE loop`
```

#Output

```
IF file_path != "" THEN
    f.println("Vertex_ID", "PageRank");
END;
V = SELECT s 
    FROM Start:s
    POST-ACCUM 
        IF result_attr != "" THEN 
            s.setAttr(result_attr, s.@sum_score) 
        END,
   
    IF file_path != "" THEN 
            f.println(s, s.@sum_score) 
        END,
  
    IF print_accum THEN 
            @@top_scores_heap += Vertex_Score(s, s.@sum_score) 
        END;

IF print_accum THEN
    PRINT @@top_scores_heap;
    IF display_edges THEN
        PRINT Start[Start.@sum_score];
    Start = SELECT 
        FROM Start:s -(e_type:e)- v_type:t
            ACCUM @@edge_set += e;
        PRINT @@edge_set;
    END;
END;
}
```

### GSQL query to load node features and labels: vertex_hloader_x_y_train_mask_val_mask_test_mask

```
CREATE QUERY vertex_hloader_x_y_train_mask_val_mask_test_mask(
    SET<VERTEX> input_vertices,
    INT num_batches=1, 
    BOOL shuffle=FALSE,
    STRING filter_by
){
    /*
    This query generates batches of vertices. If `input_vertices` is given, it will generate 
    a batches of those vertices. Otherwise, it will divide all vertices into `num_batches`, 
    and return each batch separately.

    Parameters :
      input_vertices : What vertices to get.
      num_batches    : Number of batches to divide all vertices.
      shuffle        : Whether to shuffle vertices before collecting data.
      filter_by      : A Boolean attribute to determine which vertices are included.
                       Only effective when `input_vertices` is NULL.
    */
    INT num_vertices;
    SumAccum<INT> @tmp_id;

    # Shuffle vertex ID if needed
    start = {ANY};
    IF shuffle THEN
        num_vertices = start.size();
        res = SELECT s 
              FROM start:s
              POST-ACCUM s.@tmp_id = floor(rand()*num_vertices);
    ELSE
        res = SELECT s 
              FROM start:s
              POST-ACCUM s.@tmp_id = getvid(s);
    END;

    # Generate batches
    FOREACH batch_id IN RANGE[0, num_batches-1] DO
        MapAccum<VERTEX, STRING> @@v_batch;
        IF input_vertices.size()==0 THEN
            start = {ANY};
            IF filter_by IS NOT NULL THEN
                seeds = SELECT s 
                        FROM start:s 
                        WHERE s.getAttr(filter_by, "BOOL") and s.@tmp_id % num_batches == batch_id
                        POST-ACCUM @@v_batch += (s -> (int_to_string(getvid(s)) + "," + int_to_string(s.x)+","+int_to_string(s.y)+","+bool_to_string(s.train_mask)+","+bool_to_string(s.val_mask)+","+bool_to_string(s.test_mask) + "\n"));
            ELSE
                seeds = SELECT s 
                        FROM start:s 
                        WHERE s.@tmp_id % num_batches == batch_id
                        POST-ACCUM @@v_batch += (s -> (int_to_string(getvid(s)) + "," + int_to_string(s.x)+","+int_to_string(s.y)+","+bool_to_string(s.train_mask)+","+bool_to_string(s.val_mask)+","+bool_to_string(s.test_mask) + "\n"));
            END;
        ELSE
            start = input_vertices;
            seeds = SELECT s 
                    FROM start:s 
                    POST-ACCUM @@v_batch += (s -> (int_to_string(getvid(s)) + "," + int_to_string(s.x)+","+int_to_string(s.y)+","+bool_to_string(s.train_mask)+","+bool_to_string(s.val_mask)+","+bool_to_string(s.test_mask) + "\n"));
        END;
        # Add to response
        PRINT @@v_batch AS vertex_batch;  
    END;
}
```