In [None]:
import collections

from ortools.sat.colab import visualization
from ortools.sat.python import cp_model


def jssp(durations: list[list[int]], machines: list[list[int]]):
    # Creates the solver.
    model = cp_model.CpModel()

    machines_count = len(machines[0])
    jobs_count = len(machines)
    all_machines = range(0, machines_count)
    all_jobs = range(0, jobs_count)

    # Tổng thời gian hoàn thành của tất cả các task
    horizon = sum([sum(durations[i]) for i in all_jobs])

    # Tạo một kiểu dữ liệu mới có tên là task_type, dùng để lưu trữ thông tin của mỗi task
    task_type = collections.namedtuple("task_type", "start end interval")

    """
    Ràng buộc thời gian bắt đầu và kết thúc của mỗi task.
    Điều kiện : 
        - Thời gian bắt đầu và kết thúc của mỗi task phải thỏa mãn điều kiện : 0 <= start <= end <= horizon
        -  start + duration = end
    """
    all_tasks = {}
    for i in all_jobs:
        for j in all_machines:
            start_var = model.NewIntVar(0, horizon, "start_%i_%i" % (i, j))
            duration = durations[i][j]
            end_var = model.NewIntVar(0, horizon, "end_%i_%i" % (i, j))
            interval_var = model.NewIntervalVar(
                start_var, duration, end_var, "interval_%i_%i" % (i, j)
            )
            all_tasks[(i, j)] = task_type(
                start=start_var, end=end_var, interval=interval_var
            )

    """
    Ràng buộc mỗi máy chỉ được thực hiện một task tại một thời điểm.
    """
    machine_to_jobs = {}
    for i in all_machines:
        machines_jobs = []
        for j in all_jobs:
            for k in all_machines:
                if machines[j][k] == i:  # Tìm tất cả các task cần thực hiện trên máy i
                    machines_jobs.append(all_tasks[(j, k)].interval)
        machine_to_jobs[i] = machines_jobs
        # Mỗi máy chỉ được thực hiện một task tại một thời điểm (các khoảng thời gian không được chồng lấn lên nhau)
        model.AddNoOverlap(machines_jobs)

    """
    Ràng buộc tuần tự công nghệ: 
        Với mỗi task của job, thời gian bắt đầu của task tiếp theo phải >= thời gian kết thúc của task trước đó.
    """
    for i in all_jobs:
        for j in range(0, machines_count - 1):
            model.Add(all_tasks[(i, j + 1)].start >= all_tasks[(i, j)].end)

    """
    Mục tiêu cần tối ưu:
        Tìm thời gian hoàn thành của job cuối cùng (makespan) là nhỏ nhất.
    """
    obj_var = model.NewIntVar(0, horizon, "makespan")
    model.AddMaxEquality(
        obj_var, [all_tasks[(i, machines_count - 1)].end for i in all_jobs]
    )  # Thời gian lớn nhất để hoàn thành task cuối cùng của tất cả các job chính là thời gian hoàn thành tất cả các jobs (makespan)
    model.Minimize(obj_var)

    # Solve model.
    solver = cp_model.CpSolver()
    solver.parameters.log_search_progress = True
    # setting time limit
    solver.parameters.max_time_in_seconds = 60.0
    status = solver.Solve(model)

    # Output solution.
    if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
        print("Optimal" if status == cp_model.OPTIMAL else "Feasible")
        starts = [
            [solver.Value(all_tasks[(i, j)][0]) for j in all_machines] for i in all_jobs
        ]
        visualization.DisplayJobshop(starts, durations, machines, "FT")
        print(starts)
        print("Optimal makespan: %i" % solver.ObjectiveValue())
        # # Print sequences of jobs assigned to each machine.
        # starts = [
        #     [(starts[i][j], machines[i][j]) for j in all_machines] for i in all_jobs
        # ]
        # starts = [sorted(job, key=lambda x: x[1]) for job in starts]
        # job_sq = []
        # for start in zip(*starts):
        #     start_job = sorted(
        #         [(i, j[0]) for i, j in enumerate(start)], key=lambda x: x[1]
        #     )
        #     job_sq.append([i[0] for i in start_job])
        # print()
        # print("Optimal job sequence: ")
        # for i in job_sq:
        #     print(*i)
    else:
        print("No solution found.")


if __name__ == "__main__":
    import numpy as np

    np.random.seed(215)
    # generate random data for durations and machines
    n_job = 15
    n_machine = 3
    durations = np.random.randint(1, 100, size=(n_job, n_machine)).tolist()
    machines = [np.random.permutation(n_machine).tolist() for _ in range(n_job)]
    print("Durations: ")
    for duration in durations:
        print(*duration)
    print("Machines: ")
    for machine in machines:
        print(*machine)
    jssp(durations, machines)

In [1]:
from jobshop.data_gen import jobshop_data_gen
from jobshop.cp import CPModel

In [6]:
durations, machines = jobshop_data_gen(4, 3,5)
model = CPModel(durations, machines)
model.solve()


OPTIMAL : 257.0


In [7]:
model.summary()

Constraint programming model summary: 
CpSolverResponse summary:
status: OPTIMAL
objective: 257
best_bound: 257
integers: 13
booleans: 13
conflicts: 1
branches: 13
propagations: 3
integer_propagations: 44
restarts: 1
lp_iterations: 0
walltime: 0.0592525
usertime: 0.0592526
deterministic_time: 1.4072e-06
gap_integral: 8.21485e-06
solution_fingerprint: 0xbb61dfade86c773d

default_lp
