In [None]:
import random as rnd
from prettytable import PrettyTable
import pandas as pd
import io

from zipfile import ZipFile
from datetime import date

In [None]:
DAYS = list(range(1,31))
SHIFTS = [str(day).zfill(2) + '-' + s for day in DAYS for s in ["MOR","EVE"]]

In [None]:
EMPLOYEES = []
for i in range(1,11):
    sub = []
    if i == 1:
        sub = ["E1","Manager"]
    elif i == 2:
        sub = ["E2","Leader"]
    else:
        sub = [f"E{i}", "Staff"]
    EMPLOYEES.append(sub)
EMPLOYEES

In [None]:
SHIFTS = []
START_DATE = date(2023,9,1)
END_DATE = date(2023,9,30)
HALF_OF_EMPLOYEES = len(EMPLOYEES) // 2
for DATE in pd.date_range(START_DATE,END_DATE):
    day = DATE.day
    day_name = DATE.day_name()
    for session in ["MOR","EVE"]:
        if day in (1,15,30):
            if session == "MOR":
                employees = HALF_OF_EMPLOYEES - 2
            else:
                employees = HALF_OF_EMPLOYEES - 1
        else:
            if session == "MOR":
                employees = HALF_OF_EMPLOYEES
            else:
                employees = HALF_OF_EMPLOYEES + 1
            
            if day_name in ('Saturday','Sunday'):
                employees = employees - 2

        SHIFTS.append([
            str(day).zfill(2) + '-' + session,
            max(employees, 2)
        ])
SHIFTS

- each shift has at least 1 manager / leader
- the number of employees is greater than or equal to the threshold in each shift
- 1 employee must work exactly 25 shifts in a month
-

In [None]:
POPULATION_SIZE = 10
NUMB_OF_ELITE_SCHEDULES = 1
TOURNAMENT_SELECTION_SIZE = 3
MUTATION_RATE = 0.1
NUMB_SHIFTS_PER_EMPLOYEE = 25
# def random_chunks(iters, n:int, max_drop:int, first_limit:int, last_limit:int):
#     S = list(iters)
#     chunks = []
#     rnd.shuffle(S)
#     drop = rnd.randint(0, max_drop)
#     S = S[drop:]
#     start = rnd.randint(first_limit,len(S)-last_limit-1)
#     end = rnd.randint(start+1,len(S)-last_limit)
#     chunks.append(S[:start])
#     midS = S[start:end]
#     for _ in range(n-2):
#         mid = rnd.randint(0,len(midS))
#         chunks.append(midS[:mid])
#         midS = midS[mid:]
#     chunks.append(S[end:])
#     return chunks

class Employees:
    """"""
    def __init__(self, EmployeeCode:str, JobTitleName:str):
        self._EmployeeCode = EmployeeCode
        self._JobTitleName = JobTitleName
    
    def get_EmployeeCode(self): return self._EmployeeCode
    def get_JobTitleName(self): return self._JobTitleName

class Shifts:
    """"""
    def __init__(self, shiftName:str, minEmployees:int):
        self._shiftName = shiftName
        self._minEmployees = minEmployees

    def get_shiftName(self): return self._shiftName
    def get_minEmployees(self): return self._minEmployees

class Data:
    """"""
    EMPLOYEES = EMPLOYEES
    SHIFTS = SHIFTS
    def __init__(self):
        self._employees = []; self._shifts = []
        for row in self.EMPLOYEES:
            self._employees.append(Employees(*row))
        for row in self.SHIFTS:
            self._shifts.append(Shifts(*row))
            
    def get_employees(self): return self._employees
    def get_shifts(self): return self._shifts
        
class Schedule:
    """"""
    def __init__(self) -> None:
        self._data = data
        self._shiftDetails = []
        self._numbOfConflictsTotal = 0
        self._numbOfConflicts1 = 0
        self._numbOfConflicts2 = 0
        self._numbOfConflicts3 = 0
        self._fitness = -1
        self._shiftNumb = 0
        self._isFitnessChanged = True
    def get_shiftDetails(self):
        self._isFitnessChanged = True
        return self._shiftDetails
    def get_numbOfConflictsTotal(self): return self._numbOfConflictsTotal
    def get_numbOfConflicts1(self): return self._numbOfConflicts1
    def get_numbOfConflicts2(self): return self._numbOfConflicts2
    def get_numbOfConflicts3(self): return self._numbOfConflicts3
    def get_fitness(self):
        if self._isFitnessChanged == True:
            self._fitness = self.calculate_fitness()
            self._isFitnessChanged = False
        return self._fitness

    def initialize(self):
        shifts = self._data.get_shifts()
        employees = self._data.get_employees()
        for shift in shifts:
            newShift = TimeKeeping(self._shiftNumb, shift)
            selectedEmployees = []
            for employee in employees:
                isSelected = rnd.randrange(0,2)
                if isSelected == 1:
                    selectedEmployees.append(employee)
            
            newShift.set_employeeList(selectedEmployees)
            self._shiftDetails.append(newShift)
        return self
    def calculate_fitness(self):
        totalShifts = len(self._data.get_shifts())
        shiftDetails = self.get_shiftDetails()
        df = pd.DataFrame(columns=['id', 'shiftName', 'employees'])
        for s in shiftDetails:
            temp = pd.DataFrame([{
                'id': s.get_id(),
                'shiftName': s.get_shift().get_shiftName(),
                'minEmployees': s.get_shift().get_minEmployees(),
                'employeeList': s.get_employeeList()
            }])
            df = pd.concat([df, temp])
        df = df.explode('employeeList')
        df['EmployeeCode'] = df['employeeList'].apply(lambda x: x.get_EmployeeCode() if x == x else x)
        df['JobTitleName'] = df['employeeList'].apply(lambda x: x.get_JobTitleName() if x == x else x)

        # check whether all shifts are always observed by managers
        df_managers = df[df['JobTitleName'].isin(['Manager','Leader'])]
        numbOfShiftsWithManager = df_managers['shiftName'].nunique()
        self._numbOfConflicts1 += totalShifts - numbOfShiftsWithManager

        # check whether the number of employees is greater than or equal to the threshold (minimum of employees number)
        numbOfShiftsLackOfEmps = df.groupby(['shiftName','minEmployees'], as_index=False).agg(numbEmployees=('EmployeeCode','nunique')).query("numbEmployees < minEmployees").shape[0]
        self._numbOfConflicts2 += numbOfShiftsLackOfEmps

        # check whether each employee has exactly `NUMB_SHIFTS_PER_EMPLOYEE` shifts in month    
        numbOfEmpsNotExactShifts = df.groupby('EmployeeCode', as_index=False).agg(numbShifts=('shiftName','nunique')).query(f"numbShifts != {NUMB_SHIFTS_PER_EMPLOYEE}").shape[0]
        self._numbOfConflicts3 += numbOfEmpsNotExactShifts
        
        # sum of numbOfConflict components
        
        self._numbOfConflictsTotal = self.get_numbOfConflicts1() + self.get_numbOfConflicts2() + self.get_numbOfConflicts3()

        return 1 / (1.0*self.get_numbOfConflictsTotal() + 1)
    def __str__(self):
        shiftDetails = self._shiftDetails
        finalList = []
        for s in shiftDetails:
            idx = s.get_id()
            shiftName = s.get_shift().get_shiftName()
            employeeList = s.get_employeeList()
            for employee in employeeList:
                employeeCode = employee.get_EmployeeCode()
                JobTitleName = employee.get_JobTitleName()
                # zipped = [f'({tup[0]}:{len(tup[1])})' for tup in zip(shifts, employees) if len(tup[1]) > 0]
                # finalList.append(str(day)+':['+','.join(zipped)+']')
        return ', '.join(finalList)

class Population:
    """"""
    def __init__(self, size:int):
        self._size = size
        self._data = data
        self._schedules = []
        for _ in range(0, size):
            self._schedules.append(Schedule().initialize())
    def get_schedules(self): return self._schedules

class GeneticAlgorithm:
    """"""
    def evolve(self, population:Population): return self._mutate_population(self._crossover_population(population))
    def _crossover_population(self, pop:Population):
        crossover_pop = Population(0)
        for i in range(NUMB_OF_ELITE_SCHEDULES):
            crossover_pop.get_schedules().append(pop.get_schedules()[i])
        i = NUMB_OF_ELITE_SCHEDULES
        while i < POPULATION_SIZE:
            schedule1 = self._select_tournament_population(pop).get_schedules()[0]
            schedule2 = self._select_tournament_population(pop).get_schedules()[0]
            crossover_pop.get_schedules().append(self._crossover_schedule(schedule1, schedule2))
            i += 1
        return crossover_pop
    def _mutate_population(self, population:Population):
        for i in range(NUMB_OF_ELITE_SCHEDULES, POPULATION_SIZE):
            self._mutate_schedule(population.get_schedules()[i])
        return population
    def _crossover_schedule(self, schedule1:Schedule, schedule2:Schedule):
        crossoverSchedule = Schedule().initialize()
        for i in range(0, len(crossoverSchedule.get_shiftDetails())):
            if rnd.random() > 0.5: 
                crossoverSchedule.get_shiftDetails()[i] = schedule1.get_shiftDetails()[i]
            else:
                crossoverSchedule.get_shiftDetails()[i] = schedule2.get_shiftDetails()[i]
        return crossoverSchedule
    def _mutate_schedule(self, mutateSchedule:Schedule):
        schedule = Schedule().initialize()
        for i in range(0, len(mutateSchedule.get_shiftDetails())):
            if MUTATION_RATE > rnd.random(): 
                mutateSchedule.get_shiftDetails()[i] = schedule.get_shiftDetails()[i]
        return mutateSchedule
    def _select_tournament_population(self, pop:Population):
        tournament_pop = Population(0)
        i = 0
        while i < TOURNAMENT_SELECTION_SIZE:
            tournament_pop.get_schedules().append(pop.get_schedules()[rnd.randrange(0, POPULATION_SIZE)])
            i += 1
        tournament_pop.get_schedules().sort(key=lambda x: x.get_fitness(), reverse=True)
        return tournament_pop




class TimeKeeping:
    """"""
    def __init__(self, id:int, shift:Shifts):
        self._id = id
        self._shift = shift
        self._employeeList = []
    def get_id(self): return self._id 
    def get_shift(self): return self._shift
    def get_employeeList(self): return self._employeeList
    def set_employeeList(self, employeeList): self._employeeList = employeeList
    def __str__(self):
        shift = self.get_shift()
        shiftName = shift.get_shiftName()
        employees = [[e.get_EmployeeCode() for e in el] for el in self.get_employeeList()]
        zipped = [tup[0]+':'+str(tup[1]) for tup in zip(shiftName, employees)]
        returnValue = ', '.join(zipped)
        return returnValue

class DisplayMgr:
    def print_available_data(self):
        print('> All Available Data')
        self.print_employees()
        self.print_shifts()
    def print_employees(self):
        employees = data.get_employees()
        availableEmpTable = PrettyTable(['employeeCode', 'jobTitleName'])
        for i in range(len(employees)):
            row = employees.__getitem__(i)
            availableEmpTable.add_row([row.get_EmployeeCode(),row.get_JobTitleName()])
        print('\n> Employees')
        print(availableEmpTable)
    def print_shifts(self):
        availableShiftTable = PrettyTable(['shiftName', 'minEmployees'])
        shifts = data.get_shifts()
        for i in range(len(shifts)):
            row = shifts.__getitem__(i)
            availableShiftTable.add_row([row.get_shiftName(),row.get_minEmployees()])
        print('\n> Shifts Info')
        print(availableShiftTable)
    def print_generation(self, population:Population):
        table1 = PrettyTable(['schedule #', 'fitness', '# of conflicts', 'conflicts details']) #, 'scheduling [day:(shift,employees)]'
        schedules = population.get_schedules()
        for i, row in enumerate(schedules[:1]):
            conflicts = [row.get_numbOfConflicts1(),row.get_numbOfConflicts2(),row.get_numbOfConflicts3()]
            table1.add_row([str(i), round(row.get_fitness(), 3), row.get_numbOfConflictsTotal(), conflicts]) #, row
        print(table1)
    def print_schedule_as_table(self, schedule:Schedule):
        shiftDetails = schedule.get_shiftDetails()
        table = PrettyTable(['shiftName', 'EmployeeCode', 'JobTitleName'])
        for s in shiftDetails:
            shiftName = s.get_shift().get_shiftName()
            employees = s.get_employeeList()
            for emp in employees:
                table.add_row([
                    shiftName,
                    emp.get_EmployeeCode(),
                    emp.get_JobTitleName()
                ])
        print(table)

data = Data()
displayMgr = DisplayMgr()
displayMgr.print_available_data()
generationNumber = 0
print('\n> Generation # ' + str(generationNumber))
population = Population(POPULATION_SIZE)
population.get_schedules().sort(key=lambda x: x.get_fitness(), reverse=True)
displayMgr.print_generation(population)
displayMgr.print_schedule_as_table(population.get_schedules()[0])
geneticAlgorithm = GeneticAlgorithm()
while population.get_schedules()[0].get_fitness() != 1.0:
    generationNumber += 1
    population = geneticAlgorithm.evolve(population)
    population.get_schedules().sort(key=lambda x: x.get_fitness(), reverse=True)
    print("\n> Generation # " + str(generationNumber)) 
    displayMgr.print_generation(population)
else:
    displayMgr.print_schedule_as_table(population.get_schedules()[0])
print('\n\n')

In [None]:
timetable_zip = ZipFile('./data/student_timetable.zip', 'r')
timetable_data = timetable_zip.read('student_timetable.csv')
timetable_bytes = io.BytesIO(timetable_data)
timetable_bytes.seek(0)
df = pd.read_csv(timetable_bytes, sep=';', on_bad_lines='warn')
df = df[df["room_address"] != 'Missing info']
df['room_address'] = df['room_address'].apply(str.strip)

In [None]:
# not all students join both semesters in 2019
df.groupby("student_id")["year_semester"].nunique().value_counts()

In [None]:
# a student can join many courses
df.groupby("student_id")["course_id"].nunique().hist()

In [None]:
# some different courses have the same course_id
# but, one semester, one course_id only has one course name
df.groupby(["year_semester","course_id"])["course"].nunique().value_counts()

In [None]:
df.groupby(["year_semester","course_id","weekday","room_address"]).agg(
    start_time_count=("start_time","nunique"),
    end_time_count=("end_time","nunique")
).query("start_time_count > 1")

In [None]:
df[(df["course_id"]==100455)&(df["year_semester"]==20191)&(df["weekday"]=="Thursday")&(df["room_address"]=="SALA PAT AT 021")]