Instrukcje
Zaimplementuj algorytm genetyczny z uwzględnieniem poniższych wymagań:   

	kodowanie binarne bądź dyskretne/ciągłe genotypu   
	parametryzacja długości genotypu, wielkości populacji   
	krzyżowanie k-punktowe z prawdopodobieństwem pk, losowanie locusów bez zwracania   
	mutacja m-punktowa z prawdopodobieństwem pm, losowanie locusów bez zwracania   
	selekcja:   
rankingowa: n najlepszych osobników zastępuje w populacji n najgorszych   
koło ruletki z uwzględnieniem funkcji transformującej wartości funkcji przystosowania   
		warunek stopu: liczba pokoleń, opcjonalnie docelowa wartość funkcji przystosowania, % najlepszych osobników w populacji   
Użyj algorytmu do optymalizacji ANFIS.  

In [None]:
import pandas as pd
import numpy as np 
import matplotlib
import matplotlib.pyplot as plt
import math
import random
from dataclasses import dataclass
from typing import List, Tuple
from tqdm import tqdm

@dataclass(frozen=True) # Frozen=True makes instances of this class immutable
class Operation:
task_id: int #  
op_id: int # Every task has 11 operations 
resource: int
duration: int


class JobShopData:
"""A dedicated class to hold and provide access to data"""
def __init__(self, path: str):
df = pd.read_excel('GA_task.xlsx', header=[0, 1])

# A list of all unique operations (genes)
self.operations: List[Operation] = []

# A structured list of tasks for easy lookup
self.tasks: List[List[Operation]] = []

# Loop trough task
for task_id, task_df in df.groupby(level=0, axis=1): # lvl 0 - ID, level 1 - R/T, axis 0 - rows, axis 1 - columns
    # for each task I create a list of its operations
    current_task_operation = []

    r_col = task_df.xs("R", level=1, axis=1).iloc[:,0] # xs stands for cross section, level 0 is the task ID, level 1 is either 'R' or 'T', axis=1 columns
    t_col = task_df.xs("T", level=1, axis=1).iloc[:,0]

    # Loop trough the operations for this task
    op_id_counter = 0
    for resource, time in zip(r_col, t_col):
        # Create operation object
        op = Operation(
            task_id=task_id,
            op_id=op_id_counter,
            resource=resource,
            duration=time
        )

        # Append new object to the global gene pool
        self.operations.append(op)

        # Append it to the list for the current task
        current_task_operation.append(op)

        op_id_counter += 1

    # After processing all operations for this task and its list to self.tasks
    self.tasks.append(current_task_operation)    

self.num_tasks = len(self.tasks)
all_resources = {op.resource for op in self.operations}
self.num_resources = len(all_resources)
self.num_operational_total = len(self.operations)

print(f"Data loaded... \n\nNumber of tasks: {self.num_tasks}\nTootal number of operations: {self.num_operational_total}\nNumber of resources: {self.num_resources}\n\n")


In [None]:
dataload = JobShopData(path='GA_task.xlsx',)

In [None]:
class Scheduler:
    """
    Calculates the fitness of a single chromosome.
    """
    def __init__(self, data: JobShopData):
        """
        Initializes the Scheduler with the problem data. 

        Args:
            data (JobShopData): The loaded job shop problem data.
        """
        self.data = data
    
    def calculate_fitness(self, chromosome: List[Operation]) -> float:
        """
        Calculate the makespan for a given chromosome and returns a fitness score.

        Args:
            chromosome (List[Operation]): A permutation of all operations, representing a schedule's priority list

        Returns:
            float: The fitness score, calculated as 1.0 makespan.
        """
        # The time when resource will be available
        resource_end_times = np.zeros(self.data.num_resources + 1) # Resources are indexed from 1 
        
        # Times when task are over 
        task_end_times = np.zeros(self.data.num_tasks + 1)


        for op in chromosome:
            # Time the task is ready, time of the end of previous operation
            task_ready_time = task_end_times[op.task_id] # In first iteration value is zero

            # Time when resource is available
            resource_ready_time = resource_end_times[op.resource] # In first iteration value is zero

            # An operation can only start when task and resource is ready
            start_time = max(task_ready_time, resource_ready_time)
            end_time = start_time + op.duration

            # Update
            task_end_times[op.task_id] = end_time
            resource_end_times[op.resource] = end_time
        
        # The longest task took:
        total_makespan = np.max(task_end_times) 

        return 1.0 / total_makespan   

In [None]:
scheduler = Scheduler(dataload)

basic_chromosome = dataload.operations

fitness = scheduler.calculate_fitness(basic_chromosome)
makespan = 1 / fitness

print("Scheduler test \n")
print(f"Fitness of basic is: {fitness} and makespan: {makespan}")

In [None]:
class GeneticAlgorithm:
    """
    A genetic algorithm implementation.
    """
    def __init__(self,
        fitness_func: callable[[np.ndarray], float],
        genotype_length: int,
        population_size: int = 100,
        gene_type: str = "continious",
        bounds: tuple[float, float] = (0.0, 1.0),
        selection_method: str = "rank"):
        """
        Initializes instances of class GeneticAlgorithm
        
        Args: 
            fitness_func (callable[[np.ndarray], float]): function accepting a genotype and return float score.
            genotype_length (int): Number of genes, parameters
            population_size (int): Number of individuals (chromosomes) are in the population
            gene_type (str): Available 'continious', 'discrete', 'binary'
            bounds (min, max): Constraints for genes
            selection_method (str): The method used to select parent from the population
        """
        self.fitness_func = self.fitness_func
        self.genotype_length = genotype_length
        self.population_size = population_size
        self.gene_type = gene_type
        self.bounds = bounds

        # State
        self.population: np.ndarray = None
        self.fitness_scores: np.ndarray = None
        self.best_individual = None
        self.best_fitness = -np.inf
        
        if selection_method == "roulette":
            self.selection_func = self._selection_roulette
        elif selection_method =="rank": 
            self.selection_func = self._selection_rank
        else:
            raise ValueError("Unknown selection method")
        
        self._create_initial_population()

    
    def _create_initial_population(self):
        low, high = self.bounds

        # Generate the initial population based on gene type

        if self.gene_type == 'continious':
            self.population = np.random.uniform(low, high, size=(self.population_size, self.genotype_length))
        elif self.gene_type == 'binary':
            self.population = np.random.binomial(n=1, p=0.5, size=(self.population_size, self.genotype_length))
        elif self.gene_type == 'discrete':
            self.population = np.random.randint(low, high + 1, size=(self.population, self.genotype_length))
        else:
            raise ValueError(f"Uknown gene type {self.gene_type}")+
        
        # Initial evaluation
        self.fitness_scores = np.array([self.fitness_func(ind) for ind in self.population])
        self._update_best()


    def _update_best(self):
        """Updates the global best solution found so far"""
        max_idx = np.argmax(self.fitness_scores)
        if self.fitness_scores[max_idx] > self.best_fitness:
            self.best_fitness = self.fitness_scores[max_idx]
            self.best_individual = self.population[max_idx].copy()

    
    def _crossover_k_point(self, parent1: np.ndarray, parent2: np.ndarray, k: int, prob_pk: float) -> Tuple[np.ndarray, np.ndarray]:
        """
        Performs Order Crossover

        Args:
            k (int): Number of cut points
            prob_pk (float): Probability of crossover happens
        """
        if random.random() >= prob_pk:
            return parent1.copy(), parent2.copy()
        
        # Draw indices without returning
        cut_points = sorted(random.sample(range(1, self.genotype_length), k))

        child1 = parent1.copy()
        child2 = parent2.copy()

        # Swap segments between cut points
        swap = False
        start = 0

        # Add end of array as a virtual cut point
        extended_cuts = cut_points + [self.genotype_length]

        for cut in extended_cuts:
            if swap:
                child1[start:cut], child2[start:cut] = parent2[start:cut].copy(), parent1[start:cut].copy()
            swap = not swap
            start = cut
        
        return child1, child2

    def _mutation_m_points(self, individual: np.ndarray, m: int, prob_pm: float):
        """
        Performs Mutation in m points on a chromosome.

        Args:
            m (int): Number of genes to mutate
            prob_pm (float): Probabilit that mutations happens
        """

        if random.random() >= prob_pm:
            return
        
        # Select m indices to mutate
        mutation_indices = random.sample(range(self.genotype_length), m)
        low, high = self.bounds

        for idx in mutation_indices:
            if self.gene_type == 'continious':
                individual[idx] = random.uniform(low, high)
            elif self.gene_type == 'binary':
                individual[idx] = 1 - individual[idx]
            elif self.gene_type == 'discrete':
                individual[idx] = random.randint(low, high)

    
    def _selection_roulette(self, transformation_func: callable[[float], float] = None) -> np.ndarray:
        """
        Selects one parent using Roulette Wheel Selection
        
        Notes:
        This function can use optional fitness transformation 
        """
        scores = self.fitness_scores

        # Trasnform 
        if transformation_func:
            adjusted_scores = np.array([transformation_func(s) for s in scores])
        else:
            adjusted_scores = scores.copy()
        
        # Handle cases with negative fitness by shifting
        if np.min(adjusted_scores) < 0:
            adjusted_scores = adjusted_scores - np.min(adjusted_scores) + 1e-6

        total = np.sum(adjusted_scores)

        if total == 0:
            probs = np.ones(len(scores)) / len(scores)
        else:
            probs = adjusted_scores / total

        # Selected parent
        chosen_idx = np.random.choice(len(self.population), p=probs)

        return self.population[chosen_idx]
    

    def _selection_rank(self, offspring: list[np.ndarray], n_replace: int):
        """Selects N best offspring using Rank Selection and replace N worst individuals"""

        # Calculate fitness or every offsrping 
        off_scores = np.array([self.fitness_func(ind) for ind in offspring])

        # Sort fitness of offsrping
        off_sorted_idx = np.argsort(off_scores)[::-1]

        # Sort population, from the smallest to biggest
        pop_sorted_idx = np.argsort(self.fitness_scores)

        # Replacement
        for i in range(n_replace):
            
            # If the offsrping is smaller than number of replacement
            if i >= len(offspring): break

            worst_pop_idx = pop_sorted_idx[i]
            best_off_idx = off_sorted_idx[i]

            # Elitism
            if off_scores[best_off_idx] > self.fitness_scores[worst_pop_idx]:
                self.population[worst_pop_idx] = offspring[best_off_idx]
                
                # Remember to change also fitnes score
                self.fitness_scores[worst_pop_idx] = off_scores[best_off_idx]


    def run(self, 
        num_generations: int,
        k_crossover: int = 1,
        pk_prob: float = 0.8,
        m_mutation: int = 1,
        pm_prob: float = 0.1,
        target_fitness: float = None,
        dominance_threshold: float = 0.9):
        """
        Run Genetic Algorithm

        Args:
            num_generations (int): Number of generations
            k_crossover (int): Number of crossover
            pk_prob (float): Probability that crossover happens.
            m_mutation (int): Number of mutations
            pm_prob (float): Probability that mutations happens
            target_fitness (float): Algorithm will stop if it will achieve the target fitness
            dominance_threshold (float): Algorithm will stopp if the given % of population will be close to best fitness
        """

        for gen in tqdm(range(num_generations)):
            
            # First stop condition - goal function achieved
            if target_fitness is not None and self.best_fitness >= target_fitness:
                print(f"Goal fitness {target_fitness} reached at {gen} ")
                break

            # Second stop condition- Population dominance
            # Count how many individuals are very close to the best fitness
            matches = np.sum(np.isclose(self.fitness_scores, self.best_fitness, atol=1e-5))
            if matches / self.population_size >= dominance_threshold:
                print(f"Population dominance reached threshold {dominance_threshold} at gen {gen}")
                break
            
            # Create a poll of offspring
            offsrping_pool = []

            # Full batch
            pool_target_size = self.population_size
            
            while len(offsrping_pool) < pool_target_size:
                # roulette selecion, squaring for transformation function
                parent1 = self._selection_roulette(lambda x: x**2 if x > 0 else 0)
                parent2 = self._selection_roulette(lambda x: x**2 if x > 0 else 0)

                # Crossover
                crossover1, crossover2 = self._crossover_k_point(parent1, parent2, k=k_crossover, prob_pk=pk_prob)

                # Mutation
                self._mutation_m_points(crossover1, m=m_mutation, prob_pm=pm_prob)
                self._mutation_m_points(crossover2, m=m_mutation, prob_pm=pm_prob)

                offsrping_pool.extend([crossover1, crossover2])

            # Elitizm, replace 50% of worst parents with best children
            n_replace = int(self.population_size * 0.5)
            self._selection_rank(offsrping_pool, n_replace)

            # Update stats
            self._update_best()


        return self.best_individual, self.best_fitness

In [None]:
gen_alg = GeneticAlgorithm(dataload, scheduler)
best_chromosome, best_fitness = gen_alg.run(num_generations=5000)
makespan = 1 / best_fitness
print(f"Best fitness score is equal:{best_fitness} and makespan: {makespan}")

In [None]:
best_chromosome