In [None]:
from utils import make_storage,make_task_dict
from tqdm import tqdm
from colors import colors
import numpy as np
import random
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
import seaborn as sns
import os
import shutil
from PIL import Image,ImageSequence

In [None]:
storage = make_storage()
storage['fishing net']=20
storage['apple']=20
storage['wool']=20
storage['fish fillet']=20
storage['raspberry']=20
storage['cherry']=20
storage['wheat']=100
storage['carrot']=100
storage['soybean']=100
storage['corn']=100
storage['bacon']=20
orders = [
            *['carrot pie']*2,
            *['raspberry muffin']*2,
            *['cream cake']*2,
            *['pizza']*7,
            *['fish pie']*2,
            *['bacon and eggs']*3
        ]
orders = list(storage.keys())[:40]
orders = orders[:20]+orders[:20]+orders[:20]+ orders[20:40]
# orders = orders[:20] + orders[:20] 

In [None]:
machines={'Bakery':1, 'Cow':4, 'Feed Mill':2, 'Field':12,'Dairy':1,'Chicken':6,'Sugar Mill':1,
        'Pig':4,'Pie Oven':1,'Cake Oven':1,'BBQ Grill':1,'Goat':4}

In [None]:
task_dict=make_task_dict(orders,storage)
sc=set(task['source'] for task in task_dict.values())
# print machine not in sc
for m in sc:
    if m not in machines:
        print(m)
        machines[m]=1
print(len(task_dict))

In [None]:
from collections import Counter

counter = Counter([task['name'] for task in task_dict.values()])
data = [
    {
        'image':f'images/{k.replace(" ","-")}.png',
        'name':k,
        'count':v
    } for k,v in counter.items()
]
data = sorted(data,key=lambda x:x['count'],reverse=True)

In [None]:
# generate chromosome
def generate_chromosome(task_dict):
    """
    Generate a chromosome for the genetic algorithm.

    Parameters:
    - task_dict (dict): A dictionary containing task information.

    Returns:
    - chromosome (numpy.ndarrsay): A 2xlength matrix with integer values.
                                  The first row represents the task IDs,
                                  and the second row represents the machine IDs.
    """
    chromosome = np.zeros((2,len(task_dict))).astype(int)
    tasks = list(task_dict.values())
    random.shuffle(tasks)
    tasks.sort(key=lambda x:x['depth'],reverse=True)
    for i in range(len(tasks)):
        chromosome[0][i]=tasks[i]['id']
        chromosome[1][i]=random.randint(1,machines[tasks[i]['source']])
        # chromosome[1][i]=1
    return chromosome

In [None]:
def calc_start_end_time(chromosome):
  """
  Calculates the start and end time for each task in the chromosome.

  Args:
    chromosome (tuple): A tuple containing two lists - the first list represents the task IDs and the second list represents the machine IDs.

  Returns:
    dict: A dictionary containing the start and end time for each task.
  """
  machine_queue_dict = {}
  for machine_name in machines.keys():
    machine_queue_dict[machine_name] = {}
    for count in range(1, machines[machine_name] + 1):
      machine_queue_dict[machine_name][count] = []
  temp_task_dict = {}
  for i in range(len(chromosome[0])):
    task_id = chromosome[0][i]
    machine_id = chromosome[1][i]
    task = task_dict[task_id]
    machine_name = task['source']
    machine = machine_queue_dict[machine_name][machine_id]
    temp_task = task.copy()
    temp_task['start_time'] = 0
    temp_task['end_time'] = task['duration']
    temp_task['machine_id'] = machine_id
    if len(machine) > 0:
      last_task_id = machine[-1]
      last_task = temp_task_dict[last_task_id]
      temp_task['start_time'] = last_task['end_time']
      temp_task['end_time'] = temp_task['start_time'] + task['duration']
    max_end_time_depends = max([temp_task_dict[depend_task_id]['end_time'] for depend_task_id in task['dependencies']]) if len(task['dependencies']) > 0 else 0
    if max_end_time_depends > temp_task['start_time']:
      temp_task['start_time'] = max_end_time_depends
      temp_task['end_time'] = temp_task['start_time'] + task['duration']
    temp_task_dict[task_id] = temp_task
    machine.append(task_id)
  return temp_task_dict

def fitness(chromosome):
  """
  Calculate the fitness of a chromosome.

  Parameters:
  chromosome (list): The chromosome representing a solution.

  Returns:
  int: The fitness value of the chromosome.
  """
  task_dict = calc_start_end_time(chromosome)
  max_end_time = max(task['end_time'] for task in task_dict.values())
  score = 60*60*24*30*12/max_end_time
  # for task in task_dict.values():
  #   if task['id']==task['root_id']:
  #     score+=60*60*24/task['end_time']
  return score

In [None]:
def plot_chromosome(chromosome,ax, show_text=False):
    """
    Plots the timeline of tasks based on the given chromosome.

    Parameters:
    chromosome (list): The chromosome representing the tasks.
    ax (matplotlib.axes._subplots.AxesSubplot): The axes object to plot on.
    show_text (bool): Whether to show the task name on the plot.

    Returns:
    matplotlib.figure.Figure: The generated figure object.
    """
    test_tasks = calc_start_end_time(chromosome)
    df = pd.DataFrame(test_tasks.values())
    df['task'] = df['source'] + ' ' + df['machine_id'].astype(str)
    df = df.sort_values('task', ascending=False)
    for i in range(len(df)):
        row = df.iloc[i]
        color = colors[row['root_id'] % len(colors)]  # Get color from the colors list
        ax.barh(row['task'], row['duration'], left=row['start_time'], edgecolor='black', color=color)  # Add edgecolor to create a border
        if show_text:
            ax.text(row['start_time'] + row['duration'] / 2, row['task'], row['name'],        ha='center', va='center', rotation=90)
    ax.set_xlabel('Time (s)')
    ax.set_ylabel('Task')
    ax.set_title('Timeline of Tasks')
    ax.grid(True)
    # ax.xaxis.set_major_locator(plt.MultipleLocator(60*30))
    # legend is root_id. example: task 1,task 2
    # legend_elements = []
    # for root_id in sorted(df['root_id'].unique()):
        # legend_elements.append(Line2D([0], [0], color=colors[root_id % len(colors)], lw=4, label=f'Task {root_id}'))  # Get color from the colors list
        # ax.legend(handles=legend_elements, title='Task',
        #             bbox_to_anchor=(1.05, 1),
        #             loc='upper left')


In [None]:
# Selection function
def select(population, fitnesses):
    total_fitness = sum(fitnesses)
    probabilities = [f / total_fitness for f in fitnesses]
    parent1, parent2 = random.choices(population, weights=probabilities, k=2)
    return parent1, parent2

In [None]:
def crossover(parent1, parent2):
    if random.random() > 0.8:
        parent1 = parent1.copy()
        parent2 = parent2.copy()
        return parent1, parent2

    # do ordered crossover
    row, size = parent1.shape   
    start = random.randint(0, size - 1)
    end = random.randint(start+1, size )
    offspring1 = np.zeros((row, size), dtype=int)
    offspring2 = np.zeros((row, size), dtype=int)
    offspring1[:, start:end] = parent2[:, start:end]
    offspring2[:, start:end] = parent1[:, start:end]

    ids1 = [i for i in parent1.T if i[0] not in offspring1[0]]
    ids2 = [i for i in parent2.T if i[0] not in offspring2[0]]
    for i in range(size):
        if offspring1[0][i]==0:
            offspring1[0][i]=ids1[0][0]
            offspring1[1][i]=ids1[0][1]
            ids1.pop(0)
        if offspring2[0][i]==0:
            offspring2[0][i]=ids2[0][0]
            offspring2[1][i]=ids2[0][1]
            ids2.pop(0)

    # uniform crossover
    for index_child_1 in range(size):
        if random.random() < 0.5:
            index_child_2 = np.where(offspring2[0] == offspring1[0][index_child_1])[0][0]
            offspring1[1][index_child_1], offspring2[1][index_child_2] = offspring2[1][index_child_2], offspring1[1][index_child_1]
    return offspring1, offspring2

In [None]:
def mutate(chromosome):
    if random.random() < 0.2:
        idx = random.randint(0,len(chromosome[0])-1)
        task = task_dict[chromosome[0][idx]]
        chromosome[1][idx]=random.randint(1,machines[task['source']])

In [None]:
# Example of using these functions in a genetic algorithm
def genetic_algorithm(pop_size,iteration):
    history=[]
    
    best_chromosome = None
    best_fitness = 0

    # Initialize a random population
    population = [generate_chromosome(task_dict) for _ in range(pop_size)]
    pbar = tqdm(total=iteration)
    # set desc at bottom
    for iter in range(iteration):
        new_population = []
        fitnesses = [fitness(chromosome) for chromosome in population]
    
        best_chromosome = population[np.argmax(fitnesses)]

        history.append({
            'generation': iter+1,
            'best_chromosome': best_chromosome,
            'fitnesses': fitnesses,
        })

        temp=calc_start_end_time(best_chromosome)
        # print(f"Generation {iter+1} best fitness: {max(fitnesses)}, makespan: {max(task['end_time'] for task in temp.values())}")
        pbar.set_description(f"Generation {iter+1} best fitness: {max(fitnesses)}, makespan: {max(task['end_time'] for task in temp.values())}")
        pbar.update(1)
        if max(fitnesses) > best_fitness:
            best_fitness = max(fitnesses)
            best_chromosome = population[np.argmax(fitnesses)]
        # new_population.append(best_chromosome)        
        for _ in range(pop_size // 2):
            parent1, parent2 = select(population, fitnesses)
            offspring1, offspring2 = crossover(parent1, parent2)
            mutate(offspring1)
            mutate(offspring2)
            new_population.append(offspring1)
            new_population.append(offspring2)
        population = new_population[:pop_size]
    pbar.close()
    return best_chromosome,history

In [None]:
best_chromosome,history = genetic_algorithm(pop_size=500,iteration=500)

In [None]:
min_fitness = min([min(h['fitnesses']) for h in history])
max_fitness = max([max(h['fitnesses']) for h in history])

In [None]:
# clear folder ga
if os.path.exists('ga'):
    shutil.rmtree('ga')
os.mkdir('ga')

In [None]:
from PIL import Image, ImageDraw, ImageFont
import math

def create_grid_image(data, cols, image_size=(100, 100), label_height=20):
    """
    Create an image with a grid of images and labels.

    :param data: List of dictionaries with keys 'image', 'name', and 'count'.
    :param cols: Number of columns in the grid.
    :param image_size: Size of each individual image in the grid.
    :param label_height: Height of the label area under each image.
    :return: A PIL Image object with the grid.
    """
    # Calculate rows based on the length of data and cols
    rows = math.ceil(len(data) / cols)

    # Calculate total size of the final image
    total_width = image_size[0] * cols
    total_height = (image_size[1] + label_height) * rows

    grid_image = Image.new('RGBA', (total_width, total_height), color=(255,255,255,0))
    draw = ImageDraw.Draw(grid_image)
    font = ImageFont.load_default(size=60)

    # Iterate over the data and place each image and label
    for i, item in enumerate(data):
        # Load image
        img = Image.open(item['image'])

        # Calculate position of this image/label
        row, col = divmod(i, cols)
        x = col * image_size[0]
        y = row * (image_size[1] + label_height)

        # Resize image if necessary
        img = img.resize(image_size)

        # Paste image into grid
        grid_image.paste(img, (x, y))

        # Prepare label text
        label_text = f"{item['count']}"

        # Draw label
        # draw center of the grid
        label_x = x + image_size[0] // 2
        label_y = y + image_size[1] // 2
        draw.text((label_x, label_y), label_text, fill='black', font=font, anchor='mm')
        # draw.text((x, label_y), label_text, fill='black', font=font,)
    return grid_image

In [None]:
image = create_grid_image(data, cols=6)
for iter in tqdm(range(len(history))):
    fig = plt.figure(figsize=(20,11))
    temp = calc_start_end_time(history[iter]['best_chromosome'])
    text_str = f'Genetic Algorithm to Solve the Scheduling Problem\nGeneration: {iter}\nFitness: {max(history[iter]["fitnesses"])}\nMakespan: {max(task["end_time"] for task in temp.values())} seconds'
    fig.suptitle(text_str, fontsize=14)

    ax1 = plt.subplot2grid((3, 6), (0, 0), rowspan=3, colspan=5)
    plot_chromosome(history[iter]['best_chromosome'], ax1)

    ax2 = plt.subplot2grid((3, 6), (0, 5))
    ax2.set_title('Task Distribution')
    ax2.imshow(image,aspect='auto')
    ax2.axis('off')

    ax3 = plt.subplot2grid((3, 6), (1, 5))
    ax3.set_title('Fitness Distribution')
    sns.histplot(history[iter]['fitnesses'], ax=ax3)
    ax3.set_xlim(min_fitness, max_fitness)
    ax3.set_xlabel('Fitness')
    ax3.set_ylabel('Count')

    ax4 = plt.subplot2grid((3, 6), (2, 5))
    ax4.set_title('Fitness Over Generations')
    best_fitnesses = [np.max(h['fitnesses']) for h in history[:iter]]
    avg_fitnesses = [np.mean(h['fitnesses']) for h in history[:iter]]
    ax4.plot(best_fitnesses, label='Best')
    ax4.plot(avg_fitnesses, label='Average')
    ax4.set_xlabel('Generation')
    ax4.set_ylabel('Fitness')
    ax4.legend()

    plt.subplots_adjust(wspace=0.5, hspace=0.5, top=0.85)
    plt.savefig(f'ga/{iter}.png')
    plt.close(fig)
plt.show()

In [None]:
images = []
for i in range(len(history)):
    images.append(Image.open(f'ga/{i}.png'))
images[0].save('ga.gif', save_all=True, append_images=images[1:], optimize=False, duration=100, loop=0)

with Image.open('ga.gif') as img:
    frames = [frame.copy() for frame in ImageSequence.Iterator(img)]

frames[-1].info['duration'] = 10000

frames[0].save('ga.gif',
               save_all=True,
               append_images=frames[1:],
               loop=0)
