<a href="https://colab.research.google.com/github/AhmedToto23/timetable-as-CSP/blob/main/part1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [6]:
"""
Timetable generator using Integer Linear Programming (PuLP).

- Install dependencies:
    pip install pandas pulp openpyxl
"""

'\nTimetable generator using Integer Linear Programming (PuLP).\n\n- Install dependencies:\n    pip install pandas pulp openpyxl\n'

In [7]:
#Import Libraries
import pandas as pd
import ast
import itertools
import logging
from dataclasses import dataclass
from typing import List, Dict, Tuple
import pulp
import sys

#Logging
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")

In [8]:
OUTPUT_CSV = "/content/sample_data/final_timetable.csv"

In [9]:
# Penalty weights (tuneable)
W_NOT_PREFERRED_SLOT = 10
W_NOT_PREFERRED_INST = 5
W_GAP_PROXY = 1  # proxy penalty to discourage distributing same section over many different days/slots

In [10]:
# Data classes
@dataclass
class Course:
    course_id: str
    name: str
    lecture_slots: int
    lab_slots: int
    lab_type: str

@dataclass
class Room:
    room_id: str
    capacity: int
    room_type: str
    type_of_space: str

@dataclass
class Instructor:
    instructor_id: str
    name: str
    qualified_courses: set
    not_preferred_slots: set

@dataclass
class TimeSlot:
    slot_id: int
    day: str
    start_time: str
    end_time: str

@dataclass
class Section:
    section_id: str
    department: str
    level: int
    specialization: str
    student_count: int

@dataclass
class AvailableCourse:
    department: str
    level: int
    specialization: str
    course_id: str
    preferred_prof: str
    preferred_assi: set

@dataclass
class Session:
    session_id: str
    course_id: str
    session_type: str  # "Lecture" or "Lab"
    duration_slots: int
    sections: List[str]
    total_students: int
    preferred_instructors: set

In [11]:
# load data
def load_data():
    logging.info("Loading data files...")
    courses_df = pd.read_csv("/content/sample_data/Courses.csv")
    rooms_df = pd.read_csv("/content/sample_data/Rooms.csv")
    instr_df = pd.read_csv("/content/sample_data/Instructors.csv")
    times_df = pd.read_csv("/content/sample_data/TimeSlots.csv")
    sections_df = pd.read_excel("/content/sample_data/sections_data.xlsx")
    avail_df = pd.read_csv("/content/sample_data/Avilable_Course.csv")

    courses = {}
    for r in courses_df.to_dict('records'):
        cid = str(r.get('CourseID')).strip()
        if not cid: continue
        courses[cid] = Course(
            course_id=cid,
            name=r.get('CourseName', ''),
            lecture_slots=int(r.get('Lecture', 0)),
            lab_slots=int(r.get('Lab', 0)),
            lab_type=r.get('Lab_Type', '')
        )

    rooms = {}
    for r in rooms_df.to_dict('records'):
        rid = str(r.get('RoomID')).strip()
        if not rid: continue
        try:
            cap = int(r.get('Capacity', 0))
        except:
            cap = 0
        rooms[rid] = Room(room_id=rid, capacity=cap, room_type=r.get('Type', ''), type_of_space=r.get('Type_of_Space', ''))

    instructors = {}
    for r in instr_df.to_dict('records'):
        iid = str(r.get('InstructorID')).strip()
        if not iid: continue
        raw_q = r.get('QualifiedCourses', '')
        if pd.isna(raw_q):
            qual = set()
        else:
            qual = set([s.strip() for s in str(raw_q).split(',') if s.strip()])
        not_pref = set()
        raw_np = r.get('Not_PreferredSlots', '')
        if pd.notna(raw_np) and str(raw_np).strip():
            try:
                parsed = ast.literal_eval(str(raw_np))
                if isinstance(parsed, (list, tuple, set)):
                    not_pref = set(int(x) for x in parsed if str(x).strip())
                else:
                    not_pref = {int(parsed)}
            except Exception:
                for tok in str(raw_np).split(','):
                    tok = tok.strip()
                    if tok:
                        try:
                            not_pref.add(int(tok))
                        except:
                            pass
        instructors[iid] = Instructor(instructor_id=iid, name=r.get('Name', ''), qualified_courses=qual, not_preferred_slots=not_pref)

    timeslots = {}
    timeslots_df = times_df.copy()
    if 'ID' not in timeslots_df.columns:
        raise RuntimeError("TimeSlots file needs 'ID' column.")
    timeslots_df['ID'] = timeslots_df['ID'].astype(int)
    timeslots_df = timeslots_df.sort_values('ID').reset_index(drop=True)
    for r in timeslots_df.to_dict('records'):
        tid = int(r['ID'])
        timeslots[tid] = TimeSlot(slot_id=tid, day=r.get('Day', ''), start_time=r.get('StartTime', ''), end_time=r.get('EndTime', ''))

    sections = {}
    for r in sections_df.to_dict('records'):
        sid = str(r.get('SectionID')).strip()
        if not sid: continue
        sections[sid] = Section(section_id=sid, department=r.get('Department', ''), level=int(r.get('Level', 0)),
                                specialization=r.get('Specialization', ''), student_count=int(r.get('StudentCount', 0)))

    avail = []
    for r in avail_df.to_dict('records'):
        cid = str(r.get('CourseID')).strip()
        if not cid: continue
        prof = r.get('preferred_Prof') if pd.notna(r.get('preferred_Prof')) else None
        raw_assi = r.get('preferred_Assi', '')
        assi = set()
        if pd.notna(raw_assi) and str(raw_assi).strip().lower() != 'nan':
            assi = set(a.strip() for a in str(raw_assi).split(',') if a.strip())
        avail.append(AvailableCourse(department=r.get('Department', ''), level=int(r.get('Level', 0)),
                                     specialization=r.get('Specialization', ''), course_id=cid,
                                     preferred_prof=(str(prof).strip() if prof else None), preferred_assi=assi))
    logging.info("Data loaded.")
    return courses, rooms, instructors, timeslots, sections, avail, timeslots_df

In [12]:
# Generate sessions
def generate_sessions(courses, sections, available_courses, max_group_capacity=75) -> List[Session]:
    sessions = []
    sid_counter = 0
    for req in available_courses:
        if req.course_id not in courses:
            continue
        course = courses[req.course_id]
        matching_sections = [s for s in sections.values() if (s.department == req.department and s.level == req.level and (req.specialization == 'Core' or req.specialization == s.specialization))]
        if not matching_sections: continue
        if course.lecture_slots > 0:
            sorted_secs = sorted(matching_sections, key=lambda x: x.section_id)
            current_group = []
            current_total = 0
            for sec in sorted_secs:
                if not current_group or current_total + sec.student_count > max_group_capacity:
                    if current_group:
                        sid_counter += 1
                        sessions.append(Session(session_id=f"S{sid_counter}", course_id=course.course_id,
                                                session_type="Lecture", duration_slots=course.lecture_slots,
                                                sections=[s.section_id for s in current_group], total_students=current_total,
                                                preferred_instructors=set([req.preferred_prof]) if req.preferred_prof else set()))
                    current_group = [sec]
                    current_total = sec.student_count
                else:
                    current_group.append(sec)
                    current_total += sec.student_count
            if current_group:
                sid_counter += 1
                sessions.append(Session(session_id=f"S{sid_counter}", course_id=course.course_id,
                                        session_type="Lecture", duration_slots=course.lecture_slots,
                                        sections=[s.section_id for s in current_group], total_students=current_total,
                                        preferred_instructors=set([req.preferred_prof]) if req.preferred_prof else set()))
        if course.lab_slots > 0:
            for sec in matching_sections:
                sid_counter += 1
                sessions.append(Session(session_id=f"S{sid_counter}", course_id=course.course_id,
                                        session_type="Lab", duration_slots=course.lab_slots,
                                        sections=[sec.section_id], total_students=sec.student_count,
                                        preferred_instructors=set(req.preferred_assi)))
    logging.info("Generated %d sessions", len(sessions))
    return sessions

In [13]:
# Build feasible assignment options (precompute consecutive sequences and valid room/instructor)
def build_options_for_sessions(sessions: List[Session], courses, rooms, instructors, timeslots_df: pd.DataFrame) -> Dict[str, List[Tuple[Tuple[int,...], str, str]]]:
    """
    For each session, returns a list of options:
       option = (timeslot_sequence_tuple, room_id, instructor_id)
    timeslot_sequence is consecutive slot IDs (tuple)
    """
    logging.info("Building feasible assignment options...")
    # build day -> ordered slot list
    day_to_slots = {}
    for r in timeslots_df.sort_values('ID').to_dict('records'):
        day = r['Day']
        day_to_slots.setdefault(day, []).append(int(r['ID']))

    session_options = {}
    for sess in sessions:
        opts = []
        duration = sess.duration_slots
        # generate sequences
        sequences = []
        for day, slot_list in day_to_slots.items():
            for i in range(len(slot_list) - duration + 1):
                seq = tuple(slot_list[i:i+duration])
                # ensure numeric consecutive
                if all(seq[j+1] == seq[j] + 1 for j in range(len(seq)-1)):
                    sequences.append(seq)

        # filter rooms by capacity and type (lecture vs lab)
        valid_rooms = []
        for room in rooms.values():
            if room.capacity < sess.total_students:
                continue
            if sess.session_type == "Lab":
                # lab must match lab_type
                course = courses.get(sess.course_id)
                if course and course.lab_type and room.type_of_space != course.lab_type:
                    continue
            else:
                # lecture: avoid special spaces if needed
                if room.type_of_space in {"Drawing Studio", "Computer"}:
                    continue
                if not (sess.total_students < 75) and room.room_type != "Lecture":
                    # require lecture room for big lectures
                    continue
            valid_rooms.append(room.room_id)

        # filter instructors: who are qualified for this course
        valid_instructors = [i.instructor_id for i in instructors.values() if sess.course_id in i.qualified_courses]

        # Now combine sequences x rooms x instructors but prune obviously incompatible combos
        for seq in sequences:
            for r_id in valid_rooms:
                for inst_id in valid_instructors:
                    opts.append((seq, r_id, inst_id))
        # Keep options (may be empty -> unsolvable flag later)
        session_options[sess.session_id] = opts
    logging.info("Built options for %d sessions", len(session_options))
    return session_options

In [14]:
# ILP model building
def build_and_solve_ilp(sessions: List[Session], session_options: Dict[str, List[Tuple[Tuple[int,...], str, str]]],
                        instructors: Dict[str, Instructor], rooms: Dict[str, Room], timeslots: Dict[int, TimeSlot],
                        objective_weights=None, time_limit_seconds=None):
    """
    Build PuLP ILP and solve. Variables x[sess,opt_index] ∈ {0,1}.
    Constraints:
       - Each session assigned exactly once.
       - For each basic timeslot t: instructor cannot be double-booked; room cannot be double-booked; section cannot be double-booked.
    Objective: minimize weighted sum of soft penalties:
       - assignment uses an instructor at their not-preferred slot(s)
       - assignment uses a session's preferred_instructors not matched
       - simple proxy for gaps by counting number of distinct days a section is assigned (discouraging spreading) -> optional
    """
    logging.info("Building ILP model (PuLP)...")
    prob = pulp.LpProblem("Timetable_Optimization", pulp.LpMinimize)

    # Decision variables
    x = {}  # (session_id, opt_idx) -> var
    # Precompute mapping of assignment occurrences per timeslot/instructor/room/section
    inst_slot_uses = {}  # (inst_id, slot) -> list of vars
    room_slot_uses = {}
    section_slot_uses = {}

    penalty_terms = []

    # Build variables and objective coefficients
    for sess in sessions:
        opts = session_options.get(sess.session_id, [])
        if not opts:
            logging.warning("Session %s has NO options -> problem likely unsolvable", sess.session_id)
        for j, (seq, r_id, inst_id) in enumerate(opts):
            var = pulp.LpVariable(f"x_{sess.session_id}_{j}", lowBound=0, upBound=1, cat='Binary')
            x[(sess.session_id, j)] = var

            # Penalty for not-preferred slot(s)
            inst = instructors.get(inst_id)
            slot_penalty = sum(W_NOT_PREFERRED_SLOT for s in seq if inst and s in inst.not_preferred_slots)

            # Penalty for not preferred instructor if session has preferred_instructors non-empty
            pref_penalty = 0
            if sess.preferred_instructors and inst_id not in sess.preferred_instructors:
                pref_penalty = W_NOT_PREFERRED_INST

            # Add to objective as linear term (coeff * var)
            if slot_penalty + pref_penalty != 0:
                penalty_terms.append((slot_penalty + pref_penalty, var))

            # Register usage for slot conflicts
            for s in seq:
                inst_slot_uses.setdefault((inst_id, s), []).append(var)
                room_slot_uses.setdefault((r_id, s), []).append(var)
                for sec in sess.sections:
                    section_slot_uses.setdefault((sec, s), []).append(var)

    # Constraint 1: each session exactly once (sum over its options == 1)
    for sess in sessions:
        opts = session_options.get(sess.session_id, [])
        if not opts:
            # Add a dummy infeasible constraint (sum == 1) will force infeasible model -> easier to detect
            prob += 0 == 1, f"session_{sess.session_id}_no_options"
            continue
        prob += (pulp.lpSum(x[(sess.session_id, j)] for j in range(len(opts))) == 1), f"assign_once_{sess.session_id}"

    # Constraint 2: no double booking for instructors, rooms, sections on each basic slot
    for (inst_id, slot), var_list in inst_slot_uses.items():
        prob += (pulp.lpSum(var_list) <= 1), f"inst_{inst_id}_slot_{slot}_once"
    for (room_id, slot), var_list in room_slot_uses.items():
        prob += (pulp.lpSum(var_list) <= 1), f"room_{room_id}_slot_{slot}_once"
    for (sec_id, slot), var_list in section_slot_uses.items():
        prob += (pulp.lpSum(var_list) <= 1), f"sec_{sec_id}_slot_{slot}_once"

    # Optional: a small gap-proxy to discourage same section being spread to many different days or distant slots:
    # We'll penalize the count of distinct days assigned per section beyond 1 (heuristic). To linearize, approximate by counting assignments across day groups.
    # For simplicity and to keep model small, we skip expensive linearizations; instead we add a tiny penalty proportional to usage count (encourages compact scheduling).
    for (sess_id, j), var in x.items():
        # var already included in objective via penalty_terms; we add tiny W_GAP_PROXY to all assignments to favor fewer total different assignments in global sense
        penalty_terms.append((0.001 * W_GAP_PROXY, var))

    # Set objective
    prob += pulp.lpSum(coeff * var for coeff, var in penalty_terms), "Total_Penalty"

    # Solve
    logging.info("Starting solver...")
    if time_limit_seconds:
        pulp.LpSolverDefault.msg = 1
        solver = pulp.PULP_CBC_CMD(timeLimit=time_limit_seconds, msg=True)
    else:
        solver = pulp.PULP_CBC_CMD(msg=False)  # turn off solver messages by default

    result_status = prob.solve(solver)
    status = pulp.LpStatus[result_status]
    logging.info("Solver status: %s", status)

    if status != "Optimal" and status != "Feasible":
        logging.error("No feasible solution found by solver.")
        return None

    # Extract assignments
    assignments = []
    for sess in sessions:
        opts = session_options.get(sess.session_id, [])
        for j, _ in enumerate(opts):
            var = x.get((sess.session_id, j))
            if var is None: continue
            val = pulp.value(var)
            if val is not None and val > 0.5:
                seq, room_id, inst_id = opts[j]
                assignments.append((sess, seq, room_id, inst_id))
                break
    logging.info("Extracted %d assignments", len(assignments))
    return assignments


In [15]:
# Save solution
def save_solution(assignments, timeslots, instructors, rooms):
    rows = []
    for sess, seq, r_id, inst_id in assignments:
        first = seq[0]
        last = seq[-1]
        ts_first = timeslots[first]
        ts_last = timeslots[last]
        rows.append({
            "SessionID": sess.session_id,
            "CourseID": sess.course_id,
            "Type": sess.session_type,
            "Day": ts_first.day,
            "StartTime": ts_first.start_time,
            "EndTime": ts_last.end_time,
            "Room": r_id,
            "InstructorID": inst_id,
            "InstructorName": instructors[inst_id].name if inst_id in instructors else "",
            "Sections": ", ".join(sess.sections),
            "StudentCount": sess.total_students
        })
    df = pd.DataFrame(rows)
    if df.empty:
        logging.warning("No assignments to save.")
        return
    df = df.sort_values(["Day", "StartTime"]).reset_index(drop=True)
    df.to_csv(OUTPUT_CSV, index=False)
    logging.info("✅ Saved timetable to %s", OUTPUT_CSV)

In [16]:
# Main routine
def main():
    courses, rooms, instructors, timeslots, sections, avail, timeslots_df = load_data()
    sessions = generate_sessions(courses, sections, avail, max_group_capacity=75)
    if not sessions:
        logging.error("No sessions generated. Check input data.")
        sys.exit(1)
    session_options = build_options_for_sessions(sessions, courses, rooms, instructors, timeslots_df)
    unsolvable = [s for s in sessions if not session_options.get(s.session_id)]
    if unsolvable:
        logging.error("Found %d sessions with zero feasible options.", len(unsolvable))
        sys.exit(1)
    assignments = build_and_solve_ilp(sessions, session_options, instructors, rooms, timeslots)
    if assignments is None:
        logging.error("Solver failed to find a solution.")
        sys.exit(1)
    save_solution(assignments, timeslots, instructors, rooms)

if __name__ == "__main__":
    main()