## Data Loaders

This notebook demonstrates the use of neighbor loader in `pyTigerGraph`. The job of a data loader is to pull data from the TigerGraph database. Currently, the following data loaders are provided:
* EdgeLoader, which returns batches of edges.
* VertexLoader, which returns batches of vertices.
* GraphLoader, which returns randomly sampled (probably disconnected) subgraphs in pandas `dataframe`, `PyG` or `DGL` format.
* NeighborLoader, which returns subgraphs using neighbor sampling in `dataframe`, `PyG` or `DGL` format.

Every data loader above can either get all the batches as a HTTP response (default) or stream every batch through Kafka. The former mechanism is good for testing with small graphs and it is fast, but it subjects to a data size limit of 2GB. For large graphs, the HTTP channel will likely fail due to size limit and network connectivity issues. Streaming via Kafka is offered for data robustness and scalability. Also, Kafka excels at multi-consumer use cases, and it is efficient for model search or hyperparameter tuning when there are multiiple consumers of the same data. 

Note: For the data loaders to work, a few UDFs (User Defined Functions) have to be installed into the TigerGraph database.

### Connection to Database

The `TigerGraphConnection` class represents a connection to the TigerGraph database. Under the hood, it stores the necessary information to communicate with the database. It is able to perform quite a few database tasks. Please see its documentation for details.

In [1]:
from pyTigerGraph import TigerGraphConnection

In [2]:
conn = TigerGraphConnection(
    host="http://127.0.0.1", # Change the address to your database server's
    graphname="Cora",
    username="tigergraph",
    password="tigergraph",
    useCert=False
)

In [3]:
# Number of vertices for every vertex type
conn.getVertexCount('*')

{'Paper': 2708}

In [4]:
# Number of edges for every type
conn.getEdgeCount()

{'Cite': 10556}

### Neighbor Loader

NeighborLoader performs neighbor sampling as introduced in [Inductive Representation Learning on Large Graphs](https://arxiv.org/abs/1706.02216) and returns neighborhood subgraphs. Hence, the subgraphs from this loader are connected. 

Specifically, the loader first chooses `batch_size` number of vertices as seeds, then picks `num_neighbors` number of neighbors of each seed at random, then `num_neighbors` neighbors of each neighbor, and repeat for `num_hops`. This generates one subgraph. As you loop through this data loader, every vertex will at some point be chosen as a seed and you will get the subgraph expanded from the seed. If you want to limit seeds to certain vertices, the boolean attribute provided to `filter_by` will be used to indicate which vertices can be included as seeds.

**Note**: For the first time you initialize the loader on a graph in TigerGraph,
the initialization might take a minute as it installs the corresponding
query to the database and optimizes it. However, the query installation only
needs to be done once, so it will take no time when you initialize the loader
on the same TG graph again.

There are two ways to use the data loader. See
[here](https://github.com/TigerGraph-DevLabs/mlworkbench-docs/blob/main/tutorials/basics/2_dataloaders.ipynb)
for examples.
* First, it can be used as an iterable, which means you can loop through
  it to get every batch of data. If you load all data at once (`num_batches=1`),
  there will be only one batch (of all the data) in the iterator.
* Second, you can access the `data` property of the class directly. If there is
  only one batch of data to load, it will give you the batch directly instead
  of an iterator, which might make more sense in that case. If there are
  multiple batches of data to load, it will return the loader itself.
    
Args:
* attributes (list, optional):
        Edge attributes to be included. Defaults to None.
* batch_size (int, optional):  
        Number of edges in each batch.  
        Defaults to None.  
* num_batches (int, optional):  
        Number of batches to split the edges.  
        Defaults to 1.  
* num_neighbors (int, optional):
        Number of neighbors to sample for each vertex.
        Defaults to 10.
* num_hops (int, optional):
        Number of hops to traverse when sampling neighbors.
        Defaults to 2.
* shuffle (bool, optional):  
        Whether to shuffle the edges before loading data.  
        Defaults to False.  
* filter_by (str, optional):
        A boolean attribute used to indicate which edges are included. Defaults to None.
* output_format (str, optional):
        Format of the output data of the loader. Only
        "dataframe" is supported. Defaults to "dataframe".
* loader_id (str, optional):
        An identifier of the loader which can be any string. It is
        also used as the Kafka topic name. If `None`, a random string will be generated
        for it. Defaults to None.
* buffer_size (int, optional):
        Number of data batches to prefetch and store in memory. Defaults to 4.
* kafka_address (str, optional):
        Address of the kafka broker. Defaults to None.
* kafka_max_msg_size (int, optional):
        Maximum size of a Kafka message in bytes.
        Defaults to 104857600.
* kafka_num_partitions (int, optional):
        Number of partitions for the topic created by this loader.
        Defaults to 1.
* kafka_replica_factor (int, optional):
        Number of replications for the topic created by this
        loader. Defaults to 1.
* kafka_retention_ms (int, optional):
        Retention time for messages in the topic created by this
        loader in milliseconds. Defaults to 60000.
* kafka_auto_del_topic (bool, optional):
        Whether to delete the Kafka topic once the
        loader finishes pulling data. Defaults to True.
* kafka_address_consumer (str, optional):
        Address of the kafka broker that a consumer
        should use. Defaults to be the same as `kafkaAddress`.
* kafka_address_producer (str, optional):
        Address of the kafka broker that a producer
        should use. Defaults to be the same as `kafkaAddress`.
* timeout (int, optional):
        Timeout value for GSQL queries, in ms. Defaults to 300000.

#### Get subgraphs through http

In [7]:
%%time
neighbor_loader = conn.gds.neighborLoader(
    num_batches=10,
    num_neighbors = 10,
    num_hops =2,
    v_in_feats = ["x"],
    v_out_labels = ["y"],
    v_extra_feats = ["train_mask", "val_mask", "test_mask"],
    e_in_feats=["time"],
    e_out_labels=[],
    e_extra_feats=["is_train", "is_val"],
    output_format = "PyG",
    shuffle=True,
    filter_by=None
)

Installing and optimizing queries. It might take a minute if this is the first time you use this loader.
Query installation finished.
CPU times: user 24.8 ms, sys: 8.31 ms, total: 33.1 ms
Wall time: 42.4 s


In [8]:
%%time
for i, batch in enumerate(neighbor_loader):
    print("----Batch {}----".format(i))
    print(batch)

  from .autonotebook import tqdm as notebook_tqdm


----Batch 0----
Data(edge_index=[2, 12710], edge_feat=[12710], is_train=[12710], is_val=[12710], x=[2055, 1433], y=[2055], train_mask=[2055], val_mask=[2055], test_mask=[2055], is_seed=[2055])
----Batch 1----
Data(edge_index=[2, 11142], edge_feat=[11142], is_train=[11142], is_val=[11142], x=[2035, 1433], y=[2035], train_mask=[2035], val_mask=[2035], test_mask=[2035], is_seed=[2035])
----Batch 2----
Data(edge_index=[2, 12686], edge_feat=[12686], is_train=[12686], is_val=[12686], x=[2105, 1433], y=[2105], train_mask=[2105], val_mask=[2105], test_mask=[2105], is_seed=[2105])
----Batch 3----
Data(edge_index=[2, 12012], edge_feat=[12012], is_train=[12012], is_val=[12012], x=[2072, 1433], y=[2072], train_mask=[2072], val_mask=[2072], test_mask=[2072], is_seed=[2072])
----Batch 4----
Data(edge_index=[2, 11406], edge_feat=[11406], is_train=[11406], is_val=[11406], x=[2047, 1433], y=[2047], train_mask=[2047], val_mask=[2047], test_mask=[2047], is_seed=[2047])
----Batch 5----
Data(edge_index=[2,

#### Get subgraphs through kafka

In [9]:
%%time
neighbor_loader = conn.gds.neighborLoader(
    num_batches=10,
    num_neighbors = 10,
    num_hops =2,
    v_in_feats = ["x"],
    v_out_labels = ["y"],
    v_extra_feats = ["train_mask", "val_mask", "test_mask"],
    e_in_feats=["time"],
    e_out_labels=[],
    e_extra_feats=["is_train", "is_val"],
    output_format = "PyG",
    shuffle=True,
    filter_by=None,
    kafka_address="127.0.0.1:9092"
)



Installing and optimizing queries. It might take a minute if this is the first time you use this loader.
Query installation finished.
CPU times: user 41.3 ms, sys: 10 ms, total: 51.3 ms
Wall time: 30.1 s


In [10]:
%%time
for i, batch in enumerate(neighbor_loader):
    print("----Batch {}----".format(i))
    print(batch)

----Batch 0----
Data(edge_index=[2, 11032], edge_feat=[11032], is_train=[11032], is_val=[11032], x=[2042, 1433], y=[2042], train_mask=[2042], val_mask=[2042], test_mask=[2042], is_seed=[2042])
----Batch 1----
Data(edge_index=[2, 11586], edge_feat=[11586], is_train=[11586], is_val=[11586], x=[2095, 1433], y=[2095], train_mask=[2095], val_mask=[2095], test_mask=[2095], is_seed=[2095])
----Batch 2----
Data(edge_index=[2, 13256], edge_feat=[13256], is_train=[13256], is_val=[13256], x=[2116, 1433], y=[2116], train_mask=[2116], val_mask=[2116], test_mask=[2116], is_seed=[2116])
----Batch 3----
Data(edge_index=[2, 12180], edge_feat=[12180], is_train=[12180], is_val=[12180], x=[2063, 1433], y=[2063], train_mask=[2063], val_mask=[2063], test_mask=[2063], is_seed=[2063])
----Batch 4----
Data(edge_index=[2, 11840], edge_feat=[11840], is_train=[11840], is_val=[11840], x=[2044, 1433], y=[2044], train_mask=[2044], val_mask=[2044], test_mask=[2044], is_seed=[2044])
----Batch 5----
Data(edge_index=[2,