In [119]:
import pandas as pd
import torch
import torch.nn as nn
import random
from pathlib import Path
DATA_FILES = {
    "courses": "courses(ÙˆØ±Ù‚Ø©1) (1).csv",
    "prereqs": "course_prereq.csv",
    "topics": "topics(ÙˆØ±Ù‚Ø©1).csv",
    "course_topics": "course_topics(ÙˆØ±Ù‚Ø©1) (1).csv",
    "students": "students(ÙˆØ±Ù‚Ø©1).csv",
    "history": "student_course_history(ÙˆØ±Ù‚Ø©1).csv"
}

def read_auto(path):
    try:
        df = pd.read_csv(path)
        if df.shape[1] == 1: df = pd.read_csv(path, sep=";")
    except:
        df = pd.read_csv(path, sep=";")
    return df
dfs = {name: read_auto(path) for name, path in DATA_FILES.items()}
for name in dfs:
    df = dfs[name]
    df.columns = df.columns.str.strip()
    cols_to_fix = [c for c in df.columns if "id" in c.lower() or "course" in c.lower()]
    for col in cols_to_fix:
        df[col] = df[col].astype(str).str.strip().str.replace(r"\.0$", "", regex=True)

print("Data Loaded and Cleaned.")
print(f"Students: {len(dfs['students'])} | Courses: {len(dfs['courses'])}")

Data Loaded and Cleaned.
Students: 15 | Courses: 75


In [120]:
from torch_geometric.data import HeteroData

def build_graph(dfs):
    data = HeteroData()

    c_map = {id: i for i, id in enumerate(dfs['courses']['course_id'].unique())}
    t_map = {id: i for i, id in enumerate(dfs['topics']['topic_id'].unique())}
    s_map = {id: i for i, id in enumerate(dfs['students']['student_id'].unique())}

    data["course"].num_nodes = len(c_map)
    data["topic"].num_nodes = len(t_map)
    data["student"].num_nodes = len(s_map)
    def get_edges(df, src_col, dst_col, src_map, dst_map):
        src = df[src_col].map(src_map)
        dst = df[dst_col].map(dst_map)
        mask = src.notna() & dst.notna()
        return torch.tensor([src[mask].values, dst[mask].values], dtype=torch.long)
    data["course", "requires", "course"].edge_index = get_edges(dfs['prereqs'], 'course_id', 'prereq_course_id', c_map, c_map)
    data["course", "covers", "topic"].edge_index = get_edges(dfs['course_topics'], 'course_id', 'topic_id', c_map, t_map)
    data["student", "took", "course"].edge_index = get_edges(dfs['history'], 'student_id', 'course_id', s_map, c_map)
    data["course", "rev_took", "student"].edge_index = data["student", "took", "course"].edge_index.flip(0)
    data["topic", "rev_covers", "course"].edge_index = data["course", "covers", "topic"].edge_index.flip(0)
    data["course", "is_required_by", "course"].edge_index = data["course", "requires", "course"].edge_index.flip(0)

    return data, c_map, t_map, s_map

data, c_map, t_map, s_map = build_graph(dfs)
print("Heterogeneous Knowledge Graph Built.")
print(data)

Heterogeneous Knowledge Graph Built.
HeteroData(
  course={ num_nodes=75 },
  topic={ num_nodes=46 },
  student={ num_nodes=14 },
  (course, requires, course)={ edge_index=[2, 44] },
  (course, covers, topic)={ edge_index=[2, 6] },
  (student, took, course)={ edge_index=[2, 34] },
  (course, rev_took, student)={ edge_index=[2, 34] },
  (topic, rev_covers, course)={ edge_index=[2, 6] },
  (course, is_required_by, course)={ edge_index=[2, 44] }
)


In [121]:
from torch_geometric.nn import HeteroConv, SAGEConv
import torch.nn.functional as F

class GNNRecommender(nn.Module):
    def __init__(self, data, hidden=64):
        super().__init__()
        self.emb = nn.ModuleDict({
            nt: nn.Embedding(data[nt].num_nodes, hidden) for nt in data.node_types
        })
        # Message passing layers
        self.conv1 = HeteroConv({rel: SAGEConv((-1, -1), hidden) for rel in data.edge_types}, aggr='sum')
        self.conv2 = HeteroConv({rel: SAGEConv((-1, -1), hidden) for rel in data.edge_types}, aggr='sum')

    def forward(self, edge_index_dict):
        x_dict = {nt: self.emb[nt].weight for nt in self.emb}
        x_dict = self.conv1(x_dict, edge_index_dict)
        x_dict = {nt: x.relu() for nt, x in x_dict.items()}
        x_dict = self.conv2(x_dict, edge_index_dict)
        return {nt: F.normalize(x, p=2, dim=-1) for nt, x in x_dict.items()}
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GNNRecommender(data).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.005)
data = data.to(device)

for epoch in range(1, 51):
    model.train()
    optimizer.zero_grad()
    
    out = model(data.edge_index_dict)

    pos_edges = data["student", "took", "course"].edge_index
    pos_scores = (out["student"][pos_edges[0]] * out["course"][pos_edges[1]]).sum(dim=-1)

    neg_courses = torch.randint(0, data["course"].num_nodes, (pos_edges.size(1),), device=device)
    neg_scores = (out["student"][pos_edges[0]] * out["course"][neg_courses]).sum(dim=-1)
    
    loss = -torch.mean(torch.log(torch.sigmoid(pos_scores - neg_scores)))
    loss.backward()
    optimizer.step()
    
    if epoch % 10 == 0:
        print(f"Epoch {epoch:03d} | Loss: {loss.item():.4f}")

print(" Training Complete.")

Epoch 010 | Loss: 0.2304
Epoch 020 | Loss: 0.1965
Epoch 030 | Loss: 0.2445
Epoch 040 | Loss: 0.1634
Epoch 050 | Loss: 0.2276
 Training Complete.


In [122]:
def recommend_for_student(student_id, top_n=5):

    if student_id not in s_map:
        print(f" Error: Student ID '{student_id}' not found in the system.")
        print("Please check your students.csv or ensure you re-ran the build_graph cell.")
        return # Stop the function early
        
    # --- STEP 1: GNN SCORE CALCULATION ---
    model.eval()
    with torch.no_grad():
        out = model(data.edge_index_dict)
   
        s_idx = s_map[student_id]
        stu_emb = out["student"][s_idx]
 
        all_course_embs = out["course"] 
        scores = torch.matmul(all_course_embs, stu_emb).cpu().numpy()

  
    student_info = dfs["students"][dfs["students"]["student_id"] == student_id].iloc[0]
    cohort_year = int(student_info['cohort'])
    current_year = 2026 
   
    years_in_uni = (current_year - cohort_year) + 1 
    
 
    target_min_level = years_in_uni * 100
    
    absolute_min_level = max(200, target_min_level - 100) 


    history_df, prereq_df = dfs["history"], dfs["prereqs"]
    courses_df = dfs["courses"]
    taken = set(history_df[history_df["student_id"] == student_id]["course_id"].tolist())
    
    recommendations = []
    for cid, idx in c_map.items():
        if cid in taken: 
            continue
        
    
        level = int(courses_df[courses_df["course_id"] == cid]['level'].iloc[0])
        reqs = set(prereq_df[prereq_df["course_id"] == cid]["prereq_course_id"].tolist())
        
      
        if reqs.issubset(taken) and level >= absolute_min_level:
            recommendations.append((cid, scores[idx], level))

  
    recommendations.sort(key=lambda x: x[1], reverse=True)
    
 
    course_names = dict(zip(dfs['courses']['course_id'], dfs['courses']['course_name']))
    print(f"\n---  Logical Recommendations for {student_id} (Year {years_in_uni-1}) ---")
    if not recommendations:
        print("No valid courses found. Check if prerequisites are missing for higher-level courses.")
    for cid, score, lvl in recommendations[:top_n]:
        print(f"[{cid}] {course_names.get(cid)} (Level: {lvl}, Score: {score:.4f})")

In [123]:

target_student = "20230643" 
recommend_for_student(target_student)


--- ðŸŽ“ Logical Recommendations for 20230643 (Year 3) ---
[11313] Algorithm Analysis and Design (Level: 300, Score: 0.9730)
[11335] Operating Systems (Level: 300, Score: 0.9722)
[14330] Artificial Intelligence (Level: 300, Score: 0.9422)
[11323] Database Systems (Level: 300, Score: 0.9313)
[14331] Artificial Intelligence Lab (Level: 300, Score: 0.9056)
