<a href="https://colab.research.google.com/github/chentiann/flowshop/blob/main/flowshop.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!git clone https://github.com/chentiann/flowshop.git

fatal: destination path 'flowshop' already exists and is not an empty directory.


In [2]:
import sys

sys.path.append('/content/flowshop')

In [3]:
import time
import json
import multiprocessing
from typing import Collection, Any
import http.client
from implementation import sampler


def _trim_preface_of_body(sample: str) -> str:
    """Trim the redundant descriptions/symbols/'def' declaration before the function body.
    Please see my comments in sampler.LLM (in sampler.py).
    Since the LLM used in this file is not a pure code completion LLM, this trim function is required.

    -Example sample (function & description generated by LLM):
    -------------------------------------
    This is the optimized function ...
    def priority_v2(...) -> ...:
        return ...
    This function aims to ...
    -------------------------------------
    -This function removes the description above the function's signature, and the function's signature.
    -The indent of the code is preserved.
    -Return of this function:
    -------------------------------------
        return ...
    This function aims to ...
    -------------------------------------
    """
    lines = sample.splitlines()
    func_body_lineno = 0
    find_def_declaration = False
    for lineno, line in enumerate(lines):
        # find the first 'def' statement in the given code
        if line[:3] == 'def':
            func_body_lineno = lineno
            find_def_declaration = True
            break
    if find_def_declaration:
        code = ''
        for line in lines[func_body_lineno + 1:]:
            code += line + '\n'
        return code
    return sample


class LLMAPI(sampler.LLM):
    """Language model that predicts continuation of provided source code.
    """

    def __init__(self, samples_per_prompt: int, trim=True):
        super().__init__(samples_per_prompt)
        # additional_prompt = ("Improve the scheduling heuristic to minimize makespan. "
        #                      "You can change how jobs are ordered or inserted. "
        #                      "Be creative. Think beyond NEH logic. "
        #                      "Pls only generate neh_heuristic(processing_times: np.ndarray) function"
        #                      "Use loops, conditionals, or clustering ideas. Only return valid Python code.")
        additional_prompt = """Improve the NEH heuristic for PFSP to minimize makespan. Focus on:
          1. **Scoring Strategy**: Modify how jobs are prioritized (e.g., dynamic alpha, machine load balancing).
          2. **Insertion Logic**: Optimize the position selection during insertion (e.g., early termination if no improvement).
          3. **Local Search**: Replace the swap-based search with more efficient methods (e.g., 3-opt, tabu search).
          4. **Constraints**:
            - Preserve job uniqueness (no duplicates).
            - Only use `compute_makespan` for evaluation.
            - Return a list of job indices (e.g., [0, 2, 1]).

         " Generate *only* the `neh_heuristic` function body (no duplicate code from skeleton."""
        self._additional_prompt = additional_prompt
        self._trim = trim

    def draw_samples(self, prompt: str) -> Collection[str]:
        """Returns multiple predicted continuations of `prompt`."""
        return [self._draw_sample(prompt) for _ in range(self._samples_per_prompt)]

    def _draw_sample(self, content: str) -> str:
        prompt = '\n'.join([content, self._additional_prompt])
        message = [{'role': 'user', 'content': prompt}]

        while True:
            try:
                conn = http.client.HTTPSConnection("api.bltcy.ai")
                payload = json.dumps({
                    "max_tokens": 1024,
                    "model": "gpt-3.5-turbo",
                    "messages": [
                        {
                            "role": "user",
                            "content": prompt
                        }
                    ]
                })
                headers = {
                    'Authorization': 'sk-KM05TodIiwvQH6SQE333F816C98e4092B5Cb0dE1D0A1Bc76',
                    'User-Agent': 'Apifox/1.0.0 (https://apifox.com)',
                    'Content-Type': 'application/json'
                }
                conn.request("POST", "/v1/chat/completions", payload, headers)
                res = conn.getresponse()
                data = res.read().decode("utf-8")
                data = json.loads(data)
                response = data['choices'][0]['message']['content']
                if self._trim:
                    response = _trim_preface_of_body(response)
                return response
            except Exception:
                time.sleep(2)
                continue

In [4]:
from implementation import evaluator
from implementation import evaluator_accelerate


class Sandbox(evaluator.Sandbox):
    """Sandbox for executing generated code. Implemented by RZ.

    RZ: Sandbox returns the 'score' of the program and:
    1) avoids the generated code to be harmful (accessing the internet, take up too much RAM).
    2) stops the execution of the code in time (avoid endless loop).
    """

    def __init__(self, verbose=False, numba_accelerate=True):
        """
        Args:
            verbose         : Print evaluate information.
            numba_accelerate: Use numba to accelerate the evaluation. It should be noted that not all numpy functions
                              support numba acceleration, such as np.piecewise().
        """
        self._verbose = verbose
        self._numba_accelerate = False

    def run(
            self,
            program: str,
            function_to_run: str,  # RZ: refers to the name of the function to run (e.g., 'evaluate')
            function_to_evolve: str,  # RZ: accelerate the code by decorating @numba.jit() on function_to_evolve.
            inputs: Any,  # refers to the dataset
            test_input: str,  # refers to the current instance
            timeout_seconds: int,
            **kwargs  # RZ: add this
    ) -> tuple[Any, bool]:
        """Returns `function_to_run(test_input)` and whether execution succeeded.

        RZ: If the generated code (generated by LLM) is executed successfully,
        the output of this function is the score of a given program.
        RZ: PLEASE NOTE THAT this SandBox is only designed for flowshop problem.
        """
        dataset = {test_input: inputs[test_input]}
        try:
            result_queue = multiprocessing.Queue()
            process = multiprocessing.Process(
                target=self._compile_and_run_function,
                args=(program, function_to_run, function_to_evolve, dataset, self._numba_accelerate, result_queue)
            )
            process.start()
            process.join(timeout=timeout_seconds)
            if process.is_alive():
                # if the process is not finished in time, we consider the program illegal
                process.terminate()
                process.join()
                results = None, False
            else:
                if not result_queue.empty():
                    results = result_queue.get_nowait()
                else:
                    results = None, False

            return results
        except:
            return None, False

    def _compile_and_run_function(self, program, function_to_run, function_to_evolve, dataset, numba_accelerate,
                                  result_queue):
        try:
            # optimize the code (decorate function_to_run with @numba.jit())
            if numba_accelerate:
                program = evaluator_accelerate.add_numba_decorator(
                    program=program,
                    function_to_evolve=function_to_evolve
                )
            # compile the program, and maps the global func/var/class name to its address
            all_globals_namespace = {}
            # execute the program, map func/var/class to global namespace
            exec(program, all_globals_namespace)
            # get the pointer of 'function_to_run'
            function_to_run = all_globals_namespace[function_to_run]
            # return the execution results
            results = function_to_run(dataset)
            # the results must be int or float
            if not isinstance(results, (int, float)):
                result_queue.put((None, False))
                return
            result_queue.put((results, True))
        except Exception as e:
            import traceback
            error_msg = traceback.format_exc()
            print(f"[Sandbox Error] {error_msg}")  # 控制台
            # if raise any exception, we assume the execution failed
            result_queue.put((None, False))

In [5]:
specification = r'''
from typing import List
import numpy as np

def compute_makespan(schedule: list[int], processing_times: np.ndarray) -> int:
    """
    Compute the makespan (total completion time) for a given job schedule in a PFSP.
    - schedule: list of job indices in the order they are processed.
    - processing_times: 2D numpy array of shape (num_jobs, num_machines) with processing times for each job on each machine.
    Returns the makespan (int) for the given order.
    """
    num_jobs = len(schedule)
    num_machines = processing_times.shape[1]
    if num_jobs == 0 or num_machines == 0:
        return 0

    completion_times = np.zeros((num_jobs, num_machines), dtype=int)
    first_job = schedule[0]
    completion_times[0, 0] = processing_times[first_job, 0]
    for m in range(1, num_machines):
        completion_times[0, m] = completion_times[0, m-1] + processing_times[first_job, m]

    for i in range(1, num_jobs):
        job = schedule[i]
        completion_times[i, 0] = completion_times[i-1, 0] + processing_times[job, 0]
        for m in range(1, num_machines):
            completion_times[i, m] = max(completion_times[i, m-1], completion_times[i-1, m]) + processing_times[job, m]

    return int(completion_times[-1, -1])


def local_search(schedule: list[int], processing_times: np.ndarray) -> list[int]:
    """
    A simple pairwise swap to improve the schedule.
    """
    best_schedule = schedule.copy()
    best_makespan = compute_makespan(best_schedule, processing_times)

    for i in range(len(schedule)):
        for j in range(i + 1, len(schedule)):
            new_schedule = schedule[:]
            new_schedule[i], new_schedule[j] = new_schedule[j], new_schedule[i]
            new_makespan = compute_makespan(new_schedule, processing_times)
            if new_makespan < best_makespan:
                best_makespan = new_makespan
                best_schedule = new_schedule

    return best_schedule


@funsearch.run
def evaluate(instances: dict) -> float:
    """
    FunSearch evaluation function that computes the average makespan across multiple datasets.
    - instances: dict mapping instance names to 2D numpy arrays (processing time matrices).
    Returns the negative mean makespan (float) for optimization.
    """
    makespans = []
    for name in instances:
        processing_times = instances[name]
        if not isinstance(processing_times, np.ndarray):
            print(f"[ERROR] Instance {name} is not ndarray")
            continue
        if not np.issubdtype(processing_times.dtype, np.integer):
            processing_times = processing_times.astype(int)

        if processing_times.ndim != 2 or processing_times.shape[0] < 2 or processing_times.shape[1] < 2:
            print(f"[WARNING] Skipping instance {name} due to invalid processing_times shape: {processing_times.shape}")
            continue

        schedule = neh_heuristic(processing_times)

        if not isinstance(schedule, list) or len(schedule) < 2:
            print(f"[WARNING] Skipping instance {name} because invalid schedule: {schedule}")
            continue

        ms = compute_makespan(schedule, processing_times)

        if ms == 0:
            print(f"[WARNING] Skipping instance {name} due to makespan=0")
            continue

        print(f"[INFO] Instance: {name}, Schedule: {schedule}, Makespan: {ms}")
        makespans.append(ms)

    if not makespans:
        return -1e9
    mean_makespan = np.mean(makespans)
    print(f"[INFO] Average Makespan: {mean_makespan}")
    return -float(mean_makespan)

@funsearch.evolve
def neh_heuristic(processing_times: np.ndarray) -> list[int]:
    """
    Basic NEH heuristic for PFSP.
    - Sort jobs by descending total processing times.
    - Insert each job into the position that minimizes partial makespan.
    """
    num_jobs, num_machines = processing_times.shape
    alpha = 0.7

    # Step 1: Compute total processing time for each job
    total_times = np.sum(processing_times, axis=1)

    # Step 2: Sort jobs in descending order of total processing time
    job_order = sorted(range(num_jobs), key=lambda x: np.sum(processing_times[x]))

    # Step 3: Build schedule by best insertion
    schedule = []
    for job in job_order:
        best_makespan = None
        best_schedule = None
        for pos in range(len(schedule) + 1):
            new_schedule = schedule[:pos] + [job] + schedule[pos:]
            ms = compute_makespan(new_schedule, processing_times)
            if best_makespan is None or ms < best_makespan:
                best_makespan = ms
                best_schedule = new_schedule
        schedule = best_schedule
    # Step 4: Local search
    schedule = local_search(schedule, processing_times)

    return schedule



'''


In [6]:
import numpy as np

def read_data(file_path):
    """
    正确读取 Taillard 风格数据文件（支持多个实例）。
    返回：{dataset_name: np.ndarray}
    """
    with open('/content/flowshop/dataset/tai100_10.txt', 'r') as file:
        lines = file.readlines()

    datasets = {}
    i = 0
    dataset_idx = 1

    while i < len(lines):
        line = lines[i].strip()

        if "number of jobs" in line:
            i += 1
            # 下一行是参数（5个数字）
            params_line = lines[i].strip()
            params = list(map(int, params_line.split()))
            num_jobs, num_machines = params[0], params[1]
            i += 2  # 跳过 "processing times :"

            processing_times = []
            for _ in range(num_machines):
                while i < len(lines) and lines[i].strip() == "":
                    i += 1  # 跳过空行
                job_line = lines[i].strip()
                nums = list(map(int, job_line.split()))
                processing_times.append(nums)
                i += 1

            datasets[f"dataset_{dataset_idx}"] = np.array(processing_times)
            dataset_idx += 1
        else:
            i += 1

    return datasets


In [7]:
import os
from implementation import funsearch
from implementation import config

if __name__ == '__main__':
    data_file = "/content/flowshop/dataset/tai100_10.txt"

    # 加载 Taillard 格式的数据
    datasets = read_data(data_file)

    print(f"成功加载了 {len(datasets)} 个数据集。")

    # 选一个或多个实例参与训练与评估
    instances = {
        name: matrix for name, matrix in datasets.items()
        if name.startswith("dataset_")  # 你可以改成指定某个名字
    }

    # 配置参数
    class_config = config.ClassConfig(llm_class=LLMAPI, sandbox_class=Sandbox)
    run_config = config.Config(samples_per_prompt=4, evaluate_timeout_seconds=30)
    global_max_sample_num = 80  # 限制生成样本数量

    # 启动 Funsearch
    funsearch.main(
        specification=specification,
        inputs=instances,
        config=run_config,
        max_sample_nums=global_max_sample_num,
        class_config=class_config,
        verbose=True,
        log_dir='../logs/evaluator.log/',
    )


成功加载了 10 个数据集。
[INFO] Instance: dataset_1, Schedule: [6, 3, 9, 8, 2, 0, 1, 5, 7, 4], Makespan: 6460
[INFO] Average Makespan: 6460.0
[INFO] Instance: dataset_2, Schedule: [9, 1, 5, 7, 2, 0, 4, 3, 6, 8], Makespan: 6155
[INFO] Average Makespan: 6155.0
[INFO] Instance: dataset_3, Schedule: [8, 6, 3, 1, 7, 5, 2, 0, 9, 4], Makespan: 6344
[INFO] Average Makespan: 6344.0
[INFO] Instance: dataset_4, Schedule: [8, 7, 9, 1, 0, 3, 4, 2, 5, 6], Makespan: 6474
[INFO] Average Makespan: 6474.0
[INFO] Instance: dataset_5, Schedule: [1, 7, 6, 8, 0, 5, 9, 3, 2, 4], Makespan: 6134
[INFO] Average Makespan: 6134.0
[INFO] Instance: dataset_6, Schedule: [0, 5, 8, 7, 6, 9, 1, 2, 4, 3], Makespan: 6007
[INFO] Average Makespan: 6007.0
[INFO] Instance: dataset_7, Schedule: [4, 6, 2, 9, 1, 3, 5, 7, 8, 0], Makespan: 6146
[INFO] Average Makespan: 6146.0
[INFO] Instance: dataset_8, Schedule: [9, 3, 2, 4, 7, 0, 8, 6, 1, 5], Makespan: 6304
[INFO] Average Makespan: 6304.0
[INFO] Instance: dataset_9, Schedule: [2, 7, 6, 3

INFO:absl:Best score of island 0 increased to -6277.5
INFO:absl:Best score of island 1 increased to -6277.5
INFO:absl:Best score of island 2 increased to -6277.5
INFO:absl:Best score of island 3 increased to -6277.5
INFO:absl:Best score of island 4 increased to -6277.5
INFO:absl:Best score of island 5 increased to -6277.5
INFO:absl:Best score of island 6 increased to -6277.5
INFO:absl:Best score of island 7 increased to -6277.5
INFO:absl:Best score of island 8 increased to -6277.5
INFO:absl:Best score of island 9 increased to -6277.5


def neh_heuristic(processing_times: np.ndarray) -> list[int]:
    """
    Basic NEH heuristic for PFSP.
    - Sort jobs by descending total processing times.
    - Insert each job into the position that minimizes partial makespan.
    """
    num_jobs, num_machines = processing_times.shape
    alpha = 0.7

    # Step 1: Compute total processing time for each job
    total_times = np.sum(processing_times, axis=1)
    
    # Step 2: Sort jobs in descending order of total processing time
    job_order = sorted(range(num_jobs), key=lambda x: np.sum(processing_times[x]))

    # Step 3: Build schedule by best insertion
    schedule = []
    for job in job_order:
        best_makespan = None
        best_schedule = None
        for pos in range(len(schedule) + 1):
            new_schedule = schedule[:pos] + [job] + schedule[pos:]
            ms = compute_makespan(new_schedule, processing_times)
            if best_makespan is None or ms < best_makespan:
                best_makespan = ms
    

INFO:absl:Best score of island 4 increased to -6253.0


def neh_heuristic(processing_times: np.ndarray) -> list[int]:
    """
    Basic NEH heuristic for PFSP.
    - Sort jobs by descending total processing times.
    - Insert each job into the position that minimizes partial makespan.
    """
    def score_strategy(job_order, processing_times):
        total_times = np.sum(processing_times, axis=1)
        alpha = 0.7
        machine_loads = [0] * processing_times.shape[1]
        scores = []
        for job in job_order:
            score = 0
            for i, machine_load in enumerate(machine_loads):
                new_machine_load = machine_load + processing_times[job][i]
                score += alpha * total_times[job] + (1-alpha) * new_machine_load - machine_load
            scores.append(score)
        return [job for _, job in sorted(zip(scores, job_order), reverse=True)]


    def best_insert_position(schedule, job, processing_times):
        best_makespan = float('inf')
        best_pos = -1
        for pos in range(len(schedul

INFO:absl:Best score of island 1 increased to -6253.0


def neh_heuristic(processing_times: np.ndarray) -> list[int]:
    """
    Basic NEH heuristic for PFSP.
    - Sort jobs by descending total processing times.
    - Insert each job into the position that minimizes partial makespan.
    """
    """
    Improved NEH heuristic for PFSP.
    - Modify scoring strategy, insertion logic, and local search to minimize makespan.
    """
    num_jobs, num_machines = processing_times.shape
    alpha = 0.7
    
    # Scoring Strategy: Dynamic alpha based on total processing times
    total_times = np.sum(processing_times, axis=1)
    job_order = sorted(range(num_jobs), key=lambda x: alpha * np.sum(processing_times[x]) + (1-alpha) * total_times[x], reverse=True)

    # Insertion Logic: Optimize position selection with early termination
    schedule = []
    for job in job_order:
        best_makespan = None
        best_pos = None
        for pos in range(len(schedule) + 1):
            new_schedule = schedule[:pos] + [job] + schedule[pos:]
         

INFO:absl:Best score of island 0 increased to -6262.5


def neh_heuristic(processing_times: np.ndarray) -> list[int]:
    """
    Basic NEH heuristic for PFSP.
    - Sort jobs by descending total processing times.
    - Insert each job into the position that minimizes partial makespan.
    """
    """
    Improved NEH heuristic for PFSP.
    - Custom scoring strategy.
    - Optimized insertion logic.
    - Enhanced local search technique.
    """
    num_jobs, num_machines = processing_times.shape
    
    # Custom scoring strategy
    job_scores = np.sum(processing_times, axis=1)  # Total processing time for each job
    sorted_jobs = np.argsort(job_scores)[::-1]  # Sort in descending order of total processing time
    
    # Optimized insertion logic
    schedule = []
    for job in sorted_jobs:
        best_makespan = float('inf')
        best_pos = 0
        for pos in range(len(schedule) + 1):
            new_schedule = schedule[:pos] + [job] + schedule[pos:]
            ms = compute_makespan(new_schedule, processing_times)
           

INFO:absl:Best score of island 7 increased to -6267.7


def neh_heuristic(processing_times: np.ndarray) -> list[int]:
    """
    Basic NEH heuristic for PFSP.
    - Sort jobs by descending total processing times.
    - Insert each job into the position that minimizes partial makespan.
    """
    """
    Improved NEH heuristic for PFSP.
    - Dynamic scoring strategy with machine load balancing.
    - Optimized insertion logic with early termination.
    - Efficient local search using tabu search.
    """

    def score_job(job: int, schedule: List[int]) -> float:
        """
        Compute the score of a job based on machine load balancing.
        """
        machine_loads = [0] * processing_times.shape[1]
        for pos in range(len(schedule) + 1):
            new_schedule = schedule[:pos] + [job] + schedule[pos:]
            for i, j in enumerate(new_schedule):
                machine_loads[i % len(machine_loads)] += processing_times[j][i // len(machine_loads)]
        alpha = 0.7
        return alpha * np.max(machine_loads) + (1 - a

INFO:absl:Best score of island 9 increased to -6275.8


def neh_heuristic(processing_times: np.ndarray) -> list[int]:
    """
    Basic NEH heuristic for PFSP.
    - Sort jobs by descending total processing times.
    - Insert each job into the position that minimizes partial makespan.
    """
    num_jobs, num_machines = processing_times.shape
    alpha = 0.7

    # Scoring Strategy: Modified based on dynamic alpha and machine load balancing
    job_scores = [np.sum(processing_times[j]) + alpha * np.max(processing_times[j]) for j in range(num_jobs)]
    job_order = sorted(range(num_jobs), key=lambda x: job_scores[x], reverse=True)

    # Insertion Logic: Optimize position selection during insertion
    schedule = []
    for job in job_order:
        best_makespan = float('inf')
        best_pos = 0
        for pos in range(len(schedule) + 1):
            new_schedule = schedule[:pos] + [job] + schedule[pos:]
            ms = compute_makespan(new_schedule, processing_times)
            if ms < best_makespan:
                best_makespan = 

INFO:absl:Best score of island 6 increased to -6275.8


def neh_heuristic(processing_times: np.ndarray) -> list[int]:
    """
    Basic NEH heuristic for PFSP.
    - Sort jobs by descending total processing times.
    - Insert each job into the position that minimizes partial makespan.
    """
    """
    Advanced NEH heuristic for PFSP.
    - Implementing dynamic scoring strategy, optimized insertion logic, and efficient local search.
    """
    num_jobs, num_machines = processing_times.shape
    
    def score_job(job: int, schedule: List[int]) -> float:
        """
        Calculate the score for a job based on dynamic alpha and machine load balancing.
        """
        alpha = 0.7
        job_loads = [0] * num_machines
        for pos in range(len(schedule)):
            machine = pos % num_machines
            job_loads[machine] += processing_times[job][schedule[pos]]
        
        total_load = sum(job_loads)
        max_load = max(job_loads)
        
        return alpha * total_load + (1 - alpha) * max_load
    
    schedule = 