<a href="https://colab.research.google.com/github/MaximeSzymanski/GraphConvolutionalNetwork/blob/main/GCN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
import random

In [42]:
def networx_to_stucture_part(G : nx.Graph):
    """
    Converts a networkx graph to a tensor representation
    """
    adj = nx.adjacency_matrix(G).todense()
    identity = np.identity(adj.shape[0])
    adj_tild = adj + identity
    degrees = np.sum(adj_tild, axis=1)
    d_tild = np.diag(np.power(degrees, -0.5))
    stucture_part = np.matmul(np.matmul(d_tild, adj_tild), d_tild)
    return stucture_part

In [43]:
class GCN_layer(nn.Module):
    def __init__(self, in_features, out_features):
        super(GCN_layer, self).__init__()
        self.W = nn.Parameter(torch.zeros(in_features, out_features), requires_grad=True)
        self.b = nn.Parameter(torch.zeros(out_features), requires_grad=True)
        nn.init.xavier_uniform_(self.W.data)
        nn.init.zeros_(self.b.data)

    def forward(self,X,structure_part):
        """
        X: input features
        G: nx.Graph graph
        """

        """X = torch.tensor(X).float().requires_grad_(False)"""
        output = torch.matmul(structure_part,X)
        output = torch.matmul(output,self.W) + self.b

        return output




In [44]:
class GCN_network(nn.Module):
    def __init__(self, in_features, out_features,hidden_features=64, dropout=0.5,num_layers=2,device='cpu'):
        super(GCN_network, self).__init__()

        self.device=device
        self.layers = nn.ModuleList()  # Create a ModuleList to store GCN layers

        # Add GCN layers to the ModuleList
        self.layers.append(GCN_layer(in_features, hidden_features))
        for _ in range(num_layers - 1):  # Add (num_layers - 1) additional GCN layers
            self.layers.append(GCN_layer(hidden_features, hidden_features))
        # Output layer
        self.layer_out = GCN_layer(hidden_features, out_features)

        self.dropout = dropout
    def forward(self,X,G : nx.Graph):
        """
        X: input features
        G: nx.Graph graph
        """
        tensor_graph = torch.tensor(networx_to_stucture_part(G),dtype=torch.float32,device=self.device)

        X = torch.tensor(X,device=self.device).float()
        for layer in self.layers:
            X = layer(X, tensor_graph)
            X = F.relu(X)
            X = F.dropout(X, p=self.dropout)
        output = self.layer_out(X, tensor_graph)

        return output


In [45]:
def generate_graph(numb_nodes=20):
    random_graph = nx.path_graph(numb_nodes)
    # apply one dimension features to node, a random number between 0 and 1
    for node in random_graph.nodes:
        random_graph.nodes[node]['features'] = random.random()
    #print(random_graph.nodes.data())
    num_features = 1
    # plot the graph
    """nx.draw(random_graph, with_labels=True)"""
    # put all the features in a numpy array
    features_complete = np.zeros((len(random_graph.nodes),num_features))
    features_masked = np.zeros((len(random_graph.nodes),num_features))
    # get two random nodes in the graph
    node1 = random.choice(list(random_graph.nodes))
    node2 = random.choice(list(random_graph.nodes))
    # compute the shortest path between the two nodes
    shortest_path = nx.shortest_path(random_graph, node1, node2)
    #print(shortest_path)
    # put the features of the shortest path nodes to 1, as  a dict

    for node in random_graph.nodes:
        if node in shortest_path:
            features_complete[node] = 1
        else:
            features_complete[node] = 0
    # plot graph. Node that the shortest path nodes are colored in red
    for node in random_graph.nodes:
        if node in shortest_path:
            random_graph.nodes[node]['features'] = 1
        else:
            random_graph.nodes[node]['features'] = 0
    """nx.draw(random_graph, with_labels=True, node_color=[random_graph.nodes[node]['features'] for node in random_graph.nodes])"""
    #print(features_complete)
    # features masked has only the departure and arrival nodes of the shortest path
    features_masked[shortest_path[0]] = 1
    features_masked[shortest_path[-1]] = 1
    return features_complete, features_masked, random_graph


In [73]:
number_graph_instances = 1
numb_nodes = 200
generated_data = []

for _ in range(number_graph_instances):
    features_complete, features_masked, random_graph = generate_graph(numb_nodes)
    generated_data.append((features_complete, features_masked, random_graph))


num_features = 1
features_complete = np.array([generated_data[i][0] for i in range(number_graph_instances)])
features_masked = np.array([generated_data[i][1] for i in range(number_graph_instances)])
graphs = []
for i in range(len(generated_data)):
    graphs.append(generated_data[i][2])



In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = GCN_network(in_features=num_features, out_features=2, hidden_features=32, dropout=0.2,num_layers=20,device=device)

print(f"current device : {device}")
model = model.to(device)
with torch.no_grad():
    for graph, features in zip(graphs, features_masked):
        output = model(features, graph)



# remove the last dimension of features_complete and features_masked


In [75]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.00025)
criterion = nn.CrossEntropyLoss()

In [None]:
print(model.layers[0].W)

In [None]:
epoch = 10000

for i in range(epoch):
    curr_loss = 0
    for graph, features ,features_complete_unit in zip(graphs, features_masked,features_complete):
        features_complete_tensor = torch.tensor(features_complete_unit,dtype=torch.long,device=device).requires_grad_(False).view(-1)
        output = model(features, graph)

        loss = criterion(output,features_complete_tensor)
        loss.backward()
        optimizer.step()
        curr_loss += loss.item()
        optimizer.zero_grad()
    print(f"Epoch: {i} Loss: {curr_loss/len(graphs)}")


In [81]:
# plot the graph. The shortest path is colored in red
with torch.no_grad():
    accuracy = 0
    for graph, features ,features_complete_unit in zip(graphs, features_masked,features_complete):
        features_complete_tensor = torch.tensor(features_complete_unit,dtype=torch.long,device=device).requires_grad_(False).view(-1)
        output = model(features, graph)
        predicted = (torch.argmax(output, dim=1))
        accuracy += (torch.sum(predicted == features_complete_tensor).item()/len(predicted))
    print(f"Accuracy: {accuracy/len(graphs)}%")

Accuracy: 0.85%
