# Modelling scheduling problem as constraint programming 

In this AIBT you learnt how to model a combinatorial optimisation problem using either Constraint Programming or Mixed Integer Linear programming paradigm. https://www.xoolive.org/optim4ai/

In this notebook you will be able to apply your modelling abilities to model RCPSP problem.
Contrary to your first course, a more scheduling oriented CP solver will be used. It is the [Ortools CPSAT solver](https://developers.google.com/optimization/cp/cp_solver)

<div class="alert alert-warning">
    <b>Questions</b> appear in yellow.
</div>

## Ortools basics

In [None]:
from ortools.sat.python import cp_model

### Creating a CP Model : 

In [None]:
model = cp_model.CpModel()

### Creating your first integer variables

In [None]:
# Doc : 
model.NewIntVar??

In [None]:
x = model.NewIntVar(0, 10, "x")
y = model.NewIntVar(0, 20, "y")

### Creating your first constraint

In [None]:
model.Add(x+y<=10)

### Setting objective function

In [None]:
model.Maximize(x+y)

### Solving

In [None]:
def solve(model, enumerate_all: bool=False):
    solver = cp_model.CpSolver()
    solver.parameters.enumerate_all_solutions = enumerate_all
    solver.parameters.max_time_in_seconds = 10
    
    status = solver.Solve(model)
    status_human = solver.StatusName(status)
    print(status_human)
    return solver

In [None]:
solver = solve(model)
value_x = solver.Value(x)
value_y = solver.Value(y)
print("Value x:", value_x, "\nValue y:", value_y)

### Boolean variables
One specific integer variable is the boolean ones {0, 1}, for which the CP<b>SAT</b> solver is very efficient (SAT denomination is generally for boolean problems.

In [None]:
def testing_model():
    model = cp_model.CpModel()
    x = model.NewBoolVar("x")
    y = model.NewBoolVar("y")
    return model, x, y

### Boolean constraints

#### Implications
Constraint $x\rightarrow y$, or in english y is 1 if x is 1.

In [None]:
model, x, y = testing_model()
model.AddImplication(x, y)

### And, Or, Xor

In [None]:
model,x,y = testing_model()
model.AddBoolAnd(x, y)
solver = solve(model)
value_x = solver.Value(x)
value_y = solver.Value(y)
print("Value x:", value_x, "\nValue y:", value_y)

In [None]:
model,x,y = testing_model()
model.AddBoolOr(x, y)
model.Maximize(x+y)
solver = solve(model)
value_x = solver.Value(x)
value_y = solver.Value(y)
print("Value x:", value_x, "\nValue y:", value_y)

In [None]:
model,x,y = testing_model()
model.AddBoolXOr(x, y)
model.Maximize(x+y)
solver = solve(model)
value_x = solver.Value(x)
value_y = solver.Value(y)
print("Value x:", value_x, "\nValue y:", value_y)

### Advanced constraint

CPSAT also proposes more advanced sugar syntax for other boolean constraint. Look to the following constraint documentation, which are self-explanatory.

In [None]:
#model.AddAtLeastOne?
#model.AddAtMostOne?
model.AddExactlyOne?

In [None]:
import pandas as pd
def model_many_variable():
    model = cp_model.CpModel()
    df = pd.Series(range(10))
    x = model.NewBoolVarSeries("x", df.index)
    return model, x

In [None]:
model, x = model_many_variable()
#model.AddAtLeastOne(x)
#model.AddAtMostOne(x)
model.AddExactlyOne(x)
solver = solve(model)
model.Maximize(sum(x))
solver = solve(model)
print(solver.BooleanValues(x))

### Enforcement constraint
In complex modeling problems, we might need conditional constraint, i.e constraint that are activated (true), when a given condition is satisfied. if Y is a boolean variable : 

Y$\rightarrow$ ConstraintVerified
is expressed as ```model.Add(Constraint).OnlyEnforceIf(Y)``` using CPSat.



From [the official guide](https://developers.google.com/optimization/cp/channeling#if-then-else_expressions):

Let's say you want to implement the following: "If x is less than 5, set y to 0. Otherwise, set y to 10-x". You can do this creating an intermediate boolean variable b that is true if x is greater than or equal to 5, and false otherwise:

- b implies y == 10 - x
- not(b) implies y == 0

In [None]:
model = cp_model.CpModel()
x = model.NewIntVar(0, 10, "x")
y = model.NewIntVar(0, 10, "y")
b = model.NewBoolVar("b")
model.Add(x<=5).OnlyEnforceIf(b.Not())
model.Add(x>5).OnlyEnforceIf(b)
model.Add(y==10-x).OnlyEnforceIf(b)
model.Add(y==0).OnlyEnforceIf(b.Not())
class VarArraySolutionPrinter(cp_model.CpSolverSolutionCallback):
    """Print intermediate solutions."""

    def __init__(self, variables: list[cp_model.IntVar]):
        cp_model.CpSolverSolutionCallback.__init__(self)
        self.__variables = variables

    def on_solution_callback(self) -> None:
        for v in self.__variables:
            print(f"{v}={self.Value(v)}", end=" ")
        print()
solver = cp_model.CpSolver()
solver.parameters.enumerate_all_solutions = True
status = solver.Solve(model, VarArraySolutionPrinter([x,y,b]))

### Small exercise : 

<div class="alert alert-warning">
<b>Exercice #1: (medium)</b><br />
Encode the following problem : 
    
- X is an array of boolean variable of size 20
- Y is an array of boolean variable of size 19
- Z is an integer variable
With the following constraint : 
- for i $\in [0, 18]$ , $Y[i]$ is 1 when $X[i]==X[i+1]$
- for i $\in [0, 16]$, $Y[i]!=Y[i+2]$
- Z = sum(X[::3])+sum(Y)
Goal is to maximize Z
</div>

In [None]:
model.AddBoolAnd??

In [None]:
def exercise_model(N=20):
    model = cp_model.CpModel()
    df = pd.Series(range(N))
    X = ??
    Y = ??
    Z = ??
    return model, X, Y, Z

In [None]:
# %load correction/nb2_small_cpexercise.py

In [None]:
def solution_checker(X, Y, Z):
    N = len(X)
    for i in range(N-1):
        if Y[i]:
            assert(X[i]==X[i+1])
        if i < N-3:
            assert(Y[i]!=Y[i+2])
    assert Z == sum(X[::3])+sum(Y)
    
model, X, Y, Z = exercise_model(N=20)
solver = solve(model)
X_v, Y_v, Z_v = (solver.Values(X), solver.Values(Y), solver.Value(Z))
print(X_v, Y_v)
solution_checker(X_v, Y_v, Z_v)

### Interval variables for scheduling

CP-SAT solver from Ortools has a big focus on scheduling problems and it uses the concept of interval variables to efficiently model them.

In [None]:
model.NewIntervalVar??

Here's a simple example where we have 1 task to schedule with a duration of 10 and we want to schedule it as late as possible considering the allowed range $[0, 20]$

In [None]:
model = cp_model.CpModel()
start = model.NewIntVar(0, 20, "start")
end = model.NewIntVar(0, 20, "end")
duration = 10
task_var = model.NewIntervalVar(start, duration, end, name="task")
model.Maximize(start)

solver = cp_model.CpSolver()
solver.parameters.enumerate_all_solutions = True
status = solver.Solve(model)
status_human = solver.StatusName(status)
print(status_human)
print("Task scheduled between time : ", solver.Value(start), solver.Value(end))

### Global constraints on interval

#### No overlap constraint
The <b>NoOverlap</b> constraint takes as input a list of interval variables, and forbid any overlap between the intervals. It can be usefull for problem where a given list of task have to be done by the same machine or worker.

In [None]:
# Look at the documentation in ortools library
model.AddNoOverlap?

#### Cumulative resource constraint
The <b>Cumulative</b> constraint insures that at any time, a set of interval consuming a given quantity of resource don't overconsume a given capacity.

In [None]:
model.AddCumulative?

#### Optional interval variables
Interval variable can be set "optional" using a boolean flag variable. If an interval is <b>not</b> activated (flag=False), then the considered interval is ignored in the global constraint they are involved in, and also the implicit constraint (end=start+size) are not enforced. -> Behind the scene, the flag binary variable is introduced in some ```OnlyEnforceIf``` constraints.

Let's create a small problem with 2 task, one which should start between [0, 20] and finish [0, 20] having a duration of 10,
and the other one starting between [0, 10], finishing [0, 20] having a duration of 12. We want to schedule them without overlapping.

In [None]:
def small_model_scheduling():
    model = cp_model.CpModel()
    start_1 = model.NewIntVar(0, 20, "start_1")
    end_1 = model.NewIntVar(0, 20, "end_1")
    is_present_1 = model.NewBoolVar("is_present_1")
    duration_1 = 10
    task_var_1 = model.NewOptionalIntervalVar(start=start_1,
                                              size=duration_1, 
                                              end=end_1, 
                                              is_present=is_present_1,
                                              name="task")
    start_2 = model.NewIntVar(0, 10, "start_2")
    end_2 = model.NewIntVar(0, 20, "end_2")
    is_present_2 = model.NewBoolVar("is_present_2")
    duration_2 = 12
    task_var_2 = model.NewOptionalIntervalVar(start=start_2,
                                              size=duration_2, 
                                              end=end_2, 
                                              is_present=is_present_2,
                                              name="task")
    
    model.AddNoOverlap([task_var_1, task_var_2])
    return model, start_1, end_1, is_present_1, start_2, end_2, is_present_2

In [None]:
model, start_1, end_1, is_present_1, start_2, end_2, is_present_2 = small_model_scheduling()
solver = solve(model)
print(solver.Values([start_1, end_1, is_present_1]),
      solver.Values([start_2, end_2, is_present_2]))

Strange isn't it, none of the activities are scheduled !! it's because they are optional.
Let's try to force their presence : 

In [None]:
# Exercise : 
## Force the presence of task 1 and 2, what do you observe ?? 

In [None]:
model.AddBoolAnd([is_present_1, is_present_2])
solver = solve(model)
print(solver.Values([start_1, end_1, is_present_1]),
      solver.Values([start_2, end_2, is_present_2]))

Instead of forcing the presence, we can add an objective on the number of interval activated ?!!

In [None]:
model, start_1, end_1, is_present_1, start_2, end_2, is_present_2 = small_model_scheduling()
model.Maximize(sum([is_present_1, is_present_2]))
solver = solve(model)
print(solver.Values([start_1, end_1, is_present_1]),
      solver.Values([start_2, end_2, is_present_2]))

Better isn't it ??

### You can play with this concept a bit more if needed :

In [None]:
model, start_1, end_1, is_present_1, start_2, end_2, is_present_2 = small_model_scheduling()
model.AddAtLeastOne(is_present_1, is_present_2)
makespan = model.NewIntVar(0, 20, name="makespan")
model.Add(makespan>end_1).OnlyEnforceIf(is_present_1)
model.Add(makespan>end_2).OnlyEnforceIf(is_present_2)
model.Minimize(makespan+is_present_1+20*is_present_2)
solver = solve(model)
print(solver.Values([start_1, end_1, is_present_1]),
      solver.Values([start_2, end_2, is_present_2]))

## Job shop problem

<div class="alert alert-warning">
<b>Problem #1: (medium)</b><br />

Simple job shop problem
</div>

<div class="alert alert-info">
The original problem is stated as follows:
<blockquote>
Each job consists of a sequence of tasks, which must be performed in a given order, and each task must be processed on a specific machine. For example, the job could be the manufacture of a single consumer item, such as an automobile. The problem is to schedule the tasks on the machines so as to minimize the length of the schedule—the time it takes for all the jobs to be completed.
There are several constraints for the job shop problem:
No task for a job can be started until the previous task for that job is completed.
A machine can only work on one task at a time.
A task, once started, must run to completion.
<br/><br/>
</blockquote>

</div>

## Object oriented placeholder
Let's define some structure to store the input data of a job shop problem.
A jobshop problem is defined by a number of machines, a list of jobs themselves composed of several Subjob to execute in order. 

In [None]:
from typing import List
class Subjob:
    machine_id: int
    processing_time: int
    def __init__(self, machine_id, processing_time):
        self.machine_id = machine_id
        self.processing_time = processing_time
    def __str__(self):
        return f"machine and duration : {self.machine_id, self.processing_time}"
        
class JobShopProblem:
    n_jobs: int
    n_machines: int
    list_jobs: List[List[Subjob]]
    def __init__(self, list_jobs: List[List[Subjob]], n_jobs: int=None, n_machines: int=None):
        self.n_jobs = n_jobs
        self.n_machines = n_machines
        self.list_jobs = list_jobs
        if self.n_jobs is None:
            self.n_jobs = len(list_jobs)
        if self.n_machines is None:
            self.n_machines = len(set([y.machine_id for x in self.list_jobs
                                       for y in x]))
        # Store for each machine the list of subjob given as (index_job, index_subjob)
        self.job_per_machines = {i: [] for i in range(self.n_machines)}
        for k in range(self.n_jobs):
            for sub_k in range(len(list_jobs[k])):
                self.job_per_machines[list_jobs[k][sub_k].machine_id] += [(k, sub_k)]

### Example : 

In [None]:
job_0 = [Subjob(machine_id=0, processing_time=3), Subjob(1, 2), Subjob(2, 2)]
job_1 = [Subjob(0, 2), Subjob(2, 1), Subjob(1, 4)]
job_2 = [Subjob(1, 4), Subjob(2, 3)]
example_jobshop = JobShopProblem(list_jobs=[job_0, job_1, job_2])
print(example_jobshop.job_per_machines)

### Solution encoding : 
we will choose how to encode a schedule solution in a specific object too.

In [None]:
from typing import Tuple
class SolutionJobshop:
    def __init__(self, schedule: List[List[Tuple[int, int]]]):
        # For each job and subjob, start and end time given as tuple of int.
        self.schedule = schedule

### Checking of a solution
We can code a python function verifying if a solution is valid. That could help you debug your CP model afterwards.

In [None]:
def check_solution(solution: SolutionJobshop, problem: JobShopProblem):
    if len(solution.schedule)!=problem.n_jobs:
        print("solution schedule should be same size as the problem")
        return False
    for k in range(problem.n_jobs):
        if len(solution.schedule[k])!=len(problem.list_jobs[k]):
            print(f"solution schedule for task n°{k} should be coherent with problem")
            return False
        for sub_k in range(len(solution.schedule[k])):
            if solution.schedule[k][sub_k][1]-solution.schedule[k][sub_k][0]!=problem.list_jobs[k][sub_k].processing_time:
                print(f"Duration of task should be coherent with problem")
                return False
            if sub_k>=1:
                if not (solution.schedule[k][sub_k][0]>=solution.schedule[k][sub_k-1][1]):
                    print(f"Precedence constraint between consecutive subtask not respected")
                    return False
    for machine in problem.job_per_machines:
        sorted_job = sorted([solution.schedule[x[0]][x[1]]
                             for x in problem.job_per_machines[machine]])
        for l in range(1, len(sorted_job)):
            if not (sorted_job[l][0]>=sorted_job[l-1][1]):
                print("Some task are overlaping in one machine")
                return False
    print("Constraint satisfied")
    return True

### Plotting of a solution

In [None]:
import matplotlib.pyplot as plt
import matplotlib
from matplotlib.collections import PatchCollection
from matplotlib.patches import Polygon as pp
from shapely.geometry import Polygon
from matplotlib.font_manager import FontProperties
def plot_solution(solution: SolutionJobshop, problem: JobShopProblem):
    fig, ax = plt.subplots(1)
    patches = []
    for machine in problem.job_per_machines:
        for task in problem.job_per_machines[machine]:
            time_start, time_end = solution.schedule[task[0]][task[1]]
            polygon = Polygon(
                        [
                            (time_start, machine-0.2),
                            (time_end, machine-0.2),
                            (time_end, machine+0.2),
                            (time_start, machine+0.2),
                            (time_start, machine-0.2),
                        ]
                    )
            ax.annotate(str(task),
                xy=((3*time_start+time_end)/4, machine),
                font_properties=FontProperties(size=7, weight="bold"),
                verticalalignment="center",
                horizontalalignment="left",
                color="k",
                clip_on=True,
            )
            x, y = polygon.exterior.xy
            ax.plot(x, y, zorder=-1, color="b")
            patches.append(pp(xy=polygon.exterior.coords))
    p = PatchCollection(patches, cmap=matplotlib.colormaps.get_cmap("Blues"), alpha=0.4)
    ax.add_collection(p)
    ax.set_yticks(range(problem.n_machines))
    ax.set_yticklabels(
        tuple([f"machine {j}" for j in range(problem.n_machines)]), fontdict={"size": 5}
    )
    return fig, ax

### Manual solution computation
<div class="alert alert-warning">
<b>Manual solution</b><br />

Here you are asked to build yourself a solution, that pass the check solution and that you can plot.
</div>

In [None]:
handcrafted_solution = SolutionJobshop([[[?, ?], [?, ?], [?, ?]], 
                                        [[?, ?], [?, ?], [?, ?]], 
                                        [[?, ?], [?, ?]]])
check = check_solution(solution=handcrafted_solution, problem=example_jobshop)
plot_solution(handcrafted_solution, example_jobshop)
print("Solution respect the jobshop constraints : ", check)

In [None]:
#%load correction/nb2_jobshophandcrafted.py

### CP Modelling
Constraint programming powerness can help our scheduling tasks by providing good quality schedules fastly. In this section you will code the CP model of jobshop problem

<div class="alert alert-warning">
<b>CP solver implementation</b><br />

Job shop problem
</div>

In [None]:
class SolverJobShop:
    def __init__(self, jobshop_problem: JobShopProblem):
        self.jobshop_problem = jobshop_problem
        self.model = cp_model.CpModel()
        self.variables = {}
    
    def init_model(max_time: int):
        # Write variables, constraints
        pass
        
    def solve() -> SolutionJobshop:
        self.init_model()
        solver = cp_model.CpSolver()
        solver.parameters.max_time_in_seconds = 10
        status = solver.Solve(self.model)
        status_human = solver.StatusName(status)
        print(status_human)
        # Code the reconstruction of the
        return
        

In [None]:
# %load correction/nb2_cpmodel_cell.py

### Test the solver on benchmark


In [None]:
from jsplib_parser import create_jsplib_instance, JSPLIBInstance #, instance_names, instance_opti

model = create_jsplib_instance("abz5")
pb = model.jsplib_to_jobshop()
pb = JobShopProblem(list_jobs=[[Subjob(**x) for x in y] for y in pb])
print(pb.n_jobs)
print(pb.n_machines)
print(pb.list_jobs[0][0])
solver = SolverJobShop(jobshop_problem=pb)
solution = solver.solve(max_time=10000)
check_solution(solution, pb)
plot_solution(solution, pb)