In [13]:
from ortools.sat.python import cp_model

# Tasks
- Duration
- Hard start and end time?
- Concurrent?
- Precedence
- Sentiment (first, mid, end)
- Deadline slot

In [14]:
# CONSTANTS

WORKING_HOURS = 8
SUBDIVISIONS = 4
DAYS = 5
NUM_SLOTS = WORKING_HOURS * SUBDIVISIONS * DAYS
DEADLINE_PENALTY_CONSTANT = 1
SENTIMENT_PENALTY_CONSTANT = 5000



In [15]:
# PRIORITY FUNCTION
def deadline_penalty(end, deadline):
    return DEADLINE_PENALTY_CONSTANT * (NUM_SLOTS - (deadline - end))


In [16]:
class Task:
    def __init__(self, duration, start_time, concurrent, precedes, sentiment, deadline, name):
        self.duration = duration
        # OPTIONAL
        self.start_time = start_time
        self.concurrent = concurrent
        # INDEX IN ARRAY
        self.precedes = precedes
        self.sentiment = sentiment
        self.deadline = deadline
        self.name = name

In [17]:
class TaskScheduler:
    def __init__(self, tasks):
        self.tasks = tasks

In [18]:
def create_interval_variables(self):
    model = self.model
    self.schedule = []
    self.demands = []
    for t in self.tasks:
        start, end, interval = None, None, None
        if t.start_time == None:
            start = model.NewIntVar(0, NUM_SLOTS, f'Task {t.name} start')
            end = model.NewIntVar(1, NUM_SLOTS, f'Task {t.name} end')
        else:
            start = model.NewIntVar(t.start_time, t.start_time, f'Task {t.name} start')
            end = model.NewIntVar(t.start_time + t.duration, t.start_time + t.duration, f'Task {t.name} end')
            
        model.Add(end <= t.deadline)
        interval = model.NewIntervalVar(start, t.duration, end, f'Task {t.name} interval')
        interval.start = start
        interval.end = end
        self.demands.append(0 if t.concurrent else 1)
        self.schedule.append(interval)
TaskScheduler.create_interval_variables = create_interval_variables

In [19]:
def create_overlapping_constraints(self):
    model = self.model
    model.AddCumulative(self.schedule, self.demands, 1)
TaskScheduler.create_overlapping_constraints = create_overlapping_constraints

In [20]:
def create_precedence_constraints(self):
    for i, t in enumerate(self.tasks):
        for succ in t.precedes:
            self.model.Add(self.schedule[i].end <= self.schedule[succ].start)
TaskScheduler.create_precedence_constraints = create_precedence_constraints

In [21]:
def create_scheduling_penalties(self):
    self.penalty = self.model.NewIntVar(0, 2000000000, 'Scheulding Penalties')
    cumul = 0
    for i, s in enumerate(self.schedule):
        tmp = self.model.NewIntVar(-400, 400, '')
        squared = self.model.NewIntVar(0, 3000000, '')
        p = deadline_penalty(s.end, self.tasks[i].deadline)
        self.model.Add(tmp == p)
        self.model.AddMultiplicationEquality(squared, [tmp, tmp])
        cumul += squared
        ##### TODO ###### Add sentiment penalties
        
    self.model.Add(self.penalty == cumul)

TaskScheduler.create_scheduling_penalties = create_scheduling_penalties    

In [22]:
def solve_model(self):
    self.model = cp_model.CpModel()
    self.solver = cp_model.CpSolver()
    
    self.create_interval_variables()
    self.create_overlapping_constraints()
    self.create_precedence_constraints()
    self.create_scheduling_penalties()
    self.model.Minimize(self.penalty)
    
    self.solver.parameters.num_search_workers = 4
    m = self.solver.Solve(self.model)
    print(self.solver.StatusName())

    if m in [cp_model.FEASIBLE, cp_model.OPTIMAL]:
        return [(self.solver.Value(s.start), self.solver.Value(s.end)) for s in self.schedule]

TaskScheduler.solve_model = solve_model

# Examples (Considerations)

In [23]:
# duration, start_time, concurrent, precedes, sentiment, deadline, name

### 1. Provided timeslots are always respected

In [28]:
scheduler = TaskScheduler([Task(1, 0, False, [], None, 160, "Coffee"), Task(10, None, False, [], None, 60, "Task1"), Task(3, None, False, [1], None, 140, "Task2")])
scheduler.solve_model()

OPTIMAL


[(0, 1), (4, 14), (1, 4)]

Note that coffee is scheduled at its given start time, even though the deadline is very far away. To demonstrate this changes, let's see what happens if I don't give Coffee a start time.

In [30]:
scheduler = TaskScheduler([Task(1, None, False, [], None, 159, "Coffee"), Task(10, None, False, [], None, 60, "Task1"), Task(3, None, False, [1], None, 140, "Task2")])
scheduler.solve_model()

OPTIMAL


[(13, 14), (3, 13), (0, 3)]

### 2. Then, dependencies are respected

In [31]:
scheduler = TaskScheduler([Task(1, None, False, [], None, 4, "Task1"), Task(3, None, False, [0], None, 140, "Task2")])
scheduler.solve_model()

OPTIMAL


[(3, 4), (0, 3)]

Even though Task 1 has a much closer deadline than Task 3, precedence is always respected, and Task 2 must precede Task 1. We can see what happens if we get rid of precedence.

In [32]:
scheduler = TaskScheduler([Task(1, None, False, [], None, 4, "Task1"), Task(3, None, False, [], None, 140, "Task2")])
scheduler.solve_model()

OPTIMAL


[(0, 1), (1, 4)]

Note that the solver will simply fail if you provide an fixed time for a task that has dependencies, and there isn't enough time

In [33]:
scheduler = TaskScheduler([Task(5, 2, False, [], None, 100, "Task1"), Task(3, None, False, [0], None, 140, "Task2")])
scheduler.solve_model()

INFEASIBLE


Task 2 must precede Task 1, but Task 1 was given a fixed start time of 2, while Task 2 requires 3 units of time

### 3. Then, deadlines/priorities are respected

In [34]:
scheduler = TaskScheduler([Task(3, None, False, [], None, 4, "Task1"), Task(3, None, False, [], None, 140, "Task2")])
scheduler.solve_model()

OPTIMAL


[(0, 3), (3, 6)]

Clearly, Task 1 has a much closer deadline, so we schedule Task 1 first.