### Import các module cần thiết

In [154]:
import sqlite3, itertools, random
from collections import defaultdict
from datetime import date, timedelta
from math import log10, ceil

### Cấu hình tổng quát & Các tham số

In [155]:
# Siêu tham số cho ACO
ALPHA = 1.0     # Ảnh hưởng của pheromone trong xác suất chọn (τ^α)
BETA  = 3.5     # Ảnh hưởng của heuristic trong xác suất chọn (η^β)
RHO   = 0.22    # Tốc độ bay hơi pheromone ở cập nhật toàn cục (global)
XI    = 0.1     # Tốc độ cập nhật pheromone cục bộ (local) sau mỗi quyết định
Q     = 500     # Hệ số lắng pheromone khi cộng vào lời giải tốt nhất
TAU0  = 0.5     # Giá trị pheromone khởi tạo (đồng đều)

NUM_ANTS    = 35   # Số lượng "kiến" mỗi vòng lặp
MAX_ITERS   = 30   # Số vòng lặp tối đa
EARLY_STOP  = 10   # Dừng sớm nếu không cải thiện trong chừng đó vòng
LOCAL_MOVES = 200  # Số bước thử local search cho mỗi lời giải

### Tạo danh sách ngày thi và ca thi

In [156]:
SCHEDULE_ON_SATURDAY = 1  # Có tổ chức thi vào Thứ bảy không?
SCHEDULE_ON_SUNDAY   = 0  # Có tổ thức thi vào Chủ nhật không?
SLOTS_PER_DAY        = 4  # Số ca thi mỗi ngày
DATE_START = date(2026, 1, 5)  # inclusive
DATE_END   = date(2026, 2, 1)  # exclusive
TIMESLOTS: list[tuple[int, int]] = []
slot = -SLOTS_PER_DAY
for day in range((DATE_END - DATE_START).days):
  slot += SLOTS_PER_DAY
  weekday = (DATE_START + timedelta(days=day)).weekday()
  if (weekday == 5 and not SCHEDULE_ON_SATURDAY) \
      or (weekday == 6 and not SCHEDULE_ON_SUNDAY):
    continue
  for i in range(slot, slot + SLOTS_PER_DAY):
    TIMESLOTS.append((day, slot + i))

### Load data

In [157]:
with sqlite3.connect("database.db") as conn:
  cursor = conn.cursor()
  courses_by_student: defaultdict[str, set[str]] = defaultdict(set)
  sections_by_student: defaultdict[str, set[str]] = defaultdict(set)
  students_by_course: defaultdict[str, set[str]] = defaultdict(set)
  for student_id, section_code, course_code in cursor.execute("""
      SELECT e.student_id, e.section_code, s.course_code
      FROM enrolments e JOIN sections s ON e.section_code = s.section_code
  """):
    courses_by_student[student_id].add(course_code)
    sections_by_student[student_id].add(section_code)
    students_by_course[course_code].add(student_id)

### Xây dựng đồ thị xung đột
- Mỗi `course_code` là một đỉnh.
- Có cạnh giữa hai `course_code` nếu tồn tại ít nhất một sinh viên thi cả hai môn đó.
- Trọng số của cạnh là số sinh viên cùng thi hai môn đó.

In [158]:
conflict: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
for courses in courses_by_student.values():
  for course1, course2 in itertools.combinations(courses, 2):
    conflict[course1][course2] += 1
    conflict[course2][course1] += 1
# Đảm bảo các course không có xung đột vẫn xuất hiện trong đồ thị,
# chỉ là có thể không có cạnh nào nối tới nó.
for course in students_by_course:
  conflict.setdefault(course, defaultdict(int))

### Heuristic: Tạo thứ tự xếp lịch thi
~~Môn nào "khó" (nhiều xung đột, quy mô lớn) được gán trước để giảm bế tắc.~~  
Thay vì duyệt theo `courses_order` cố định, mỗi bước chọn đỉnh có ít slot khả dụng nhất.

In [159]:
# courses_order = sorted(
#   conflict.keys(),
#   key=lambda c: (len(conflict[c]), len(students_by_course[c])),
#   reverse=True
# )

def get_next_course(schedule: dict[str, int]) -> str:
  def n_avail(c: str) -> int:
    forbidden_slots: set[int] = {schedule[n] for n in conflict[c] if n in schedule}
    return len(TIMESLOTS) - len(forbidden_slots)
  remain_courses = [c for c in students_by_course if c not in schedule]
  return min(remain_courses, key=n_avail) if remain_courses else ""

### "Vòng quay xổ số"
Chọn một phần tử trong `items` theo phân phối tỉ lệ `weights`.

In [160]:
def roulette(items: list[int], weights: list[float]):
  total = sum(weights)
  if total <= 0:
    return random.choice(items)
  r = random.random() * total
  accumulate = 0
  for it, w in zip(items, weights):
    accumulate += w
    if accumulate >= r:
      return it
  return items[-1]

### Xây đường đi (lịch thi) cho một con "kiến"
- Duyệt theo thứ tự `courses_order`.
- Với mỗi course: xác định tập lịch khả thi (không trùng lịch với course xung đột).
- Tính trọng số chọn lịch: $\tau^\alpha \cdot \eta^\beta$, với $\eta = \frac{1}{1+\Delta_\text{soft}}$
- Chọn lịch thi theo bánh xe xổ số, cập nhật pheromone cục bộ rồi tiếp tục.

In [161]:
def construct_ant(tau: dict[tuple[str, int], float]) -> dict[str, int] | None:
  schedule: dict[str, int] = dict()
  students_by_day: dict[int, dict[str, int]] = defaultdict(lambda: defaultdict(int))
  while course := get_next_course(schedule):
  # for course in courses_order:
    forbidden_slots: set[int] = set(schedule[nb] for nb in conflict[course] if nb in schedule)
    feasible_slots: list[int] = [ts for ts in range(len(TIMESLOTS)) if ts not in forbidden_slots]
    # Nếu không có slot khả thi, con kiến này "die"
    if not feasible_slots:
      return None
    weights: list[float] = []
    for timeslot in feasible_slots:
      # Heuristic cục bộ: Ước lượng nhanh "phạt tăng thêm" nếu đặt `course`
      # vào ngày thứ `day` dựa trên lịch tạm thời `partial_schedule`
      # (chỉ xét theo ngày: với mỗi sinh viên, đếm số ca thi khác trong ngày của họ).
      day, _ = TIMESLOTS[timeslot]
      delta_soft = sum(students_by_day[day][sid] for sid in students_by_course[course])
      eta = 1 / (1 + delta_soft)
      w = (tau[(course, timeslot)] ** ALPHA) * (eta ** BETA)
      weights.append(w)
    timeslot_chosen = roulette(feasible_slots, weights)
    schedule[course] = timeslot_chosen
    day_chosen, _ = TIMESLOTS[timeslot_chosen]
    for student_id in students_by_course[course]:
      students_by_day[day_chosen][student_id] += 1
    # Local pheromone update: τ ← (1-ξ)τ + ξτ0
    tau[(course, timeslot_chosen)] = (1 - XI) * tau[(course, timeslot_chosen)] + XI * TAU0
  return schedule 

### Local search: Tối ưu cục bộ đơn giản
- Local search: Lặp `LOCAL_MOVES` lần: chọn ngẫu nhiên một course, thử chuyển sang một timeslot hợp lệ khác.
- Chấp nhận nước đi nếu giảm chi phí; nếu không thì hoàn tác.

Dựa theo bài báo _Solving a Practical Examination Timetabling Problem: A Case Study_ (8/2007), viết bởi Masri Ayob, Ariff Md Ab Malik, Salwani Abdullah, Abdul Razak Hamdan, Graham Kendall, và Rong Qu, [Link researchgate.net](https://www.researchgate.net/publication/221432805_Solving_a_Practical_Examination_Timetabling_Problem_A_Case_Study), có hàm tính chi phí của cả lịch thi như sau:
$$F = \frac{\sum_{i=1}^{N-1}\sum_{j=i+1}^{N} c_{ij}\cdot 2^{(15-\Delta t_{ij})(3-\Delta d_{ij})}}{M}$$
với:
- $N$ là số môn thi,
- $M$ là số sinh viên,
- $c_{ij}$ là số sinh viên cùng thi hai môn $E_i$ và $E_j$ ($c_{ij}=0$ với $i=j$),
- $\Delta t_{ij} = |t_i - t_j|$ là khoảng cách ca thi giữa hai môn $E_i$ và $E_j$,
- $\Delta d_{ij} = |d_i - d_j|$ là khoảng cách ngày thi giữa hai môn $E_i$ và $E_j$.

Vì $F$ là hàm mũ, giá trị có thể rất lớn, lên đến $2^{45}$ nên khi cập nhật pheromone ta sẽ lấy $\log_{10}(1 + F)$ (cộng $1$ vào vì không tính được $\log 0$).

In [162]:
M_SLOTS = 15
M_DAYS  = 3

def power2(ts1: int, ts2: int) -> int:
  day1, slot1 = TIMESLOTS[ts1]
  day2, slot2 = TIMESLOTS[ts2]
  delta_slot = abs(slot1 - slot2)
  delta_day = abs(day1 - day2)
  if delta_slot > M_SLOTS or delta_day > M_DAYS:
    return 0
  tmp = (M_SLOTS - delta_slot) * (M_DAYS - delta_day)
  return 2 ** tmp

def local_search(schedule: dict[str, int]) -> float:
  cost = 0
  for course1, course2 in itertools.combinations(conflict, 2):
    cost += conflict[course1][course2] * power2(schedule[course1], schedule[course2])
  loops = 0
  while loops < LOCAL_MOVES:
    course = random.choice(tuple(conflict))
    cur_timeslot = schedule[course]
    forbidden_slots: set[int] = set(schedule[nb] for nb in conflict[course] if nb in schedule)
    forbidden_slots.add(cur_timeslot)
    feasible_slots: list[int] = [ts for ts in range(len(TIMESLOTS)) if ts not in forbidden_slots]
    if not feasible_slots:
      continue
    loops += 1
    new_schedule = random.choice(feasible_slots)
    delta = 0
    for nb in conflict[course]:
      coef = power2(new_schedule, schedule[nb]) - power2(schedule[course], schedule[nb])
      delta += conflict[course][nb] * coef
    if delta < 0:
      cost += delta
      schedule[course] = new_schedule
  cost /= len(courses_by_student)
  return cost

### Cập nhật pheromone toàn cục
- Evaporate: $\tau \leftarrow (1-\rho)\tau$ cho mọi cung `(course, timeslot)`.
- Deposit: cộng $\frac{\rho \times Q}{1 + log_{10}(1 + \text{best\_cost})}$ lên các cặp `(course, timeslot)` thuộc `best_schedule`.

Giới hạn $\tau$ trong khoảng $[\tau_{\min}, \tau_{\max}]$ để tránh một vài đường đi bị quá ưu tiên hoặc bị hội tụ sớm.

In [163]:
TAU_MIN = 0.01
TAU_MAX = 10.0

def evaporate_and_deposit(
    tau: dict[tuple[str, int], float],
    best_schedule: dict[str, int],
    best_cost: float
  ) -> None:
  # Bốc hơi pheromone
  for key in tau:
    tau[key] *= (1 - RHO)
  # Đổ pheromone lên best schedule
  deposit = (RHO * Q) / (1 + log10(best_cost))
  for key in best_schedule.items():
    tau[key] = min(max(tau[key] + deposit, TAU_MIN), TAU_MAX)

### Main algorithm
Ant colony optimizaztion + Local search

In [164]:
best_schedule: dict[str, int] = {}
best_cost = 10**63
no_improvement = 0

tau: dict[tuple[str, int], float] = {
  (course, ts): TAU0 for course in conflict for ts in range(len(TIMESLOTS))
}

for it in range(1, MAX_ITERS + 1):
  ants: list[dict[str, int]] = []
  costs: list[float] = []
  while len(ants) < NUM_ANTS:
    schedule = construct_ant(tau)
    if schedule is None:
      continue
    cost = local_search(schedule)
    ants.append(schedule)
    costs.append(cost)
  i_best = min(range(NUM_ANTS), key=lambda i: costs[i])
  if costs[i_best] < best_cost:
    best_schedule = ants[i_best]
    best_cost = costs[i_best]
    no_improvement = 0
  else:
    no_improvement += 1
  evaporate_and_deposit(tau, best_schedule, best_cost)
  print(f"[Iter {it:3d}] best_cost={best_cost:.2f} (iter_best={costs[i_best]:.2f})")
  if no_improvement >= EARLY_STOP:
    print(f"No improvement for {EARLY_STOP} iterations — stop.")
    break

[Iter   1] best_cost=16331197775.62 (iter_best=16331197775.62)
[Iter   2] best_cost=16077616195.02 (iter_best=16077616195.02)
[Iter   3] best_cost=15765398962.09 (iter_best=15765398962.09)
[Iter   4] best_cost=14389836926.87 (iter_best=14389836926.87)
[Iter   5] best_cost=14389836926.87 (iter_best=16780013227.43)
[Iter   6] best_cost=14389836926.87 (iter_best=15287360871.39)
[Iter   7] best_cost=11726562759.70 (iter_best=11726562759.70)
[Iter   8] best_cost=11726562759.70 (iter_best=16897242832.61)
[Iter   9] best_cost=11726562759.70 (iter_best=13921547576.51)
[Iter  10] best_cost=11726562759.70 (iter_best=14419108720.76)
[Iter  11] best_cost=11726562759.70 (iter_best=15921459356.40)
[Iter  12] best_cost=11726562759.70 (iter_best=15404438555.20)
No improvement for 5 iterations — stop.


### Đánh giá độ tốt của lịch thi cuối cùng

In [165]:
counter: dict[str, int] = {k: 0 for k in [
  "same_day_same_slot",  # Cùng ngày cùng ca -> phải bằng 0
  "same_day_adj",        # Thi cùng ngày, 2 ca liền kề
  "same_day_nonadj",     # Thi cùng ngày nhưng không phải 2 ca liền kề
  "overnight_adj",       # Thi ca cuối hôm trước và ca đầu ngày hôm sau
  "1_day",               # Hai ca thi ở hai ngày liên tiếp
  "2_days",              # Hai ca thi cách nhau 1 ngày
  "3_days",              # Hai ca thi cách nhau 2 ngày
  "other"                # Các trường hợp còn lại coi như là đẹp
]}
for course1, course2 in itertools.combinations(conflict, 2):
  mutual = conflict[course1][course2]
  if mutual == 0:
    continue
  day1, slot1 = TIMESLOTS[schedule[course1]]
  day2, slot2 = TIMESLOTS[schedule[course2]]
  delta_day = abs(day1 - day2)
  delta_slot = abs(slot1 - slot2)
  if delta_day == 0:
    if delta_slot == 0:
      counter["same_day_same_slot"] += mutual
    elif delta_slot == 1:
      counter["same_day_adj"] += mutual
    else:
      counter["same_day_nonadj"] += mutual
  elif delta_day == 1:
    if delta_slot == 1:
      counter["overnight_adj"] += mutual
    else:
      counter["1_day"] += mutual
  elif delta_day == 2:
    counter["2_days"] += mutual
  elif delta_day == 3:
    counter["3_days"] += mutual
  else:
    counter["other"] += mutual

print("best_cost:", best_cost)
counter

best_cost: 11726562759.704855


{'same_day_same_slot': 0,
 'same_day_adj': 47,
 'same_day_nonadj': 47,
 'overnight_adj': 0,
 '1_day': 8669,
 '2_days': 5892,
 '3_days': 6572,
 'other': 79058}

### Gán phòng thi và cán bộ coi thi
Sau khi đã có lịch thi tối ưu nhất, ta sẽ gán phòng thi và cán bộ coi thi.  
Đối với mỗi ca thi $i$, gọi $n$ là số lượng sinh viên thi ca này. Giả sử mỗi phòng có tối đa $40$ sinh viên và mỗi phòng thi cần đúng $2$ cán bộ coi thi, ta cần $r = \lceil \frac{n}{40} \rceil$ phòng thi và $t = 2r$ cán bộ coi thi.  
Về vấn đề phòng thi, ta có thể xếp môn thi vào bất cứ phòng nào miễn là phòng đó không được sử dụng bởi môn thi khác cùng ca thi.  
Về vấn đề cán bộ coi thi, ta mong muốn các cán bộ coi thi được phân việc đồng đều nhất (không có cán bộ nào phải đi coi quá nhiều ca thi trong khi có cán bộ coi ít ca thi hơn).

In [166]:
N_SUPERVISORS = 140
N_ROOMS       = 70
ROOM_MAX_CAPACITY = 40

def get_section_of_course(student_id: str, course_code: str) -> str:
  for section_code in sections_by_student[student_id]:
    if section_code.replace(' ', '').startswith(course_code):
      return section_code
  assert False   # không thể xảy ra trường hợp này

schedule_rows: list[tuple[str, str, int, int, int, int, int]] = []
count_timeslots_of_supervior = [0] * N_SUPERVISORS
schedule_by_timeslot: dict[int, list[str]] = defaultdict(list)
for course, ts in best_schedule.items():
  schedule_by_timeslot[ts].append(course)
for ts, courses in schedule_by_timeslot.items():
  supervisors = sorted(range(N_SUPERVISORS), key=lambda sp: count_timeslots_of_supervior[sp])
  day, timeslot = TIMESLOTS[ts]
  room_id = 0
  for course in courses:
    # Sort theo mã lớp môn học vì ta muốn có nhiều sinh viên học cùng lớp thì thi cùng phòng nhất chứ không phải xếp ngẫu nhiên
    students = sorted(students_by_course[course], key=lambda sid: get_section_of_course(sid, course))
    n_rooms = ceil(len(students) / ROOM_MAX_CAPACITY)
    idx = 0
    for i in range(n_rooms):
      # Tránh trường hợp có phòng maximum sinh viên trong khi có phòng có đúng 1 sinh viên, ta mong muốn mỗi phòng có số sinh viên bằng nhau
      # Ví dụ có 10 sv và 3 phòng thi thì có 1 phòng 4 sv và 2 phòng 3 sv
      capacity = len(students) // n_rooms + (i < len(students) % n_rooms)
      sp1 = supervisors[room_id * 2]
      sp2 = supervisors[room_id * 2 + 1]
      count_timeslots_of_supervior[sp1] += 1
      count_timeslots_of_supervior[sp2] += 1
      for j in range(capacity):
        schedule_rows.append((students[idx], course, day, timeslot % SLOTS_PER_DAY, room_id, sp1, sp2))
        idx += 1
      room_id += 1

### Lưu lịch thi vào database

In [167]:
with sqlite3.connect("database.db") as conn:
  cursor = conn.cursor()
  cursor.executescript("""
    DROP TABLE IF EXISTS schedule;
    CREATE TABLE schedule (
      student_id    TEXT,
      course_code   TEXT,
      day           INT,
      slot          INT,
      room          INT,
      supervisor1   INT,
      supervisor2   INT
    )
  """)
  cursor.executemany("INSERT INTO schedule VALUES (?,?,?,?,?,?,?)", schedule_rows)

len(schedule_rows)

38752