In [1]:
import torch
import numpy as np
import gudhi

from torch_geometric.loader import DataLoader
from torch_geometric.utils import degree
from torch_geometric.transforms import FaceToEdge, ToUndirected
import torchvision.transforms as transforms

from mantra.simplicial import SimplicialDataset
from validation.validate_homology import validate_betti_numbers
from torch.nn import Linear
import torch.nn.functional as F
from torch_geometric.nn import GCNConv, GATConv
from torch_geometric.nn import global_mean_pool


import k_simplex2vec as ks2v



In [2]:
int(True)

1

In [3]:
class DegreeTransform(object):
    def __call__(self, data):
        deg = degree(data.edge_index[0], dtype=torch.float)
        data.x = deg.view(-1, 1)
        return data

class TriangulationToFaceTransform(object):
    def __call__(self, data):
        data.face = torch.tensor(data.triangulation).T - 1
        data.triangulation = None
        return data

class OrientableToClassTransform(object):
    def __call__(self, data):
        data.y = int(data.orientable)
        return data

class SetNumNodes:
    def __call__(self,data):
        data.num_nodes = data.n_vertices
        return data


class Simplex2Vec:
    def __call__(self,data):
        st = gudhi.SimplexTree()

        ei = [[edge[0],edge[1]] for edge in data.edge_index.T.tolist() if edge[0] < edge[1]]
        data.edge_index = torch.tensor(ei).T
        # Say hi to bad programming
        for edge in ei:
            st.insert(edge)
        st.expansion(3)

        p1 = ks2v.assemble(cplx =st, k= 1, scheme = "uniform", laziness =None)
        P1 = p1.toarray()

        Simplices = list()
        for simplex in st.get_filtration():
            if simplex[1]!= np.inf:
                Simplices.append(simplex[0])
            else: 
                break  

        ## Perform random walks on the edges
        L = 20
        N = 40
        Walks = ks2v.RandomWalks(walk_length=L, number_walks=N, P=P1,seed = 3)
        # to save the walks in a text file 
        ks2v.save_random_walks(Walks,'RandomWalks_Edges.txt')

        ## Embed the edges 
        Emb = ks2v.Embedding(Walks = Walks, emb_dim = 20 , epochs = 5 ,filename ='k-simplex2vec_Edge_embedding.model')
        data.edge_attr = torch.tensor(Emb.wv.vectors)
        toundirected = ToUndirected()
        data = toundirected(data)
        return data



tr = transforms.Compose(
            [
                TriangulationToFaceTransform(),
                SetNumNodes(),
                FaceToEdge(remove_faces=False),
                DegreeTransform(),
                OrientableToClassTransform(),
                Simplex2Vec(),
                ]
        )

dataset = SimplicialDataset(root="./data",pre_transform=tr)


print(dataset[0])

# print()
# print(f'Dataset: {dataset}:')
# print('====================')
# print(f'Number of graphs: {len(dataset)}')
# print(f'Number of features: {dataset.num_features}')
# print(f'Number of classes: {dataset.num_classes}')

# data = dataset[0]  # Get the first graph object.

# print()
# print(data)
# print('=============================================================')

# # Gather some statistics about the first graph.
# print(f'Number of nodes: {len(data.x)}')
# print(f'Number of edges: {data.num_edges}')
# print(f'Average node degree: {data.num_edges / len(data.x):.2f}')
# print(f'Has isolated nodes: {data.has_isolated_nodes()}')
# print(f'Has self-loops: {data.has_self_loops()}')
# print(f'Is undirected: {data.is_undirected()}')

# print('=============================================================')
# print(f'Number of orientable Manifolds: {sum(dataset.orientable)}')
# print(f'Number of non-orientable Manifolds: {len(dataset) - sum(dataset.orientable)}')
# print(f'Percentage: {sum(dataset.orientable) / len(dataset):.2f}, {(len(dataset) - sum(dataset.orientable)) / len(dataset):.2f}')

Data(dimension=[1], n_vertices=[1], torsion_coefficients=[3], betti_numbers=[3], orientable=[1], genus=[1], name='S^2', face=[3, 4], edge_index=[2, 12], x=[4, 1], y=[1], edge_attr=[12, 20], num_nodes=4)


In [4]:
dataset[0].face

tensor([[0, 0, 0, 1],
        [1, 1, 2, 2],
        [2, 3, 3, 3]])

In [5]:

dataset = dataset.shuffle()

train_dataset = dataset[:150]
test_dataset = dataset[150:]

print(f'Number of training graphs: {len(train_dataset)}')
print(f'Number of test graphs: {len(test_dataset)}')

train_loader = DataLoader(train_dataset)
test_loader = DataLoader(test_dataset)





Number of training graphs: 150
Number of test graphs: 562


In [6]:
for batch in train_loader:
    break

print(batch.x.shape)
print(batch.edge_index.min())


torch.Size([9, 1])
tensor(0)


In [7]:
class GCN(torch.nn.Module):
    def __init__(self, hidden_channels):
        super(GCN, self).__init__()
        torch.manual_seed(12345)
        self.conv1 = GATConv(1, hidden_channels)
        self.conv2 = GATConv(hidden_channels, hidden_channels)
        self.conv3 = GATConv(hidden_channels, hidden_channels)
        self.lin = Linear(hidden_channels, 2)

    def forward(self, x, edge_index, edge_attr, batch):
        # 1. Obtain node embeddings 
        x = self.conv1(x, edge_index,edge_attr)
        x = x.relu()
        x = self.conv2(x, edge_index,edge_attr)
        x = x.relu()
        x = self.conv3(x, edge_index,edge_attr)

        # 2. Readout layer
        x = global_mean_pool(x, batch)  # [batch_size, hidden_channels]

        # 3. Apply a final classifier
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.lin(x)
        
        return x

model = GCN(hidden_channels=64)
print(model)

GCN(
  (conv1): GATConv(1, 64, heads=1)
  (conv2): GATConv(64, 64, heads=1)
  (conv3): GATConv(64, 64, heads=1)
  (lin): Linear(in_features=64, out_features=2, bias=True)
)


In [10]:
model = GCN(hidden_channels=64)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
criterion = torch.nn.CrossEntropyLoss()

def train():
    model.train()

    for data in train_loader:  # Iterate in batches over the training dataset.
         out = model(data.x, data.edge_index, data.edge_attr, data.batch)  # Perform a single forward pass.
         loss = criterion(out, data.y)  # Compute the loss.
         loss.backward()  # Derive gradients.
         optimizer.step()  # Update parameters based on gradients.
         optimizer.zero_grad()  # Clear gradients.

def test(loader):
     model.eval()

     correct = 0
     for data in loader:  # Iterate in batches over the training/test dataset.
         out = model(data.x, data.edge_index, data.edge_attr,data.batch)  
         pred = out.argmax(dim=1)  # Use the class with highest probability.
         correct += int((pred == data.y).sum())  # Check against ground-truth labels.
     return correct / len(loader.dataset)  # Derive ratio of correct predictions.


for epoch in range(1, 171):
    train()
    train_acc = test(train_loader)
    test_acc = test(test_loader)
    print(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')

Epoch: 001, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 002, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 003, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 004, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 005, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 006, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 007, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 008, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 009, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 010, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 011, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 012, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 013, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 014, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 015, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 016, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 017, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 018, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 019, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 020, Train Acc: 0.6467, Test Acc: 0.7509
Epoch: 021, Train Acc: 0.6467, Test Acc:

KeyboardInterrupt: 