# Source:
- Pyg highlevel documentation:
    - https://pytorch-geometric.readthedocs.io/en/latest/advanced/remote.html#feature-store
<br><br>
- Pyg Test of feature store: 
    - https://github.com/pyg-team/pytorch_geometric/blob/901a255346009c7294fd3cc1e825aa441f1dbd4f/torch_geometric/testing/feature_store.py
<br><br>
- Youtube pyg batch video:
    - https://www.youtube.com/watch?v=mz9xYNg9Ofs
    
    
---- 
Actually 2.2.0 tagged version
https://github.com/pyg-team/pytorch_geometric/tree/2.2.0

In [15]:
import sys
import platform
import torch_geometric

print("Platform", platform.system(), platform.release())
print("Python version",sys.version)

print("torch",torch.__version__)
print("torch_geomeric",torch_geometric.__version__)



Platform Darwin 22.3.0
Python version 3.8.12 (default, Jul 12 2022, 16:17:42) 
[Clang 13.1.6 (clang-1316.0.21.2.5)]
torch 1.13.1
torch_geomeric 2.2.0


In [116]:
from typing import Dict, List, Optional, Tuple

import torch
from torch import Tensor

## FEATURE STORE
from torch_geometric.data.feature_store import FeatureStore, TensorAttr
from torch_geometric.typing import FeatureTensorType

## GRAPH STORE
from torch_geometric.data.graph_store import EdgeAttr, GraphStore
from torch_geometric.typing import EdgeTensorType

## SAMPLER 
#from torch_geometric.sampler import BaseSampler, NeighborSampler
#from torch_geometric.loader import NeighborSampler
# THERE EXISTS TWO FUCKING SAMPLERS
# torch_geometric.loader.NeighborSampler
# torch_geometric.sampler.NeighborSampler

## TEST DATA 
from torch_geometric.utils import erdos_renyi_graph
from torch_sparse import SparseTensor

## Implement custom feature and graph store as dictionaries

In [137]:

class MyFeatureStore(FeatureStore):
    def __init__(self):
        super().__init__()
        self.store: Dict[Tuple[str, str], Tensor] = {}

    @staticmethod
    def key(attr: TensorAttr) -> str:
        return (attr.group_name, attr.attr_name)

    def _put_tensor(self, tensor: FeatureTensorType, attr: TensorAttr) -> bool:
        index = attr.index

        # None indices define the obvious index:
        if index is None:
            index = torch.arange(0, tensor.shape[0])

        # Store the index:
        self.store[MyFeatureStore.key(attr)] = (index, tensor)

        return True

    def _get_tensor(self, attr: TensorAttr) -> Optional[FeatureTensorType]:
        index, tensor = self.store.get(MyFeatureStore.key(attr), (None, None))
        if tensor is None:
            return None

        # None indices return the whole tensor:
        if attr.index is None:
            return tensor

        # Empty slices return the whole tensor:
        if (isinstance(attr.index, slice)
                and attr.index == slice(None, None, None)):
            return tensor

        idx = (torch.cat([(index == v).nonzero() for v in attr.index]).view(-1)
               if attr.index.numel() > 0 else [])
        return tensor[idx]

    def _remove_tensor(self, attr: TensorAttr) -> bool:
        del self.store[MyFeatureStore.key(attr)]
        return True

    def _get_tensor_size(self, attr: TensorAttr) -> Tuple:
        return self._get_tensor(attr).size()

    def get_all_tensor_attrs(self) -> List[str]:
        return [TensorAttr(*key) for key in self.store.keys()]

    def __len__(self):
        # TODO
        return(1)


In [138]:

class MyGraphStore(GraphStore):
    def __init__(self):
        super().__init__()
        self.store: Dict[EdgeAttr, Tuple[Tensor, Tensor]] = {}

    @staticmethod
    def key(attr: EdgeAttr) -> str:
        return (attr.edge_type, attr.layout.value, attr.is_sorted, attr.size)

    def _put_edge_index(self, edge_index: EdgeTensorType,
                        edge_attr: EdgeAttr) -> bool:
        self.store[MyGraphStore.key(edge_attr)] = edge_index

    def _get_edge_index(self, edge_attr: EdgeAttr) -> Optional[EdgeTensorType]:
        return self.store.get(MyGraphStore.key(edge_attr), None)

    def get_all_edge_attrs(self):
        return [EdgeAttr(*key) for key in self.store]

In [200]:
class MySampler(torch_geometric.sampler.BaseSampler):
    r"""An abstract base class that initializes a graph sampler and provides
    :meth:`sample_from_nodes` and :meth:`sample_from_edges` routines.
    .. note ::
        Any data stored in the sampler will be *replicated* across data loading
        workers that use the sampler since each data loading worker holds its
        own instance of a sampler.
        As such, it is recommended to limit the amount of information stored in
        the sampler.
    """
    def sample_from_nodes(
        self,
        index: NodeSamplerInput,
        **kwargs,
    ) -> Union[HeteroSamplerOutput, SamplerOutput]:
        r"""Performs sampling from the nodes specified in :obj:`index`,
        returning a sampled subgraph in the specified output format.
        The :obj:`index` is a tuple holding the following information:
        1. The example indices of the seed nodes
        2. The node indices to start sampling from
        3. The timestamps of the given seed nodes (optional)
        Args:
            index (NodeSamplerInput): The node sampler input object.
        """
        raise NotImplementedError

    def sample_from_edges(
        self,
        index: EdgeSamplerInput,
        neg_sampling: Optional[NegativeSampling] = None,
    ) -> Union[HeteroSamplerOutput, SamplerOutput]:
        r"""Performs sampling from the edges specified in :obj:`index`,
        returning a sampled subgraph in the specified output format.
        The :obj:`index` is a tuple holding the following information:
        1. The example indices of the seed links
        2. The source node indices to start sampling from
        3. The destination node indices to start sampling from
        4. The labels of the seed links (optional)
        5. The timestamps of the given seed nodes (optional)
        Args:
            index (EdgeSamplerInput): The edge sampler input object.
            neg_sampling (NegativeSampling, optional): The negative sampling
                configuration. (default: :obj:`None`)
        """
        raise NotImplementedError

    @property
    def edge_permutation(self) -> Union[OptTensor, Dict[EdgeType, OptTensor]]:
        r"""If the sampler performs any modification of edge ordering in the
        original graph, this function is expected to return the permutation
        tensor that defines the permutation from the edges in the original
        graph and the edges used in the sampler. If no such permutation was
        applied, :obj:`None` is returned. For heterogeneous graphs, the
        expected return type is a permutation tensor for each edge type."""
        return None

NameError: name 'NodeSamplerInput' is not defined

### Actual tests
- Feature_store:
    - https://github.com/pyg-team/pytorch_geometric/blob/2.2.0/test/data/test_feature_store.py
- Graph_store:
    - https://github.com/pyg-team/pytorch_geometric/blob/2.2.0/test/data/test_graph_store.py

In [199]:
adj

SparseTensor(row=tensor([0, 1]),
             col=tensor([1, 2]),
             size=(2, 3), nnz=2, density=33.33%)

In [None]:
group_name = 'index'
attr_name = 'index'
index = torch.tensor([0, 1])
attr_graph = TensorAttr(group_name, attr_name, index)

graph_store.put_edge_index(edge_index, attr_graph)

True

In [255]:
# Feature store - heterogenous
feature_store = MyFeatureStore()
tensor_a = torch.Tensor([[0, 0, 0], [1, 1, 1], [2, 2, 2]])
tensor_b = torch.Tensor([[0, 0, 0], [1, 1, 1], [2, 2, 2]])


group_name_a = 'A'
attr_name_a = 'feat_a'
group_name_b = 'B'
attr_name_b = 'feat_b'

index_a = torch.tensor([0, 1, 2])
index_b = torch.tensor([0, 1, 2])
attr_a = TensorAttr(group_name_a, attr_name_a, index_a)
attr_b = TensorAttr(group_name_b, attr_name_b, index_b)


feature_store.put_tensor(tensor_a, attr_a)
feature_store.put_tensor(tensor_b, attr_b)

# Graph store - heterogenous
graph_store = MyGraphStore()
edge_index_ab = torch.LongTensor([(0, 1), (1, 2), (2,0),(0,2)])
#adj = SparseTensor(row=edge_index_ab[0], col=edge_index_ab[1])
coo = (edge_index_ab[0], edge_index_ab[1])


edge_attr_ab = torch_geometric.data.graph_store.EdgeAttr(
    edge_type = ("A","link_name_ab","B"),
    layout = "csr",
    is_sorted = False,
    size = (2,2))


graph_store.put_edge_index(edge_index = coo,
                           edge_type=('A', '1', 'B'),
                            layout='coo', 
                            size=(2, 2),
                           is_sorted=False
                          )


node_sampler = torch_geometric.sampler.NeighborSampler((feature_store,graph_store), num_neighbors=[1],input_type="csr")


loader = torch_geometric.loader.NodeLoader(
    data=(feature_store, graph_store),
    node_sampler=node_sampler,
    batch_size=1,
    input_nodes='A',
)

for batch in loader:
    pass

IndexError: phmap at(): lookup non-existent key

In [225]:
# Feature store
feature_store = MyFeatureStore()
tensor_a = torch.Tensor([[0, 0, 0], [1, 1, 1], [2, 2, 2]])
tensor_b = torch.Tensor([[0, 0, 0], [1, 1, 1], [2, 2, 2]])


group_name = 'A'
attr_name_a = 'feat_a'
attr_name_b = 'feat_b'

index_a = torch.tensor([0, 1, 2])
index_b = torch.tensor([0, 1, 2])
attr_a = TensorAttr(group_name, attr_name_a, index_a)
attr_b = TensorAttr(group_name, attr_name_b, index_b)


feature_store.put_tensor(tensor_a, attr_a)

# Graph store
graph_store = MyGraphStore()
edge_index = torch.LongTensor([(0, 1), (1, 2)])
adj = SparseTensor(row=edge_index[0], col=edge_index[1])


# likely only need one of the methods?
coo = adj.coo()[:-1]
csr = adj.csr()[:-1]
csc = adj.csc()[-2::-1] 

graph_store['edge', torch_geometric.data.graph_store.EdgeLayout.COO] = coo
graph_store['edge', 'csr'] = csr
graph_store['edge', 'csc'] = csc

# Nodesampler (can implement your own)
# It is likely heterogenous - whereas the graph is maybe homogenious 
node_sampler = torch_geometric.sampler.NeighborSampler((feature_store,graph_store), num_neighbors=[1],input_type='csc')


loader = torch_geometric.loader.NodeLoader(
    data=(feature_store, graph_store),
    node_sampler=node_sampler,
    batch_size=1,
    input_nodes='A',
)


# for batch in loader:
#     pass


In [219]:
graph_store.get_all_edge_attrs()

[EdgeAttr(edge_type='edge', layout=<EdgeLayout.COO: 'coo'>, is_sorted=False, size=None),
 EdgeAttr(edge_type='edge', layout=<EdgeLayout.CSR: 'csr'>, is_sorted=True, size=None),
 EdgeAttr(edge_type='edge', layout=<EdgeLayout.CSC: 'csc'>, is_sorted=True, size=None)]

In [None]:
for batch in loader

In [191]:
class GCN(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = GCNConv(dataset.num_node_features, 16)
        self.conv2 = GCNConv(16, dataset.num_classes)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index

        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, training=self.training)
        x = self.conv2(x, edge_index)

        return F.log_softmax(x, dim=1)

In [192]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GCN().to(device)
data = loader.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)

model.train()
for epoch in range(200):
    optimizer.zero_grad()
    out = model(data)
    loss = F.nll_loss(out[data.train_mask], data.y[data.train_mask])
    loss.backward()
    optimizer.step()

AttributeError: 'NodeLoader' object has no attribute 'to'

In [150]:
assert torch.equal(store[group_name, attr_name, index], tensor)
assert torch.equal(store[group_name, attr_name, None], tensor)
assert torch.equal(store[group_name, attr_name, :], tensor)
assert torch.equal(store[group_name][attr_name][:], tensor)
assert torch.equal(store[group_name].feat[:], tensor)
assert torch.equal(store.view().A.feat[:], tensor)

In [146]:

# Generate edge and node features
edges_index = erdos_renyi_graph(num_nodes=10, edge_prob=0.5)

paper_features = torch.tensor([[0],[1],[2],[3],[4], [5],[6],[7],[8],[9]
                             ], dtype=torch.long)
author_features = torch.tensor([[3],[3],[3],[1],[1],[1],[1],[2],[2],[2]
                             ], dtype=torch.long)



# node_features = torch.tensor([[0],[1],[2],[3],[4], [5],[6],[7],[8],[9]
#                              ], dtype=torch.long)

# initiate stores
feature_store = MyFeatureStore()
graph_store = MyGraphStore()


# Add features and edges to stores
feature_store = MyFeatureStore()
feature_store['paper', 'x', None] = paper_features
feature_store['author', 'x', None] = author_features

assert torch.equal(feature_store['paper', 'x'], paper_features)
assert torch.equal(feature_store['paper'].x, paper_features)
assert torch.equal(feature_store['author', 'x', 0:5], author_features[0:5])


graph_store['edge','coo'] = edge_index

row, col = graph_store['edge', 'coo']

# checks out
assert torch.equal(row, edge_index[0]) 
assert torch.equal(col, edge_index[1])


# initiate sampler
#node_sampler = torch_geometric.sampler.NeighborSampler((feature_store,graph_store), num_neighbors=[3],input_type=['coo'])




TypeError: equal(): argument 'input' (position 1) must be Tensor, not AttrView

In [136]:
graph_store.

MyGraphStore()

In [115]:
torch.manual_seed(12345)
edge_index = erdos_renyi_graph(num_nodes=10, edge_prob=0.5)
E = edge_index.size(1)

loader = NeighborSampler(edge_index, sizes=[2, 4], batch_size=2)

In [118]:
NeighborSampler((feature_store,graph_store), sizes=[2])

AttributeError: 'tuple' object has no attribute 'to'

In [87]:
NeighborSampler?

[0;31mInit signature:[0m
[0mNeighborSampler[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mdata[0m[0;34m:[0m [0mUnion[0m[0;34m[[0m[0mtorch_geometric[0m[0;34m.[0m[0mdata[0m[0;34m.[0m[0mdata[0m[0;34m.[0m[0mData[0m[0;34m,[0m [0mtorch_geometric[0m[0;34m.[0m[0mdata[0m[0;34m.[0m[0mhetero_data[0m[0;34m.[0m[0mHeteroData[0m[0;34m,[0m [0mTuple[0m[0;34m[[0m[0mtorch_geometric[0m[0;34m.[0m[0mdata[0m[0;34m.[0m[0mfeature_store[0m[0;34m.[0m[0mFeatureStore[0m[0;34m,[0m [0mtorch_geometric[0m[0;34m.[0m[0mdata[0m[0;34m.[0m[0mgraph_store[0m[0;34m.[0m[0mGraphStore[0m[0;34m][0m[0;34m][0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mnum_neighbors[0m[0;34m:[0m [0mUnion[0m[0;34m[[0m[0mList[0m[0;34m[[0m[0mint[0m[0;34m][0m[0;34m,[0m [0mDict[0m[0;34m[[0m[0mTuple[0m[0;34m[[0m[0mstr[0m[0;34m,[0m [0mstr[0m[0;34m,[0m [0mstr[0m[0;34m][0m[0;34m,[0m [0mList[0m[0;34m[[0m[0mint[0m[0;34m][0m[0;34m][0m

AttrView(store=MyFeatureStore(), attr=TensorAttr(group_name='paper', attr_name=<FieldStatus.UNSET: 1>, index=<FieldStatus.UNSET: 1>))

In [95]:





loader = torch_geometric.loader.NodeLoader(
    data=(feature_store, graph_store),
    node_sampler=node_sampler,
    batch_size=20,
    input_nodes='paper',
)

# for batch in loader:
#     pass

NameError: name 'node_sampler' is not defined

In [92]:
torch_geometric.loader.NodeLoader?

[0;31mInit signature:[0m [0mtorch_geometric[0m[0;34m.[0m[0mloader[0m[0;34m.[0m[0mNodeLoader[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwds[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m     
A data loader that performs neighbor sampling from node information,
using a generic :class:`~torch_geometric.sampler.BaseSampler`
implementation that defines a :meth:`sample_from_nodes` function and is
supported on the provided input :obj:`data` object.

Args:
    data (torch_geometric.data.Data or torch_geometric.data.HeteroData):
        The :class:`~torch_geometric.data.Data` or
        :class:`~torch_geometric.data.HeteroData` graph object.
    node_sampler (torch_geometric.sampler.BaseSampler): The sampler
        implementation to be used with this loader. Note that the
        sampler implementation must be compatible with the input data
        object.
    input_nodes (torch.Tensor or str or Tuple[str, torch.Tensor]): The
        indic

# Training

In [41]:
from torch_geometric.datasets import Planetoid

from torch_geometric.nn import GCNConv
import torch.nn.functional as F


In [31]:
# Official toy dataset (a series of grpahs

dataset = Planetoid(root='/tmp/Cora', name='Cora')


In [32]:
class GCN(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = GCNConv(dataset.num_node_features, 16)
        self.conv2 = GCNConv(16, dataset.num_classes)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index

        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, training=self.training)
        x = self.conv2(x, edge_index)

        return F.log_softmax(x, dim=1)

In [37]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GCN().to(device)
data = dataset[0].to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)

model.train()
for epoch in range(200):
    optimizer.zero_grad()
    out = model(data)
    loss = F.nll_loss(out[data.train_mask], data.y[data.train_mask])
    loss.backward()
    optimizer.step()

In [39]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GCN().to(device)
data = dataset[0].to(device)
batch_size = 216


optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)

model.train()
for epoch in range(200):
    optimizer.zero_grad()
    out = model(data)
    loss = F.nll_loss(out[data.train_mask], data.y[data.train_mask])
    loss.backward()
    optimizer.step()



NameError: name 'DataLoader' is not defined

In [38]:
model.eval()
pred = model(data).argmax(dim=1)
correct = (pred[data.test_mask] == data.y[data.test_mask]).sum()
acc = int(correct) / int(data.test_mask.sum())