# Tema 1 IA - Class Scheduler
### Alexandru LICURICEANU - 332CD

In [461]:
# Prerequisites
import utils
import math
import time
from copy import copy, deepcopy
from heapq import heappop, heappush

from utils import ZILE as DAYS
from utils import INTERVALE as INTERVALS
from utils import SALI as CLASSROOMS
from utils import MATERII as SUBJECTS
from utils import PROFESORI as TEACHERS

CONSTRAINTS = 'Constrangeri'
PREFERRED = 'Preferred'
NOT_PREFERRED = 'Not_preferred'
CAPACITY = 'Capacitate'
MAX_INTERVALS = 7

PREFERRED_DAYS = 'Preferred_days'
NOT_PREFERRED_DAYS = 'Not_preferred_days'
PREFERRED_INTERVALS = 'Preferred_intervals'
NOT_PREFERRED_INTERVALS = 'Not_preferred_intervals'

# A* Algorithm

`State` data structure used by the A* implementation.

In [462]:
class State:
    def __init__(self, cost, state):
        self.state = state
        self.cost = cost

    def __lt__(self, other):
        return self.cost < other.cost

`init_empty_timetable()` Returns an empty timetable.



In [463]:
def init_empty_timetable(data):
    timetable = {day: {interval: {classroom: None for classroom in data[CLASSROOMS]} for interval in data[INTERVALS]} for day in data[DAYS]}
    return timetable

`is_covered()` Checks the timetable to see if the subject is fully covered.

In [464]:
def is_covered(state, subject, data):
    student_count = 0

    # Sum the capacity of all assigned classrooms where the subject is taught. 
    for day in state:
        for interval in state[day]:
            for classroom in state[day][interval]:
                if state[day][interval][classroom] is not None and state[day][interval][classroom][1] == subject:
                    student_count += data[CLASSROOMS][classroom][CAPACITY]

    return student_count >= data[SUBJECTS][subject]

`count_teacher_slots()` Counts the number of slots assigned to a given teacher in the timetable.

In [465]:
def count_teacher_slots(state, teacher):
    teacher_count = 0

    for day in state:
        for interval in state[day]:
            for classroom in state[day][interval]:
                if state[day][interval][classroom] is not None and state[day][interval][classroom][0] == teacher:
                    teacher_count += 1

    return teacher_count

`choose_teacher()` Chooses the teacher that has the least amount of classed for this subject assigned to him.

In [466]:
def choose_teacher(state, subject, data):
    teacher_counts = {teacher: 0 for teacher in data[TEACHERS] if subject in data[TEACHERS][teacher][SUBJECTS]}

    for day in state:
        for interval in state[day]:
            for classroom in state[day][interval]:
                if state[day][interval][classroom] is not None and state[day][interval][classroom][1] == subject:
                    teacher_counts[state[day][interval][classroom][0]] += 1

    # Filter out teachers that have already reached the maximum number of classes.
    teacher_counts = {teacher: count for teacher, count in teacher_counts.items() if count_teacher_slots(state, teacher) < MAX_INTERVALS}

    return min(teacher_counts, key=teacher_counts.get)


`check_hard_constraints()` Calculates the hard cost based on how many hard constraints have been violated in the state.

In [467]:
def check_hard_constraints(state, data):
    hard_cost = 0
    
    # Check if a teacher is assigned to more than one class at the same time.
    for day in state:
        for interval in state[day]:
            for classroom in state[day][interval]:

                if state[day][interval][classroom] is None:
                    continue

                teacher = state[day][interval][classroom][0]

                for other_classroom in state[day][interval]:
                    if other_classroom == classroom:
                        continue

                    if state[day][interval][other_classroom] is not None and state[day][interval][other_classroom][0] == teacher:
                        hard_cost += 1

    # Check if a subject cannot be taught in the assigned classroom.
    for day in state:
        for interval in state[day]:
            for classroom in state[day][interval]:
                if state[day][interval][classroom] is None:
                    continue
                
                subject = state[day][interval][classroom][1]

                if subject not in data[CLASSROOMS][classroom][SUBJECTS]:
                    hard_cost += 1
            
    return hard_cost

`check_soft_constraints_slot()` Calculates how many soft constraints have been violated in a specific time slot. 

In [468]:
def check_soft_constraints_slot(state, day, interval, classroom, data):
    soft_cost = 0

    if state[day][interval][classroom] is None:
        return soft_cost

    teacher = state[day][interval][classroom][0]

    # Check if the teacher prefers the assigned interval.
    if interval in data[TEACHERS][teacher][NOT_PREFERRED]:
        soft_cost += 1

    # Check if the teacher prefers the assigned day.
    if day in data[TEACHERS][teacher][NOT_PREFERRED]:
        soft_cost += 1

    return soft_cost

`check_soft_constraints()` Calculates how many soft constraints have been violated for the timetable.

In [469]:
def check_soft_constraints(state, data):
    soft_cost = 0
    
    for day in state:
        for interval in state[day]:
            for classroom in state[day][interval]:
                if state[day][interval][classroom] is None:
                    continue

                soft_cost += check_soft_constraints_slot(state, day, interval, classroom, data)

    return soft_cost

`is_final()` Checks if a state is final (if the state covers all subjects).

In [470]:
def is_final(state, data):
    return all(is_covered(state, subject, data) for subject in data[SUBJECTS])

`generate_next_states()` Generates the neighbors of the current state.
    
Identify all empty slots in the timetable where a subject can be assigned.
For each empty slot, try assigning all possible combinations of teachers and subjects that satisfy the hard constraints.
Check each assignment to ensure that it doesn't violate any of the hard constraints.
If a valid assignment is found, create a new state representing the timetable with the assignment made.
Repeat this process for all empty slots, generating a list of all possible neighbor states.

In [471]:
def generate_next_states(state, data):    
    next_states = []

    for subject in data[SUBJECTS]:

        # Check if this subject is covered.
        if is_covered(state, subject, data):
            continue

        for day in state:
            for interval in state[day]:
                for classroom in state[day][interval]:

                    # Empty slot is found.
                    if state[day][interval][classroom] is None:
                        new_state = deepcopy(state)

                        # Choose a teacher for this subject.
                        teacher = choose_teacher(state, subject, data)
                        
                        new_state[day][interval][classroom] = (teacher, subject)

                        # Verify hard constraints.
                        if check_hard_constraints(new_state, data) == 0:
                            next_states.append(new_state)

    return next_states

`count_unassigned_slots()` Counts how many time slots in the timetable have nothing assigned. \
`heuristic()` Heuristic function to evaluate a state. 

In [472]:
def count_unassigned_slots(state):
    slots_count = 0
    for day in state:
        for interval in state[day]:
            for classroom in state[day][interval]:
                if state[day][interval][classroom] is None:
                    slots_count += 1
    
    return slots_count

def heuristic(state, data):
    return count_unassigned_slots(state) + check_soft_constraints(state, data)

`astar()` A* algorithm.

In [473]:
def astar(initial_state, heuristic, generate_next_states, is_final, data):
    '''A* algorithm.'''

    #initial_state = generate_initial_state(initial_state, data)

    frontier = []
    heappush(frontier, State(heuristic(initial_state, data), initial_state))

    while frontier:
        current = heappop(frontier)

        current_state = current.state
        current_cost = current.cost

        if is_final(current_state, data):
            return current_state

        for next_state in generate_next_states(current_state, data):
            next_cost = heuristic(next_state, data)
            heappush(frontier, State(next_cost, next_state))
    

    return initial_state

`parse_input_file` Parses constraints from the input files. \
`run_astar()` Runs the A* algorithm for the given input file. \
`run_csp()` Runs the CSP algorithm for the given input file.


In [474]:
def parse_input_file(input_file):
    data = utils.read_yaml_file(input_file)

    # Sort subjects by number of students.
    #data[SUBJECTS] = dict(sorted(data[SUBJECTS].items(), key=lambda item: item[1]))

    preferred = {}
    not_preferred = {}

    # Process the soft constraints for each teacher.
    for teacher in data[TEACHERS]:
        preferred = [constraint for constraint in data[TEACHERS][teacher][CONSTRAINTS] if not constraint.startswith('!')]
        not_preferred = [constraint.strip('!') for constraint in data[TEACHERS][teacher][CONSTRAINTS] if constraint.startswith('!')]


        # Convert intervals from '8-12' to (8, 10) and (10, 12).
        preferred_intervals = [tuple(map(int, interval.split('-'))) for interval in preferred if interval[0].isdigit()]
        not_preferred_intervals = [tuple(map(int, interval.split('-'))) for interval in not_preferred if interval[0].isdigit()]

        # Convert intervals from (8, 14) to [(8, 10), (10, 12), (12, 14)]. 
        preferred_intervals_expanded = []
        not_preferred_intervals_expanded = []

        for interval in preferred_intervals:
            start = interval[0]
            end = interval[1]

            if start != end - 2:
                intervals = [str((i, i + 2)) for i in range(start, end, 2)]
                preferred_intervals_expanded.extend(intervals)
            else:
                preferred_intervals_expanded.append(str(interval))

        for interval in not_preferred_intervals:
            start = interval[0]
            end = interval[1]

            if start != end - 2:
                intervals = [str((i, i + 2)) for i in range(start, end, 2)]
                not_preferred_intervals_expanded.extend(intervals)
            else:
                not_preferred_intervals_expanded.append(str(interval))
            

        # Create the preferred and not preferred days.
        preferred_days = [day for day in preferred if not day[0].isdigit()]
        not_preferred_days = [day.strip('!') for day in not_preferred if not day[0].isdigit()]
        
        data[TEACHERS][teacher][PREFERRED] = preferred_days + preferred_intervals_expanded
        data[TEACHERS][teacher][NOT_PREFERRED] = not_preferred_days + not_preferred_intervals_expanded

        data[TEACHERS][teacher][PREFERRED_DAYS] = preferred_days
        data[TEACHERS][teacher][NOT_PREFERRED_DAYS] = not_preferred_days
        data[TEACHERS][teacher][PREFERRED_INTERVALS] = preferred_intervals_expanded
        data[TEACHERS][teacher][NOT_PREFERRED_INTERVALS] = not_preferred_intervals_expanded

    return data

def run_astar(input_file):
    # Read input data.
    data = parse_input_file(input_file)

    # Initialize the empty timetable.
    initial_state = init_empty_timetable(data)

    # Run the A* algorithm.
    start = time.time()
    result = astar(initial_state, heuristic, generate_next_states, is_final, data)
    end = time.time()

    print(f'[A*] {input_file} Execution time: {round(end - start, 2)} seconds. Soft constraints violated: {check_soft_constraints(result, data)}')
    
    return result

`transform_intervals()` Function that transforms the intervals of the timetable from strings to tuples,
in order to work with the utils.pretty_print function.

In [475]:
def transform_intervals(timetable):
    new_timetable = {}

    for day in timetable:
        new_timetable[day] = {}

        for interval in timetable[day]:
            temp = interval.strip('()').split(',')
            new_interval = (int(temp[0]), int(temp[1]))
            new_timetable[day][new_interval] = timetable[day][interval]

    return new_timetable                      

`test_astar()` Runs and times the A* algorithm for the input files. \
`test_csp()` Runs and times the CSP algorithm for the input files.

In [476]:
input_files = {'dummy': 0, 'orar_mic_exact': 0, 'orar_mediu_relaxat': 0, 'orar_mare_relaxat': 0, 'orar_constrans_incalcat': 0}#, 'orar_bonus_exact': 0}


In [477]:
def test_astar():
    for input_file in input_files:
        result = run_astar('inputs/' + input_file + '.yaml')
        transformed = transform_intervals(result)
        
        with open('outputs/' + input_file + '.txt', 'w') as f:
           f.write(utils.pretty_print_timetable(transformed, 'inputs/' + input_file + '.yaml'))

        print(utils.pretty_print_timetable(transformed, 'inputs/' + input_file + '.yaml'), end='\n')


        #print(utils.pretty_print_timetable(transformed, 'inputs/' + input_file + '.yaml'), end='\n')

#test_astar()

In [478]:
def transform_solution(solution, data):
        
        # Transform the solution to: dict -> day -> interval -> classroom -> (teacher, subject)
        new_solution = init_empty_timetable(data)

        for var in solution:
            day = var[0]
            interval = var[1]
            classroom = var[2]

            teacher = solution[var][0]
            subject = solution[var][1]

            if teacher is not None and subject is not None:
                new_solution[day][interval][classroom] = (teacher, subject)
            else:
                new_solution[day][interval][classroom] = None

        return new_solution

In [479]:
def run_pcsp(input_file):
    global best_solution
    global best_cost
    global iterations
    
    data = parse_input_file('inputs/' + input_file + '.yaml')
    #print(data)

    vars = [(day, interval, classroom) for day in data[DAYS] for interval in data[INTERVALS] for classroom in data[CLASSROOMS]]
    domains = {var: [(teacher, subject)
                    for teacher in data[TEACHERS]
                    for subject in data[TEACHERS][teacher][SUBJECTS] if subject in data[CLASSROOMS][var[2]][SUBJECTS]]
                    for var in vars}


    # Sort the domains by the number of students in the subject.
    # o sala in care se predau x materii adauga 1/x puncte la ambele

    for var in domains:
        #domains[var] = sorted(domains[var], key=lambda x: data[SUBJECTS][x[1]])
        #domains[var] = sorted(domains[var], key=lambda x: scores[x[1]] if x[1] is not None else math.inf)
        domains[var] = sorted(domains[var], key=lambda x: data[SUBJECTS][x[1]])
        domains[var].append((None, None))      

    #print(domains)
    soft_constraints = [([var], lambda teacher, var=var: teacher is None or var[0] in data[TEACHERS][teacher][PREFERRED_DAYS]) for var in vars]
    soft_constraints += [([var], lambda teacher, var=var: teacher is None or var[1] in data[TEACHERS][teacher][PREFERRED_INTERVALS]) for var in vars]

    best_solution = {}
    best_cost = len(soft_constraints)
    iterations = 0

    result = pcsp(vars, domains, soft_constraints, input_files[input_file], {}, input_files[input_file], data)
    print(f'[PCSP] done in {iterations} iterations. {input_file} : {best_cost} : {best_solution}')
    
    print(count_total_assigned_students(best_solution, data))
    print(check_hard_constraints3(best_solution, data))
    
    return transform_solution(best_solution, data)


def get_constraints(var, constraints):
    return [constraint for constraint in constraints if var in constraint[0]]


def check_constraint(solution, constraint):
    return constraint[1](*(solution[var][0] for var in constraint[0]))
    

def fixed_constraints(solution, constraints):
    return [constraint for constraint in constraints if all(var in solution for var in constraint[0])]

def is_covered2(solution, subject, data):
    student_count = 0

    # Sum the capacity of all assigned classrooms where the subject is taught. 
    for var in solution:
        if solution[var] != (None, None) and solution[var][1] == subject:
            student_count += data[CLASSROOMS][var[2]][CAPACITY]

    return student_count >= data[SUBJECTS][subject]

def check_all_covered(solution, data):
    return all(is_covered2(solution, subject, data) for subject in data[SUBJECTS])


def compute_new_domains(solution, domains, data):
    covered = {subject: is_covered2(solution, subject, data) for subject in data[SUBJECTS]}
    new_domains = {var: list(filter(lambda val: val[1] is None or not covered[val[1]], domains[var])) for var in domains}

    return new_domains

def check_hard_constraints3(solution, data):
    
    # Check if the same teacher is assigned to two classes at the same time.
    teacher_intervals = {}
    teacher_intervals_count = {teacher: 0 for teacher in data[TEACHERS]}

    for (day, interval, classroom), (teacher, subject) in solution.items():
        if teacher is None:
            continue
            
        teacher_intervals_count[teacher] += 1

        if teacher_intervals_count[teacher] > MAX_INTERVALS:
            return False

        if teacher not in teacher_intervals:
            teacher_intervals[teacher] = set()

        if (day, interval) in teacher_intervals[teacher]:
            return False
        else:
            teacher_intervals[teacher].add((day, interval))

    if not all(count <= MAX_INTERVALS for count in teacher_intervals_count.values()):
        return False
    
    return True
            

def count_total_assigned_students(solution, data):
    student_count = {subject: 0 for subject in data[SUBJECTS]}

    for var in solution:
        if solution[var] != (None, None):
            student_count[solution[var][1]] += data[CLASSROOMS][var[2]][CAPACITY]

    for subject, count in student_count.items():
        print(f'{subject}: {count}/{data[SUBJECTS][subject]}')


def count_total_assigned_students2(solution, data, subject):
    student_count = 0

    for var in solution:
        if solution[var] != (None, None) and solution[var][1] == subject:
            student_count += data[CLASSROOMS][var[2]][CAPACITY]

    return student_count
    

def pcsp(vars, domains, constraints, acceptable_cost, solution, cost, data, max_iters = 10000):
    global best_solution
    global best_cost
    global iterations
  
    domains = compute_new_domains(solution, domains, data)

    if not vars:
        best_solution = deepcopy(solution)
        best_cost = cost

        if best_cost <= acceptable_cost and check_all_covered(solution, data) and check_hard_constraints3(solution, data):
            return True
        
    elif not domains[vars[0]]:
        return False
    elif cost == best_cost:
        return False
    else:
        
        var = vars[0]
        val = domains[var].pop(0)
        iterations += 1

        
        
        new_solution = deepcopy(solution)
        new_solution[var] = val

        #if val[1] != None and is_covered2(solution, val[1], data):
        #    return pcsp(vars, deepcopy(domains), constraints, acceptable_cost, solution, cost, data)
        
        if not check_hard_constraints3(new_solution, data):
            return False


        eval_soft_constraints = fixed_constraints(new_solution, constraints)
        new_cost = 0
                
        for constraint in eval_soft_constraints:
            if not check_constraint(new_solution, constraint):
                new_cost += 1
                
        if new_cost < best_cost and new_cost <= acceptable_cost:
            if pcsp(vars[1:], deepcopy(domains), constraints, acceptable_cost, new_solution, new_cost, data) and check_all_covered(new_solution, data):
                return True
            
        return pcsp(vars, deepcopy(domains), constraints, acceptable_cost, solution, cost, data)



In [480]:
def test_pcsp():
    for input_file in input_files.keys():
        result = run_pcsp(input_file)
        transformed = transform_intervals(result)
        
        with open('outputs/' + input_file + '.txt', 'w') as f:
            f.write(utils.pretty_print_timetable(transformed, 'inputs/' + input_file + '.yaml'))


test_pcsp()

[PCSP] done in 36 iterations. dummy : 0 : {('Luni', '(8, 10)', 'EG324'): ('Roxana Gheorghe', 'MS'), ('Luni', '(8, 10)', 'EG390'): ('Elena Gheorghe', 'DS'), ('Luni', '(10, 12)', 'EG324'): ('Pavel Filipescu', 'IA'), ('Luni', '(10, 12)', 'EG390'): ('Andreea Dinu', 'DS'), ('Luni', '(12, 14)', 'EG324'): ('Pavel Filipescu', 'IA'), ('Luni', '(12, 14)', 'EG390'): ('Roxana Gheorghe', 'DS'), ('Marti', '(8, 10)', 'EG324'): ('Roxana Gheorghe', 'MS'), ('Marti', '(8, 10)', 'EG390'): ('Cristina Dumitrescu', 'DS'), ('Marti', '(10, 12)', 'EG324'): ('Andreea Dinu', 'IA'), ('Marti', '(10, 12)', 'EG390'): (None, None), ('Marti', '(12, 14)', 'EG324'): ('Cristina Dumitrescu', 'MS'), ('Marti', '(12, 14)', 'EG390'): (None, None), ('Miercuri', '(8, 10)', 'EG324'): ('Roxana Gheorghe', 'MS'), ('Miercuri', '(8, 10)', 'EG390'): (None, None), ('Miercuri', '(10, 12)', 'EG324'): (None, None), ('Miercuri', '(10, 12)', 'EG390'): (None, None), ('Miercuri', '(12, 14)', 'EG324'): (None, None), ('Miercuri', '(12, 14)', 'EG