## Data Loaders

This notebook demonstrates the use of **graph loader** in `pyTigerGraph`. The job of a data loader is to pull data from the TigerGraph database. Currently, the following data loaders are provided:
* EdgeLoader, which returns batches of edges.
* VertexLoader, which returns batches of vertices.
* GraphLoader, which returns randomly sampled (probably disconnected) subgraphs in pandas `dataframe`, `PyG` or `DGL` format.
* NeighborLoader, which returns subgraphs using neighbor sampling in `dataframe`, `PyG` or `DGL` format.
* EdgeNeighborLoader, which returns subgraphs using neighbor sampling from edges in `dataframe`, `PyG` or `DGL` format.

Every data loader above can either get all the batches as a HTTP response (default) or stream every batch through Kafka. The former mechanism is good for testing with small graphs and it is fast, but it subjects to a data size limit of 2GB. For large graphs, the HTTP channel will likely fail due to size limit and network connectivity issues. Streaming via Kafka is offered for data robustness and scalability. Also, Kafka excels at multi-consumer use cases, and it is efficient for model search or hyperparameter tuning when there are multiiple consumers of the same data. 

The data loaders support both homogeneous and heterogenous graphs. By default, they load from all vertex and edge types and treat the graph as a homogeneous graph. But they also allow users to specify what vertex and edge types to load from and what attributes to load from each type. This way users will get heterogeneous graph outputs.



### Connection to Database

The `TigerGraphConnection` class represents a connection to the TigerGraph database. Under the hood, it stores the necessary information to communicate with the database. It is able to perform quite a few database tasks. Please see its [documentation](https://docs.tigergraph.com/pytigergraph/current/intro/) for details.

In [1]:
from pyTigerGraph import TigerGraphConnection
import json

# Read in DB configs
with open('../../config.json', "r") as config_file:
    config = json.load(config_file)
    
conn = TigerGraphConnection(
    host=config["host"],
    username=config["username"],
    password=config["password"]
)

from pyTigerGraph.datasets import Datasets

dataset = Datasets("Cora")

conn.ingestDataset(dataset, getToken=config["getToken"])

from pyTigerGraph.visualization import drawSchema

drawSchema(conn.getSchema(force=True))

A folder with name Cora already exists in ./tmp. Skip downloading.
---- Checking database ----
A graph with name Cora already exists in the database. Skip ingestion.
Graph name is set to Cora for this connection.


CytoscapeWidget(cytoscape_layout={'name': 'circle', 'animate': True, 'padding': 1}, cytoscape_style=[{'selectoâ€¦

### Edge Neighbor Loader

`EdgeNeighborLoader` performs neighbor sampling from all edges in the graph in batches in the following manner:

* It chooses a specified number (`batch_size`) of edges as seeds. The number of batches is the total number of edges divided by the batch size. 
  * If you specify the number of batches (`num_batches`) instead, `batch_size` is calculated by dividing the total number of edges by the number of batches.
  * If specify both parameters, `batch_size` takes priority. 
* Starting from the vertices attached to the seed edges, it picks a specified number (`num_neighbors`) of neighbors of each vertex at random.
* It picks the same number of neighbors for each neighbor, and repeats this process until it finished performing a specified number of hops (`num_hops`).
        
This generates one subgraph. As you loop through this data loader, every edge will at some point be chosen as a seed and you will get the subgraph expanded from the seeds. If you want to limit seeds to certain edges, the boolean attribute provided to `filter_by` will be used to indicate which edges can be included as seeds. If you want to load from certain types of vertices and edges, 
use the `dict` input for `v_in_feats`, `v_out_labels`, `v_extra_feats`,
`e_in_feats`, `e_out_labels`, `e_extra_feats` where keys of the dict are vertex 
or edge types to be selected and values are lists of attributes to collect from the
vertex or edge types. 

NOTE: When you initialize the loader on a graph for the first time,
the initialization might take a minute as it installs the corresponding
query to the database. However, the query installation only
needs to be done once, so it will take no time when you initialize the loader
on the same graph again.

    
Args:
* v_in_feats (list or dict, optional):
        Vertex attributes to be used as input features. 
        If it is a list, then the attributes
        in the list from all vertex types will be selected. An error will be thrown if
        certain attribute doesn't exist in all vertex types. If it is a dict, keys of the 
        dict are vertex types to be selected, and values are lists of attributes to be 
        selected for each vertex type.
        Only numeric and boolean attributes are allowed. The type of an attribute 
        is automatically determined from the database schema. Defaults to None.
* v_out_labels (list or dict, optional):
        Vertex attributes to be used as labels for prediction. 
        If it is a list, then the attributes
        in the list from all vertex types will be selected. An error will be thrown if
        certain attribute doesn't exist in all vertex types. If it is a dict, keys of the 
        dict are vertex types to be selected, and values are lists of attributes to be 
        selected for each vertex type.
        Only numeric and boolean attributes are allowed. Defaults to None.
* v_extra_feats (list or dict, optional):
        Other attributes to get such as indicators of train/test data. 
        If it is a list, then the attributes
        in the list from all vertex types will be selected. An error will be thrown if
        certain attribute doesn't exist in all vertex types. If it is a dict, keys of the 
        dict are vertex types to be selected, and values are lists of attributes to be 
        selected for each vertex type. 
        All types of attributes are allowed. Defaults to None.
* e_in_feats (list or dict, optional):
        Edge attributes to be used as input features. 
        If it is a list, then the attributes
        in the list from all edge types will be selected. An error will be thrown if
        certain attribute doesn't exist in all edge types. If it is a dict, keys of the 
        dict are edge types to be selected, and values are lists of attributes to be 
        selected for each edge type.
        Only numeric and boolean attributes are allowed. The type of an attribute
        is automatically determined from the database schema. Defaults to None.
* e_out_labels (list or dict, optional):
        Edge attributes to be used as labels for prediction. 
        If it is a list, then the attributes in the list from all edge types will be 
        selected. An error will be thrown if certain attribute doesn't exist in all 
        edge types. If it is a dict, keys of the dict are edge types to be selected, 
        and values are lists of attributes to be selected for each edge type.
        Only numeric and boolean attributes are allowed. Defaults to None.
* e_extra_feats (list or dict, optional):
        Other edge attributes to get such as indicators of train/test data. 
        If it is a list, then the attributes in the list from all edge types will be 
        selected. An error will be thrown if certain attribute doesn't exist in all 
        edge types. If it is a dict, keys of the dict are edge types to be selected, 
        and values are lists of attributes to be selected for each edge type.
        All types of attributes are allowed. Defaults to None.
* batch_size (int, optional):  
        Number of edges in each batch.  
        Defaults to None.  
* num_batches (int, optional):  
        Number of batches to split the edges.  
        Defaults to 1.  
* num_neighbors (int, optional):
        Number of neighbors to sample for each vertex.
        Defaults to 10.
* num_hops (int, optional):
        Number of hops to traverse when sampling neighbors.
        Defaults to 2.
* shuffle (bool, optional):  
        Whether to shuffle the edges before loading data.  
        Defaults to False.  
* filter_by (str, optional):
        A boolean attribute used to indicate which edges are included. Defaults to None.
* output_format (str, optional):
        Format of the output data of the loader. Only
        "dataframe" is supported. Defaults to "dataframe".
* loader_id (str, optional):
        An identifier of the loader which can be any string. It is
        also used as the Kafka topic name. If `None`, a random string will be generated
        for it. Defaults to None.
* buffer_size (int, optional):
        Number of data batches to prefetch and store in memory. Defaults to 4.
* timeout (int, optional):
        Timeout value for GSQL queries, in ms. Defaults to 300000.

# Testcase1: using edgeNeighborLoader with callback_fn to get batches of data.(for homogeneous graph)  
## Results: run successfully, data loaded completely

In [3]:
def process_batch(batch):
    return batch
neighbor_loader2 = conn.gds.edgeNeighborLoader(
    num_batches=10,
    num_neighbors = 10,
    num_hops =2,
    v_in_feats = ["x"],
    v_out_labels = ["y"],
    v_extra_feats = ["train_mask", "val_mask", "test_mask"],
    e_in_feats=["time"],
    e_out_labels=[],
    e_extra_feats=["is_train", "is_val"],
    output_format = "PyG",
    shuffle=False,
    filter_by=None,
    callback_fn = process_batch
)
for i, batch in enumerate(neighbor_loader2):
    print("----Batch {}----".format(i))
    print(batch)

----Batch 0----
Data(edge_index=[2, 8273], edge_feat=[8273], is_train=[8273], is_val=[8273], is_seed=[8273], x=[2412, 1433], y=[2412], train_mask=[2412], val_mask=[2412], test_mask=[2412])
----Batch 1----
Data(edge_index=[2, 8882], edge_feat=[8882], is_train=[8882], is_val=[8882], is_seed=[8882], x=[2492, 1433], y=[2492], train_mask=[2492], val_mask=[2492], test_mask=[2492])
----Batch 2----
Data(edge_index=[2, 8866], edge_feat=[8866], is_train=[8866], is_val=[8866], is_seed=[8866], x=[2468, 1433], y=[2468], train_mask=[2468], val_mask=[2468], test_mask=[2468])
----Batch 3----
Data(edge_index=[2, 8926], edge_feat=[8926], is_train=[8926], is_val=[8926], is_seed=[8926], x=[2473, 1433], y=[2473], train_mask=[2473], val_mask=[2473], test_mask=[2473])
----Batch 4----
Data(edge_index=[2, 8770], edge_feat=[8770], is_train=[8770], is_val=[8770], is_seed=[8770], x=[2454, 1433], y=[2454], train_mask=[2454], val_mask=[2454], test_mask=[2454])
----Batch 5----
Data(edge_index=[2, 8167], edge_feat=[8

# Testcase3: using edgeNeighborLoader with callback_fn to get batchs of data(for heterogeneous graph).  
## case details:using edgeNeighborLoader without callback_fn first, then using neighboLoader with the same paprams but set a callback_fn to get part of data.
## Results: run successfully, data loaded completely

Since `Cora` is a homogeneous graph, we will connect to a different graph to demostrate the use case of heterogeneous graphs.

In [4]:
conn.graphname="hetero"

# COMMENT OUT THE LINE BELOW if you are NOT using a graph that requires token authentication
conn.getToken(conn.createSecret())

('9jluampn6n064l74qkf8jop2mvhi5bfr', 1675161000, '2023-01-31 10:30:00')

In [5]:
neighbor_loader3 = conn.gds.edgeNeighborLoader(
    v_in_feats={"v0": ["x"],
                "v1": ["x"],
                "v2": ["x"]},
    v_out_labels={"v0": ["y"]},
    v_extra_feats={"v0": ["train_mask", "val_mask", "test_mask"]},
    e_extra_feats={"v0v0": ["is_train"], 
                   "v2v0": ["is_train"],
                   "v1v2": ["is_train"]},
    num_batches=16,
    num_neighbors=10,
    num_hops=2,
    shuffle=False,
    output_format="PyG",
    add_self_loop=False,
    loader_id=None,
    buffer_size=4,
)
for i, batch in enumerate(neighbor_loader3):
    print("----Batch {}----".format(i))
    print(batch)

----Batch 0----
HeteroData(
  [1mv0[0m={
    x=[76, 231],
    y=[76],
    train_mask=[76],
    val_mask=[76],
    test_mask=[76]
  },
  [1mv1[0m={ x=[42, 171] },
  [1mv2[0m={ x=[99, 144] },
  [1m(v0, v0v0, v0)[0m={
    edge_index=[2, 660],
    is_train=[660],
    is_seed=[660]
  },
  [1m(v1, v1v2, v2)[0m={
    edge_index=[2, 369],
    is_train=[369],
    is_seed=[369]
  },
  [1m(v2, v2v0, v0)[0m={
    edge_index=[2, 872],
    is_train=[872],
    is_seed=[872]
  }
)
----Batch 1----
HeteroData(
  [1mv0[0m={
    x=[76, 231],
    y=[76],
    train_mask=[76],
    val_mask=[76],
    test_mask=[76]
  },
  [1mv1[0m={ x=[50, 171] },
  [1mv2[0m={ x=[100, 144] },
  [1m(v0, v0v0, v0)[0m={
    edge_index=[2, 662],
    is_train=[662],
    is_seed=[662]
  },
  [1m(v1, v1v2, v2)[0m={
    edge_index=[2, 429],
    is_train=[429],
    is_seed=[429]
  },
  [1m(v2, v2v0, v0)[0m={
    edge_index=[2, 888],
    is_train=[888],
    is_seed=[888]
  }
)
----Batch 2----
HeteroData(
  [1mv

In [6]:
def process_batch(batch):
    return {"v0":batch["v0"]}
neighbor_loader4 = conn.gds.edgeNeighborLoader(
    v_in_feats={"v0": ["x"],
                "v1": ["x"],
                "v2": ["x"]},
    v_out_labels={"v0": ["y"]},
    v_extra_feats={"v0": ["train_mask", "val_mask", "test_mask"]},
    e_extra_feats={"v0v0": ["is_train"], 
                   "v2v0": ["is_train"],
                   "v1v2": ["is_train"]},
    num_batches=16,
    num_neighbors=10,
    num_hops=2,
    shuffle=False,
    output_format="PyG",
    add_self_loop=False,
    loader_id=None,
    buffer_size=4,
    callback_fn=process_batch
)
for i, batch in enumerate(neighbor_loader4):
    print("----Batch {}----".format(i))
    print(batch)

----Batch 0----
{'v0': {'x': tensor([[-0.1881, -0.3013,  1.2804,  ...,  0.3575, -1.1039, -1.0695],
        [ 0.2924,  0.9473, -1.0976,  ...,  0.7368, -0.8270, -0.8999],
        [-1.3932, -0.8158,  0.5243,  ...,  0.4610,  1.2749,  0.7621],
        ...,
        [-0.3560, -0.7603,  0.5492,  ...,  0.7666, -1.6951,  0.6725],
        [ 0.1390, -1.1011,  0.4353,  ...,  0.8184,  0.4806,  0.8013],
        [ 0.3864,  0.4018, -0.0926,  ...,  0.3499, -1.7556,  1.1516]],
       dtype=torch.float64), 'y': tensor([1, 3, 4, 1, 4, 9, 5, 7, 7, 5, 8, 8, 7, 0, 1, 4, 6, 7, 4, 8, 7, 1, 1, 1,
        9, 1, 5, 0, 4, 6, 9, 7, 0, 7, 6, 5, 3, 3, 8, 4, 8, 9, 4, 1, 7, 7, 1, 6,
        1, 9, 7, 2, 4, 1, 6, 5, 9, 2, 9, 8, 2, 6, 4, 9, 8, 1, 3, 8, 1, 2, 4, 9,
        7, 0, 9, 2]), 'train_mask': tensor([False, False, False, False, False, False, False, False, False, False,
        False, False, False, False, False, False, False, False, False, False,
        False, False, False, False, False, False, False, False, False, 

# Testcase4: using edgeNeighborLoader with callback_fn to loaddata(via Kafka).  
## Results: run successfully, data loaded completely

**Note**: Kafka streaming function is only available for the Enterprise Edition. You need to activate the Enterprise Edition to use it. 

In [7]:
conn.graphname="Cora"
# COMMENT OUT THE LINE BELOW if you are NOT using a graph that requires token authentication
conn.getToken(conn.createSecret())

('sr06met5bsgsnbdctmtr1v92gjqrvu0s', 1675161004, '2023-01-31 10:30:04')

#### Configure Kafka
Set up Kafka here. Once configured, the settings will be shared with all newly created data loaders and no need to set up Kafka for each loader. Please see official [doc](https://docs.tigergraph.com/pytigergraph/current/gds/gds#_configurekafka) for detailed settings.

In [8]:
conn.gds.configureKafka(kafka_address ="your_Kafka_address")

#### Get batches of vertices

In [9]:
def process_batch(batch):
    return batch
neighbor_loader5 = conn.gds.edgeNeighborLoader(
    num_batches=10,
    num_neighbors = 10,
    num_hops =2,
    v_in_feats = ["x"],
    v_out_labels = ["y"],
    v_extra_feats = ["train_mask", "val_mask", "test_mask"],
    e_in_feats=["time"],
    e_out_labels=[],
    e_extra_feats=["is_train", "is_val"],
    output_format = "PyG",
    shuffle=True,
    filter_by=None,
    callback_fn = process_batch
)
for i, batch in enumerate(neighbor_loader5):
    print("----Batch {}----".format(i))
    print(batch)



----Batch 0----
Data(edge_index=[2, 8246], edge_feat=[8246], is_train=[8246], is_val=[8246], is_seed=[8246], x=[2422, 1433], y=[2422], train_mask=[2422], val_mask=[2422], test_mask=[2422])
----Batch 1----
Data(edge_index=[2, 8823], edge_feat=[8823], is_train=[8823], is_val=[8823], is_seed=[8823], x=[2452, 1433], y=[2452], train_mask=[2452], val_mask=[2452], test_mask=[2452])
----Batch 2----
Data(edge_index=[2, 8889], edge_feat=[8889], is_train=[8889], is_val=[8889], is_seed=[8889], x=[2434, 1433], y=[2434], train_mask=[2434], val_mask=[2434], test_mask=[2434])
----Batch 3----
Data(edge_index=[2, 8723], edge_feat=[8723], is_train=[8723], is_val=[8723], is_seed=[8723], x=[2455, 1433], y=[2455], train_mask=[2455], val_mask=[2455], test_mask=[2455])
----Batch 4----
Data(edge_index=[2, 8535], edge_feat=[8535], is_train=[8535], is_val=[8535], is_seed=[8535], x=[2431, 1433], y=[2431], train_mask=[2431], val_mask=[2431], test_mask=[2431])
----Batch 5----
Data(edge_index=[2, 8062], edge_feat=[8