# Library Inport

In [None]:
import copy
import json
import numpy as np

import torch
import torch.nn as nn

import networkx as nx
from networkx.readwrite import json_graph

import torch_geometric
from torch_geometric.utils.convert import from_networkx,to_networkx
from torch_geometric.loader import NeighborLoader
from torch_geometric.nn.models import GIN,GCN
from torch_geometric.explain.metric import groundtruth_metrics

import matplotlib.pyplot as plt

# Classi ausiliarie di utilità
Classe che permette di estrarre i grafi salvati su file e rappresentarli in formato Data in manniera opportuna, oltre a permettere di estrarre le maschere per training e test set.

In [None]:
class DatasetCreator():

    def __init__(self,paths,node_attrs,edge_attrs,label):
        '''
        Args:
            paths (list[str]): Paths ai file contenenti i grafi
            node_attrs (list[str]): Attributi dei nodi
            edge_attrs (list[str]): Attributi degli archi
            label (str): Etichette ground truth
        '''
        self.node_attrs=node_attrs
        self.edge_attrs=edge_attrs
        self.label=label
        self.data=self._read_from_json_(paths)
        self._set_masks_()
    
    def _read_from_json_(self,paths):
        '''
        Estrai i grafi contenuti in una lista di file.
        Args:
            paths (list[str]): Paths ai file contenenti i grafi
        Returns:
            GS (list[Data]): Grafi contenuti nei files
        '''
        GS = []
        for path in paths:
            json_gs= None
            with open(path,'r') as file:
                json_gs = json.load(file)
                file.close()
            for g in json_gs:
                GS.append(json_graph.node_link_graph(g))
        return self._prepair_data_(GS)
    
    def _prepair_data_(self,GS):
        '''
        Trasforma i grafi in formato pytorch_geometric andando a specificare le feature dei nodi,
        degli archi e le etichette ground truth.
        Args:
            GS (list[Graph]): Grafi formato Networkx
        Returns:
            datas (list[Data]): Grafi in formato Data pytorch_geometric
        '''
        datas = []
        for g in GS:
            tmp = from_networkx(g,group_node_attrs=self.node_attrs)
            tmp.y = tmp[self.label]
            tmp[self.label] = None
            self._normalize_features_(tmp)
            datas.append(tmp)
        return datas
    
    def _normalize_features_(self,G):
        '''
        Normalizza le feature di un grafo in un range [0,1]
        Args:
            G (grafo)
        '''
        x = G.x
        min_vals = x.min(dim=0, keepdim=True).values
        max_vals = x.max(dim=0, keepdim=True).values
        G.x = (x - min_vals) / (max_vals - min_vals)
    
    def _set_masks_alt_(self,train_ratio=0.7): #problema qui sulla selezione delle maschere
        '''
        Assegna le maschere per i nodi associati al training set e al test set
        Args:
            train_ratio (float): frazione dei nodi da incorporare nel training set
        '''
        for g in self.data:
            num_nodes = g.x.shape[0]
            num_train = int(num_nodes * train_ratio)
            idx = [i for i in range(num_nodes)]

            np.random.shuffle(idx)
            train_mask = torch.full_like(g.y, False, dtype=bool)
            train_mask[idx[:num_train]] = True
            test_mask = torch.full_like(g.y, False, dtype=bool)
            test_mask[idx[num_train:]] = True
            g.train_mask = train_mask
            g.test_mask = test_mask

    def _set_masks_(self,train_ratio=0.7):
        """
        Assegna le maschere per i nodi associati al training set e al test set,
        bilanciandole rispetto alle classi.

        Args:
            train_ratio (float): Frazione dei nodi da incorporare nel training set (0 < train_ratio < 1).
        """
        for g in self.data:
            num_nodes = g.x.shape[0]  # Numero totale di nodi

            # Creazione delle maschere inizializzate a False
            train_mask = torch.zeros(num_nodes, dtype=torch.bool)
            test_mask = torch.zeros(num_nodes, dtype=torch.bool)

            # Trova le classi presenti nei dati
            classes = torch.unique(g.y)  # `g.y` contiene le etichette delle classi

            for cls in classes:
                # Indici dei nodi appartenenti alla classe corrente
                class_indices = torch.nonzero(g.y == cls, as_tuple=True)[0]

                # Mescola casualmente gli indici di questa classe
                shuffled_indices = class_indices[torch.randperm(len(class_indices))]

                # Numero di nodi da assegnare al training set per questa classe
                num_train = int(len(class_indices) * train_ratio)

                # Assegna i nodi ai set di training e test
                train_mask[shuffled_indices[:num_train]] = True
                test_mask[shuffled_indices[num_train:]] = True

            # Assegna le maschere al grafo
            g.train_mask = train_mask
            g.test_mask = test_mask
        
    def get_masks(self,g):
        '''
        Restituisci le maschere per i nodi associati al training set e al test set
        Args:
            g (int): index of the graph in data list
        Returns:
            masks (tuple(Tensor,Tensor)): tupla contenente le maschere per esempi del train e test set
        '''
        G = self.data[g]
        return G.train_mask,G.test_mask

    def get_graph_info(self,idx):
        '''
        Restituisce informazioni relative al grafo in formato pytorch_geometric
        Args:
            G (Data): un grafo formato pythorch_geometric
        '''
        print(f'Number of nodes: {self.data[idx].num_nodes}') #Number of nodes in the graph
        print(f'Number of edges: {self.data[idx].num_edges}') #Number of edges in the graph
        print(f'Average node degree: {self.data[idx].num_edges / self.data[idx].num_nodes:.2f}') # Average number of nodes in the graph
        print(f'Contains isolated nodes: {self.data[idx].has_isolated_nodes()}') #Does the graph contains nodes that are not connected
        print(f'Contains self-loops: {self.data[idx].has_self_loops()}') #Does the graph contains nodes that are linked to themselves
        print(f'Is undirected: {self.data[idx].is_undirected()}') #Is the graph an undirected graph

    def get_data(self):
        '''
        Restituisce i grafi formato pytorch_geometric.
        Returns:
            _ Data: grafo
        '''
        return self.data

Classe ausiliaria per il training e l'evaluation del modello.

In [None]:
class Trainer:
    def __init__(self,model,data,criterion,optimizer,metrics):
        self.model = model
        self.data = data
        self.criterion = criterion
        self.optimizer = optimizer
        self.metrics = metrics
    
    def train(self,num_epocs):
        self.optimizer.zero_grad()
        for epoc in range(num_epocs):
            print(f'---- EPOCH {epoc} ----')
            train_total_loss = 0    
            for g in self.data:
                train_loader = NeighborLoader(g,input_nodes=g.train_mask,num_neighbors=[8],batch_size=16,directed=False)
                c = 0
                tmp = 0
                for batch in train_loader:
                    loss= self._train_step_(batch)
                    print(loss)
                    tmp+=loss
                    c+=1
                train_total_loss += tmp / c
                if (epoc+1) % 2 == 0:
                    test_loader = NeighborLoader(g,input_nodes=g.test_mask,num_neighbors=[8],batch_size=16,directed=False)
                    eval_total_loss = 0
                    eval_metric_total = 0
                    c = 0
                    for batch in test_loader:
                        loss,metric = self._evaluation_step_(batch)
                        eval_total_loss += loss
                        eval_metric_total += metric
                        c+=1
                    print("----- Evaluation -----")
                    eval_metric_total=eval_metric_total/c
                    eval_total_loss = eval_total_loss/c
                    print(f'{self.metrics}: {eval_metric_total}')
                    print(f'loss {eval_total_loss}')

            train_total_loss = train_total_loss / len(self.data)
            print(f'Epoc {epoc} mean loss {train_total_loss}')

    def _train_step_(self,batch):
        self.model.train()
        out = self.model(batch.x, batch.edge_index,batch.Weight)
        loss = self.criterion(out[batch.train_mask], batch.y[batch.train_mask])
        loss.backward()
        self.optimizer.step()
        return loss
    
    def _evaluation_step_(self,batch):
        self.model.eval()
        out= self.model(batch.x, batch.edge_index)
        predictions = torch.argmax(out[batch.test_mask], dim=1)
        loss = self.criterion(out[batch.test_mask],batch.y[batch.test_mask])
        metric = groundtruth_metrics(predictions, batch.y[batch.test_mask], metrics=self.metrics)
        return loss, metric

In [None]:
def get_adj(G):
    nodes = G.num_nodes
    A = torch.zeros((nodes,nodes))
    source_nodes, target_nodes = G.edge_index
    A[source_nodes, target_nodes] = 1
    return A

# Caricamento dataset

In [None]:
#'preferential attachment net.json','mixed net.json', 'small world net.json','advanced mixed.json'
GC = DatasetCreator(['advanced mixed.json'],['Score','Likes','Shares','Comments','Visuals','Dislikes'],['Interactions'],'Misinformative')

In [None]:
#model = GCN(in_channels=GC.get_data()[0].num_features,hidden_channels=3,num_layers=2,out_channels=2,dropout=0.8,norm="layer",act_first=True,aggr="mean")
criterion = nn.NLLLoss()
optimizer = torch.optim.Adam(model.parameters(),lr=1e-4)  # Initialize the Adam optimizer.
optimizer.zero_grad() # Clear gradients.

def train(data,model):
    model.train()
    out = model(data.x,data.edge_index,data.Weight)  # Perform a single forward pass
    out = nn.LogSoftmax(dim=1)(out)
    loss = criterion(out[data.train_mask],data.y[data.train_mask])
    loss.backward()
    optimizer.step()
    return loss

def evaluate(data,model):
    model.eval()
    out = model(data.x, data.edge_index,data.Weight)
    out = nn.LogSoftmax(dim=1)(out)
    predictions = torch.argmax(out[data.test_mask], dim=1)
    loss = criterion(out[data.test_mask],data.y[data.test_mask])
    metric = groundtruth_metrics(predictions, data.y[data.test_mask], metrics=["f1_score"])
    return loss, metric

gs = GC.get_data()

k = 0
for i in range(5):
    print(f'---EPOCH {i}----')
    for g in gs:
        train_loader = NeighborLoader(g,input_nodes=g.train_mask,num_neighbors=[8],batch_size=16,directed=False)
        graph_avg_loss = 0
        c=0
        for batch in train_loader:
            loss= train(batch,model)
            graph_avg_loss += loss
            c+=1
        graph_avg_loss = graph_avg_loss/c
        print(graph_avg_loss)

In [None]:
GC = DatasetCreator(['advanced mixed test.json'],['Score','Likes','Shares','Comments','Visuals','Dislikes'],['Interactions'],'Misinformative')

def draw_graph_with_label(G1,G2):
        '''
        Disegna il grafo originale e quello predetto con etichette.
        Args:
            G (Graph): un grafo
        '''
        fig, axes = plt.subplots(1, 2, figsize=(10, 4))
        i=0
        pos = nx.spring_layout(G1)
        for G in [G1,G2]:
            labels = {}
            for node, data in G.nodes(data=True):
                labels.update({node:data["y"]})
            nx.draw(G1,pos,ax=axes[i],with_labels=False)
            nx.draw_networkx_labels(G, pos,ax=axes[i], labels=labels, font_size=12, font_color='black')
            tmp = "original" if i == 0 else "predicted"
            axes[i].set_title(tmp)
            i+=1

        plt.tight_layout()
        plt.show()

def evaluate(data,model):
    model.eval()
    
    out = model(data.x, data.edge_index,data.Weight)
    out = nn.LogSoftmax(dim=1)(out)
    predictions = torch.argmax(out, dim=1)
    loss = criterion(out,data.y)
    metric = groundtruth_metrics(predictions, data.y, metrics=["f1_score"])

    original = to_networkx(data,node_attrs= ['y'],to_undirected=True)
    tmp = copy.copy(data)
    tmp.y = predictions
    predicted = to_networkx(tmp,node_attrs= ['y'],to_undirected=True)
    draw_graph_with_label(original,predicted)
    return loss, metric


gs = GC.get_data()
for g in gs:
    test_loader = NeighborLoader(g,input_nodes=g.test_mask,num_neighbors=[8],batch_size=16,directed=False)
    graph_avg_loss = 0
    c=0
    for batch in test_loader:
        c+=1
        evaluations = evaluate(batch,model)
        graph_avg_loss += evaluations[0]
    graph_avg_loss = graph_avg_loss / c
    print(graph_avg_loss)
        