<a href="https://colab.research.google.com/github/RMYazdi/Graph_Neural_Networks/blob/main/GNN_Experiment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Install depedancies

In [None]:
# Install required packages.
# !pip install -q torch-scatter -f https://data.pyg.org/whl/torch-1.10.0+cu113.html
# !pip install -q torch-sparse -f https://data.pyg.org/whl/torch-1.10.0+cu113.html
!pip install -q git+https://github.com/pyg-team/pytorch_geometric.git

In [None]:
from torch_geometric.datasets import Planetoid
from torch_geometric.transforms import NormalizeFeatures
# Helper function for visualization.
%matplotlib inline
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
import torch
from torch.nn import Linear
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.nn import SAGEConv
import numpy as np
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score,classification_report,confusion_matrix,log_loss
from sklearn.ensemble import RandomForestClassifier
from IPython.display import Javascript  # Restrict height of output cell.
display(Javascript('''google.colab.output.setIframeHeight(0, true, {maxHeight: 10})'''))





def visualize(h, color):
    z = TSNE(n_components=2).fit_transform(h.detach().cpu().numpy())

    plt.figure(figsize=(10,10))
    plt.xticks([])
    plt.yticks([])

    plt.scatter(z[:, 0], z[:, 1], s=70, c=color, cmap="Set2")
    plt.show()




#Cora Dataset
To demonstrate, we make use of the `Cora` dataset, which is a **citation network** where nodes represent documents.
Each node is described by a 1433-dimensional bag-of-words feature vector.
Two documents are connected if there exists a citation link between them.
The task is to infer the category of each document (7 in total). We can see that the `Cora` network holds 2,708 nodes and 10,556 edges, resulting in an average node degree of 3.9.

In [None]:
dataset = Planetoid(root='data/Planetoid', name='Cora', transform=NormalizeFeatures())

print()
print(f'Dataset: {dataset}:')
print('======================')
print(f'Number of graphs: {len(dataset)}')
print(f'Number of features: {dataset.num_features}')
print(f'Number of classes: {dataset.num_classes}')

data = dataset[0]  # Get the first graph object.

print()
print(data)
print('===========================================================================================================')

# Gather some statistics about the graph.
print(f'Number of nodes: {data.num_nodes}')
print(f'Number of edges: {data.num_edges}')
print(f'Average node degree: {data.num_edges / data.num_nodes:.2f}')
print(f'Number of training nodes: {data.train_mask.sum()}')
print(f'Training node label rate: {int(data.train_mask.sum()) / data.num_nodes:.2f}')
print(f'Has isolated nodes: {data.has_isolated_nodes()}')
print(f'Has self-loops: {data.has_self_loops()}')
print(f'Is undirected: {data.is_undirected()}')

#Model Architecture

## MLP Architecture

In [None]:

class MLP(torch.nn.Module):
    def __init__(self,num_input, hidden_channels):
        super().__init__()
        torch.manual_seed(12345)
        self.lin1 = Linear(num_input, hidden_channels)
        self.lin2 = Linear(hidden_channels, dataset.num_classes)

    def forward(self, x):
        x = self.lin1(x)
        x = x.relu()
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.lin2(x)
        return x


## GNN Architecture

In [None]:
class GCN(torch.nn.Module):
    def __init__(self,num_input, hidden_channels):
        super().__init__()
        torch.manual_seed(1234567)
        self.conv1 = GCNConv(num_input, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, dataset.num_classes)


    def forward(self, x, edge_index):
        x = self.conv1(x, edge_index)
        x = x.relu()
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.conv2(x, edge_index)
        return x




# **Canonical Feature + GNN**

In [None]:
input_data=data.x
input_data=input_data.float()


model = GCN(input_data.shape[1],hidden_channels=16)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)
criterion = torch.nn.CrossEntropyLoss()

def train():
      model.train()
      optimizer.zero_grad()  # Clear gradients.
      out = model(input_data, data.edge_index)  # Perform a single forward pass.
      loss = criterion(out[data.train_mask], data.y[data.train_mask])  # Compute the loss solely based on the training nodes.
      loss.backward()  # Derive gradients.
      optimizer.step()  # Update parameters based on gradients.
      return loss

def train_test_acc():
      model.eval()
      out = model(input_data, data.edge_index)
      pred = out.argmax(dim=1)  # Use the class with highest probability.

      train_correct = pred[data.train_mask] == data.y[data.train_mask]  # Check against ground-truth labels.
      train_acc = int(train_correct.sum()) / int(data.train_mask.sum())  # Derive ratio of correct predictions.

      test_correct = pred[data.test_mask] == data.y[data.test_mask]  # Check against ground-truth labels.
      test_acc = int(test_correct.sum()) / int(data.test_mask.sum())  # Derive ratio of correct predictions.
      return train_acc,test_acc



train_acc_list=[]
test_acc_list=[]
for epoch in range(1, 120):
    loss = train()
    print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}')
    train_acc,test_acc = train_test_acc()
    train_acc_list.append(train_acc)
    test_acc_list.append(test_acc)


print(f'Test Accuracy: {test_acc:.4f}')

In [None]:
out = model(data.x, data.edge_index)
visualize(out, color=data.y)

#**Canonical Featue + MLP**

In [None]:
X_Train=data.x[data.train_mask]
Y_Train=data.y[data.train_mask]
X_Test=data.x[data.test_mask]
Y_Test=data.y[data.test_mask]

In [None]:
input_data=data.x
input_data=input_data.float()

model = MLP(input_data.shape[1],hidden_channels=16)

criterion = torch.nn.CrossEntropyLoss()  # Define loss criterion.
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)  # Define optimizer.

def train():
      model.train()
      optimizer.zero_grad()  # Clear gradients.
      out = model(data.x)  # Perform a single forward pass.
      loss = criterion(out[data.train_mask], data.y[data.train_mask])  # Compute the loss solely based on the training nodes.
      loss.backward()  # Derive gradients.
      optimizer.step()  # Update parameters based on gradients.
      return loss

def test():
      model.eval()
      out = model(data.x)
      pred = out.argmax(dim=1)  # Use the class with highest probability.
      test_correct = pred[data.test_mask] == data.y[data.test_mask]  # Check against ground-truth labels.
      test_acc = int(test_correct.sum()) / int(data.test_mask.sum())  # Derive ratio of correct predictions.
      return test_acc

for epoch in range(1, 120):
    loss = train()
    print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}')

test_acc = test()
print(f'MLP Accuracy: {test_acc:.4f}')

# **Canonical Featue + RandomForest**

In [None]:
rf_clf = RandomForestClassifier(criterion='entropy')
rf_clf.fit(X_Train,Y_Train)
y_predict = rf_clf.predict(X_Test)

print("RandromForest Accuracy:", accuracy_score(Y_Test,y_predict))


#**PCA + GNN**

In [None]:
X = data.x
pca = PCA(n_components=15)
data_PCA=pca.fit_transform(X)

In [None]:

input_data=torch.tensor(data_PCA)
input_data=input_data.float()

model = GCN(input_data.shape[1],hidden_channels=16)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)
criterion = torch.nn.CrossEntropyLoss()

def train():
      model.train()
      optimizer.zero_grad()  # Clear gradients.
      out = model(input_data, data.edge_index)  # Perform a single forward pass.
      loss = criterion(out[data.train_mask], data.y[data.train_mask])  # Compute the loss solely based on the training nodes.
      loss.backward()  # Derive gradients.
      optimizer.step()  # Update parameters based on gradients.
      return loss

def test():
      model.eval()
      out = model(input_data, data.edge_index)
      pred = out.argmax(dim=1)  # Use the class with highest probability.
      test_correct = pred[data.test_mask] == data.y[data.test_mask]  # Check against ground-truth labels.
      test_acc = int(test_correct.sum()) / int(data.test_mask.sum())  # Derive ratio of correct predictions.
      return test_acc


for epoch in range(1, 300):
    loss = train()
    print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}')

test_acc = test()
print(f'Test Accuracy: {test_acc:.4f}')

**#PCA + MLP**

In [None]:
X_Train=data_PCA[data.train_mask]
Y_Train=data.y[data.train_mask]
X_Test=data_PCA[data.test_mask]
Y_Test=data.y[data.test_mask]

In [None]:
input_data=torch.tensor(data_PCA)
input_data=input_data.float()

model = MLP(input_data.shape[1],hidden_channels=16)

criterion = torch.nn.CrossEntropyLoss()  # Define loss criterion.
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)  # Define optimizer.

def train():
      model.train()
      optimizer.zero_grad()  # Clear gradients.
      out = model(input_data)  # Perform a single forward pass.
      loss = criterion(out[data.train_mask], data.y[data.train_mask])  # Compute the loss solely based on the training nodes.
      loss.backward()  # Derive gradients.
      optimizer.step()  # Update parameters based on gradients.
      return loss

def test():
      model.eval()
      out = model(input_data)
      pred = out.argmax(dim=1)  # Use the class with highest probability.
      test_correct = pred[data.test_mask] == data.y[data.test_mask]  # Check against ground-truth labels.
      test_acc = int(test_correct.sum()) / int(data.test_mask.sum())  # Derive ratio of correct predictions.
      return test_acc

for epoch in range(1, 300):
    loss = train()
    print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}')

test_acc = test()
print(f'MLP Accuracy: {test_acc:.4f}')

# **PCA + RandomForest**

In [None]:
rf_clf = RandomForestClassifier(criterion='entropy')
rf_clf.fit(X_Train,Y_Train)
y_predict = rf_clf.predict(X_Test)
print("RandromForest Accuracy:", accuracy_score(Y_Test,y_predict))




#**One-Hot + GNN**


In [None]:
data_ind=torch.tensor(np.asarray(range(data.x.shape[0])))
data_one_hot = torch.nn.functional.one_hot(data_ind, num_classes=data_ind.shape[0])

In [None]:
input_data=data_one_hot
input_data=input_data.float()

model = GCN(input_data.shape[1],hidden_channels=16)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)
criterion = torch.nn.CrossEntropyLoss()

def train():
      model.train()
      optimizer.zero_grad()  # Clear gradients.
      out = model(input_data, data.edge_index)  # Perform a single forward pass.
      loss = criterion(out[data.train_mask], data.y[data.train_mask])  # Compute the loss solely based on the training nodes.
      loss.backward()  # Derive gradients.
      optimizer.step()  # Update parameters based on gradients.
      return loss

def test():
      model.eval()
      out = model(input_data, data.edge_index)
      pred = out.argmax(dim=1)  # Use the class with highest probability.
      test_correct = pred[data.test_mask] == data.y[data.test_mask]  # Check against ground-truth labels.
      test_acc = int(test_correct.sum()) / int(data.test_mask.sum())  # Derive ratio of correct predictions.
      return test_acc


for epoch in range(1, 300):
    loss = train()
    print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}')

test_acc = test()
print(f'Test Accuracy: {test_acc:.4f}')

#**One-Hot + MLP**

In [None]:
X_Train=data_one_hot[data.train_mask]
Y_Train=data.y[data.train_mask]
X_Test=data_one_hot[data.test_mask]
Y_Test=data.y[data.test_mask]

In [None]:

input_data=data_one_hot
input_data=input_data.float()

model = MLP(input_data.shape[1],hidden_channels=16)

criterion = torch.nn.CrossEntropyLoss()  # Define loss criterion.
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)  # Define optimizer.

def train():
      model.train()
      optimizer.zero_grad()  # Clear gradients.
      out = model(input_data)  # Perform a single forward pass.
      loss = criterion(out[data.train_mask], data.y[data.train_mask])  # Compute the loss solely based on the training nodes.
      loss.backward()  # Derive gradients.
      optimizer.step()  # Update parameters based on gradients.
      return loss

def test():
      model.eval()
      out = model(input_data)
      pred = out.argmax(dim=1)  # Use the class with highest probability.
      test_correct = pred[data.test_mask] == data.y[data.test_mask]  # Check against ground-truth labels.
      test_acc = int(test_correct.sum()) / int(data.test_mask.sum())  # Derive ratio of correct predictions.
      return test_acc

for epoch in range(1, 300):
    loss = train()
    print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}')

test_acc = test()
print(f'MLP Accuracy: {test_acc:.4f}')

#**One-Hot + Random Forest**

In [None]:
rf_clf = RandomForestClassifier(criterion='entropy')
rf_clf.fit(X_Train,Y_Train)
y_predict = rf_clf.predict(X_Test)


print("RandromForest Accuracy:", accuracy_score(Y_Test,y_predict))


# **Index + GNN**

In [None]:
data_index=torch.tensor(np.asarray(range(data.x.shape[0])))
data_index=torch.reshape(data_index, (data_index.shape[0], 1))

In [None]:
data_index.shape

In [None]:
input_data=data_index
input_data=input_data.float()

model = GCN(input_data.shape[1],hidden_channels=16)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)
criterion = torch.nn.CrossEntropyLoss()

def train():
      model.train()
      optimizer.zero_grad()  # Clear gradients.
      out = model(input_data, data.edge_index)  # Perform a single forward pass.
      loss = criterion(out[data.train_mask], data.y[data.train_mask])  # Compute the loss solely based on the training nodes.
      loss.backward()  # Derive gradients.
      optimizer.step()  # Update parameters based on gradients.
      return loss

def train_test_acc():
      model.eval()
      out = model(input_data, data.edge_index)
      pred = out.argmax(dim=1)  # Use the class with highest probability.

      train_correct = pred[data.train_mask] == data.y[data.train_mask]  # Check against ground-truth labels.
      train_acc = int(train_correct.sum()) / int(data.train_mask.sum())  # Derive ratio of correct predictions.

      test_correct = pred[data.test_mask] == data.y[data.test_mask]  # Check against ground-truth labels.
      test_acc = int(test_correct.sum()) / int(data.test_mask.sum())  # Derive ratio of correct predictions.
      return train_acc,test_acc



train_acc_list=[]
test_acc_list=[]
for epoch in range(1, 120):
    loss = train()
    print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}')
    train_acc,test_acc = train_test_acc()
    train_acc_list.append(train_acc)
    test_acc_list.append(test_acc)


print(f'Test Accuracy: {test_acc:.4f}')

# **Index + MLP**

In [None]:
X_Train=data_index[data.train_mask]
Y_Train=data.y[data.train_mask]
X_Test=data_index[data.test_mask]
Y_Test=data.y[data.test_mask]

In [None]:
input_data=data_index
input_data=input_data.float()

model = MLP(input_data.shape[1],hidden_channels=16)

criterion = torch.nn.CrossEntropyLoss()  # Define loss criterion.
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)  # Define optimizer.

def train():
      model.train()
      optimizer.zero_grad()  # Clear gradients.
      out = model(input_data)  # Perform a single forward pass.
      loss = criterion(out[data.train_mask], data.y[data.train_mask])  # Compute the loss solely based on the training nodes.
      loss.backward()  # Derive gradients.
      optimizer.step()  # Update parameters based on gradients.
      return loss

def test():
      model.eval()
      out = model(input_data)
      pred = out.argmax(dim=1)  # Use the class with highest probability.
      test_correct = pred[data.test_mask] == data.y[data.test_mask]  # Check against ground-truth labels.
      test_acc = int(test_correct.sum()) / int(data.test_mask.sum())  # Derive ratio of correct predictions.
      return test_acc

for epoch in range(1, 120):
    loss = train()
    print(f'Epoch: {epoch:03d}, Loss: {loss:.4f}')

test_acc = test()
print(f'MLP Accuracy: {test_acc:.4f}')

# **Index + RandomForest**

In [None]:
rf_clf = RandomForestClassifier(criterion='entropy')
rf_clf.fit(X_Train,Y_Train)

y_train_prdict=rf_clf.predict(X_Train)
y_predict = rf_clf.predict(X_Test)

print("RandromForest Train Accuracy:", accuracy_score(Y_Train,y_train_prdict))
print("RandromForest Test Accuracy:", accuracy_score(Y_Test,y_predict))
