Import Libraries

In [None]:
import numpy as np
import time, json, math, random, copy, glob, os
from json import JSONEncoder
from datetime import datetime

import matplotlib
import matplotlib.pyplot as plt
from matplotlib import colors
from matplotlib.ticker import MultipleLocator

import pygad
import pygame

from utils import config_input
from utils import reward as Reward

pygame 2.6.1 (SDL 2.28.4, Python 3.11.5)
Hello from the pygame community. https://www.pygame.org/contribute.html


Case study Inputs

In [None]:
## Mock-up

outer_shape = [40,40]   # [w, h]
input_shape_n = [(1,12,12,7),
                 (1,14,14,1),
                 (1,13,20,1),
                 (1,3,20,1),
                 (1,7,14,1),
                 (0,7,14,1),
                 (1,5,14,1),
                 (1,5,10,2)]
MAX_STEPS = 30
num_generations = 100
num_parents_mating = 10
sol_per_pop = 32
num_genes = MAX_STEPS
crossover_probability = 0.85
mutation_probability = 0.2

# ## Quasi-real-world Baseline

# outer_shape = [30,12]
# input_shape_n = [(1,2,2,2),
#                  (1,4,4,3),
#                  (0,4,4,10),
#                  (1,8,4,4),
#                  (0,8,4,10)]
# MAX_STEPS = 100
# num_generations = 500
# num_parents_mating = 10
# sol_per_pop = 32
# num_genes = MAX_STEPS
# crossover_probability = 0.85
# mutation_probability = 0.2

GA Algorithm

In [None]:
## Initialization

gene_space = range(0, 8)
parent_selection_type = "sss"
keep_parents = 1
crossover_type = "single_point"
mutation_type = "random"

def init_eval_common():
    canvas = np.zeros([outer_shape[1], outer_shape[0]])
    w, h = outer_shape
    return canvas, w, h

def init_eval_rand():
    canvas, w, h = init_eval_common()
    init_inv = config_input.inv_configurator(input_shape_n)     # Inventory formatted from stock shape info
    inv_keys = np.random.choice(2, size=len(init_inv), p=[0.2,0.8])   # Random stock availability p=[0.1,0.9]   ## [0.2,0.8]
    inv_aval = list(np.where(inv_keys==1)[0])   # Keys for currently available stocks
    inv_id = np.random.choice(np.where(inv_keys==1)[0], 1)[0]   # Inital stock to test
    return canvas, w, h, init_inv, len(init_inv), inv_aval, inv_id, [], [], np.array([0,0]), [], np.zeros(MAX_STEPS), 0

def init_eval_fixed():
    canvas, w, h = init_eval_common()
    input_updated = [(1, x[1], x[2], x[3]) for x in input_shape_n]
    init_inv = config_input.inv_configurator(input_updated)
    inv_aval = list(init_inv.keys())
    inv_id = inv_aval[-1]
    return canvas, w, h, init_inv, len(init_inv), inv_aval, inv_id, [], [], np.array([0,0]), [], np.zeros(MAX_STEPS), 0


In [None]:
## Utilities

class NumpyArrayEncoder(JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        if isinstance(obj, np.integer):
            return int(obj)
        if isinstance(obj, np.floating):
            return int(obj)
        return JSONEncoder.default(self, obj)
    
def export_results(info_stack, key_rewards=None):
    encoded_info = json.dumps(info_stack, cls=NumpyArrayEncoder)
    file_name = f'result_{datetime.now().strftime("%Y%m%d-%H%M")}_ga'
    with open(f'./result/{file_name}.json', 'w', encoding="utf-8") as f:
        json.dump(encoded_info, f)
        # print(file_name)
    if key_rewards != None:
        print("Major Updates:", key_rewards)
    return file_name


In [None]:
## Visualization

class viz():
    def __init__(self, file_name=None):
        if file_name == None:
            list_of_files = glob.glob(r'..\result\*.json')
            self.file_name = max(list_of_files, key=os.path.getmtime)
        else:
            self.file_name=r'..\result\{}.json'.format(file_name)
            # print(file_name)

        with open(self.file_name, 'r', encoding='utf-8') as f:
            data = json.load(f)
        self.data = json.loads(data)
        # print(data.keys())
        self.canvas = np.array(self.data['final canvas'])
        self.w = self.canvas.shape[1]
        self.h = self.canvas.shape[0]
           
    def show_layout(self,**kwargs):
        cmap = colors.ListedColormap(['#000000', '#F0F3BD', '#76C893'])
        bounds = [-2, 0, 0.5, 300]
        norm = colors.BoundaryNorm(bounds, cmap.N)

        # Plot the heatmap
        plt.close('all')
        fig, ax = plt.subplots()
        im = ax.imshow(self.canvas, cmap=cmap, norm=norm, **kwargs)

        # Show all ticks and label them with the respective list entries.
        ax.xaxis.set_major_locator(MultipleLocator(5))
        ax.xaxis.set_minor_locator(MultipleLocator(1))
        ax.yaxis.set_major_locator(MultipleLocator(5))
        ax.yaxis.set_minor_locator(MultipleLocator(1))

        # Turn spines off and create grid.
        ax.spines[:].set_visible(False)
        ax.set_title(f"Layout")
        ax.set_xticks(np.arange(self.w+1)-.5, minor=True)
        ax.set_yticks(np.arange(self.h+1)-.5, minor=True)
        ax.grid(which="minor", color="k", linestyle='-', linewidth=1)
        ax.tick_params(which="minor", bottom=False, left=False)
        # ax.legend()
        fig.tight_layout()

    def show_pygame(self, save=False):
        self.c_chart = ['mediumorchid','royalblue','turquoise','mediumseagreen','yellowgreen','gold','lightcoral', 'gray']        
        self.scale_factor = min(800//self.w, 600//self.h)
        self.CELL = self.scale_factor
        self.design_size = np.array([self.w, self.h])*self.scale_factor
        self._screen_size = tuple(map(sum, zip(self.design_size, (-1,-1))))
        self.SCREEN_W = self._screen_size[0]
        self.SCREEN_H = self._screen_size[1]

        # pygame setup
        pygame.init()
        pygame.font.init()
        self.screen = pygame.display.set_mode(self._screen_size)
        self.clock = pygame.time.Clock()
        self.font1 = pygame.freetype.Font(r"C:\Windows\Fonts\arial.ttf", 18)
        self.running = True
        pygame.time.wait(50)

        time_limit = 1000 
        # Record the start time
        start_time = pygame.time.get_ticks()

        while self.running:
            # poll for events
            # pygame.QUIT event means the user clicked X to close your window
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    self.running = False
            elapsed_time = pygame.time.get_ticks() - start_time
            if elapsed_time >= time_limit:
                self.running = False

            ## Draw the base
            background = pygame.Surface((self.SCREEN_W, self.SCREEN_H)).convert()
            background.fill((0,0,0))  # Black

            self.canvas_layer = pygame.Surface(tuple(self.design_size)).convert_alpha()
            self.canvas_layer.fill((244,243,189,255))  # Yellow-ish, Not transparent (255)
                
            # Draw horizontal lines
            for y in range(self.design_size[1]+1):
                pygame.draw.line(self.canvas_layer, (255,255,255,255), (0,y*self.CELL),
                                (self.SCREEN_W, y*self.CELL))

            # Draw vertical lines
            for x in range(self.design_size[0]+1):
                pygame.draw.line(self.canvas_layer, (255,255,255,255), (x*self.CELL,0),
                                (x*self.CELL, self.SCREEN_H))

            ## Draw the stocks
            inv_keys = np.unique(self.canvas)[1:]
            line_color = (0,0,0,255)
            cell_color = (118,200,147)    # Green-ish, Not transparent (255)
            for inv_key in inv_keys:
                self._draw_stocks(self.canvas_layer, line_color, cell_color, self.canvas, inv_key)
                
            self.screen.blit(self.canvas_layer,(0,0))
            pygame.display.update()
            
            self.clock.tick(60)  # limits FPS to 60
        if save:
            pygame.image.save(self.screen, f"{self.file_name[:-5]}_pygame.png")
        pygame.quit()

    def _draw_inv(self, layer, line_color, cell_color, canvas, inv_key):
        return
    
    def _draw_cur_stock(self, layer, line_color, cell_color, canvas, inv_key):
        return

    def _draw_stocks(self, layer, line_color, cell_color, canvas, inv_key):
        # Find location
        arr = np.argwhere(canvas==inv_key)

        # Color the cells
        for cell in arr:
            self._color_cell(np.flip(cell), color=cell_color)

        # Find the boundary of each stock
        ver_l, ver_r, hor_up, hor_dn = self._find_boundary(arr)

        # Draw horizontal lines: up & down
        self._draw_h_lines(layer, line_color, hor_up)
        self._draw_h_lines(layer, line_color, np.array(hor_dn) + np.array([0,1]))

        # Draw vertical lines: left & right
        self._draw_v_lines(layer, line_color, ver_l)
        self._draw_v_lines(layer, line_color, np.array(ver_r) + np.array([1,0]))

        # Write inventory key
        self._write_text(layer, int(inv_key-100), hor_up[0])

    def _color_cell(self, cell, color=(118,200,147), transparency=20):    
        x = int(cell[0] * self.CELL + 0.5 + 1)
        y = int(cell[1] * self.CELL + 0.5 + 1)
        w = int(self.CELL + 0.5 - 1)
        h = int(self.CELL + 0.5 - 1)
        pygame.draw.rect(self.canvas_layer, color + (transparency,), (x, y, w, h))

    def _find_boundary(self, arr):
        arr_grouped_by_x = [ list(arr[arr[:,0]==i]) for i in np.unique(arr[:,0]) ]
        arr_grouped_by_y = [ list(arr[arr[:,1]==i]) for i in np.unique(arr[:,1]) ]
        
        ver_l = [ np.flip(arr_x[0]) for arr_x in arr_grouped_by_x ]
        ver_r = [ np.flip(arr_x[-1]) for arr_x in arr_grouped_by_x ]
        hor_up = [ np.flip(arr_x[0]) for arr_x in arr_grouped_by_y ]
        hor_dn = [ np.flip(arr_x[-1]) for arr_x in arr_grouped_by_y ]

        return ver_l, ver_r, hor_up, hor_dn

    def _draw_h_lines(self, canvas_layer, line_color, arr):
        for xy in arr:
            pygame.draw.line(canvas_layer, line_color, (xy[0]*self.CELL, xy[1]*self.CELL),
                            ((xy[0]+1)*self.CELL, xy[1]*self.CELL))
            
    def _draw_v_lines(self, canvas_layer, line_color, arr):
        for xy in arr:
            pygame.draw.line(canvas_layer, line_color, (xy[0]*self.CELL, xy[1]*self.CELL),
                            (xy[0]*self.CELL, (xy[1]+1)*self.CELL))
        
    def _write_text(self, canvas_layer, inv_key, text_xy):
        text = f'{inv_key}'
        textRect = self.font1.get_rect(text, size=24)
        textRect.center = ((text_xy[0]+0.5)*self.CELL, (text_xy[1]+0.5)*self.CELL)
        self.font1.render_to(canvas_layer, textRect, text, (0,0,0,255))
 

In [None]:
## GA fitting

inv_best, ref_pt_best, best_return, canvas_best = [], [], -100, False
def fitness_func(ga_instance, solution, solution_idx):
    global inv_best, ref_pt_best, best_return, canvas_best

    canvas, w, h, init_inv, inv_max, inv_aval, inv_id, inv_assigned, \
        seq_stocks, ref_yx, ref_pts, rewards, output_return = init_eval_fixed()   # init_eval_rand()
    
    # Calculate the Rewards: list(current step/action's reward)
    # -----> Return: a series of actions/gene according to a given solution9*6

    for step, action in enumerate(solution):
        action = int(action)
        
        # 0: Change the stock
        if action == 0:
            if len(inv_aval) < 1:
                step_reward = -10
            else:
                if np.round(np.random.rand(1),1) < 0.8:
                    inv_id = int(sorted([
                        (np.count_nonzero(init_inv[inv_key]==1), inv_key) for inv_key in inv_aval
                    ], key = lambda x: x[0])[-1][-1])
                else:
                    inv_id = int(np.random.choice(inv_aval, 1)[0])
                    
                step_reward = Reward.velocity(canvas, init_inv[inv_id], ref_yx)

        # 1-2: Geometry (Cut)
        elif action < 3:
            cur_stock = init_inv[inv_id]
            org_cur_stock_area = np.count_nonzero(cur_stock)

            if np.count_nonzero(cur_stock==1) == 0:
                step_reward = -10
            else:
                # Prepare the cut-off availability test
                cut_i = np.where(cur_stock==1)[action-1].max()
                if action == 1:
                    new_stock_shape = cur_stock[cut_i:,:]
                else:
                    new_stock_shape = cur_stock[:,cut_i:]

                # Evaluate
                if np.count_nonzero(new_stock_shape) != 0:  # Yes Cut-off
                    # Update cur_stock_shape
                    if action == 1:
                        cur_stock = cur_stock[:cut_i,:]
                    else:
                        cur_stock = cur_stock[:,:cut_i]

                    init_inv[inv_id] = cur_stock
                    init_inv[inv_max] = new_stock_shape
                    inv_aval.append(inv_max)
                    inv_max += 1

                    step_reward = -(np.count_nonzero(new_stock_shape)/org_cur_stock_area + 1)
                    # step_reward = -np.count_nonzero(new_stock_shape)

                else:  # No Cut-off
                    step_reward = -10    # -np.count_nonzero(cur_stock)*2
            
        # 3-6: Translation
        elif action < 7:
            cur_stock = init_inv[inv_id]
            stock_h, stock_w = cur_stock.shape
            ref_yx_temp = copy.deepcopy(ref_yx)
            step_reward = 0

            # Change the ref pt
            if ref_yx_temp[action//5] + (-1)**action < [h, w][action//5]:
                ref_yx_temp[action//5] += (-1)**action
            else:
                ref_yx_temp[action//5] = [h-1, w-1][action//5]

            # Check if the stock is within the range
            if True in (ref_yx_temp<0) or (ref_yx_temp[0]+stock_h > h) or (ref_yx_temp[1]+stock_w > w):
                step_reward -= 10
            else:
                stock_temp = cur_stock + canvas[ref_yx_temp[0]:ref_yx_temp[0]+stock_h,
                                                ref_yx_temp[1]:ref_yx_temp[1]+stock_w]
                if True in (stock_temp > 1):    # No overlap
                    step_reward -= 10
                else:
                    ref_yx = ref_yx_temp
                    step_reward += Reward.velocity_concav(canvas, cur_stock, ref_yx_temp)

        # 7: Place the stock
        else:
            cur_stock = init_inv[inv_id]
            stock_h, stock_w = cur_stock.shape
            step_reward = 0

            #1# Check if the stock is within the range
            if (ref_yx[0]+stock_h <= h) and (ref_yx[1]+stock_w <= w):
                canvas_temp = copy.deepcopy(canvas)
                cur_stock_shape_temp = copy.deepcopy(cur_stock) + canvas_temp[ref_yx[0]:ref_yx[0]+stock_h, 
                                                                              ref_yx[1]:ref_yx[1]+stock_w]
                #2# If not overlapped, locate the stock on the canvas
                if not True in (cur_stock_shape_temp > 1):
                    cur_stock_shape_temp[cur_stock==1] = inv_id+100
                    canvas_temp[ref_yx[0]:ref_yx[0]+stock_h, 
                                ref_yx[1]:ref_yx[1]+stock_w] = cur_stock_shape_temp
                    ## REWARD
                    reward_concav, vertex_par = 0, np.array([0, w-1])
                    for r in range(canvas_temp.shape[0]):
                        if np.any(canvas_temp[r,:] >= 100):
                            c = np.argwhere(canvas_temp[r,:] >= 100)
                            vertex_cur = np.array([c.min(), c.max()])
                            if len(np.unique(c)) == 1 or list(np.unique(c)) != list(range(c.min(), c.max()+1)):
                                reward_concav -= np.unique(c).size    # Partially empty in the middle
                            if c.min() != 0:
                                reward_concav -= 1  # Empty first column
                            reward_concav -= np.count_nonzero(vertex_par!=vertex_cur)*2     # Min corners
                            vertex_par = vertex_cur
                        elif not np.any(canvas_temp[r,:] >= 100) and reward_concav != 0:
                            reward_concav -= 1  # Row fully empty in the middle
                    reward_concav = reward_concav/(w*h)
                    
                    step_reward += 20*(10+np.count_nonzero(cur_stock)/(w*h) + 10*reward_concav)
                    
                    canvas = canvas_temp

                    # Update for the next step
                    ## ref pt
                    if ref_yx[0] + stock_h < h-1:
                        ref_yx[0] += stock_h
                    elif ref_yx[1] + stock_w < w:
                        ref_yx[1] += stock_w
                        ref_yx[0] = 0
                
                    ## inv_id
                    inv_aval.remove(inv_id)
                    inv_assigned.append(inv_id)
                    if np.round(np.random.rand(1),1) < 0.8:
                        inv_id = int(sorted([
                            (np.count_nonzero(init_inv[inv_key]==1), inv_key) for inv_key in inv_aval
                        ], key = lambda x: x[0])[-1][-1])
                    else:
                        inv_id = int(np.random.choice(inv_aval, 1)[0])
                        
                else:
                    step_reward -= 10
            else:
                step_reward -= 10

        if np.count_nonzero(canvas==0) < w*h*0.1 or len(inv_aval) == 0:
            step_reward += 500

        rewards[step] = step_reward # Rewards are normalized at last
        seq_stocks.append(inv_id)
        ref_pts.append(tuple(ref_yx))

        # print(f"Step {step} - Action {action}. stock_id = {inv_id}, ref_yx = {ref_yx}, reward = {step_reward}")

    output_return = round(np.sum(rewards)/MAX_STEPS, 4)

    if best_return < output_return:
        best_return = output_return
        inv_best = seq_stocks
        ref_pt_best = ref_pts
        canvas_best = canvas

    return output_return


Index of the best solution : gen_38-idx_0
Actions of the best solution : [1. 6. 3. 1. 2. 7. 2. 1. 1. 5. 2. 7. 2. 0. 7. 7. 2. 3. 2. 1. 4. 0. 7. 7.
 7. 7. 1. 7. 6. 1.]
Fitness value of the best solution = 47.3114
Time elapsed: 21.63
result_20250702-1459_ga


Run the GA

In [None]:
fitness_function = fitness_func

ga_instance = pygad.GA(num_generations=num_generations,
                       num_parents_mating=num_parents_mating,
                       fitness_func=fitness_function,
                       sol_per_pop=sol_per_pop,
                       num_genes=num_genes,
                       gene_space=gene_space,
                       parent_selection_type=parent_selection_type,
                       keep_parents=keep_parents,
                       crossover_type=crossover_type,
                       crossover_probability=crossover_probability,
                       mutation_type=mutation_type,
                       mutation_probability=mutation_probability)

start_time = time.time()
ga_instance.run()
end_time = time.time()
elapsed_time = round(end_time - start_time, 2)

# Export the result
best_solution, best_solution_fitness, best_match_idx = ga_instance.best_solution()
info_stack = {
    "Index of the best solution" : f"gen_{ga_instance.best_solution_generation}-idx_{best_match_idx}",
    "Actions of the best solution" : best_solution,
    "Fitness value of the best solution": best_solution_fitness,
    "Time elapsed": elapsed_time,
    "Sequence of stock id": inv_best,
    "Sequence of reference points": ref_pt_best,
    "final canvas": canvas_best
}
file_name = export_results(info_stack)
print(f"Actions of the best solution : {best_solution}")
print(f"Fitness value of the best solution = {best_solution_fitness}")  # best_return

# Visualization
state = viz()
state.show_pygame(save=True)
print(file_name)
