# Sistem penjadwalan praktikum lab otomatis menggunakan algoritma meta-heuristik hibrida berbasiskan API. Menggunakan algoritma genetika dan lainnya (dalam pengembangan).

## Setup

### Django setup and other dependencies

In [1]:
import os
import sys
import django
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true"
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "LabTimetablingAPI.settings")
django.setup()
from django.conf import settings
#set debug to true
settings.DEBUG = True

#logging
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

### Pararellization

In [2]:
#Pararellization

import os
import multiprocessing as mp
import concurrent.futures
from django.db import close_old_connections

class ParallelExecutor:
    def __init__(self, max_workers=None):
        self.max_workers = min(mp.cpu_count(), max_workers or mp.cpu_count())
        self.executor = None
        self.enter_args = None
        self.enter_kwargs = None

    def _execute_parallel(self, fn, data, cpu_bound=False):
        executor_cls = concurrent.futures.ThreadPoolExecutor if cpu_bound else concurrent.futures.ProcessPoolExecutor
        with executor_cls(max_workers=self.max_workers) as executor:
            # close_old_connections() #close old connections, this is to make sure that the database connection is closed after each request
            # return list(executor.map(fn, data))
            futures = [executor.submit(fn, d) for d in data]
            concurrent.futures.wait(futures)
            return [f.result() for f in futures]

    def execute_cpu_bound(self, fn, data):
        return self._execute_parallel(fn, data, cpu_bound=True)
    
    def execute_io_bound(self, fn, data):
        return self._execute_parallel(fn, data, cpu_bound=False)
    
    def __enter__(self, *args, **kwargs):
        self.executor = self
        self.enter_args = args
        self.enter_kwargs = kwargs
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.executor = None
        self.enter_args = None
        self.enter_kwargs = None


In [3]:
mp.cpu_count()

12

### Data related setup

#### Ekstraksi Data
Ekstrasi data dari model Django ke dalam objek Python.

In [4]:
from scheduling_data.models import Laboratory, Module, Participant, Group, Assistant, Chapter
from functools import lru_cache
#Data Handling, import data from model to an object
# All random methods are for testing purposes only

class LaboratoryData:

    @classmethod
    @lru_cache(maxsize=None) #cache the result of the function, so it doesn't have to be computed again, only for most used functions
    def get_laboratories(cls):
        return Laboratory.objects.all()
    
    @classmethod
    def get_laboratory(cls, id):
        return Laboratory.objects.get(id=id)
    
    @classmethod
    def get_random_laboratory(cls):
        return Laboratory.objects.order_by('?').first()
    
    @classmethod
    def get_assistants(cls, id):
        laboratory = Laboratory.objects.get(id=id)
        if laboratory:
            return laboratory.assistants.all()
        return []
    
    @classmethod
    def get_modules(cls, id):
        laboratory = Laboratory.objects.get(id=id)
        if laboratory:
            return laboratory.modules.all()
        return []
    

class ModuleData:
    
    @classmethod
    @lru_cache(maxsize=None)
    def get_modules(cls):
        return Module.objects.all()
    
    @classmethod
    def get_module(cls, id):
        return Module.objects.get(id=id)
    
    @classmethod
    def get_random_module(cls):
        return Module.objects.order_by('?').first()
    
    @classmethod
    def get_laboratory(cls, id):
        module = Module.objects.get(id=id)
        if module:
            return module.laboratory
        return None
    
    @classmethod
    def get_groups(cls, id):
        module = Module.objects.get(id=id)
        if module:
            return module.groups.all()
        return []
    
    @classmethod
    def get_chapters(cls, id):
        module = Module.objects.get(id=id)
        if module:
            return module.chapters.all()
        return []
    
    @classmethod
    def get_participants(cls, id):
        module = Module.objects.get(id=id)
        if module:
            groups = module.groups.all()
            participants = []
            for group in groups:
                group_memberships = group.group_memberships.all()
                for group_membership in group_memberships:
                    participants.append(group_membership.participant)
            return participants
        return []
    
    @classmethod
    def get_assistants(cls, id):
        module = Module.objects.get(id=id)
        if module:
            assistants = []
            assistants_membership = module.assistant_memberships.all()
            for assistant_membership in assistants_membership:
                assistants.append(assistant_membership.assistant)
            return assistants
        return []
        
    
class ChapterData:
    
    @classmethod
    @lru_cache(maxsize=None)
    def get_chapters(cls):
        return Chapter.objects.all()
    
    @classmethod
    def get_chapter(cls, id):
        return Chapter.objects.get(id=id)
    
    @classmethod
    def get_module(cls, id):
        chapter = Chapter.objects.get(id=id)
        if chapter:
            return chapter.module
        return None
    
    @classmethod
    def get_random_chapter(cls, id):
        chapter = Chapter.objects.get(id=id)
        if chapter:
            return chapter.module
        return None
    
class ParticipantData:
        
    @classmethod
    @lru_cache(maxsize=None)
    def get_participants(cls):
        return Participant.objects.all()
    
    @classmethod
    def get_participant(cls, id):
        return Participant.objects.get(id=id)
    
    @classmethod
    def get_random_participant(cls):
        return Participant.objects.order_by('?').first()
    
    @classmethod
    def get_groups(cls, id):
        participant = Participant.objects.get(id=id)
        if participant:
            groups_membership = participant.group_memberships.all()
            groups = []
            for group_membership in groups_membership:
                groups.append(group_membership.group)
            return groups
        return []
    
    @classmethod
    def get_modules(cls, id):
        participant = Participant.objects.get(id=id)
        if participant:
            groups_membership = participant.group_memberships.all()
            modules = []
            for group_membership in groups_membership:
                modules.append(group_membership.group.module)
            return modules
        return []
    
    @classmethod
    def get_schedule(cls, id):
        participant = Participant.objects.get(id=id)
        if participant:
            return participant.regular_schedule
        return None
        
class GroupData:
    
    @classmethod
    @lru_cache(maxsize=None)
    def get_groups(cls):
        return Group.objects.all()
    
    @classmethod
    def get_group(cls, id):
        return Group.objects.get(id=id)
    
    @classmethod
    def get_random_group(cls):
        return Group.objects.order_by('?').first()
    
    @classmethod
    def get_module(cls, id):
        group = Group.objects.get(id=id)
        if group:
            return group.module
        return None
    
    @classmethod
    def get_participants(cls, id):
        group = Group.objects.get(id=id)
        if group:
            group_memberships = group.group_memberships.all()
            participants = []
            for group_membership in group_memberships:
                participants.append(group_membership.participant)
            return participants
        return []
    
    @classmethod
    def get_assistants(cls, id):
        group = Group.objects.get(id=id)
        if group:
            module = group.module
            assistants = []
            assistants_membership = module.assistant_memberships.all()
            for assistant_membership in assistants_membership:
                assistants.append(assistant_membership.assistant)
            return assistants
        return []
    
    @classmethod
    def get_participant_schedule(cls, id):
        participants = cls.get_participants(id)
        if participants:
            schedule = []
            for participant in participants:
                schedule.append(participant.regular_schedule)
            return schedule
        return None
    
    @classmethod
    def get_schedule(cls, id):
        participants_schedule = cls.get_participant_schedule(id)
        if participants_schedule:
            days = participants_schedule[0].keys()
            merged_schedule = {day:{} for day in days}
            for day in days:
                for timeslot in participants_schedule[0][day]:
                    is_available = all([participant_schedule[day][timeslot] for participant_schedule in participants_schedule])
                    merged_schedule[day][timeslot] = is_available
            return merged_schedule
        return None
    
class AssistantData:
    
    @classmethod
    @lru_cache(maxsize=None)
    def get_assistants(cls):
        return Assistant.objects.all()
    
    @classmethod
    def get_assistant(cls, id):
        return Assistant.objects.get(id=id)
    
    @classmethod
    def get_random_assistant(cls):
        # This is for testing purposes only
        return Assistant.objects.order_by('?').first()
    
    @classmethod
    def get_laboratory(cls, id):
        assistant = Assistant.objects.get(id=id)
        if assistant:
            return assistant.laboratory
        return None
    
    @classmethod
    def get_modules(cls, id):
        assistant = Assistant.objects.get(id=id)
        if assistant:
            modules = []
            assistant_membership = assistant.assistant_memberships.all()
            for assistant_membership in assistant_membership:
                modules.append(assistant_membership.module)
            return modules
        return []
    
    @classmethod
    def get_groups(cls, id):
        assistant = Assistant.objects.get(id=id)
        if assistant:
            modules = cls.get_modules(id)
            groups = []
            for module in modules:
                groups.append(module.groups.all())
            return groups
        return []
    
    @classmethod
    def get_schedule(cls, id):
        assistant = Assistant.objects.get(id=id)
        if assistant:
            return assistant.regular_schedule
        return None


In [47]:
#Data manager
class DataHandler:
    def __init__(self):
        self.laboratory_data = LaboratoryData()
        self.module_data = ModuleData()
        self.chapter_data = ChapterData()
        self.participant_data = ParticipantData()
        self.group_data = GroupData()
        self.assistant_data = AssistantData()
    

##### Test

In [5]:
ModuleData.get_module(1).groups.all()

<QuerySet [<Group: PL-1>, <Group: PL-2>, <Group: PL-3>, <Group: PL-4>, <Group: PL-5>, <Group: PL-6>, <Group: PL-7>, <Group: PL-8>, <Group: PL-9>, <Group: PL-10>, <Group: PL-11>, <Group: PL-12>, <Group: PL-13>, <Group: PL-14>, <Group: PL-15>, <Group: PL-16>, <Group: PL-17>, <Group: PL-18>, <Group: PL-19>, <Group: PL-20>, '...(remaining elements truncated)...']>

### Constants

In [6]:
class Constant:
    days = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"]
    shifts = ["Shift1", "Shift2", "Shift3", "Shift4", "Shift5", "Shift6"]

from collections import namedtuple
TimeSlot = namedtuple("TimeSlot", ["date", "day", "shift"])

## Algoritma

### Algoritma Genetika

#### Gene

##### Gene Inialization
Initializing the gene structure. I add constraits_broken attribute to the gene structure to keep track of the number of constraints broken by the gene, or should I rename it to fitness_broken?  So that the chromosome can detect which gene violates the most constraints and then mutate it. I don't need constraints_broken attribute because invalid genes will never be added to the chromosome.

In [7]:
from collections import namedtuple
TimeSlot = namedtuple("TimeSlot", ["date", "day", "shift"])

class Gene:
    def __init__(self, laboratory, module, chapter, group, assistant, time_slot: TimeSlot):
        self.laboratory = laboratory
        self.module = module
        self.chapter = chapter
        self.group = group
        self.assistant = assistant
        self.time_slot = time_slot

        self.constraints_broken = set()

    def __str__(self):
        return f"Gene(laboratory={self.laboratory}, module={self.module}, chapter={self.chapter}, group={self.group}, assistant={self.assistant}, time_slot={self.time_slot})"
    
    def __repr__(self):
        return self.__str__()
    
    def __eq__(self, other: "Gene"):
        return self.laboratory == other.laboratory and self.module == other.module and self.chapter == other.chapter and self.group == other.group and self.assistant == other.assistant and self.time_slot == other.time_slot
    


In [8]:
# Test Gene
gene = Gene(LaboratoryData.get_random_laboratory(), ModuleData.get_random_module(), ChapterData.get_random_chapter(1), GroupData.get_random_group(), AssistantData.get_random_assistant(), TimeSlot("2021-05-05", "Wednesday", "Shift1"))
print(gene)

Gene(laboratory=Dasar Elektro, module=Pengukuran Listrik, chapter=Pengukuran Listrik, group=PL-30, assistant=Rafli, time_slot=TimeSlot(date='2021-05-05', day='Wednesday', shift='Shift1'))


##### Gene Level Constraints
Check if a gene is valid by not violating any constraints. Constraints are defined on gene level, it was to ensure that the gene is valid before it is assigned to a chromosome. 

My plan is to prevent the gene from being invalid in the first place, all the constraint must return true before the gene is assigned to a chromosome. This would reduce the number of invalid chromosomes and thus reducing the complexity of the algorithm albeit make the chromosome initialization process slower, but atleast it reduces the pressure for my brain to think about fixing the invalid chromosomes. My God, if only I was a genius, I would not have to do this. I rather playing genshin impact than doing this, hu tao is waiting for me.

In [9]:
# Constraint Class for Constraint Function
# 1. Ensure that each chapter is assigned to the correct module.
# 2. Ensure that each module is assigned to the correct lab.
# 3. Ensure that each group is assigned to the correct module.
# 4. Ensure that each assistant is assigned to the correct lab.

class BaseConstraint:
    def __init__(self, name):
        self.name = name
    
    def __str__(self):
        return f"Constraint(name={self.name})"
    
    def __repr__(self):
        return self.__str__()
    
    def __call__(self, gene: Gene):
        raise NotImplementedError("Constraint function not implemented")
    
# Ensure that each chapter is assigned to the correct module.
class ChapterModuleConstraint(BaseConstraint):
    def __init__(self):
        super().__init__("ChapterModuleConstraint")
        self.chapters = ChapterData

    def __call__(self, gene: Gene) -> bool:
        if gene.module == self.chapters.get_module(gene.chapter).id:
            return True
        return False
    
# Ensure that each group is assigned to the correct module.
class GroupModuleConstraint(BaseConstraint):
    def __init__(self):
        super().__init__("GroupModuleConstraint")
        self.groups = GroupData
    
    def __call__(self, gene: Gene) -> bool:
        if gene.module == self.groups.get_module(gene.group).id:
            return True
        return False

# Ensure that each module is assigned to the correct lab.
class ModuleLaboratoryConstraint(BaseConstraint):
    def __init__(self):
        super().__init__("ModuleLaboratoryConstraint")
        self.modules = ModuleData
    
    def __call__(self, gene: Gene) -> bool:
        if gene.laboratory == self.modules.get_laboratory(gene.module).id:
            return True
        return False
    
# Ensure that each assistant is assigned to the correct lab.
class AssistantLaboratoryConstraint(BaseConstraint):
    def __init__(self):
        super().__init__("AssistantLaboratoryConstraint")
        self.assistants = AssistantData
    
    def __call__(self, gene: Gene) -> bool:
        if gene.laboratory == self.assistants.get_laboratory(gene.assistant).id:
            return True
        return False
    
# Ensure that the group schedule is not violated by the gene time slot.
class ScheduleConstraint(BaseConstraint):
    def __init__(self):
        super().__init__("ScheduleConstraint")
        self.groups = GroupData
    
    def __call__(self, gene: Gene) -> bool:
        
        schedule = self.groups.get_schedule(gene.group)
        if schedule:
            return schedule[gene.time_slot.day][gene.time_slot.shift]
        return False
    
# Dynamic Constraint Class (custom constraint function)
class DynamicConstraint(BaseConstraint):
    def __init__(self, name, constraint_function):
        super().__init__(name)
        self.constraint_function = constraint_function
    
    def __call__(self, gene: Gene) -> bool:
        return self.constraint_function(gene)

In [10]:
# Group all constraints
from typing import List
class ConstraintManager:
    def __init__(self,constraints: List[BaseConstraint]):
        self.constraints = constraints
    
    def __str__(self):
        return f"constraints={self.constraints})"
    
    def __repr__(self):
        return self.__str__()
    
    def __call__(self, gene: Gene) -> bool:
        return all([constraint(gene) for constraint in self.constraints])

In [11]:
# Test Constraint
chapter_module_constraint = ChapterModuleConstraint()
group_module_constraint = GroupModuleConstraint()
module_laboratory_constraint = ModuleLaboratoryConstraint()
assistant_laboratory_constraint = AssistantLaboratoryConstraint()
schedule_constraint = ScheduleConstraint()

constraint_checker = ConstraintManager([chapter_module_constraint, group_module_constraint, module_laboratory_constraint, assistant_laboratory_constraint, schedule_constraint])

gene = Gene(LaboratoryData.get_random_laboratory().id, ModuleData.get_random_module().id, ChapterData.get_random_chapter(1).id, GroupData.get_random_group().id, AssistantData.get_random_assistant().id, TimeSlot("2021-05-05", "Friday", "Shift2"))

print(constraint_checker(gene))

True


In [12]:
GroupData.get_schedule(gene.group)

{'Friday': {'Shift1': False,
  'Shift2': True,
  'Shift3': False,
  'Shift4': True,
  'Shift5': False,
  'Shift6': False},
 'Monday': {'Shift1': False,
  'Shift2': False,
  'Shift3': False,
  'Shift4': True,
  'Shift5': False,
  'Shift6': False},
 'Tuesday': {'Shift1': False,
  'Shift2': False,
  'Shift3': False,
  'Shift4': False,
  'Shift5': True,
  'Shift6': False},
 'Saturday': {'Shift1': False,
  'Shift2': True,
  'Shift3': True,
  'Shift4': True,
  'Shift5': False,
  'Shift6': False},
 'Thursday': {'Shift1': True,
  'Shift2': False,
  'Shift3': True,
  'Shift4': False,
  'Shift5': False,
  'Shift6': True},
 'Wednesday': {'Shift1': True,
  'Shift2': False,
  'Shift3': False,
  'Shift4': False,
  'Shift5': False,
  'Shift6': False}}

### Chromosome

#### Chromosome Initialization

In [13]:
class Chromosome:
    def __init__(self, genes: List[Gene]):
        self.genes = genes
        self.constraints_broken = set()
    
    def __str__(self):
        return f"Chromosome(genes={self.genes})"
    
    def __repr__(self):
        return self.__str__()
    
    def __eq__(self, other: "Chromosome"):
        return self.genes == other.genes
    
    def __getitem__(self, index):
        return self.genes[index]
    
    def __len__(self):
        return len(self.genes)
    
    def __iter__(self):
        return iter(self.genes)
    
    def __contains__(self, gene: Gene):
        return gene in self.genes
    
    def add_gene(self, gene: Gene):
        self.genes.append(gene)
    
    def remove_gene(self, gene: Gene):
        self.genes.remove(gene)
    
    def get_gene(self, index):
        return self.genes[index]
    
    def get_genes(self):
        return self.genes

In [14]:
# Test Chromosome
chromosome = Chromosome([Gene(LaboratoryData.get_random_laboratory().id, ModuleData.get_random_module().id, ChapterData.get_random_chapter(1).id, GroupData.get_random_group().id, AssistantData.get_random_assistant().id, TimeSlot("2021-05-05", "Friday", "Shift2")) for i in range(10)])
for gene in chromosome:
    print(gene.time_slot)

TimeSlot(date='2021-05-05', day='Friday', shift='Shift2')
TimeSlot(date='2021-05-05', day='Friday', shift='Shift2')
TimeSlot(date='2021-05-05', day='Friday', shift='Shift2')
TimeSlot(date='2021-05-05', day='Friday', shift='Shift2')
TimeSlot(date='2021-05-05', day='Friday', shift='Shift2')
TimeSlot(date='2021-05-05', day='Friday', shift='Shift2')
TimeSlot(date='2021-05-05', day='Friday', shift='Shift2')
TimeSlot(date='2021-05-05', day='Friday', shift='Shift2')
TimeSlot(date='2021-05-05', day='Friday', shift='Shift2')
TimeSlot(date='2021-05-05', day='Friday', shift='Shift2')


#### Fitness Function
Fitness function is a function that takes a chromosome as input and returns a fitness score as output. The fitness score is a measure of how good the chromosome is. The higher the score, the better the chromosome is, wait, is it? Or should I say the lower the score, the better the chromosome is? I don't know, I'm just keep moving forward without any idea of what I'm actually doing, just hoping that maybe I stumble upon something that works. I'm just a monkey with a keyboard, I don't know what I'm doing.

In [15]:
#Base Fitness Class
class BaseFitness:
    def __init__(self, name):
        self.name = name
    
    def __str__(self):
        return f"Fitness(name={self.name})"
    
    def __repr__(self):
        return self.__str__()
    
    def __call__(self, chromosome: Chromosome):
        raise NotImplementedError("Fitness function not implemented")

##### 1. Group Assignment Conflict Penalty
This function is used to calculate the number of group that assigned to the same lab at the same time. The lower the number, the less conflict there is. In other words, this objective is to Minimize conflicts in the schedule by ensuring that no two groups are assigned to the same lab and time slot simultaneously.

Bahasa Simplenya, fungsi ini menghitung jumlah konflik yang terjadi pada jadwal, khususnya jadwal grup yang berada di lab yang sama pada waktu yang sama. Jadi misal di jadwal hari selasa shift 1, ada 2 grup yang berada di lab yang sama, maka akan dihitung sebagai 1 konflik. Tapi tergantung batas maksimal konflik yang ditentukan, misal batas maksimal konflik adalah 2, maka jadwal tersebut masih valid. Tapi kalau batas maksimal konflik adalah 1, maka jadwal tersebut tidak valid dan dihitung sebagai 1 konflik.

In [16]:
#GroupAssignmentConflictPenalty
from collections import defaultdict

class GroupAssignmentConflictFitness(BaseFitness):
    """Count the number of groups assigned to a time slot in a lab, and penalize the chromosome if the number of groups exceeds the maximum threshold"""
    def __init__(self):
        super().__init__("GroupAssignmentConflictFitness")
        self.max_threshold = 2 # Maximum number of groups that can be assigned to a single time slot in lab
        self.conflict_penalty = 1 # Penalty for each group that exceeds the maximum threshold
        
        #conflicts[laboratory][module][time_slot] = [groups]
        self._conflicts = self.default_outer_dict()
        # Keeps track of the number of groups assigned to a time slot in a lab, initialized to 0

    def default_inner_dict(self):
        return defaultdict(list)
    
    def default_middle_dict(self):
        return defaultdict(self.default_inner_dict)
    
    def default_outer_dict(self):
        return defaultdict(self.default_middle_dict)

    def __call__(self, chromosome: Chromosome):
        self._conflicts.clear()
        for gene in chromosome:
            self._conflicts[gene.laboratory][gene.module][gene.time_slot].append(gene.group)
        
        total_penalty = 0
        for laboratory in self._conflicts:
            for module in self._conflicts[laboratory]:
                for time_slot in self._conflicts[laboratory][module]:
                    groups = self._conflicts[laboratory][module][time_slot] #Get all groups that are assigned to the time slot, more specifically, the number of groups in conflict[laboratory][module][time_slot]
                    if len(groups) > self.max_threshold:
                        total_penalty += (len(groups) - self.max_threshold) * self.conflict_penalty
        return total_penalty
    
    def get_conflicts(self):
        return self._conflicts
    
    def configure(self, max_threshold, conflict_penalty):
        """Configure the fitness function
        Args:
            max_threshold (int): Maximum number of groups that can be assigned to a single time slot in lab
            conflict_penalty (int): Penalty for each group that exceeds the maximum threshold"""
        self.max_threshold = max_threshold
        self.conflict_penalty = conflict_penalty

##### 2. Assistant Distribution Penalty
The purpose of this function is to maximize the utilization of assistants by distributing tasks evenly among them. Each assistant should be assigned to a balanced number of groups and shift to avoid overloading, and to ensure their brain is not fried.

In [17]:
from collections import namedtuple, defaultdict, Counter

#Maximize the utilization of assistants by distributing tasks evenly among them. Each assistant should be assigned to a balanced number of groups and shift to avoid overloading.
class AssistantDistributionFitness(BaseFitness):
    def __init__(self):
        super().__init__("AssistantDistributionFitness")
        self.max_group_threshold = 15 # Maximum number of groups that can be assigned to a single assistant
        self.max_shift_threshold = 15 # Maximum number of shifts that can be assigned to a single assistant
        self.group_penalty = 1 # Penalty for each group that exceeds the maximum threshold
        self.shift_penalty = 1 # Penalty for each shift that exceeds the maximum threshold

        #groups[assistant][module] = [groups]
        self._groups = self.default_outer_dict()
        #shifts[assistant][module] = [shifts]
        self._shifts = self.default_outer_dict()

    def default_inner_dict(self):
        return defaultdict(set)
    
    def default_outer_dict(self):
        return defaultdict(self.default_inner_dict)
    


    def __call__(self, chromosome: Chromosome):
        self._groups.clear()
        self._shifts.clear()
        for gene in chromosome:
            self._groups[gene.assistant][gene.module].add(gene.group)
            self._shifts[gene.assistant][gene.module].add(gene.time_slot)
        
        total_penalty = 0
        for assistant in self._groups:
            for module in self._groups[assistant]:
                groups = self._groups[assistant][module]
                shifts = self._shifts[assistant][module]
                if len(groups) > self.max_group_threshold:
                    total_penalty += (len(groups) - self.max_group_threshold) * self.group_penalty
                if len(shifts) > self.max_shift_threshold:
                    total_penalty += (len(shifts) - self.max_shift_threshold) * self.shift_penalty
        return total_penalty
    
    def get_groups(self):
        return self._groups
    
    def get_shifts(self):
        return self._shifts
    
    def configure(self, max_group_threshold, max_shift_threshold, group_penalty, shift_penalty):
        """Configure the fitness function
        Args:
            max_group_threshold (int): Maximum number of groups that can be assigned to a single assistant
            max_shift_threshold (int): Maximum number of shifts that can be assigned to a single assistant
            group_penalty (int): Penalty for each group that exceeds the maximum threshold
            shift_penalty (int): Penalty for each shift that exceeds the maximum threshold"""
        self.max_group_threshold = max_group_threshold
        self.max_shift_threshold = max_shift_threshold
        self.group_penalty = group_penalty
        self.shift_penalty = shift_penalty


##### 3. Apalagi ya? 
Nanti ditambahin lagi kalo udah kepikiran.


In [18]:
#Fitness Manager
class FitnessManager:
    def __init__(self, fitness_functions: List[BaseFitness]):
        self.fitness_functions = fitness_functions
    
    def __str__(self):
        return f"FitnessManager(fitness_functions={self.fitness_functions})"
    
    def __repr__(self):
        return self.__str__()
    
    def __call__(self, chromosome: Chromosome):
        return sum([fitness_function(chromosome) for fitness_function in self.fitness_functions])
    
    def configure(self, fitness_functions: List[BaseFitness]):
        self.fitness_functions = fitness_functions

    def grouped_fitness(self, chromosome: Chromosome):
        """Return a dictionary of fitness functions and their respective fitness value"""
        return {fitness_function.name: fitness_function(chromosome) for fitness_function in self.fitness_functions}

## Population

### Population Initialization

In [19]:
#Population Initialization Class

class Population:
    def __init__(self, chromosomes: List[Chromosome], fitness_manager: FitnessManager):
        self.chromosomes = chromosomes
        self.fitness_manager = fitness_manager
    
    def __str__(self):
        return f"Population(chromosomes={self.chromosomes})"
    
    def __repr__(self):
        return self.__str__()
    
    def __getitem__(self, index):
        return self.chromosomes[index]
    
    def __len__(self):
        return len(self.chromosomes)
    
    def __iter__(self):
        return iter(self.chromosomes)
    
    def __contains__(self, chromosome: Chromosome):
        return chromosome in self.chromosomes
    
    def calculate_fitness(self):
        for chromosome in self.chromosomes:
            chromosome.fitness = self.fitness_manager(chromosome)
    
    def add_chromosome(self, chromosome: Chromosome):
        self.chromosomes.append(chromosome)
    
    def remove_chromosome(self, chromosome: Chromosome):
        self.chromosomes.remove(chromosome)
    
    def get_chromosome(self, index):
        return self.chromosomes[index]
    
    def get_chromosomes(self):
        return self.chromosomes
    
    def get_best_chromosome(self):
        return min(self.chromosomes, key=lambda chromosome: chromosome.fitness)
    
    def get_worst_chromosome(self):
        return max(self.chromosomes, key=lambda chromosome: chromosome.fitness)
    
    def get_average_fitness(self):
        return sum([chromosome.fitness for chromosome in self.chromosomes]) / len(self.chromosomes)

### Factory Method
Class for generating population, chromosome, and gene. This class also generates some data like time slot, chromosome length, etc.This class requires a lot of data to be passed to it, so I think it's better to make it a class instead of a function.

In [61]:
#factory.py
from calendar import c
import re
from typing import List
from math import ceil, floor

import numpy as np
from datetime import datetime, timedelta

from multiprocessing import Pool

TimeSlot = namedtuple("TimeSlot", ["date", "day", "shift"])

class Factory:
    '''Factory class to generate chromosomes and population. It also contains the data from the database. We can call this class when we need some of'''
    def __init__(self):
        self.laboratories = LaboratoryData.get_laboratories()
        self.modules = ModuleData.get_modules()
        self.chapters = ChapterData.get_chapters()
        self.groups = GroupData.get_groups()
        self.participants = ParticipantData.get_participants()
        self.assistants = AssistantData.get_assistants()
        self.constant = Constant
        
        self.constraints = ConstraintManager([ChapterModuleConstraint(), GroupModuleConstraint(), ModuleLaboratoryConstraint(), AssistantLaboratoryConstraint(), ScheduleConstraint()])
        self.fitness_manager = FitnessManager([GroupAssignmentConflictFitness(), AssistantDistributionFitness()])

    def set_constraints(self, constraints: List[BaseConstraint]):
        # No idea if I need to check constraints here when generating the population
        # But I think it's better to set the constraints here, maybe we can change the constraints later
        self.constraints = ConstraintManager(constraints)

    def set_fitness_manager(self, fitness_manager: FitnessManager):
        self.fitness_manager = fitness_manager

    def set_data(self, laboratories, modules, chapters, groups, participants, assistants):
        self.laboratories = laboratories
        self.modules = modules
        self.chapters = chapters
        self.groups = groups
        self.participants = participants
        self.assistants = assistants
    
    def generate_time_slot(self, start_date, end_date):
        """Generate time slots based on the start date, end date, days and shifts"""
        #if start_date not start from Monday, then start from the next Monday
        if start_date.weekday() != 0:
            start_date = start_date + timedelta(days=7 - start_date.weekday())
        duration = (end_date - start_date).days + 1
        weeks_duration = floor(duration / 7) # Number of weeks between start date and end date, using floor to make sure that the time slot does not exceed the end date
        random_weeks = np.random.randint(0, weeks_duration)
        random_days = np.random.choice(self.constant.days)
        random_shifts = np.random.choice(self.constant.shifts)
        random_date = start_date + timedelta(days=random_weeks * 7 + self.constant.days.index(random_days))
        return TimeSlot(random_date, random_days, random_shifts)
    
    def generate_time_slot_weekly(self, start_date, end_date):
        """Generate time slots based on the start date, end date, days and shifts"""
        #if start_date not start from Monday, then start from the next Monday
        if start_date.weekday() != 0:
            start_date = start_date + timedelta(days=7 - start_date.weekday())
        random_days = np.random.choice(self.constant.days)
        random_shifts = np.random.choice(self.constant.shifts)
        random_date = start_date + timedelta(days= 7 + self.constant.days.index(random_days))
        return TimeSlot(random_date, random_days, random_shifts)
    
    def chromosome_size(self) -> int:
        """
        Calculate the chromosome size based on the number of modules chapters and groups.
        Each group must be assigned to all chapters in a module. Mostly used for testing purposes.
        The actual chromosome size is generated based on parential data.

        Returns:
            int: chromosome size
        """
        chromosome_size = 0
        for module in self.modules:
            chromosome_size += len(module.groups.all()) * len(module.chapters.all())
        return chromosome_size
    
    def generate_chromosome(self) -> Chromosome:
        """Generate a chromosome based on data, each group must be assigned to all chapters in a module of appropriate lab"""
        chromosome = Chromosome([])
        for module in self.modules:
            for group in module.groups.all():
                for chapter in module.chapters.all():
                    laboratory = module.laboratory
                    assistant = np.random.choice(laboratory.assistants.all())
                    time_slot = self.generate_time_slot(module.start_date, module.end_date)
                    gene = Gene(laboratory.id, module.id, chapter.id, group.id, assistant.id, time_slot)
                    chromosome.add_gene(gene)
        return chromosome
    
    def generate_chromosome_weekly(self) -> Chromosome:
        """Generate a chromosome based on data, each group must be assigned to all chapters in a module of appropriate lab"""
        chromosome = Chromosome([])
        for module in self.modules:
            for group in module.groups.all():
                start_date = module.start_date
                end_date = module.end_date
                duration = (end_date - start_date).days + 1
                weeks_duration = floor(duration / 7)
                chapters_count = len(module.chapters.all())
                weekly_chapters = ceil(chapters_count / weeks_duration)
                for i in range(weekly_chapters):
                    laboratory = module.laboratory
                    assistant = np.random.choice(laboratory.assistants.all())
                    time_slot = self.generate_time_slot_weekly(start_date, end_date)
                    gene = Gene(laboratory, module, module.chapters.all()[i], group, assistant, time_slot)
                    chromosome.add_gene(gene)
        return chromosome
    
    def generate_chromosome_parallel(self) -> Chromosome:
        return self.generate_chromosome()
    
    def generate_population(self, population_size: int) -> Population:
        """Generate a population based on the population size"""

        if settings.DEBUG:
            logger.info("Generating population")

        chromosomes = []
        # chromosomes = [self.generate_chromosome() for i in range(population_size)]
        for i in range(population_size):
            if settings.DEBUG:
                logger.info(f"Generating chromosome {i}")

            chromosomes.append(self.generate_chromosome())

        population = Population(chromosomes, self.fitness_manager)
        #population.calculate_fitness()
        return population
    
    def generate_population_weekly(self, population_size: int) -> Population:
        """Generate a population based on the population size"""

        if settings.DEBUG:
            logger.info("Generating population")

        chromosomes = []
        # chromosomes = [self.generate_chromosome() for i in range(population_size)]
        for i in range(population_size):
            if settings.DEBUG:
                logger.info(f"Generating chromosome {i}")

            chromosomes.append(self.generate_chromosome_weekly())

        population = Population(chromosomes, self.fitness_manager)
        #population.calculate_fitness()
        return population
    
    def generate_population_parallel(self, population_size: int) -> Population:
        with Pool() as pool:
            chromosomes = pool.map(self.generate_chromosome_parallel, range(population_size))
        population = Population(chromosomes, self.fitness_manager)
        return population


In [21]:
# #generate population test
# factory = Factory()
# settings.DEBUG = True
# population = factory.generate_population(100)


INFO:__main__:Generating population
INFO:__main__:Generating chromosome 0
INFO:__main__:Generating chromosome 1
INFO:__main__:Generating chromosome 2
INFO:__main__:Generating chromosome 3
INFO:__main__:Generating chromosome 4
INFO:__main__:Generating chromosome 5
INFO:__main__:Generating chromosome 6
INFO:__main__:Generating chromosome 7
INFO:__main__:Generating chromosome 8
INFO:__main__:Generating chromosome 9
INFO:__main__:Generating chromosome 10
INFO:__main__:Generating chromosome 11
INFO:__main__:Generating chromosome 12
INFO:__main__:Generating chromosome 13
INFO:__main__:Generating chromosome 14
INFO:__main__:Generating chromosome 15
INFO:__main__:Generating chromosome 16
INFO:__main__:Generating chromosome 17
INFO:__main__:Generating chromosome 18
INFO:__main__:Generating chromosome 19
INFO:__main__:Generating chromosome 20
INFO:__main__:Generating chromosome 21
INFO:__main__:Generating chromosome 22
INFO:__main__:Generating chromosome 23
INFO:__main__:Generating chromosome 24

In [22]:
# population.calculate_fitness()

In [23]:
# group_assignment_conflict_fitness = GroupAssignmentConflictFitness()
# assistant_distribution_fitness = AssistantDistributionFitness()

# assistant_distribution_fitness.configure(15, 15, 1, 1)
# group_assignment_conflict_fitness.configure(2, 1)

# fitness_manager = FitnessManager([group_assignment_conflict_fitness, assistant_distribution_fitness])

# fitness_manager(population[0])


297.83

### Factory pararel version test

In [62]:
#Factory class parallel version test for faster population generation

from multiprocessing import Pool, cpu_count, current_process, Manager, Process, Queue, Lock, Value
from functools import partial
from itertools import repeat
import concurrent.futures

class FactoryParallel(Factory):
    def __init__(self):
        super().__init__()
        self.pool = Pool()
        
    def __del__(self):
        self.pool.close()
        self.pool.join()
    
    def generate_chromosome_parallel(self, i) -> Chromosome:
        # i is just a dummy variable, it is not used, but it is needed for the map function
        return self.generate_chromosome()
    
    def generate_population_parallel(self, population_size: int) -> Population:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            chromosomes = list(executor.map(self.generate_chromosome_parallel, range(population_size)))
        population = Population(chromosomes, self.fitness_manager)
        return population


In [25]:
# #generate population test
# factory = Factory()
# factory_parallel = FactoryParallel()
# #time
# import time
# start = time.time()
# population = factory.generate_population(100)
# end = time.time()

# parallel_start = time.time()
# population_parallel = factory_parallel.generate_population_parallel(100)
# parallel_end = time.time()

# print(f"Time taken for serial population generation: {end - start}")
# print(f"Time taken for parallel population generation: {parallel_end - parallel_start}")


In [63]:
import timeit

factory_serial = Factory()
factory_parallel = FactoryParallel()

def test_factory(population_size):
    serial_time = timeit.timeit(lambda: factory_serial.generate_population(population_size), number=1)
    parallel_time = timeit.timeit(lambda: factory_parallel.generate_population_parallel(population_size), number=1)
    print(f"Population size: {population_size}")
    print(f"Serial time: {serial_time}")
    print(f"Parallel time: {parallel_time}")
    print(f"Speedup: {serial_time / parallel_time}")
    if serial_time / parallel_time > 1:
        print("Parallel is faster with time difference of: ", serial_time - parallel_time)
    else:
        print("Serial is faster with time difference of: ", parallel_time - serial_time)
    print()
    return serial_time, parallel_time


In [64]:
#test for population size 1, 2, 4, 8, 16, 32, 64, 128, 256, 512
data = []
for i in range(1, 10):
    data.append(test_factory(2 ** i))
    print('----------------------------------')


Population size: 2
Serial time: 0.896754200104624
Parallel time: 0.5847837999463081
Speedup: 1.5334798949405908
Parallel is faster with time difference of:  0.3119704001583159

----------------------------------
Population size: 4
Serial time: 1.7304778001271188
Parallel time: 1.8726954001467675
Speedup: 0.9240572706012399
Serial is faster with time difference of:  0.14221760001964867

----------------------------------
Population size: 8
Serial time: 3.528932499932125
Parallel time: 4.015831300057471
Speedup: 0.8787551658062983
Serial is faster with time difference of:  0.4868988001253456

----------------------------------
Population size: 16
Serial time: 6.000041699968278
Parallel time: 7.045914399903268
Speedup: 0.8515632406846515
Serial is faster with time difference of:  1.0458726999349892

----------------------------------
Population size: 32
Serial time: 15.648321999935433
Parallel time: 17.298241399927065
Speedup: 0.9046192406588459
Serial is faster with time difference of:  

#### Pararel and Serial version comparison

Setelah dicoba, pada awalnya versi pararel lebih lambat dari versi serial, khususnya pada ukuran populasi rendah. Sekiranya antara 1 hingga 100 populasi, versi serial lebih cepat dari pada yang pararel. Tapi ketika ukuran populasi meningkat, setidaknya diatas 100, kecepatan versi pararel mulai mengungguli versi serial. Perbedaan yang dihasilkan juga cukup signifikan, pada saat populasi mencapai 512, versi pararel 1.5 kali lebih cepat dari versi serial. Apakah itu cukup signifikan? Mungkin kita coba lagi dengan ukuran populasi yang lebih besar, misal 1024 atau 2048. Sementara itu, ini adalah hasil perbandingan dari ujicoba sebelumnya. Untuk keterangan, ukuran chromosome untuk tiap populasi adalah 288.

| Population Size | Serial | Pararel | Pararel Speedup |
| --------------- | ------ | ------- | --------------- |
| 2               | 0.765  | 0.576   | 1.328           |
| 4               | 1.792  | 1.954   | 0.917           |
| 8               | 3.573  | 4.294   | 0.832           |
| 16              | 6.771  | 8.302   | 0.815           |
| 32              | 13.199 | 17.507  | 0.753           |
| 64              | 28.468 | 34.396  | 0.827           |
| 128             | 53.476 | 46.242  | 1.156           |
| 256             | 91.285 | 83.171  | 1.097           |
| 512             | 466.15 | 294.25  | 1.584           |
| 1024            | 385.61 | 304.31  | 1.267           |
| 2048            | 758.55 | 587.89  | 1.290           |

Kayaknya nanti bisa dipertimbangin deh mau make jenis proses yang mana, tergantung jumlah populasinya. Hmm kira2 bisa ngga ini diterapin di fungsi fitness. Di versi kode sebelumnya sebenernya lebih lama di ngurusin perhitungan fitness-nya, kalo generate population mah cuman diawal doang.

In [None]:
# # #test for population size 1024, 2048, 4096
# for i in range(10, 13):
#     data.append(test_factory(2 ** i))
#     print('----------------------------------')

### Operator
Operator is a class that contains methods for performing genetic operations such as crossover and mutation. This class is used by the genetic algorithm class to perform genetic operations on the population.

In [49]:
#Mutation Class
'''The one that need to mutate is just the time slot and the assistant, the rest is fixed because we just need the schedule. The group is already assigned to the appropriate module and all its chapters, we don't to change it. Each chapter also already assigned to the correct module, and each module have their own lab, as well the group and the assistant. The schedule is affected by the time slot and the assistant that is assigned to the group. The number of group is fixed and cannot be random, that's why the only things that need to be mutated is the time slot and the assistant only.'''

import random
from copy import deepcopy

class BaseMutation:
    def __init__(self, name, probability_weight=1):
        self.name = name
        self.probability_weight = probability_weight # It is used to determine the probability of the mutation function being called if more than one mutation function is used.
    
    def __str__(self):
        return f"Mutation(name={self.name})"
    
    def __repr__(self):
        return self.__str__()
    
    def __call__(self, chromosome: Chromosome):
        raise NotImplementedError("Mutation function not implemented")
    
class SwapMutation(BaseMutation):
    def __init__(self):
        super().__init__("SwapMutation")
    
    def __call__(self, chromosome: Chromosome):
        # Randomly select a gene
        gene1 = random.choice(chromosome)
        # Randomly select another gene
        gene2 = random.choice(chromosome)
        # Swap the time slot and the assistant
        gene1.time_slot, gene2.time_slot = gene2.time_slot, gene1.time_slot
        gene1.assistant, gene2.assistant = gene2.assistant, gene1.assistant
        return chromosome
    
class ShiftMutation(BaseMutation):
    def __init__(self):
        super().__init__("ShiftMutation")
    
    def __call__(self, chromosome: Chromosome):
        # Randomly select a gene
        gene = random.choice(chromosome)
        # Shift the time slot
        gene.time_slot = self.shift_time_slot(gene.time_slot)
        return chromosome
    
    def shift_time_slot(self, time_slot: TimeSlot) -> TimeSlot:
        # Shift the time slot by 1 day
        if time_slot.day == "Saturday":
            return TimeSlot(time_slot.date + timedelta(days=2), "Monday", time_slot.shift)
        return TimeSlot(time_slot.date + timedelta(days=1), self.constant.days[self.constant.days.index(time_slot.day) + 1], time_slot.shift)
    
class RandomMutation(BaseMutation):
    def __init__(self):
        super().__init__("RandomMutation")
        self.constant = Constant
        self.module = ModuleData
        self.group = GroupData

    def __call__(self, chromosome: Chromosome):
        # Randomly select a gene
        gene = random.choice(chromosome)
        module = self.module.get_module(gene.module)
        group = self.group.get_group(gene.group)
        # Randomly select a time slot
        time_slot = self.generate_time_slot(module.start_date, module.end_date)
        # Randomly select an assistant
        assistant = random.choice(module.laboratory.assistants.all())
        # Change the gene
        gene.time_slot = time_slot
        gene.assistant = assistant
        return chromosome
    
    def generate_time_slot(self, start_date, end_date):
        """Generate time slots based on the start date, end date, days and shifts"""
        #if start_date not start from Monday, then start from the next Monday
        if start_date.weekday() != 0:
            start_date = start_date + timedelta(days=7 - start_date.weekday())
        duration = (end_date - start_date).days + 1
        weeks_duration = floor(duration / 7)
        random_weeks = np.random.randint(0, weeks_duration)
        random_days = np.random.choice(self.constant.days)
        random_shifts = np.random.choice(self.constant.shifts)
        random_date = start_date + timedelta(days=random_weeks * 7 + self.constant.days.index(random_days))
        return TimeSlot(random_date, random_days, random_shifts)
    
class DynamicMutation(BaseMutation):
    def __init__(self, name, mutation_function):
        super().__init__(name)
        self.mutation_function = mutation_function
    
    def __call__(self, chromosome: Chromosome):
        return self.mutation_function(chromosome)
    
class MutationManager:
    def __init__(self, mutation_functions: List[BaseMutation]):
        self.mutation_functions = mutation_functions
        self.mutation_probability = 0.1
    
    def __str__(self):
        return f"MutationManager(mutation_functions={self.mutation_functions})"
    
    def __repr__(self):
        return self.__str__()
    
    def __call__(self, chromosome: Chromosome):
        #random based on probability weight
        if random.random() < self.mutation_probability:
            if settings.DEBUG:
                logger.info("Mutating chromosome")
            mutation_function = random.choices(self.mutation_functions, weights=[mutation.probability_weight for mutation in self.mutation_functions])[0]
            return mutation_function(chromosome)
        return chromosome
    
    def configure(self, mutation_functions: List[BaseMutation]):
        self.mutation_functions = mutation_functions

In [50]:
factory = Factory()
population = factory.generate_population(5)

In [57]:
population.chromosomes[0].genes[0].module.laboratory.assistants.all()

<QuerySet [<Assistant: Fatimah>, <Assistant: Rahmat>, <Assistant: Rafli>, <Assistant: Amel>, <Assistant: Bowo>, <Assistant: Andi>]>

In [53]:
# Mutation Test
swap_mutation = SwapMutation()
shift_mutation = ShiftMutation()
random_mutation = RandomMutation()

mutation_manager = MutationManager([swap_mutation, shift_mutation, random_mutation])
mutation_manager.mutation_probability = 0.75

print(population[0])
print(mutation_manager(population[0]))


Chromosome(genes=[Gene(laboratory=Dasar Elektro, module=Pengukuran Listrik, chapter=U-1, group=PL-1, assistant=Rahmat, time_slot=TimeSlot(date=datetime.datetime(2022, 9, 27, 7, 30, 0, 530000, tzinfo=datetime.timezone.utc), day='Tuesday', shift='Shift2')), Gene(laboratory=Dasar Elektro, module=Pengukuran Listrik, chapter=U-2, group=PL-1, assistant=Bowo, time_slot=TimeSlot(date=datetime.datetime(2022, 10, 14, 7, 30, 0, 530000, tzinfo=datetime.timezone.utc), day='Friday', shift='Shift5')), Gene(laboratory=Dasar Elektro, module=Pengukuran Listrik, chapter=U-3, group=PL-1, assistant=Andi, time_slot=TimeSlot(date=datetime.datetime(2022, 9, 13, 7, 30, 0, 530000, tzinfo=datetime.timezone.utc), day='Tuesday', shift='Shift6')), Gene(laboratory=Dasar Elektro, module=Pengukuran Listrik, chapter=U-4, group=PL-1, assistant=Rafli, time_slot=TimeSlot(date=datetime.datetime(2022, 9, 13, 7, 30, 0, 530000, tzinfo=datetime.timezone.utc), day='Tuesday', shift='Shift6')), Gene(laboratory=Dasar Elektro, modu

TypeError: Field 'id' expected a number but got <Module: Pengukuran Listrik>.