# Imports needed


In [17]:
!pip install pygame



In [18]:
import math
from itertools import zip_longest
from operator import itemgetter
import random
import pygame

In [19]:
visualisation = True  # maybe turn of if do_algo > 1
iterations = 320000  # amount of iterations, while one iteration of the algorithm
do_algo = 1  # iterations of applying the algorithm
benchmark_id = 0  # choose number between 0 and 81, check tests.txt



### To run pygame


# Temperature Function
Using s-curve

In [20]:
# temperature function - s-curve
def temperature(i):
    amplitude = 1000
    center = 0
    width = 0.11 * iterations
    return float(amplitude * (1 / (1 + math.exp((i - center) / width))))

# Dataset


## Construct job shop problem


In [21]:
# Dataset url
jobshop1 = "https://raw.githubusercontent.com/DGuilherme/EnergyProductionScheduling/main/PhaseONE/Datasets/OR_JobShop/jobshop1.txt";
jobshop2 = "https://raw.githubusercontent.com/DGuilherme/EnergyProductionScheduling/main/PhaseONE/Datasets/OR_JobShop/jobshop2.txt";

In [22]:
def construct_tests_js1():
    file = []
    with open(jobshop1, 'r') as file1:
        for line in file1:
            if line.isspace():
                continue
            if (line[1].isdigit() or line[2].isdigit()) and line[3] != 'x':
                file.append(line.strip())

    tests = []
    while len(file) > 0:
        desc = file.pop(0).split()  # tuple (amount of jobs, amount of machines)
        jobs = []
        for x in range(int(desc[0])):
            job_file = file.pop(0).split()
            job = []
            y = 0
            while len(job_file) > 0:
                task = (x, y, int(job_file.pop(0)), int(job_file.pop(0)))
                y += 1
                job.append(task)
            jobs.append(job)
        tests.append(jobs)
    return tests

In [27]:
construct_tests_js1();

FileNotFoundError: ignored

# Algorithm Functions


In [23]:
def find_prev_index(jobs, job):
    job_n = job[0]
    job_seq = job[1] - 1
    if job_seq < 0:
        return None
    for i in range(0, len(jobs)):
        if jobs[i][0] == job_n and jobs[i][1] == job_seq:
            return i


# generating a random, valid neighbor based on previous solution
def shuffle_jobs(array):
    while True:
        first_job, second_job = random.choice(array), random.choice(array)
        # check if job_number is different
        if first_job[0] != second_job[0]:
            # indexes of two elements
            a, b = array.index(first_job), array.index(second_job)
            # indexes of previous jobs
            a_prev, b_prev = find_prev_index(array, first_job), find_prev_index(array, second_job)
            # check if the order of jobs are still valid
            if (a_prev is None or a_prev <= b) and (b_prev is None or b_prev <= a):
                array[b], array[a] = array[a], array[b]  # swap jobs
                return array


# evaluation function, total length of schedule
def find_max_duration(array):
    result = 0
    for sublist in array:
        temp = max(sublist, key=itemgetter(2))[2]
        if result < temp:
            result = temp
    return result


# creates schedule out of a ordered job list
# each job list is validated before
def schedule(jobs, machine_count):
    # first parameter is machine number
    results = [[] for x in range(machine_count)]
    time = [0] * (len(jobs) + 1)  # time of end of job
    time_m = [0] * machine_count  # time end of machine last job
    for job in jobs:
        job_n = job[0]  # job number
        machine_n = job[2]  # machine number
        duration = job[3]  # duration length
        # starting time is max(end of prev task in job, end of prev task on machine)
        start = max(time_m[machine_n], time[job_n])
        results[machine_n].append((job, start, start + duration))  # insert to the end
        # update the last time of job and machine
        time[job_n] = start + duration  # set new time of end of job
        time_m[machine_n] = start + duration    # set new end of last job of machine
    return results


# function that prints schedule as readable output
def print_result(res):
    i = 0
    for machine in res:
        string = "machine " + str(i) + ":"
        for job in machine:
            string = string + "(" + str(job[0][0]) + ", " + str(job[0][1]) + ", " + str(job[1]) + ")"
        print(string)
        i += 1

# Data Visualization

In [24]:
# used to visualize a single schedule

def show(schedule, am_jobs):

    # window initialization
    pygame.init()
    dis_x = 1800
    game_display = pygame.display.set_mode((dis_x, 900))    # change if problem with window size
    game_display.fill((0, 0, 0))

    scaling = (dis_x*0.99)/find_max_duration(schedule)  # scaling factor, so each schedule will fit
    height = int(600 / len(schedule))   # total height the same every time, single height depends on amount of machines
    start_x = 10
    y = 150

    # random color list, to assign each job a different color
    colors = [(random.randint(0, 240), random.randint(0, 240), random.randint(0, 240)) for i in range(50)]

    # calculate size and position of each task and create corresponding rectangle
    for machine in schedule:
        x = start_x + (machine[0][1] * scaling)
        for task in machine:
            if machine.index(task) + 1 < len(machine):
                next = machine[machine.index(task) + 1]
                width = next[1] - task[1]
            pygame.draw.rect(game_display, colors[task[0][0]], (x, y, int(task[0][3] * scaling), height))
            x += width * scaling
        y += height

    # window loop
    while True:
        for event in pygame.event.get():

            if event.type == pygame.QUIT:
                pygame.quit()
                quit()
        pygame.display.update()

# Run 


In [25]:
def run():
    tests = construct_tests_js1()       # get benchmarks in jobshop1.txt
    jobs_data = tests[benchmark_id]         # choose the benchmark you want to run

    jobs_data = [list(filter(None, i)) for i in zip_longest(*jobs_data)]

    first_solution = 0
    best_duration = int
    best_jobs = []

    for x in range(do_algo):

        # order and flatten list
        jobs = []
        for sublist in jobs_data:
            for item in sublist:
                jobs.append(item)

        machines_count = max(jobs, key=itemgetter(2))[2] + 1  # find how many machines are there
        results = schedule(jobs, machines_count)      # generate first solution
        min_duration = find_max_duration(results)     # calculate duration of first solution
        first_solution = min_duration

        best_duration = min_duration + 1
        current_best = results       # set first solution as best_solution

        for i in range(iterations):
            old = list(results)     # previous schedule
            alter = shuffle_jobs(jobs)      # new job list
            results = schedule(alter, machines_count)  # calculate schedule of new solution

            # calculate duration of old and new schedule
            d_old = find_max_duration(old)
            d_alter = find_max_duration(results)

            if d_old >= d_alter:
                jobs = alter
            elif random.uniform(0, 1) < math.exp(-(d_alter - d_old) / temperature(i)):
                jobs = alter

            if d_alter < min_duration:
                min_duration = d_alter
                current_best = list(results)

        if best_duration > min_duration:
            best_duration = min_duration
            best_jobs = current_best

    print("first solution: " + str(first_solution))
    print("minimum duration: " + str(best_duration))

    print_result(best_jobs)
    if visualisation:
        show(best_jobs, len(jobs_data))



In [26]:
run();

FileNotFoundError: ignored