In [1]:
from random import choices, randint, randrange, random
from typing import List, Optional, Callable, Tuple

Genome = List[int]
Population = List[Genome]
PopulateFunc = Callable[[], Population]
FitnessFunc = Callable[[Genome], int]
SelectionFunc = Callable[[Population, FitnessFunc], Tuple[Genome, Genome]]
CrossoverFunc = Callable[[Genome, Genome], Tuple[Genome, Genome]]
MutationFunc = Callable[[Genome], Genome]
PrinterFunc = Callable[[Population, int, FitnessFunc], None]


def generate_genome(length: int) -> Genome:
    return choices([0, 1], k=length)


def generate_population(size: int, genome_length: int) -> Population:
    return [generate_genome(genome_length) for _ in range(size)]


def single_point_crossover(a: Genome, b: Genome) -> Tuple[Genome, Genome]:
    if len(a) != len(b):
        raise ValueError("Genomes a and b must be of same length")

    length = len(a)
    if length < 2:
        return a, b

    p = randint(1, length - 1)
    return a[0:p] + b[p:], b[0:p] + a[p:]


def mutation(genome: Genome, num: int = 1, probability: float = 0.5) -> Genome:
    for _ in range(num):
        index = randrange(len(genome))
        genome[index] = genome[index] if random() > probability else abs(genome[index] - 1)
    return genome


def population_fitness(population: Population, fitness_func: FitnessFunc) -> int:
    return sum([fitness_func(genome) for genome in population])


def selection_pair(population: Population, fitness_func: FitnessFunc) -> Population:
    return choices(
        population=population,
        weights=[fitness_func(gene) for gene in population],
        k=2
    )


def sort_population(population: Population, fitness_func: FitnessFunc) -> Population:
    return sorted(population, key=fitness_func, reverse=True)


def genome_to_string(genome: Genome) -> str:
    return "".join(map(str, genome))


def print_stats(population: Population, generation_id: int, fitness_func: FitnessFunc):
    print("GENERATION %02d" % generation_id)
    print("=============")
    print("Population: [%s]" % ", ".join([genome_to_string(gene) for gene in population]))
    print("Avg. Fitness: %f" % (population_fitness(population, fitness_func) / len(population)))
    sorted_population = sort_population(population, fitness_func)
    print(
        "Best: %s (%f)" % (genome_to_string(sorted_population[0]), fitness_func(sorted_population[0])))
    print("Worst: %s (%f)" % (genome_to_string(sorted_population[-1]),
                              fitness_func(sorted_population[-1])))
    print("")

    return sorted_population[0]


def run_evolution(
        populate_func: PopulateFunc,
        fitness_func: FitnessFunc,
        fitness_limit: int,
        selection_func: SelectionFunc = selection_pair,
        crossover_func: CrossoverFunc = single_point_crossover,
        mutation_func: MutationFunc = mutation,
        generation_limit: int = 100,
        printer: Optional[PrinterFunc] = None) \
        -> Tuple[Population, int]:
    population = populate_func()

    for i in range(generation_limit):
        population = sorted(population, key=lambda genome: fitness_func(genome), reverse=True)

        if printer is not None:
            printer(population, i, fitness_func)

        if fitness_func(population[0]) >= fitness_limit:
            break

        next_generation = population[0:2]

        for j in range(int(len(population) / 2) - 1):
            parents = selection_func(population, fitness_func)
            offspring_a, offspring_b = crossover_func(parents[0], parents[1])
            offspring_a = mutation_func(offspring_a)
            offspring_b = mutation_func(offspring_b)
            next_generation += [offspring_a, offspring_b]

        population = next_generation

    return population, 

In [7]:
from typing import List
from random import choices
Genome = List[int]
Population = List[Genome]
# genetic representation of a solution which is either that item is included in the solution in this case 1 while 0 is not included
def generate_genome(length: int) -> Genome:
    return choices([0,1],k=length)

# function to generate new solution
def generate_population(size: int,genome_length: int) -> Population:
    return [generate_gnome(gnome_length) for _ in range(size)]

# fitness function
def fitness(genome: Genome, things: [Thing], weight_limit: int) -> int:
    if len(genome) != len(things):
        raise ValueError("genome and things must of same length")
    

In [8]:
!pip install pyeasyga

Defaulting to user installation because normal site-packages is not writeable
Collecting pyeasyga
  Downloading pyeasyga-0.3.1.tar.gz (20 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: pyeasyga
  Building wheel for pyeasyga (setup.py): started
  Building wheel for pyeasyga (setup.py): finished with status 'done'
  Created wheel for pyeasyga: filename=pyeasyga-0.3.1-py2.py3-none-any.whl size=6804 sha256=69789ee45662ec44f44370594d5580259aed1f2a7fb32929cc8810802f5abf1e
  Stored in directory: c:\users\drstone\appdata\local\pip\cache\wheels\4d\b5\ac\484fc9ef665b0699a2d0e64d5075282b73d3ae73e5ac6c3c98
Successfully built pyeasyga
Installing collected packages: pyeasyga
Successfully installed pyeasyga-0.3.1


In [1]:
"""

This code solves the scheduling problem using a genetic algorithm. Implementation taken from pyeasyga
As input this code receives:
    1. T = number of jobs [integer]
    2. ni = number of operations of the job i [list of T elements]
    3. m = number of machines [integer]
    3. Mj´ = feasible machines for the operaion j of the job i [matrix of sum(ni) row,  each row with n' feasible machines]
    4. pj'k = processing time of the operation j' in the machine k [matrix of sum(ni) row,  each row with n' feasible machines]
"""
from time import time

# Inputs

#T = 2 # number of jobs
#ni =[2,2] # number of operations of the job i
#ma = 2 # number of machines
#Mij = [[1,2],[1],[2],[1,2]]
#pjk = [[3,4],[5,1000],[1000,6],[2,2]]

#T = 3 # number of jobs
#ni =[2,2,2] # number of operations of the job i
#ma = 2 # number of machines
#Mij = [[1,2],[1,2],[1],[1,2],[2],[1,2]]
#pjk = [[3,4],[5,4],[2,1000],[2,4],[1000,3],[1,2]]

#T = 4 # number of jobs
#ni =[1,3,2,2] # number of operations of the job i
#ma = 3 # number of machines
#Mij = [[1,2,3],[1,3],[3],[1,2],[1,3],[1,2],[1,2,3],[1,3]]
#pjk = [[3,4,3],[5,1000,5],[1000,1000,6],[2,4,3],[1,1000,3],[1,2,1000],[2,2,2],[1,1000,1000]]

#T = 3 # number of jobs
#ni =[2,3,4] # number of operations of the job i
#ma = 5 # number of machines
#Mij = [[1,2,3,4,5],[1,3,4],[3,2],[1,2,5],[1,3,4],[1,2],[1,2,3],[1,3,5],[1,5]]
#pjk = [[3,4,3,4,4],[5,1000,5,4,1000],[1000,4,6,1000,1000],[2,4,1000,1000,4],
#       [1,1000,3,4,1000],[1,2,1000,1000,1000],[2,2,2,1000,1000],[1,1000,1,1000,2],
#       [4,1000,1000,1000,3]]

T = 4 # number of jobs
ni =[2,3,4,2] # number of operations of the job i
ma = 6 # number of machines
Mij = [[1,2,3,4,5],[1,3,4,6],[1,3,2],[1,2,5],[1,2,3,4],[1,2,5],[1,2,3,6],[1,3,5],[1,5,6],
       [1,6],[2,3,4]]
pjk = [[3,4,3,4,4,1000],[5,1000,5,4,1000,4],[3,4,6,1000,1000,1000],[2,4,1000,1000,4,1000],
       [1,3,3,2,1000,1000],[1,3,1000,1000,2,1000],[2,2,2,1000,1000,2],[1,1000,1,1000,2,1000],
       [4,1000,1000,1000,3,3],[3,1000,1000,1000,1000,4],[1000,5,3,4,1000,1000]]

"""
The individual is a list with T*ni*2 digits. For each operation in each job it has the variable S and the variable X
The S for start time to process and the X for the machine where this operation will be done. E.g:
    
    individual = [S11,X11,S12,X12..........Sini,Xini]
But first of all a dataset to be used during the algorithm must be made
"""
from pyeasyga import pyeasyga # import the library to be used
import random

data=[]

data.append(T)
data.append(ni)
data.append(ma)
data.append(Mij)
data.append(pjk)

def is_data_ok(data):
  sum_ni=0
  for i in range(0,len(data[1])):
    sum_ni+=data[1][i]
  if len(data[1])!=data[0]:
    print("Data invalid. Please check the length of ni list")
    exit
  elif len(data[3])!=sum_ni:
    print("Data invalid. Please check the length of Mij list")
    exit
  elif len(data[4])!=sum_ni:
    print("Data invalid. Please check the length of pjk list")
    exit

is_data_ok(data)


"""
To create a random individual a function called create_individual is created. In this case, random values to S from 0 to the max
of pjk*T are generated and for X values between the feasible machines are generated
"""
def max_processing_time(data):
    pjk=data[4]
    max_time=0
    for i in range(0,len(pjk)):
        for j in range(0,len(pjk[i])):
            if pjk[i][j]>max_time and pjk[i][j]!=1000:
                max_time=pjk[i][j]
    return max_time


def create_individual(data):
    individual=[]
    start_times=[0]*data[2]
    jobs=data[0]
    list_to=[2,1,2,0,1,2,0,1,1,0]
    random_number=random.randint(0,len(list_to)-1)
    reference=list_to[random_number]
    if reference == 1:
      a=0
      for i in range(0,jobs):
          for j in range(0,data[1][i]):
              position_X=random.randint(0,len(data[3][a])-1)
              X=data[3][a][position_X]
              S=start_times[X-1]
              individual.append(S)
              individual.append(X)
              start_times[X-1]=start_times[X-1]+data[4][a][X-1]
              a+=1
    elif reference == 2:
      a=len(data[3])-1
      for i in range(0,jobs):
          for j in range(0,data[1][i]):
              position_X=random.randint(0,len(data[3][a])-1)
              X=data[3][a][position_X]
              S=start_times[X-1]
              individual.append(S)
              individual.append(X)
              start_times[X-1]=start_times[X-1]+data[4][a][X-1]
              a-=1
    else:
      for i in range(0,jobs):
          for j in range(0,data[1][i]):
              X=random.randint(1,data[2])
              max_time=max_processing_time(data)
              S=random.randint(0,max_time)
              individual.append(S)
              individual.append(X)
    return individual
  
def mutate(individual):
  mutate_index1=random.randrange(len(individual))
  mutate_index2=random.randrange(len(individual))
  #max_time=max_processing_time(data)
  if ((mutate_index1%2)==0 and (mutate_index2%2)==0) or ((mutate_index1%2)!=0 and \
      (mutate_index2%2!=0)):
    individual[mutate_index1], individual[mutate_index2] = individual[mutate_index2], individual[mutate_index1]
  elif (mutate_index1%2)==0 and (mutate_index2%2)!=0:
    #if individual[mutate_index1]>(max_time/2):
     # individual[mutate_index1]=individual[mutate_index1]+random.randint(-(max_time/2),(max_time/2))
    new_index=random.randrange(0,len(individual),2)
    individual[mutate_index1], individual[new_index] = individual[new_index], individual[mutate_index1]
    individual[mutate_index2]=random.randint(1,data[2])
  else:
    #if individual[mutate_index2]>(max_time/2):
     # individual[mutate_index2]=individual[mutate_index2]+random.randint(-(max_time/2),(max_time/2))
    new_index=random.randrange(0,len(individual),2)
    individual[mutate_index2], individual[new_index] = individual[new_index], individual[mutate_index2]
    individual[mutate_index1]=random.randint(1,data[2])
    
"""
The fitness function is divided in two parts: 1. the Cmax is calculated from the individual, 2. the restrictions of the 
problema are validated to count how many fouls has the individual. At the end the fitness value = cmax + fouls*constant
"""
def is_feasible_machine(operation,machine,data):
    Mij=data[3]
    count=0
    for i in range(0,len(Mij[operation])):
        if machine==Mij[operation][i]:
            count+=1
    if count == 0:
        return False
    else:
        return True

def operations_in_machine(machine,individual):
    result=[]
    i=0
    while i<len(individual):
        if individual[i+1]==machine:
            result.append(int(i/2))
        i+=2
    return result
    
def fitness(individual,data):
    fitness=0
    pjk=data[4]
    i=0
    for op in range(0,len(pjk)):
        if (individual[i]+pjk[op][individual[i+1]-1])>fitness:
            fitness=individual[i]+pjk[op][individual[i+1]-1]
        i+=2
    # ------restrictions---------------
    fouls=0
    j=0
    k=0
    # for each job, C of current operation must be less than the next
    for job in range(0,len(ni)):
        for op2 in range(0,ni[job]-1):
          if (individual[j]+pjk[k][individual[j+1]-1])>individual[j+2] or\
            individual[j]>=individual[j+2]:
              fouls+=4
          j+=2
          k+=1
        j+=2
        k+=1
    # an operation must be made in a feasible machine
    l=0
    while l<len(individual):
        if not is_feasible_machine(int(l/2),individual[l+1],data):
            fouls+=2
        l+=2
        
    # for each machine an operation must start at zero
    # for each mahcine, the operations cannot be mixed. Only one operation at a time
    count_zeros=0
    for machine2 in range(1,data[2]+1):
      #count_zeros=0
      operations2=operations_in_machine(machine2,individual)
      for op4 in range(0,len(operations2)):
        if individual[operations2[op4]*2]==0:
          count_zeros+=1
        start_reference=individual[operations2[op4]*2]
        end_reference=individual[operations2[op4]*2]+pjk[operations2[op4]][machine2-1]
        for op5 in range(0,len(operations2)):
          if op5 != op4:
            s=individual[operations2[op5]*2]
            c=individual[operations2[op5]*2]+pjk[operations2[op5]][machine2-1]
            if s<=start_reference and c>=end_reference:
              fouls+=2
            elif s>=start_reference and s<=end_reference and c<=end_reference:
              fouls+=2
            elif s<=start_reference and c>start_reference and c<=end_reference:
              fouls+=2
            elif s>=start_reference and s<end_reference and c>=end_reference:
              fouls+=2
      #if count_zeros != 1:
        #fouls+=1
    if count_zeros == 0:
      fouls+=1
    fitness=fitness+(fouls*1000)
    return fitness
 
"""
At the end the create_individual and the fitness functions are added to the ga. Then run and print the best individual
"""
steps=[]
count_increment=0

def genetic_algorithm_scheduling(data,counter,pop_size=100,num_generations=500):
  start_time=time()
  ga=pyeasyga.GeneticAlgorithm(data,maximise_fitness=False,population_size=pop_size,generations=num_generations,mutation_probability=0.3) # initialization of the algorithm
  ga.create_individual=create_individual
  ga.mutate_function=mutate
  ga.fitness_function=fitness
  ga.run()
  best_individual=ga.best_individual()
  steps.append(best_individual)
  best_fitness=best_individual[0]
  if best_fitness>1000 and counter<10:
    counter+=1
    new_generations=num_generations+100
    print("Incrementing generations to ",new_generations,"......")
    genetic_algorithm_scheduling(data,counter,pop_size,new_generations)
  elif best_fitness>1000 and counter==10:
    print("Feasible individual wasn't found!")
    print("Best infeasible individual: ",ga.best_individual())
    end_time=time()
    print("The execution time was: ",(end_time-start_time)," seconds")
  elif best_fitness<1000:
    end_time=time()
    print("Best feasible individual found! ",ga.best_individual())
    print("The execution time was: ",(end_time-start_time)," seconds")
    print("These were the different best individuals:")
    for i in range(0,len(steps)):
      print(steps[i])
    return steps
  

genetic_algorithm_scheduling(data,count_increment,pop_size=200)

Best feasible individual found!  (11, [0, 3, 3, 3, 0, 2, 4, 2, 8, 2, 0, 1, 1, 1, 4, 5, 6, 1, 0, 6, 8, 3])
The execution time was:  11.020895004272461  seconds
These were the different best individuals:
(11, [0, 3, 3, 3, 0, 2, 4, 2, 8, 2, 0, 1, 1, 1, 4, 5, 6, 1, 0, 6, 8, 3])


[(11, [0, 3, 3, 3, 0, 2, 4, 2, 8, 2, 0, 1, 1, 1, 4, 5, 6, 1, 0, 6, 8, 3])]