In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import dgl
import dgl.function as fn
from dgl.nn.functional import edge_softmax
from dgl.nn import HeteroGraphConv, GATConv
import pandas as pd


In [None]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the graph from the .bin file
graph = dgl.load_graphs('C:/Users/suman/OneDrive/Bureau/Case_Study/prj_Graphtransformers/prj_Graphtransformers/src/data/processed/oulad_graph_with_features.bin')[0][0]
# graph = graphs[0].to(device)

# Check the graph structure
print("Graph info:")
print(graph)


In [None]:
# Assuming student nodes have a unique 'student_id' feature
if 'id_student' in graph.nodes['student'].data:
    graph_student_ids = graph.nodes['student'].data['id_student'].numpy()
    print("Sample student IDs from the graph:", graph_student_ids[:10])
else:
    print("No 'student_id' found in graph! Ensure the graph contains student IDs.")


In [None]:
# Load student labels from studentInfo.csv
student_info = pd.read_csv("C:/Users/suman/OneDrive/Bureau/Case_Study/prj_Graphtransformers/prj_Graphtransformers/data/raw/studentInfo.csv")

# Convert 'final_result' into numerical labels
result_mapping = {"Withdrawn": 0, "Fail": 1, "Pass": 2, "Distinction": 2}
student_info["labels"] = student_info["final_result"].map(result_mapping)

# Convert labels to tensor
labels_tensor = torch.tensor(student_info["labels"].values, dtype=torch.long)

# Ensure label tensor matches student node count
if labels_tensor.shape[0] != graph.num_nodes('student'):
    print("⚠️ Mismatch between student nodes and labels! Check data order.")
else:
    # Assign labels to student nodes in the graph
    graph.nodes['student'].data['labels'] = labels_tensor
    print("✅ Labels successfully added to student nodes!")

# Verify update
print("Updated keys for student nodes:", graph.nodes['student'].data.keys())


In [None]:
# Define HGTLayer: Responsible for performing multi-head attention over different node types and edges in your heterogeneous graph

class HGTLayer(nn.Module):
    def __init__(
        self,
        in_dim,
        out_dim,
        node_dict,
        edge_dict,
        n_heads,
        dropout=0.2,
        use_norm=False,
    ):
        super(HGTLayer, self).__init__()

        self.in_dim = in_dim
        self.out_dim = out_dim
        self.node_dict = node_dict
        self.edge_dict = edge_dict
        self.num_types = len(node_dict)
        self.num_relations = len(edge_dict)
        self.n_heads = n_heads
        self.d_k = out_dim // n_heads
        self.sqrt_dk = math.sqrt(self.d_k)

        # Linear transformations for query, key, value
        self.k_linears = nn.ModuleList()
        self.q_linears = nn.ModuleList()
        self.v_linears = nn.ModuleList()
        self.a_linears = nn.ModuleList()
        self.norms = nn.ModuleList()
        self.use_norm = use_norm

        for _ in range(self.num_types):
            self.k_linears.append(nn.Linear(in_dim, out_dim))
            self.q_linears.append(nn.Linear(in_dim, out_dim))
            self.v_linears.append(nn.Linear(in_dim, out_dim))
            self.a_linears.append(nn.Linear(out_dim, out_dim))
            if use_norm:
                self.norms.append(nn.LayerNorm(out_dim))

        # Relation-specific attention and message passing parameters
        self.relation_pri = nn.Parameter(torch.ones(self.num_relations, n_heads))
        self.relation_att = nn.Parameter(torch.Tensor(self.num_relations, n_heads, self.d_k, self.d_k))
        self.relation_msg = nn.Parameter(torch.Tensor(self.num_relations, n_heads, self.d_k, self.d_k))
        self.skip = nn.Parameter(torch.ones(self.num_types))
        self.drop = nn.Dropout(dropout)

        # Initialize parameters
        nn.init.xavier_uniform_(self.relation_att)
        nn.init.xavier_uniform_(self.relation_msg)

    def forward(self, G, h):
        with G.local_scope():
            node_dict, edge_dict = self.node_dict, self.edge_dict
            for srctype, etype, dsttype in G.canonical_etypes:
                sub_graph = G[srctype, etype, dsttype]

                k_linear = self.k_linears[node_dict[srctype]]
                v_linear = self.v_linears[node_dict[srctype]]
                q_linear = self.q_linears[node_dict[dsttype]]

                k = k_linear(h[srctype]).view(-1, self.n_heads, self.d_k)
                v = v_linear(h[srctype]).view(-1, self.n_heads, self.d_k)
                q = q_linear(h[dsttype]).view(-1, self.n_heads, self.d_k)

                e_id = edge_dict[etype]

                relation_att = self.relation_att[e_id]
                relation_pri = self.relation_pri[e_id]
                relation_msg = self.relation_msg[e_id]

                k = torch.einsum("bij,ijk->bik", k, relation_att)
                v = torch.einsum("bij,ijk->bik", v, relation_msg)

                sub_graph.srcdata["k"] = k
                sub_graph.dstdata["q"] = q
                sub_graph.srcdata["v_%d" % e_id] = v

                sub_graph.apply_edges(fn.v_dot_u("q", "k", "t"))
                attn_score = (sub_graph.edata.pop("t").sum(-1) * relation_pri) / self.sqrt_dk
                attn_score = edge_softmax(sub_graph, attn_score, norm_by="dst")

                sub_graph.edata["t"] = attn_score.unsqueeze(-1)

            G.multi_update_all(
                {
                    etype: (
                        fn.u_mul_e("v_%d" % e_id, "t", "m"),
                        fn.sum("m", "t"),
                    )
                    for etype in edge_dict
                },
                cross_reducer="mean",
            )

            new_h = {}
            for ntype in G.ntypes:
                n_id = node_dict[ntype]
                alpha = torch.sigmoid(self.skip[n_id])
                t = G.nodes[ntype].data["t"].view(-1, self.out_dim)
                trans_out = self.drop(self.a_linears[n_id](t))
                trans_out = trans_out * alpha + h[ntype] * (1 - alpha)
                if self.use_norm:
                    new_h[ntype] = self.norms[n_id](trans_out)
                else:
                    new_h[ntype] = trans_out
            return new_h


In [None]:
# Define HGT model
class HeteroGraphTransformer(nn.Module):
    def __init__(self, hidden_dim, num_layers, num_heads, node_dict, edge_dict, dropout=0.2):
        super(HeteroGraphTransformer, self).__init__()

        self.node_dict = node_dict
        self.edge_dict = edge_dict
        self.gat_layers = nn.ModuleList()

        for _ in range(num_layers):
            self.gat_layers.append(
                HGTLayer(
                    hidden_dim,
                    hidden_dim,
                    node_dict,
                    edge_dict,
                    num_heads,
                    dropout=dropout
                )
            )

        # Final MLP for classification
        self.fc1 = nn.Linear(hidden_dim, 64)  # Fully connected layer
        self.fc2 = nn.Linear(64, 4)  # Predicts 3 classes: dropout, fail, pass

        self.dropout = nn.Dropout(dropout)
        self.num_layers = num_layers

    def forward(self, graph):
        h = {ntype: graph.nodes[ntype].data['inp'] for ntype in graph.ntypes}

        for layer in self.gat_layers:
            h = layer(graph, h)

        # Final classification on student nodes
        out = self.fc1(h['student'])
        out = self.dropout(F.relu(out))
        out = self.fc2(out)

        return out
