# CNA | Final Project | P1
### Mohsen Ebadpour | 400131080 | m.ebadpour@aut.ac.ir

In [None]:
import torch 
from torch import nn
from torch import optim
from torch.utils.data import DataLoader,Dataset
from torch.nn import functional as F

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd 
from sklearn.metrics import confusion_matrix
from tqdm import tqdm


import torch_geometric 
from torch_geometric.datasets import Planetoid,DBLP,CoraFull
from torch_geometric import transforms as T
from torch_geometric.nn import GCNConv,Linear,GATConv,GATv2Conv

from torch_geometric.nn.conv import MessagePassing

# Report Dataset

In [None]:
def ReportDataset(name="PubMed"):
    if name == "CoraFull":
        _dataset = CoraFull("./{0}".format(name))
    else:
        _dataset = Planetoid(name=name,root="./{0}".format(name))
    data = {}
    data["Node`s Featrue"] = _dataset.num_node_features
    data["Edge`s Feature"] = _dataset.num_edge_features
    data["Classes"] = _dataset.num_classes
    data["Nodes"] = _dataset[0].num_nodes
    data["Edges"] = _dataset[0].num_edges
    if name == "CoraFull":
        data["Train Nodes"] = np.nan
        data["Test Nodes"] = np.nan
        data["Validation Nodes"] = np.nan
        return data
    
    data["Train Nodes"] = _dataset[0].x[_dataset[0].train_mask].shape[0]
    data["Test Nodes"] = _dataset[0].x[_dataset[0].test_mask].shape[0]
    data["Validation Nodes"] = _dataset[0].x[_dataset[0].val_mask].shape[0]
    return data
    
    
info = []
Names = ["Cora","CiteSeer","PubMed","CoraFull"]
for name in Names:
    info.append(ReportDataset(name))
    
pd.DataFrame(info).set_axis(Names,axis=0)

# Dataset Split

In [None]:
def GetDataset(name="Cora"):
    _transform = T.Compose([T.RandomNodeSplit()])
    _transform(CoraFull("./CoraFull")[0])
    if name == "CoraFull":
        _dataset = CoraFull("./{0}".format(name))
        _dataset = _transform(_dataset[0])
        
    else:
        _dataset = Planetoid(name=name,root="./{0}".format(name))[0]
        
    return _dataset


# Train Function

In [None]:
def Train(model,dataset,epach:int,lr=0.01, weight_decay=5e-4,show=True,dataset_name="Cora",loss_f="nll",coment=""):
    device = "cuda"
    model = model.to(device)
    dataset = dataset.to(device)
    LOSE = F.nll_loss if loss_f=="nll" else F.cross_entropy
    opt = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    
    
    model.train()
    losses_train = []
    acc_train = []
    
    loss_val = []
    acc_val = []
    acc_test = []
    
    for ite in range(epach):
        model.train()
        opt.zero_grad()
        out = model(dataset)
        loss = LOSE(out[dataset.train_mask],dataset.y[dataset.train_mask])
        loss.backward()
        opt.step()
        
        model.eval()
        losses_train.append(loss.item())
        
        loss_valid = LOSE(out[dataset.val_mask],dataset.y[dataset.val_mask])
        loss_val.append(loss_valid.item())
        
        pred = model(dataset).argmax(dim=1)
        correct = (pred[dataset.train_mask] == dataset.y[dataset.train_mask]).sum()
        acc_t = float(correct) / float(dataset.train_mask.sum())
        acc_train.append(acc_t)
        
        
        pred = model(dataset).argmax(dim=1)
        correct = (pred[dataset.test_mask] == dataset.y[dataset.test_mask]).sum()
        acc = float(correct) / float(dataset.test_mask.sum())
        acc_test.append(acc)
        
        pred = model(dataset).argmax(dim=1)
        correct = (pred[dataset.val_mask] == dataset.y[dataset.val_mask]).sum()
        acc_ = float(correct) / float(dataset.val_mask.sum())
        acc_val.append(acc_)
        
    model = model.to("cpu")
    dataset = dataset.to("cpu")
    pred = model(dataset).argmax(dim=1)
    correct = (pred[dataset.test_mask] == dataset.y[dataset.test_mask]).sum()
    acc = float(correct) / float(dataset.test_mask.sum())

    if show:  
        sns.set_style("whitegrid")
        plt.rcParams['figure.figsize']= (10,10)
        h,w = 2,2
        plt.subplot(h,w,1)
        plt.plot(losses_train,label="Train loss")
        plt.plot(loss_val,label="Validation loss")
        plt.title("Loss Report | {0} | {1}".format(model.name,dataset_name))
        plt.xlabel("Epoch")
        plt.ylabel("Cross Entropy Loss")
        plt.legend()
        #plt.show()
        
        plt.subplot(h,w,2)
        plt.plot(acc_train,label="Train Accuracy")
        plt.plot(acc_val,label="Validation Accuracy")
        plt.title("Accuracy Report")
        plt.xlabel("Epoch")
        plt.legend()
        #plt.show()
    
        
        plt.subplot(h,w,3)
        cm = confusion_matrix(np.array(dataset.y[dataset.test_mask]),np.array(pred[dataset.test_mask]))
        sns.heatmap(cm,cmap="Blues",annot=True,cbar=False,fmt="g")
        plt.title("Confusion Matrix for TEST data | Accuracy:{0}%".format(round(acc*100,2)))
        plt.ylabel("Predicted Label")
        plt.xlabel("True Label")
        
        plt.subplot(h,w,4)
        plt.plot(acc_test,label="Test Accuracy",color="green")
        plt.title("Test accuracy report during train")
        plt.xlabel("Epoch")
        plt.legend()
        
        plt.tight_layout()
        plt.savefig("./P1/{1} | {0} | {2}".format(model.name,dataset_name,coment))
        #plt.show()
        plt.clf()
        
    return acc,acc_,acc_t
    
    

# Net-GCN 

In [None]:
class NetGCN(torch.nn.Module):
    def __init__(self,num_layer=2,data_set=None,linear=None,p=0.6,Linears = [16,32,64,128,256,512],dim=[15]):
        super().__init__()
        
        if linear is not None:
            num_layer += 1
         
           
        self.convs = nn.ModuleList()
        self.p =p
        n_calsss = torch.unique(data_set.y).shape[0]
        n_feaute_in =  data_set.x.shape[1]
        self.name = "GCN-NET | Con:" +str(n_feaute_in) 
        dim = [n_feaute_in] + dim + [n_calsss]

        step = 1 if linear is not None else 0
        for i in range(0,len(dim)-1-step):
            conv = GCNConv(dim[i],dim[i+1])
            self.name += " -> "+str(dim[i+1])
            self.convs.append(conv)
            
        self.use_linear = False   
        if linear:    
            self.linears = nn.ModuleList()
            self.use_linear = True 
            Linears = Linears[0:linear-1]
            #Linears.reverse()
            Linears.insert(0,dim[-2])
            #Linears.reverse()
            Linears.append(n_calsss)
            self.name += " | CLF: "  + str(Linears[0])
            for i in range(1,len(Linears)):
                lin = Linear(Linears[i-1],Linears[i])
                self.linears.append(lin)
                self.name += " -> "+str(Linears[i])
            
    def forward(self,dataset):
        x_in,edge_index = dataset.x,dataset.edge_index
        
        
        for index in range(len(self.convs)-1):
            x_in = self.convs[index](x_in,edge_index)
            x_in = F.relu(x_in)
            x_in = F.dropout(x_in,training=self.training,p=self.p)
            
        if not self.use_linear:    
            x = self.convs[-1](x_in,edge_index)
            Y = F.log_softmax(x,dim=1)
            return Y
            
        x_in = self.convs[-1](x_in,edge_index)
        
        for index in range(len(self.linears)-1):
            x_in = self.linears[index](x_in)
            x_in = F.relu(x_in)
            x_in = F.dropout(x_in,training=self.training,p=self.p)
            
        x = self.linears[-1](x_in)
        Y = F.softmax(x,dim=1)
        return Y
            


name = "PubMed"
dataset = GetDataset(name)

TestConvLayers = [
    [16],[16,16],[16,16,16],[16,16,16,16],
    [16,32],[32,16],[32,32],[24,16],[16,24],[64,128],[128,64]
]

TestFullyLayer = [None,1,2,3,4]
Linears = [64,32,16,8]

NumLayers = []
TrainAcc = []
TestAcc = []
ValidAcc = []
LayersN = []

LenFLayer = []
layersF = []

for conv_layer in tqdm(TestConvLayers):
    for full_layer in tqdm(TestFullyLayer):
        if full_layer is not None:
            Linears_test = Linears[-full_layer:]
            LenFLayer.append(full_layer)
            layersF.append(Linears_test.copy())
            lo = "cross"
            
        else:
            Linears_test = np.nan  
            LenFLayer.append(np.nan)
            layersF.append(np.nan)
            lo = "nll"
            
        model = NetGCN(len(conv_layer),data_set=dataset,p=0.4,linear=full_layer,dim=conv_layer,Linears=Linears_test)
        te,va,tr = Train(model=model,dataset=dataset,epach=1000,dataset_name=name,loss_f=lo,lr=0.0003,weight_decay=5e-4,coment="GCN Search")
        te,va,tr = round(te*100,2),round(va*100,2),round(tr*100,2)
        TrainAcc.append(tr)
        TestAcc.append(te)
        ValidAcc.append(va)
        NumLayers.append(len(conv_layer))
        LayersN.append(conv_layer.copy())
        

Total = [ NumLayers,LayersN,TrainAcc,ValidAcc,TestAcc, LenFLayer,layersF]
labels = ["No. GCNs","GCNs","Train Acc","Validation Acc","Test Acc","No. FCs","FCs"]
gcn_search = pd.DataFrame(Total).T.set_axis(labels=labels,axis=1)


In [None]:
gcn_search.to_excel("./results.xlsx",index=False)

# Custom-Layer-GAT

In [None]:
class CustomLayerGAT(GATConv):
    def __init__(self, in_channels, out_channels: int, heads: int = 1, concat: bool = True, negative_slope: float = 0.2,is_weighted=False, dropout: float = 0, add_self_loops: bool = True, edge_dim = None, fill_value = 'mean', bias: bool = True, **kwargs):
        super().__init__(in_channels, out_channels, heads, concat, negative_slope, dropout, add_self_loops, edge_dim, fill_value, bias, **kwargs)
        
        from torch.nn.init import xavier_uniform_,zeros_
        if is_weighted and concat:
            W = torch.Tensor(heads,1)
            W = nn.Parameter(W) 
            self._weight_heads = W
            xavier_uniform_(self._weight_heads)       
        
        if is_weighted and concat:
            self.bias = nn.Parameter(torch.Tensor(out_channels))
        
        zeros_(self.bias)
        self.is_weighted = is_weighted
        
    def forward(self, x, edge_index, edge_attr = None, size = None, return_attention_weights = None):
        H, C = self.heads, self.out_channels
        if isinstance(x, torch.Tensor):
            assert x.dim() == 2, "Static graphs not supported in 'GATConv'"
            x_src = x_dst = self.lin_src(x).view(-1, H, C)
        else:  
            x_src, x_dst = x
            assert x_src.dim() == 2, "Static graphs not supported in 'GATConv'"
            x_src = self.lin_src(x_src).view(-1, H, C)
            if x_dst is not None:
                x_dst = self.lin_dst(x_dst).view(-1, H, C)

        x = (x_src, x_dst)

        alpha_src = (x_src * self.att_src).sum(dim=-1)
        alpha_dst = None if x_dst is None else (x_dst * self.att_dst).sum(-1)
        alpha = (alpha_src, alpha_dst)

        if self.add_self_loops:
            from torch_geometric.utils import add_self_loops, remove_self_loops, softmax
            from torch_geometric.typing import (Adj,OptPairTensor,OptTensor,Size,SparseTensor,)
            import torch_sparse
            if isinstance(edge_index, torch.Tensor):

                num_nodes = x_src.size(0)
                if x_dst is not None:
                    num_nodes = min(num_nodes, x_dst.size(0))
                num_nodes = min(size) if size is not None else num_nodes
                edge_index, edge_attr = remove_self_loops(
                    edge_index, edge_attr)
                edge_index, edge_attr = add_self_loops(
                    edge_index, edge_attr, fill_value=self.fill_value,
                    num_nodes=num_nodes)
            elif isinstance(edge_index, SparseTensor):
                if self.edge_dim is None:
                    edge_index = torch_sparse.set_diag(edge_index)
                else:
                    raise NotImplementedError(
                        "The usage of 'edge_attr' and 'add_self_loops' "
                        "simultaneously is currently not yet supported for "
                        "'edge_index' in a 'SparseTensor' form")
        alpha = self.edge_updater(edge_index, alpha=alpha, edge_attr=edge_attr)
        out = self.propagate(edge_index, x=x, alpha=alpha, size=size)

        if self.concat:
            if self.is_weighted:
                out = torch.transpose(out,1,2)
                out = (out@self._weight_heads).squeeze(dim=2)
            else:
                out = out.view(-1, self.heads * self.out_channels)
        else:
            out = out.mean(dim=1)

        if self.bias is not None:
            out = out + self.bias
        return out    

# Net-GAT

In [None]:
class NetGAT(torch.nn.Module):
    def __init__(self,heads=1,hidden_features=64,dataset=None,is_concat=True,linear_dim=32,is_weighted=False,p=0.6):
        super().__init__()
        n_calsss = torch.unique(dataset.y).shape[0]
        n_feaute_in =  dataset.x.shape[1]
        
        if is_concat:
            if is_weighted:
                hidden_out = hidden_features
            else:
                hidden_out = hidden_features*heads
                
        else:
            hidden_out = hidden_features
            
        #print(n_feaute_in,hidden_out,hidden_features,heads)
        self.is_concat = is_concat
        self.hidden_features = hidden_features
        self.heads = heads
        self.p = p
        
        self.name= "GAT-Net | "
        self.name += "Head: "+ str(heads) +" | "
        self.name += "Weighted" if is_weighted else "Mean"
        
        
        self.is_wieghted = is_weighted
        self.show = True
        self.gat_layer_1 = CustomLayerGAT(n_feaute_in,n_calsss,heads,concat=is_concat,is_weighted=is_weighted,dropout=p)
        self.gat_layer_2 = CustomLayerGAT(hidden_out,n_calsss,1,concat=False,is_weighted=False,dropout=p)
        
            
    def forward(self,dataset):
        x_in,edge_index = dataset.x,dataset.edge_index
        x_in = self.gat_layer_1(x_in,edge_index)

        x_in = F.dropout(x_in,training=self.training,p=self.p)
        x_in = F.elu(x_in)
        x_in = self.gat_layer_2(x_in,edge_index)
        Y = F.log_softmax(x_in,dim=1)
        return Y   

def plot_10_head_chart(head=11,dataset=None,p=0.6):
    accuracy = []
    valid = []
    for i in tqdm(range(1,head)):
        model = NetGAT(heads=i,hidden_features=16,dataset=dataset,is_concat=False,is_weighted=False,p=p)
        a,v,t = Train(model=model,dataset=dataset,epach=1500,dataset_name=name,show=True,loss_f="cross",lr=0.0003,weight_decay=5e-3,coment="Heads:".format(i))
        accuracy.append(a)
        valid.append(v)
        #print(a)
        
    plt.rcParams['figure.figsize']= (5,5)
    sns.set_style("whitegrid")
    plt.plot(list(range(1,head)),accuracy,label="Test accuracy",color="red")
    plt.title("Heads count report | Best: {0} - {1}%".format(np.argmax(accuracy)+1,round(100*max(accuracy),4)))
    plt.xlabel("Heads No.")
    plt.ylabel("Accuracy")
    plt.xticks(list(range(1,head)))
    plt.legend()
    plt.show()
    return np.argmax(accuracy)+1,accuracy
    
    
p = 0.6
best_head,accs = plot_10_head_chart(11,dataset=dataset,p=p)
model = NetGAT(heads=best_head,hidden_features=16,dataset=dataset,is_concat=True,is_weighted=True,p=p)
a,v,t = Train(model=model,dataset=dataset,epach=1500,dataset_name=name,show=True,loss_f="cross",lr=0.0003,weight_decay=5e-3,coment="Best Weighted Head")


# Custom GAT-v2

In [None]:
class CustomLayerGATv2(GATv2Conv):
    def __init__(self, in_channels, out_channels: int, heads: int = 1, concat: bool = True, negative_slope: float = 0.2, 
                dropout: float = 0, add_self_loops: bool = True, edge_dim = None, fill_value = 'mean', bias: bool = True, 
                share_weights: bool = False, **kwargs):
        super().__init__(in_channels, out_channels, heads, concat, negative_slope, dropout, add_self_loops, edge_dim, fill_value, bias, share_weights, **kwargs)
        if "aggregation_type" in kwargs:
            self.aggregation_type = kwargs["aggregation_type"]
        else:
            raise ValueError("Aggregation Type not passed")

    def CustomOperation(self,h_i,h_j):
        if self.aggregation_type == "min":
            return torch.min(h_i,h_j)
        elif self.aggregation_type == "max":
            return torch.max(h_i,h_j)
        elif self.aggregation_type == "hadamard":
            return torch.multiply(h_i,h_j)
        elif self.aggregation_type == "concat":
            return h_i + h_j

    def message(self, x_j, x_i, edge_attr, index, ptr, size_i):
        from torch_geometric.utils import softmax
        x = self.CustomOperation(x_i,x_j)

        if edge_attr is not None:
            if edge_attr.dim() == 1:
                edge_attr = edge_attr.view(-1, 1)
            assert self.lin_edge is not None
            edge_attr = self.lin_edge(edge_attr)
            edge_attr = edge_attr.view(-1, self.heads, self.out_channels)
            x = x + edge_attr

        x = F.leaky_relu(x, self.negative_slope)
        alpha = (x * self.att).sum(dim=-1)
        alpha = softmax(alpha, index, ptr, size_i)
        self._alpha = alpha
        alpha = F.dropout(alpha, p=self.dropout, training=self.training)
        return x_j * alpha.unsqueeze(-1)


# Net-GATv2

In [None]:
class NetGATv2(torch.nn.Module):
    def __init__(self,heads=1,hidden_features=32,dataset=None,p=0.6,aggregation_type="max"):
        super().__init__()
        n_calsss = torch.unique(dataset.y).shape[0]
        n_feaute_in =  dataset.x.shape[1]
        
        self.p = p 
        self.gat_1 = CustomLayerGATv2(in_channels=n_feaute_in,out_channels=hidden_features,heads=heads,concat=False,aggregation_type=aggregation_type,share_weights=True)
        self.gat_2 = CustomLayerGATv2(in_channels=hidden_features,out_channels=n_calsss,heads=heads,concat=False,aggregation_type=aggregation_type,share_weights=True)
        self.name = "GATv2 | Opreation:" + aggregation_type 
    def forward(self,dataset):
        x_in,edge_index = dataset.x,dataset.edge_index
        x_in = self.gat_1(x_in,edge_index)
        x_in = F.dropout(x_in,training=self.training,p=self.p)
        x_in = F.elu(x_in)
        x_in = self.gat_2(x_in,edge_index)
        Y = F.log_softmax(x_in,dim=1)
        return Y
   

for aggr_type in tqdm(["min","max","hadamard","concat"]):
    model = NetGATv2(heads=2,hidden_features=32,dataset=dataset,aggregation_type=aggr_type)
    a,v,t = Train(model=model,dataset=dataset,epach=2000,dataset_name=name,show=True,loss_f="cross",lr=0.0002,weight_decay=5e-3,coment="Aggregation type")