
Import the necessary modules

In [595]:
import random as rd
import numpy as np
import itertools
import utils
import time
import sys

from heapq import heappop, heappush
from utils import pretty_print_timetable
from check_constraints import check_mandatory_constraints

In [596]:
class Day:
    def __init__(self, name):
        self.name = name

In [597]:
class Interval:
    def __init__(self, interval):
        self.interval = interval

In [598]:
class Subject:
    def __init__(self, name, capacity):
        self.name = name
        self.capacity = capacity
        self.occupied = 0
    
    def is_covered(self):
        return self.occupied >= self.capacity

    def update(self, lecture_hall):
        self.occupied += lecture_hall.capacity

    def restart(self):
        self.occupied = 0

In [599]:
class Constraints:
    def __init__(self, constraints):
        days = ["Luni", "Marti", "Miercuri", "Joi", "Vineri", "Sambata", "Duminica"]
        self.prefferable_days = []
        self.prefferable_intervals = []
        self.c_pauses = []
        
        for c in constraints:
            if c in days:
                self.prefferable_days.append(c)
            
            if "-" in c and not "!" in c:
                self.prefferable_intervals.append(c)

            if "Pauza" in c:
                self.c_pauses.append(c)

In [600]:
class Teacher:
    def __init__(self, name, constraints, subjects):
        self.name = name
        self.num_intervals = 0
        self.constraints = Constraints(constraints)
        self.subjects = subjects

    def is_available(self):
        return self.num_intervals < 7

    def is_specialized(self, s):
        """
            Tells if the teacher may teach the subject s received as a parameter
        """
        return s in self.subjects

    def update(self):
        self.num_intervals += 1

    def restart(self):
        self.num_intervals = 0

In [601]:
class LectureHall:
    def __init__(self, name, capacity, subjects):
        self.name = name
        self.capacity = capacity
        self.subjects = subjects

In [602]:
class Slot:
    def __init__(self, day, interval, lecture_hall):
        self.day = day
        self.interval = interval
        self.lecture_hall = lecture_hall

        self.teacher = None
        self.subject = None

    def is_available(self):
        return (self.subject is None) or (self.teacher is None)

    def matches_requirements(self, lecture_hall, teacher, slots):
        return self.lecture_hall.name == lecture_hall.name and\
            not any(slot.teacher == teacher for slot in [s for s in slots if s.interval == self.interval and s.day == self.day])

    def update(self, subject, teacher):
        self.subject = subject
        self.teacher = teacher

    def restart(self):
        self.subject = None
        self.teacher = None

    def swap(self, other):
        self.subject, other.subject = other.subject, self.subject
        self.teacher, other.teacher = other.teacher, self.teacher

    def can_swap(self, other, timetable):
        def can_swap_helper(s1, s2, timetable):
            # Take all the slots from the same day and intervals
            same_interval_slots = [s for s in timetable["timetable"].slots\
                if s.day == s1.day and s.interval == s1.interval and s != s1]

            # Check if the s2 teacher has another class
            teacher_cond = any(s2.teacher == s.teacher for s in same_interval_slots)

            # Check if the s2 lecture hall is used in that interval
            lecture_hall_cond = any(s2.lecture_hall == s.lecture_hall for s in same_interval_slots)

            return teacher_cond and lecture_hall_cond
        
        return can_swap_helper(self, other, timetable) and can_swap_helper(other, self, timetable)


In [603]:
class Timetable:
    def __init__(self, days, interval, lecture_halls):
        self.slots = []
        for d in days:
            for i in interval:
                for l in lecture_halls:
                    self.slots.append(Slot(d, i, l))

In [604]:
def process_data(data):
    # Initialize Days and Intervals
    days = [Day(day_name) for day_name in data["Zile"]]
    intervals = [Interval(interval) for interval in data["Intervale"]]

    # Initialize subjects
    subjects = [Subject(name, capacity) for name, capacity in data["Materii"].items()]
    
    # Initialize lecture halls
    lecture_halls = [LectureHall(name, details["Capacitate"], [s for s in subjects if s.name in details["Materii"]]) for name, details in data["Sali"].items()]

    # Create empty timetable
    timetable = Timetable(days, intervals, lecture_halls)

    # Initialize teachers
    teachers = [Teacher(name, details["Constrangeri"], [s for s in subjects if s.name in details["Materii"]]) for name, details in data["Profesori"].items()]
    
    return {
        "timetable": timetable,
        "days": days,
        "intervals": intervals,
        "subjects": subjects,
        "lecture_halls": lecture_halls,
        "teachers": teachers
    }

In [605]:
def print_data(data):
    print("Timetable Slots:")
    for slot in data["timetable"].slots:
        slot_details = f"{slot.day.name} {slot.interval.interval}, Subject: {slot.subject.name if slot.subject else 'None'}, "
        slot_details += f"Teacher: {slot.teacher.name if slot.teacher else 'None'}, "
        slot_details += f"Hall: {slot.lecture_hall.name if slot.lecture_hall else 'None'}"
        print(slot_details)

    print("\nSubjects:")
    for subject in data["subjects"]:
        print(f"{subject.name}, Capacity: {subject.capacity}, Occupied: {subject.occupied}")

    print("\nLecture Halls:")
    for hall in data["lecture_halls"]:
        subjects = ', '.join([s.name for s in hall.subjects])
        print(f"{hall.name}, Capacity: {hall.capacity}, Subjects: {subjects}")

    print("\nTeachers:")
    for teacher in data["teachers"]:
        subjects = ', '.join([s.name for s in teacher.subjects])
        constraints = f"Days: {', '.join(teacher.constraints.prefferable_days)}, Intervals: {', '.join(teacher.constraints.prefferable_intervals)}, Pauses: {', '.join(teacher.constraints.c_pauses)}"
        print(f"{teacher.name}, Subjects: {subjects}, Constraints: {constraints}")

In [606]:
def get_interval_tuple(interval, delimiter=", "):
    if delimiter == "-":
        splitted = interval.split(delimiter)
    else:
        splitted = interval[1:len(interval) - 1].split(delimiter)
    
    first = int(splitted[0])
    second = int(splitted[1])
    return (first, second)

In [607]:
def save_timetable(data, file_path):
    processed_data = {}

    for d in data["days"]:
        processed_data[d.name] = {}
        for i in data["intervals"]:
            processed_data[d.name][get_interval_tuple(i.interval)] = {}
            
    for slot in data["timetable"].slots:
        processed_data[slot.day.name][get_interval_tuple(slot.interval.interval)][slot.lecture_hall.name] = (slot.teacher.name, slot.subject.name) if slot.teacher else None
    
    pretty_data = pretty_print_timetable(processed_data, file_path)

    with open(f"outputs/{file_path.split('/')[1].split('.')[0]}.txt", "w") as f:
        f.writelines(pretty_data)
    

Function to run the algorithm


In [608]:
def run_algorithm(file_path, func, h=None) -> None:
    data = process_data(utils.read_yaml_file(file_path))

    if h is not None:
        data["h"] = h

    # TODO run the algorithm
    func(data, file_path)

In [609]:
def is_solution(timetable, specs):
    return not check_mandatory_constraints(timetable.state, specs)

In [610]:
def astar(data):
    h = data["h"]
    
    # Initialize empty timetable
    start_timetable = Timetable(0, h(data), h, data)
    
    OPEN = []
    CLOSED = set()
    heappush(OPEN, (start_timetable.f, start_timetable))
    
    while OPEN:
        _, curr = heappop(OPEN)

        if is_solution(curr, data):
            return curr

        CLOSED.add(curr)
        for SUCC in curr.generate_successors():
            if SUCC not in CLOSED:
                heappush(OPEN, (SUCC.f, SUCC))
    
    
    

In [611]:
def h1():
    pass

In [612]:
class RandomRestartHillClimbing():
    def __all_subjects_covered(subjects):
        return all(s.is_covered() for s in subjects)
    
    def __interval_constraint_breach(interval, searched_intervals):
        return not any(interval[0] >= i[0] and interval[1] <= i[1] for i in searched_intervals)
    
    def generate_initial_state_restart(data, restarts):
        for slot in data["timetable"].slots:
            slot.restart()

        for s in data["subjects"]:
            s.restart()
    
        for t in data["teachers"]:
            t.restart()

        restarts += 1

        sys.stdout.write(f"\rRestarts: {restarts}")
        sys.stdout.flush()

        return data, restarts
    
    def generate_initial_state(data):
        restarts = 0
        print("Starting to generate the initial state...")

        while not RandomRestartHillClimbing.__all_subjects_covered(data["subjects"]):
            # Choose a random subject
            s = rd.choice([s for s in data["subjects"] if not s.is_covered()])

            while not s.is_covered():
                # Choose a random lecture hall
                try:
                    l = rd.choice([l for l in data["lecture_halls"] if s in l.subjects])
                except IndexError:
                    # if there is an IndexError, it means that the choice() method was applied on an empty list
                    # therefore there are no lecture halls available so the generating algorithm has to be restarted
                    data, restarts = RandomRestartHillClimbing.generate_initial_state_restart(data, restarts)
                    break

                # Choose a random teacher
                try:
                    t = rd.choice([t for t in data["teachers"] if t.is_available() and t.is_specialized(s)])
                except IndexError:
                    # if there is an IndexError, it means that the choice() method was applied on an empty list
                    # therefore there are no teachers available so the generating algorithm has to be restarted
                    data, restarts = RandomRestartHillClimbing.generate_initial_state_restart(data, restarts)
                    break

                # Choose a random slot
                try:
                    slot = rd.choice([slot for slot in data["timetable"].slots\
                        if slot.is_available() and slot.matches_requirements(l, t, data["timetable"].slots)])
                except IndexError:
                    # if there is an IndexError, it means that the choice() method was applied on an empty list
                    # therefore there are no slots available so the generating algorithm has to be restarted
                    data, restarts = RandomRestartHillClimbing.generate_initial_state_restart(data, restarts)
                    break
            
                slot.update(s, t)
                s.update(l)
                t.update()
        print("\nGenerating the initial state - DONE")
        return data
    
    def eval(timetable):
        """
            This is the method used for evaluating the cost of the current state/timetable.
            It is used in Hill Climbing algorithm for choosing the next state.
            The cost consists in the number of constraints that are not breached.

            Used primarily for:
                if Eval(S') > Eval(S) return S, where S' is max(s for s in SUCC(S))
        """
        cost = 0
        slots = timetable["timetable"].slots
    
        # Evaluate each slot in the timetable
        for slot in slots:
            # If the slot is not available, it means there are a teacher and a subject allocated for the slot
            if not slot.is_available():
                # Day constraint breach
                if slot.day.name not in slot.teacher.constraints.prefferable_days:
                    cost += 1
                
                # Interval constraint breach
                slot_interval = get_interval_tuple(slot.interval.interval)
                prefferable_intervals = [get_interval_tuple(interval, "-") for interval in slot.teacher.constraints.prefferable_intervals]
                if RandomRestartHillClimbing.__interval_constraint_breach(slot_interval, prefferable_intervals):
                    cost += 1

        return cost

    def generate_successor(S):
        """
            Generate all successors from the current timetable.
            These successors are similar to the latter timetable,
            as there is just a swap taking place, interchanging two slots
            for example.
            This approach is useful for reducing the number of constraints.
            Choose the best one based on the eval() method.
        """
        best_swap = None
        best_cost = RandomRestartHillClimbing.eval(S)
        slot_swaps = list(itertools.combinations(S["timetable"].slots, 2))

        # Keep the same variable/timetable to avoid deep copy and duplicating overhead
        for s1, s2 in slot_swaps:
            # Check if the swap will cause breaches among the hard constraints
            if not s1.can_swap(s2, S):
                continue

            # Realize the swap
            s1.swap(s2)

            # If the new state's cost is better than the best cost, update the best swap
            new_cost = RandomRestartHillClimbing.eval(S)
            if new_cost > best_cost:
                best_swap = (s1, s2)
                best_cost = new_cost

            # Undo the swap to proceed with the next ones
            s1.swap(s2)

        if best_swap:
            s1, s2 = best_swap[0], best_swap[1]
            s1.swap(s2)
            return S, False
        else:
            # No better state found, should stop
            return S, True

In [613]:
def random_restart_hill_climbing(data, file_path):
    # Generate the initial state
    S = RandomRestartHillClimbing.generate_initial_state(data)

    """
        Generate the best successor.
        If the best one is no better than the current state, it should stop.
    """
    stop = False
    while not stop:
        S, stop = RandomRestartHillClimbing.generate_successor(S)
    
    save_timetable(S, file_path)
    

Run the algorithm for "dummy.yaml"

In [614]:
run_algorithm("inputs/dummy.yaml", random_restart_hill_climbing)

Starting to generate the initial state...

Generating the initial state - DONE


KeyError: (8, 10)

Run the algorithm for "orar_constrans_incalcat.yaml"

In [None]:
run_algorithm("inputs/orar_constrans_incalcat.yaml", h=h1)

TypeError: run_algorithm() missing 1 required positional argument: 'func'

Run the algorithm for "orar_mic_exact.yaml"

In [None]:
run_algorithm("inputs/orar_mic_exact.yaml", h=h1)

Run the algorithm for "orar_mediu_relaxat.yaml"

In [None]:
run_algorithm("inputs/orar_mediu_relaxat.yaml", h=h1)

Run the algorithm for "orar_mare_relaxat.yaml"

In [None]:
run_algorithm("inputs/orar_mare_relaxat.yaml", h=h1)

Run the algorithm for all files

In [None]:
input_files = [
    "inputs/dummy.yaml",
    "inputs/orar_bonus_exact.yaml",
    "inputs/orar_constrans_incalcat.yaml",
    "inputs/orar_mare_relaxat.yaml",
    "inputs/orar_mediu_relaxat.yaml",
    "inputs/orar_mic_exact.yaml",
]

for file in input_files:
    run_algorithm(file, random_restart_hill_climbing)

Starting to generate the initial state...
Generating the initial state - DONE
Starting to generate the initial state...
Restarts: 4Generating the initial state - DONE
Starting to generate the initial state...
Generating the initial state - DONE
Starting to generate the initial state...
Generating the initial state - DONE
Starting to generate the initial state...
Generating the initial state - DONE
Starting to generate the initial state...
Generating the initial state - DONE
