# ST7 Planification quotidienne d’une équipe mobile
# Phase III

## Importation de modules

In [1]:
# module importation
import numpy as np
from math import ceil
import matplotlib.pyplot as plt

# utilities
from utils import *
from copy import deepcopy

# model classes for employees and nodes
from models_v2 import Employee, Node, Task, Home, Unavail

Déclaration des constantes et des indices :

In [2]:
W = U = T = V = 0
employees = homes = tasks = unavails = nodes = []

## Fonction de lecture de données

In [3]:
def load_data_from_path(path_to_instance: str):
    # load employee data
    Employee.load_excel(path_to_instance)

    # load node data
    Node.clear_previous_data()
    for cls in [Home, Task, Unavail]:
        cls.load_excel(path_to_instance)
    Node.initialize_distance()

    # constants
    global W, U, T, V
    (W, U, T, V) = (Task.count, Unavail.count, Employee.count, Node.count)

    # indices of employees, homes, lunches, tasks, unavailabilities
    global employees, homes, tasks, unavails, nodes
    employees = list(range(T))
    homes = list(range(T))
    tasks = list(range(T, T + W))
    unavails = list((range(T + W, V)))
    nodes = list(range(V))

Lecture de données de test

In [4]:
path_to_test = path_bordeaux_v2 = "./data/InstancesV2/InstanceBordeauxV2.xlsx"
load_data_from_path(path_to_test)

In [8]:
def open_intervals(i):
    d, f = Node.list[i].opening_time, Node.list[i].closing_time
    task_duration = Node.list[i].duration
    l = [d]
    for s, e in Node.list[i].closed_intervals:
        l.append(s)
        l.append(e)
    l.append(f)
    return [(l[j],l[j+1]) for j in range(0,len(l),2) if l[j+1] - l[j] >= task_duration]

## Implémentation de la classe Solution

In [5]:
class Solution:
    print_warning = True  # whether to print warning or not when validating

    def __init__(self):
        """instantiate new solution instance that adapts to the Employee and Node data"""
        self.x = np.zeros((V, V), dtype=np.intc)
        self.y = np.zeros((T, W), dtype=np.intc)
        self.b = np.zeros(W, dtype=np.intc)
        self.l = np.zeros((T, V), dtype=np.intc)
        self.p = np.zeros(T, dtype = np.intc)
        self.t = np.zeros(W, dtype = np.intc) # t[i] = indice du segment utilisé de disponibilité de la tache i
        #Si la tache i n'est pas effectuée, on prend t[i] = -1

    @classmethod
    def set_warning(cls, print_warning: bool):
        """set whether to print warning when validating data"""
        cls.print_warning = print_warning

    @classmethod
    def warn(cls, warning: str, print_color="yellow"):
        """
        Print a text in yellow if print_warning set to True
        :param warning: the warning message we want to print
        :param print_color: the color of the printed message, defaulted to yellow
        """
        if not cls.print_warning: return # does nothing if print_warning set to False
        correspondance = {
            "yellow": Colors.WARNING,
            "cyan": Colors.CYAN,
            "green": Colors.GREEN,
            "red": Colors.FAIL
        }
        color = correspondance[print_color] if print_color in correspondance.keys() else Colors.WARNING
        print(f"{color}Error: {warning}{Colors.NORMAL}")

    def copy(self):
        """Return a deep copy of the instance"""
        return deepcopy(self)

    def validate_binary_vars(self):
        """Verify that x, y and l only contain binary variables"""

        def is_binary(arr: np.array):
            return np.min(arr) >= 0 and np.max(arr) <= 1

        to_verify = [(self.x, "x"), (self.y, "y"), (self.l, "l")]
        for arr, name in to_verify:
            if is_binary(arr):
                continue
            Solution.warn(f"{name} should contain binary variables, values other than 0 and 1 detected.")
            return False
        return True

    def validate(self):
        """
        validate the modifications
        :return: whether the constraints are verified
        """

        # verify that x, y and l contain only 0 and 1
        if not self.validate_binary_vars(): return False

        # verify that a node is not linked to itself
        for i in nodes:
            if not self.x[i, i]: continue
            Solution.warn("A node can't be linked to itself.")
            Solution.warn(f"Condition violated by node {i}.")
            return False

        entrance_count = self.x.sum(axis=0)
        exit_count = self.x.sum(axis=1)
        # C1
        if not np.all(entrance_count == exit_count):
            Solution.warn("C1: The number of entrance of each node should be equal to the number of exit.")
            node_idx = np.argwhere(entrance_count != exit_count)[0]
            Solution.warn(
                f"Condition violated by node {node_idx} with {entrance_count[node_idx]} entrance(s) and {exit_count[node_idx]} exit(s).")
            return False

        # C2
        less_than_one_entrance = entrance_count <= 1
        if not np.all(less_than_one_entrance[tasks + homes]):
            Solution.warn("C2: Each task should be done by at most one employee.")
            for i in tasks:
                if not less_than_one_entrance[i]:
                    Solution.warn(f"Condition violated by node {i} done by {entrance_count[i]} employees.", "cyan")
            return False

        # C3.a
        exactly_one_entrance = entrance_count == 1
        if not np.all(exactly_one_entrance[unavails]):
            Solution.warn("C3.a: Unavailabilities should be visited.")
            unavail_indices = np.argwhere(~exactly_one_entrance)
            idx = unavail_indices[0]
            Solution.warn(f"Condition violated by node {idx}", "cyan")
            return False

        # C3.b
        for k in employees:
            if self.x[k, Employee.list[k].home()]:
                continue
            Solution.warn(f"C3.b: Each employee should visits their home.")
            Solution.warn(f"Condition violated by employee {k}.", "cyan")

            return False

        for idx in unavails:
            employee = Node.list[idx].employee
            employee_idx = Employee.index_of(employee)
            if self.x[employee_idx, idx] == 1:
                continue
            Solution.warn(f"C3.b: Each employee should visit his unavailabilities.")
            Solution.warn(f"Condition violated by employee {employee_idx}.", "cyan")
            return False

        # C4
        for i in nodes:
            for j in nodes:
                if i == j or not self.x[i, j]: continue
                if self.y[:, i] == self.y[:, j]: continue
                Solution.warn(f"C4: Two consecutive tasks should be done by the same employee")
                Solution.warn(f"Condition violated by task {i} and {j}", "cyan")
                return False

        # C5
        for i in tasks:
            task: Task = Node.list[i]
            opening, closing = task.opening_time, task.closing_time
            if self.b[i] >= opening and self.b[i] + task.duration <= closing: continue
            Solution.warn(f"C5: A task should be worked on between its opening time and closing time")
            Solution.warn(f"Condition violated by task {i}", "cyan")
            return False

        # C6.a
        for k in employees:
            employee: Employee = Employee.list[k]
            employee_start = employee.start_time
            if employee_start == self.b[k]: continue
            Solution.warn(f"C6.a: Employees' start times should be respected")
            Solution.warn(f"Condition violated by employee {k}", "cyan")
            return False

        # C6.b
        for i in unavails:
            if self.b[i] == Node.list[i].opening_time: continue
            Solution.warn(f"C6.b: The beginning and end of unavailabilities should be respected")
            Solution.warn(f"Condition violated by Node {i}", "cyan")
            return False

        # C7
        # TODO: verify C7
        for i in tasks:
            if sum(self.x[i, :]) == 0: continue
            intervals = open_intervals(i)
            if self.t[i] in range(len(intervals)):
                start, end = intervals[self.t[i]]
                if self.b[i] >= start and self.b[i] + Node.list[i].duration <= end: continue
                Solution.warn("C7 : Start and end of used disponibility segment not respected")
                Solution.warn(f"CO)
            Solution.warn("C7 : No disponibility segment used for a task that is visited!")
            Solution.warn(f"Condition violated by task {i}", "cyan")
            return False
                
            
        # C8
        for k in employees:
            if self.l[k, k] and self.b[k] > self.p[k]:
                Solution.warn("C8: An employee's lunch time does not respect his daily activity start time.")
                Solution.warn(f"Condition violated by employee {k}", "cyan")
                return False
            for i in tasks + unavails:
                if not self.x[k, i]: continue
                task: Task = Node.list[i]
                employee: Employee = Employee.list[k]
                if self.b[k] + task.duration + (Node.distance[i, k] / Employee.speed) <= employee.end_time:
                    if not self.l[k, k]: continue
                    if self.p[k] + 60 + (Node.distance[i, k] / Employee.Speed) <= self.b[i]: continue
                    Solution.warn("C8: An employee having his lunch at home should do his first task/unavail AFTER lunch time.")
                    Solution.warn(f"Condition violated by employee {k} before task/unavail {i}", "cyan")
                    return False
                Solution.warn("C8: An employee should respect his starting time to do his first task/unavail.")
                Solution.warn(f"Condition violated by employee {k} before task/unavail {i}", "cyan")
                return False

        # C9
        for k in employees:
            for i in tasks + unavails:
                if not self.x[i, k]: continue
                task: Task = Node.list[i]
                employee: Employee = Employee.list[k]
                if self.b[k] + task.duration + (Node.distance[i, k] / Employee.speed) <= employee.end_time:
                    if not L[k, i] or not X[i, k]: continue
                    if self.b[i] + task.duration <= self.p[k] and self.p[k] + 60 + (Node.distance[i, k] / Employee.speed) <= employee.end_time: continue
                    Solution.warn("C9: An employee should have enough time to have his lunch time at his last task/unavail.")
                    Solution.warn(f"Condition violated by emplye {k} after task/unavail {i}", "cyan")
                    return False
                Solution.warn("C9: An employee should have enough time to go back home.")
                Solution.warn(f"Condition violated by employee {k} after task/unavail {i}", "cyan")
                return False

        # C10.a
        for k in employees:
            if sum(self.l[k, :]) == 1: continue
            Solution.warn("C10.a: Each employee has one unique lunch")
            Solution.warn(f"Condition violated by employee {k}", "cyan")
            return False

        # C10.b
        for k in employees:
            for i in nodes:
                if not self.l[k, i]: continue
                if self.y[k, i]: continue
                Solution.warn(f"C10.b: Constraint violated")
                Solution.warn(f"Condition violated by employee {k} after lunch at node {i}")
                return False

        # C10.c
        for i in nodes:
            for k in employees:
                if not self.l[k, i]: continue
                task_duration = Node.list[i].duration
                if self.b[i] + task_duration + self.l[k, i] * 60 <= 14 * 60 and self.b[
                    i] + task_duration >= 12 * 60: continue
                Solution.warn("C10.c A task should be taken between 12AM and 2PM")
                Solution.warn(f"Condition violated by employee {k} after node {i}", "cyan")
                return False

        # C11
        for i in tasks + unavails:
            if sum(self.y[:, i]) <= sum(self.x[:, i]): continue
            Solution.warn("C11: This constraint is violated")
            Solution.warn(f"Condition violated by node {i}", "cyan")
            return False

        # C12
        for i in tasks + unavails:
            for j in tasks + unavails:
                if i == j or not self.x[i, j]: continue
                pause_en_i = sum(self.l[:, i])
                if self.b[i] + Node.list[i].duration + (Node.distance[i, j] / Employee.speed) <= self.b[j]:
                    if not pause_en_i: continue
                    if self.b[i] + Node.list[i].duration <= self.p[k] and self.p[k] + 60 + (Node.distance[i, j] / Employee.speed) <= self.b[j]: continue
                    Solution.warn("C12: An employee should have enough time to have his lunch time at his current task/unavail before going to the next one.")
                    Solution.warn(f"Condition violated between task/unavail {i} and task/unavail {j}", "cyan")
                    return False
                Solution.warn("C12: The traveling time between two nodes should be sufficient.")
                Solution.warn(f"Condition violated between task/unavail {i} and task/unavail {j}", "cyan")
                return False

        # C13
        for k in employees:
            for i in tasks:
                if not self.y[k, i]: continue
                employee: Employee = Employee.list[k]
                task: Task = Node.list[i]
                if employee.level >= task.level: continue
                Solution.warn("C13: An employee can only do tasks that they are able to do.")
                Solution.warn(f"Condition violated by employee {k} at node {i}", "cyan")
                Solution.warn(repr(employee), "green")
                Solution.warn(repr(task), "green")
                return False

        # Every condition is verified
        return True

IndentationError: expected an indented block (Temp/ipykernel_4056/4021777753.py, line 156)

# Demo
On peut instancier une nouvelle instance de Solution en appelant simplement le constructeur Solution().
L’instance s’adapte alors automatiquement aux données présentes dans les classes Employee et Node

La méthode validate permet de déterminer si notre solution vérifie les contraintes. Lorsque au moins une contrainte n’est pas vérifiée. Dans ce cas-là, un message d’erreur correspondant à la première violation de contrainte s’affiche.

In [6]:
solution_instance = Solution()
solution_instance.validate()

[93mError: C3.a: Unavailabilities should be visited.[0m
[96mError: Condition violated by node [0][0m


False

Pour modifier une solution, on modifie directement les tableaux numpy stockés en attribut des instances.

In [7]:
solution_instance.x[0, 0] = 1
solution_instance.validate()

[93mError: A node can't be linked to itself.[0m
[93mError: Condition violated by node 0.[0m


False

Pour copier une instance, on utilise la méthode copy(), qui fait un deep-copy de l’instance.

In [8]:
solution_instance_copy = solution_instance.copy()

Lorsque l’on modifie une copie, la solution originale n’est pas modifiée.

In [9]:
solution_instance_copy.x[0, 0] = 2
solution_instance.x[0, 0] = 0
solution_instance.x[1, 0] = solution_instance.x[0, 2] = 1

In [10]:
solution_instance.validate()

[93mError: C1: The number of entrance of each node should be equal to the number of exit.[0m
[93mError: Condition violated by node [1] with [0] entrance(s) and [1] exit(s).[0m


False

In [11]:
solution_instance_copy.validate()

[93mError: x should contain binary variables, values other than 0 and 1 detected.[0m


False

L'affichage des contraintes non vérifiées sont pour le debug, et on peut ne pas les afficher lorsque l’on exécute nos algorithmes méta-heuristiques.

In [12]:
Solution.set_warning(False) # this tells the solution class to not print warning messages
solution_instance.validate()

False

In [13]:
Solution.set_warning(True) # modify setting to print warning messages again
solution_instance.validate()

[93mError: C1: The number of entrance of each node should be equal to the number of exit.[0m
[93mError: Condition violated by node [1] with [0] entrance(s) and [1] exit(s).[0m


False