# Trabalho de Organização e Planejamento

## Observações
*   As escalas de 01/01 e 31/12 não são tratadas e não devem ficar bloqueadas. O motivo é seguirmos o padrão de escalas por 2 dias consecutivos, 01/01 e 31/12 não seguem esse comportamento.
* Nâo testado com períodos maiores que 1 ano, provável necessidade de ajustes.




## Bibliotecas instaladas

In [1]:
!pip install deap

Collecting deap
  Downloading deap-1.3.1-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (160 kB)
[?25l[K     |██                              | 10 kB 18.7 MB/s eta 0:00:01[K     |████                            | 20 kB 9.0 MB/s eta 0:00:01[K     |██████                          | 30 kB 7.2 MB/s eta 0:00:01[K     |████████▏                       | 40 kB 7.1 MB/s eta 0:00:01[K     |██████████▏                     | 51 kB 4.3 MB/s eta 0:00:01[K     |████████████▏                   | 61 kB 5.1 MB/s eta 0:00:01[K     |██████████████▎                 | 71 kB 5.2 MB/s eta 0:00:01[K     |████████████████▎               | 81 kB 4.0 MB/s eta 0:00:01[K     |██████████████████▎             | 92 kB 4.5 MB/s eta 0:00:01[K     |████████████████████▍           | 102 kB 4.9 MB/s eta 0:00:01[K     |██████████████████████▍         | 112 kB 4.9 MB/s eta 0:00:01[K     |████████████████████████▍       | 122 kB 4.9 MB/s eta 0:00:01[

In [2]:
from datetime import date, datetime, timedelta, time
from typing import List, Tuple, Dict, Callable, Union
import csv

import deap
from deap import base
from deap import creator
from deap import tools
from deap import algorithms

import numpy as np
import random
import math

# Constantes

In [3]:
ARQUIVO_FERIADOS: str = 'feriados.csv'
PESO_HORA_DIA_SEMANA: float = 0.3
PESO_HORA_FINAL_SEMANA: float = 1.0
PESO_HORA_FERIADO: float = 1.5
PESO_HORA_FERIADO_ESPECIAL: float = 2.0

# Classes Gerais

In [4]:
class Feriado:
    '''
        Feriado especial um tecnico não pode ter mais que X vezes no ano, ano novo, carnaval e natal
    '''
    def __init__(self, data: date, ehEspecial:bool = False):
        self.data: date = data
        self.ehEspecial: bool = ehEspecial

    def __str__(self):
        return f"data={self.data}, dataEspecial={self.ehEspecial}"

    def __repr__(self):
        return self.__str__()

In [5]:
class DateTimeUtil:
    _feriados: Dict[date, Feriado] = {}

    @staticmethod
    def _carrega_feriados_se_necessario() -> None:
        if len (DateTimeUtil._feriados) <= 0:
            lista_feriados: List[Feriado] = []

            with open(ARQUIVO_FERIADOS) as csv_file:
                csv_reader = csv.reader(csv_file, delimiter=',')
                for row in csv_reader:
                    data: date = datetime.strptime(row[0], '%d/%m/%Y').date()
                    # row[1] nome do feriado descartado
                    ehEspecial: str = row[2]

                    f: Feriado = Feriado(data, ehEspecial == 'E')
                    lista_feriados.append(f)
                DateTimeUtil._feriados = {f.data: f for f in lista_feriados}


    @staticmethod
    def eh_dia_util(data: date) -> bool:
        DateTimeUtil._carrega_feriados_se_necessario()
        return not(DateTimeUtil.eh_feriado(data) or DateTimeUtil.eh_final_semana(data))

    @staticmethod
    def eh_feriado(data: date) -> bool:
        DateTimeUtil._carrega_feriados_se_necessario()
        return data in DateTimeUtil._feriados     
    
    @staticmethod
    def eh_feriado_especial(data: date) -> bool:
        DateTimeUtil._carrega_feriados_se_necessario()
        if data in DateTimeUtil._feriados:
            f: Feriado = DateTimeUtil._feriados[data]
            return f.ehEspecial

        return False

    @staticmethod
    def eh_final_semana(data: date) -> bool:
        return data.weekday() > 4

    @staticmethod
    def get_weekday_name(wd: int) -> str:
        if (wd == 0):
            return "Segunda"
        elif (wd == 1):
            return "Terça"
        elif (wd == 2):
            return "Quarta"
        elif (wd == 3):
            return "Quinta"
        elif (wd == 4):
            return "Sexta"
        elif (wd == 5):
            return "Sábado"
        elif (wd == 6):
            return "Domingo"

        raise ValueError(f"Dia da semana inválido {wd}")

    @staticmethod
    def calcula_diferenca_em_horas(dt_fim: datetime, dt_inicio: datetime) -> int:
        diff_time: timedelta = dt_fim - dt_inicio
        return math.ceil(diff_time.total_seconds() / 3600)

    @staticmethod
    def calcula_diferenca_em_dias(dt_inicio: datetime, dt_fim: datetime) -> int:
        diff_time: timedelta = dt_fim - dt_inicio
        return diff_time.days


# Sobreaviso

## Classes

In [6]:
class Bloqueio:
    def __init__(self, data_inicio: date, data_fim: date):
        self.data_inicio = data_inicio
        self.data_fim = data_fim
    
    def eh_data_bloqueada(self, data :date) -> bool:
        return data >= self.data_inicio and data <= self.data_fim

    def __str__(self):
        return f"inicio-bloqueio={self.data_inicio}, fim-bloqueio={self.data_fim}"

    def __repr__(self):
        return self.__str__()

In [7]:
class Funcionario:
    def __init__(self, id:int, nome:str):
        self.id: int = id
        self.nome: str = nome
        self.bloqueios: List[Bloqueio] = []

    def tem_bloqueio_para_data(self, data: date) -> bool:
        if len(self.bloqueios) == 0:
            return False
        for b in self.bloqueios:
            if b.eh_data_bloqueada(data):
                return True

        return False

    def __str__(self):
        return f"id={self.id}, nome={self.nome}"

    def __repr__(self):
        return self.__str__()

In [8]:
class Escala:
    def __init__(self, 
                 inicio: datetime, 
                 termino: datetime, 
                 funcionario: Funcionario, 
                 eh_data_bloqueada: bool, 
                 marcado_otimizacao: bool):
        self.inicio: datetime = inicio
        self.termino: datetime = termino
        self.funcionario: Funcionario = funcionario
        self.eh_data_bloqueada: bool = eh_data_bloqueada
        # será utilizado para mutações e cruzamento, descobrindo as datas que precisarão de tratamento
        self.marcado_otimizacao = marcado_otimizacao

    @staticmethod
    def get_csv_header() -> str:
        return ['Data', 'Dia Semana', 'Hora Inicio', 'Hora Fim', 'Nome', 'Id Funcionario', 'Bloqueio','Otimizacao']    
    
    '''
        trata a inexistencia de funcionário
    '''
    def _get_nome_funcionario_alocado(self) -> str:
        if self.eh_data_bloqueada:
            return f"Bloqueio **{self.funcionario.nome}**"
        else:
            return self.funcionario.nome

    def get_csv_row(self) -> List[str]:
        return [self.inicio.strftime('%d/%m/%Y') ,
               DateTimeUtil.get_weekday_name(self.inicio.weekday()) ,
               self.inicio.strftime('%H:%M:%S') ,
               self.termino.strftime('%H:%M:%S'),
                self._get_nome_funcionario_alocado(),
                self.funcionario.id,
                self.eh_data_bloqueada,
                self.marcado_otimizacao]

    def __str__(self):
        str_bloqueado: str = 'bloqueado' if self.eh_data_bloqueada else 'não bloqueado'
        str_otimizacao: str = 'otimizacao' if self.marcado_otimizacao else 'sem otimizacao'
        return f"{self._get_nome_funcionario_alocado():<35} | {DateTimeUtil.get_weekday_name(self.inicio.weekday()):<10} | {self.inicio} | {self.termino} | {str_bloqueado:<13} | {str_otimizacao:<14}"

    def __repr__(self):
        return self.__str__()

In [9]:
class SobreavisoUtil:
    
    @staticmethod
    def carrega_funcionarios(path: str) -> List[Funcionario]:
        with open(path) as csv_file:
            csv_reader = csv.reader(csv_file, delimiter=',')
            lista_funcionario: List[Funcionario] = []

            for row in csv_reader:
                lista_funcionario.append(Funcionario(id=int(row[0]), nome=row[1]))
               
        return lista_funcionario

    @staticmethod
    def carrega_bloqueios(path: str, equipe: List[Funcionario]) -> List[Funcionario]:
        dict_equipe: Dict[int,Funcionario] = {f.id: f for f in equipe}      
        with open(path) as csv_file:
            csv_reader = csv.reader(csv_file, delimiter=',')
            for row in csv_reader:
                data_inicio: date = datetime.strptime(row[1], '%d/%m/%Y').date()
                data_fim: date = datetime.strptime(row[2], '%d/%m/%Y').date()
                id: int = int(row[0])

                dict_equipe[id].bloqueios.append(Bloqueio(data_inicio=data_inicio, data_fim=data_fim))

        return list(dict_equipe.values())

   


In [10]:
'''
   
'''
class Sobreaviso:

    '''
        qtd_dias_pares - quantos dias eu rodo os pares antes de alternar

    '''
    def __init__(self, 
                 escala_inclui_dias_uteis: bool, 
                 hora_inicio: int, 
                 hora_fim: int,
                 hora_troca_dias_nao_uteis: int,
                 qtd_dias_pares: int = 7):
        
        self.escala_inclui_dias_uteis: bool = escala_inclui_dias_uteis
        self.hora_inicio = hora_inicio
        self.hora_fim = hora_fim
        self.hora_troca_dias_nao_uteis = hora_troca_dias_nao_uteis
        self.qtd_dias_pares = qtd_dias_pares

        self.escala_equipe: List[Escala] = []

    '''
    '''
    def _get_funcionario_por_id(self, id: int) -> Funcionario:
        for f in self.equipe:
            if f.id == id:
                return f
        
        raise ValueError(f"Funcionario com id {id} não encontrado")
    '''
    '''
    def carrega_arquivos_controle(self, path_arquivo_funcionarios: str, path_arquivo_bloqueios: str):
        self.equipe: List[Funcionario] = SobreavisoUtil.carrega_funcionarios(path_arquivo_funcionarios)
        self.equipe = SobreavisoUtil.carrega_bloqueios(path_arquivo_bloqueios, self.equipe)
        

    '''
    '''
    def salva_para_arquivo(self, path) -> None:
        with open(path, 'w', encoding='UTF8') as f:
            csv_writer = csv.writer(f)
            
            csv_writer.writerow(Escala.get_csv_header())
            for e in self.escala_equipe:
                csv_writer.writerow(e.get_csv_row())

    def carrega_de_arquivo(self, path: str) -> None:
        self.escala_equipe = []
        with open(path, 'r', encoding='UTF8') as csv_file:
            csv_reader = csv.reader(csv_file, delimiter=',')
            next(csv_reader, None)  # skip the header

            for row in csv_reader:
                # ['Data', 'Dia Semana', 'Hora Inicio', 'Hora Fim', 'Nome', 'Id Func.', Bloqueio]
                data: date = datetime.strptime(row[0], '%d/%m/%Y').date()
                hora_inicio: time =  datetime.strptime(row[2], '%H:%M:%S').time()
                hora_fim: time =  datetime.strptime(row[3], '%H:%M:%S').time()
                id: int = int(row[5])
                eh_data_bloqueada = eval(row[6])
                marcado_otimizacao = eval(row[7])

                resp: Funcionario = self._get_funcionario_por_id(id)
                data_inicio: datetime = datetime(year=data.year, month=data.month, day=data.day, hour=hora_inicio.hour, minute=hora_inicio.minute, second=hora_inicio.second)
                data_fim: datetime = datetime(year=data.year, month=data.month, day=data.day, hour=hora_fim.hour, minute=hora_fim.minute, second=hora_fim.second)
                self.escala_equipe.append(Escala(inicio=data_inicio, termino=data_fim, funcionario=resp, eh_data_bloqueada=eh_data_bloqueada,marcado_otimizacao=marcado_otimizacao))
            

    '''
        Monta somente as entradas de agenda, de acordo com as configurações
    ''' 
    def _gera_datas_para_montar_escala(self, ano: int) -> List[date]:
        
        data_inicial: date = date(ano, 1, 1)
        data_final: date = date(ano, 12, 31)
        dias_periodo: int = (data_final - data_inicial).days + 1
        vetor_datas: List[date] = []

        for i in range(dias_periodo):
            data_atual:date = data_inicial + timedelta(days=i)
            dia_anterior: date = data_atual + timedelta(days=-1)

            if self.escala_inclui_dias_uteis:
                vetor_datas.append(data_atual)
            else:
                if not DateTimeUtil.eh_dia_util(data_atual):
                    vetor_datas.append(data_atual)
                else: 
                    #se o dia anterior foi um dia não util então incluir a entrada, já que madrugada de um dia não útil para um dia útil deve estar coberta
                    if not DateTimeUtil.eh_dia_util(dia_anterior):
                        vetor_datas.append(data_atual)

        return vetor_datas

    '''
    '''
    def _rotaciona_par_escala(self, par_atual: Tuple[int,int]) -> Tuple[int,int]:
        if par_atual == None:
            return (0,1)  
        else:
            par_1 = (par_atual[1] + 1) % len(self.equipe)
            par_2 = (par_atual[1] + 2) % len(self.equipe)

        return (par_1,par_2)

    
    '''
        Returns
            Se foi adicionado bloqueio nessa escala
    '''
    def _adiciona_responsavel_escala(self, inicio: datetime, termino: datetime, funcionario: Funcionario) -> None:
        tem_bloqueio: bool = False
        if (funcionario.tem_bloqueio_para_data(inicio.date())):
            tem_bloqueio = True

        self.escala_equipe.append(Escala(inicio=inicio, termino=termino, funcionario=funcionario, eh_data_bloqueada=tem_bloqueio, marcado_otimizacao=tem_bloqueio))

    '''
        Caso 1 - Inserido com bloqueio -> A entrada anterior, caso exista, também deve ser bloqueada, caso seja um dia consecutivo, já que temos escalas sem dia útil
        Caso 2 - A entrada anterior tem bloqueio e estamos em um dia consecutivo, então essa entrada também deve ter bloqueio
    '''
    def _trata_meia_escala(self):
        if (len(self.escala_equipe) > 1):
            ultima_escala: Escala = self.escala_equipe[-1]
            penultima_escala: Escala = self.escala_equipe[-2]
            diff_data: timedelta = ultima_escala.inicio.date() - penultima_escala.inicio.date()
            # somente aplicável ao dia seguinte
            if (diff_data.days == 1):
                if ultima_escala.eh_data_bloqueada and not penultima_escala.eh_data_bloqueada:                
                    penultima_escala.eh_data_bloqueada = True
                    penultima_escala.marcado_otimizacao = True
                if penultima_escala.eh_data_bloqueada and not ultima_escala.eh_data_bloqueada:
                    ultima_escala.eh_data_bloqueada = True
                    ultima_escala.marcado_otimizacao = True


    '''

    '''
    def monta_escala(self, ano: int):
        self.escala_equipe = []
        datas_escala: List[date] = self._gera_datas_para_montar_escala(ano)
        par_atual: Tuple(int,int) = self._rotaciona_par_escala(None)
        dias_processados_sem_rotacao: int = 0
        ind_pos_par: int = 0
        
        for data_atual in datas_escala:
            
            #-------------- Primeira Parte da Escala --------------------------
            responsavel_atual: Funcionario = self.equipe[par_atual[ind_pos_par]]
            primeira_parte_escala_inicio: datetime = datetime(year=data_atual.year, month=data_atual.month, day=data_atual.day, hour=0, minute=0, second=0)
            if DateTimeUtil.eh_dia_util(data_atual):
                hora_termino_primeira_parte_escala: int = self.hora_fim
            else:
                hora_termino_primeira_parte_escala: int = self.hora_troca_dias_nao_uteis
            primeira_parte_escala_fim: datetime = datetime(year=data_atual.year, month=data_atual.month, day=data_atual.day, hour=hora_termino_primeira_parte_escala, minute=0, second=0)

            tem_bloqueio: bool = self._adiciona_responsavel_escala(inicio=primeira_parte_escala_inicio, 
                                                                   termino=primeira_parte_escala_fim, 
                                                                   funcionario=responsavel_atual)
            self._trata_meia_escala()
            
            #------------ Segunda Parte da Escala ------------------------------
            if (dias_processados_sem_rotacao >= self.qtd_dias_pares):
                par_atual = self._rotaciona_par_escala(par_atual)
                dias_processados_sem_rotacao = 0
                ind_pos_par = 0
            else:
                ind_pos_par = (ind_pos_par + 1) % 2
                dias_processados_sem_rotacao+=1

            responsavel_atual = self.equipe[par_atual[ind_pos_par]]

            if DateTimeUtil.eh_dia_util(data_atual):
                if self.escala_inclui_dias_uteis:
                    segunda_parte_escala_inicio: datetime = datetime(year=data_atual.year, month=data_atual.month, day=data_atual.day, hour=self.hora_inicio, minute=0, second=0)
                    segunda_parte_escala_fim: datetime = datetime(year=data_atual.year, month=data_atual.month, day=data_atual.day, hour=23, minute=59, second=59)
                    self._adiciona_responsavel_escala(inicio=segunda_parte_escala_inicio, termino=segunda_parte_escala_fim, funcionario=responsavel_atual)
            else:
                segunda_parte_escala_inicio: datetime = datetime(year=data_atual.year, month=data_atual.month, day=data_atual.day, hour=self.hora_troca_dias_nao_uteis, minute=0, second=0)
                segunda_parte_escala_fim: datetime = datetime(year=data_atual.year, month=data_atual.month, day=data_atual.day, hour=23, minute=59, second=59)
                self._adiciona_responsavel_escala(inicio=segunda_parte_escala_inicio, termino=segunda_parte_escala_fim, funcionario=responsavel_atual)

            

            #if data_atual.month > 2:
            #    break  


## Execuções para carga e geração de escalas

In [None]:
a = Sobreaviso(escala_inclui_dias_uteis=True,
               hora_inicio=21,
               hora_fim=8,
               hora_troca_dias_nao_uteis=12,
               qtd_dias_pares=7)

In [None]:
a.carrega_arquivos_controle(path_arquivo_funcionarios='funcionarios-telecom.csv',
                            path_arquivo_bloqueios='bloqueios-telecom.csv')

In [None]:
a.monta_escala(2022)

In [None]:
#a.escala_equipe
a.salva_para_arquivo(path='escala-2022.csv')

In [None]:
agenda_carregada = Sobreaviso(escala_inclui_dias_uteis=True,
                          hora_inicio=21,
                          hora_fim=8,
                          hora_troca_dias_nao_uteis=12,
                          qtd_dias_pares=7)
agenda_carregada.carrega_arquivos_controle(path_feriados='feriados.csv', 
                                           path_funcionarios='funcionarios-telecom.csv',
                                           path_bloqueios='bloqueios-telecom.csv')
agenda_carregada.carrega_de_arquivo(path='escala-gerada.csv')

In [None]:
agenda_carregada.escala_equipe

# Tratamento de Bloqueios 
Otimização utilizando Algoritmos Genéticos

### Carregamento do sobreaviso bloqueado
inicialização das variáveis que serão utilizadas pelas função relacionadas aos algoritmos genéticos

In [11]:
# O algoritmo genético vai trabalhar com a lista de escalas, mas precisará dos metadados para validação

sobre_aviso: Sobreaviso = Sobreaviso(escala_inclui_dias_uteis=True,
                          hora_inicio=21,
                          hora_fim=8,
                          hora_troca_dias_nao_uteis=12,
                          qtd_dias_pares=7)
sobre_aviso.carrega_arquivos_controle(path_arquivo_funcionarios='funcionarios-telecom.csv',
                                      path_arquivo_bloqueios='bloqueios-telecom.csv')
sobre_aviso.carrega_de_arquivo(path='escala-2022.csv')

equipe = sobre_aviso.equipe
dict_equipe: Dict[int,Funcionario] = {f.id: f for f in equipe}
tamanho_equipe: int = len(equipe)

lista_escala_original = sobre_aviso.escala_equipe


In [12]:
# imprime a escala
sobre_aviso.escala_equipe

[Funcionario 11                      | Sábado     | 2022-01-01 00:00:00 | 2022-01-01 12:00:00 | não bloqueado | sem otimizacao,
 Bloqueio **Funcionario 14**         | Sábado     | 2022-01-01 12:00:00 | 2022-01-01 23:59:59 | bloqueado     | otimizacao    ,
 Bloqueio **Funcionario 14**         | Domingo    | 2022-01-02 00:00:00 | 2022-01-02 12:00:00 | bloqueado     | otimizacao    ,
 Bloqueio **Funcionario 11**         | Domingo    | 2022-01-02 12:00:00 | 2022-01-02 23:59:59 | bloqueado     | otimizacao    ,
 Bloqueio **Funcionario 11**         | Segunda    | 2022-01-03 00:00:00 | 2022-01-03 08:00:00 | bloqueado     | otimizacao    ,
 Bloqueio **Funcionario 14**         | Segunda    | 2022-01-03 21:00:00 | 2022-01-03 23:59:59 | bloqueado     | otimizacao    ,
 Bloqueio **Funcionario 14**         | Terça      | 2022-01-04 00:00:00 | 2022-01-04 08:00:00 | bloqueado     | otimizacao    ,
 Bloqueio **Funcionario 11**         | Terça      | 2022-01-04 21:00:00 | 2022-01-04 23:59:59 | bloquead

### Funções Genéricas GA
utilizadas pelas funções relacionadas aos algoritmos genéticos ou para uso comum 

In [13]:
def _calcula_esforco_funcionario(funcionario: Funcionario, lista_escala : List[Escala]) -> float:
    '''
        Calcula o esforço do funcionario na lista de escala informada, seguindo 
        a configuração de peso por tipo de dia trabalhado

        Args
            funcionario(Funcionario): funcionário que será avaliado
            lista_escala(List[Escala]): escala que será analisada

        Returns
            esforco(float): medida de esforço do funcionário
    '''

    esforco: float = 0

    for escala in lista_escala:
        if escala.eh_data_bloqueada:
            continue
            #raise ValueError(f'Individuo com bloqueio encontrado no dia {escala.inicio.date()}')
        if escala.funcionario.id == funcionario.id:
            peso: float = PESO_HORA_DIA_SEMANA
            data = escala.inicio.date()

            if DateTimeUtil.eh_feriado_especial(data):
                peso = PESO_HORA_FERIADO_ESPECIAL
            elif DateTimeUtil.eh_feriado(data):
                peso = PESO_HORA_FERIADO
            elif DateTimeUtil.eh_final_semana(data):
                peso = PESO_HORA_FINAL_SEMANA
                
            diff_time_hour = DateTimeUtil.calcula_diferenca_em_horas(escala.termino, escala.inicio)
            esforco += diff_time_hour * peso

    return esforco



def _get_indice_entradas_com_bloqueio(lista_escala: List[Escala]):
    '''
    Na geração de inidividuos alteramos as  escalas com bloqueios. 
    
    Args
        lista_escala(List[Escala]): lista de escalas que será analisada

    Returns
        ind_escalas_bloqueadas(List[int]): lista de indices cuja escala está bloqueada
    '''
    ind_escalas_bloqueadas: List[int] = []

    for idx, e in enumerate(lista_escala):
        if e.eh_data_bloqueada:
            ind_escalas_bloqueadas.append(idx)

    return ind_escalas_bloqueadas


def _get_indice_entradas_para_otimizacao(lista_escala: List[Escala]):
    '''
    No cruzamento(mate) e na mutação(mutate) não teremos mais na lista de escalas, 
    já que foram tratados na geração de indivídus. O atributo  marcado_otimizacao 
    indica as escalas que farão parte da mutação/cruzamento.

    Args:
        lista_escala(List[Escala]): lista de escalas que será analisada

    Returns
        ind_escalas_otimizacao(List[int]): lista de indices cuja escala está 
            disponível para otimização, ou seja, 
            ao carregar o sobreaviso ela estava bloqueada    

    '''

    ind_escalas_otimizacao: List[int] = []

    for idx, e in enumerate(lista_escala):
        if e.marcado_otimizacao:
            ind_escalas_otimizacao.append(idx)

    return ind_escalas_otimizacao


def _clona_escala_sobreaviso(escala_sobrevaviso: List[Escala]) -> List[Escala]:
    clone: List[Escala] = []
    '''
        A clonagem é para evitar que as operações realizadas na lista de escala 
        contaminem a lista original, dessa forma mutação e cruzamento devem gerar 
        escalas novas.

        Args
            escala_sobrevaviso(List[Escala]): lista de escalas que será clonada

        Returns
            clone(List[Escala]): lista de escalas clonadas, identica a original, mas com novas referencias.

    '''

    for e in escala_sobrevaviso:
        escala_clonada = Escala(inicio=e.inicio, 
                                termino=e.termino, 
                                funcionario=e.funcionario,
                                eh_data_bloqueada=e.eh_data_bloqueada,
                                marcado_otimizacao=e.marcado_otimizacao)
        clone.append(escala_clonada)
    
    return clone

    
def _get_funcionarios_livres_para_escala(data: date, funcionarios_ignorar: List[int]) -> List[Funcionario]:
    '''
    Buscar Funcionários livres para substituição de escala, já que as combinações são muitas para que randomicamente cheguemos as melhores escolhas

    Args:
        data(date): data onde se busca funcionários sem bloqueio 
        responsavel_escala_anterior(Funcionario): Funcionario que já trabalho na escala anterior e deve ser desconsiderado 
    Returns:
        Lista de Funcionários livres para substituição de escala
    '''

    lista_funcionarios_livres: List[Funcionario] = []
    # escala sempre de 2 dias
    data_seguinte = data + timedelta(days=1)

    for f in equipe:
        tem_bloqueio: bool = False
        if f.id in funcionarios_ignorar:
            continue
        for b in f.bloqueios:
            if data >= b.data_inicio and data <= b.data_fim:
                tem_bloqueio = True
                break
            if data_seguinte >= b.data_inicio and data_seguinte <= b.data_fim:
                tem_bloqueio = True
                break

        if not tem_bloqueio:
            lista_funcionarios_livres.append(f)

    return lista_funcionarios_livres

def _get_funcionarios_bloqueados_pela_escala(escala_sobreaviso: List[Escala], posicao: int) -> List[int]:
    '''
        Retorna os funcionarios que não podem substituir uma escala: o Próprio, o Responável Anterior e o Posterior
        Args:
            escala_sobreaviso(List[Escala]): escala tratada
            posicao(int): posicao de substituição
        Returns:
            List com os id's de funcionários que não devem ser considerados
    '''
    lista_funcionarios_com_restricao: List[int] = []
    
    # anterior
    lista_funcionarios_com_restricao.append(escala_sobreaviso[posicao - 1].funcionario.id)
    # atual
    lista_funcionarios_com_restricao.append(escala_sobreaviso[posicao].funcionario.id)
    # proximo
    if posicao + 2 < len(escala_sobreaviso):
        lista_funcionarios_com_restricao.append(escala_sobreaviso[posicao + 2].funcionario.id)

    return lista_funcionarios_com_restricao


def _troca_responsavel_se_possivel(origem: List[Escala], destino: List[Escala], pos_troca: int) -> bool:
    '''
        Dada duas escalas (cruzamento), verificar se a posição informada para o destino pode ser trocada.
        Nem sempre será possível no cruzamento realizar uma troca, já que as escalas anteriores  posteriores
        podem gerar uma cofiguração inválida.

        Args:
            origem(List[Escala]): lista de escalas que será utilizada para cruzamento
            destino(List[Escala]): lista de escalas que receberá a troca caso seja possível
            pos_troca(int): posição onde será realizada a tentativa de troca

        Returns:
            True se ocorreu a troca, false caso contrário

    '''
    lista_funcionarios_bloqueados_pela_escala: List[int] = _get_funcionarios_bloqueados_pela_escala(destino, pos_troca)
    resp_origem: Funcionario = origem[pos_troca].funcionario
    if not (resp_origem.id in lista_funcionarios_bloqueados_pela_escala):
        destino[pos_troca].funcionario = resp_origem
        destino[pos_troca + 1].funcionario = resp_origem  
        return True
    else:
        return False


def _troca_responsavel_escala(escala_sobreaviso: List[Escala], responsavel: Funcionario, pos_troca: int) -> bool:
    '''
        Troca a escala e a próxima, sempre são duas, na posição indicada.

        Args:
            escala_sobreaviso(List[Escala]): lista de escalas que será atualizada
            responsavel(Funcionario): novo responsável
            pos_troca(int): posição onde será realizada a troca

        Returns:
            True se ocorreu a troca, false caso contrário

    '''
    escala_sobreaviso[pos_troca].funcionario = responsavel
    escala_sobreaviso[pos_troca + 1].funcionario = responsavel  
      

def salva_individuo(path_arq: str, escala_sobreaviso: List[Escala]) -> None:
    '''
    '''
    with open(path_arq, 'w', encoding='UTF8') as f:
        csv_writer = csv.writer(f)
        
        csv_writer.writerow(Escala.get_csv_header())
        for e in escala_sobreaviso:
            csv_writer.writerow(e.get_csv_row())

def sao_individuos_iguais(individuo1: List[Escala], individuo2: List[Escala] ) -> bool:
    '''
    '''    
    for idx, e in enumerate(individuo1):
        if e.funcionario.id != individuo2[idx].funcionario.id:
            return False
    
    return True


### Função Objetivo 
Mede a justiça na distribuição da escala pelos funcionários.

In [14]:
def mede_justica_escala(individual: List[Escala], debug_enabled: bool = False) -> float:
    '''
        A justiça da escala está relacionada as horas trabalhadas e os respectivos 
        pesos, já que horas durante a semana, finais de semana, feriados e 
        feriados especiais possuem pesos diferentes.
        
        Args:
            individual(List[Escala]): individuo que será analisado
        
        Returns
            desvio_padrao(float): retorna o desvio padrao da lista de escalas.
    '''
    analise_escala_list: List[int] = [];

    for f in equipe:
        ind_esforco: float = _calcula_esforco_funcionario(f, individual)
        analise_escala_list.append(ind_esforco)
        if debug_enabled:
            print(f'{f.nome} - {ind_esforco}')

    esforco_np = np.array(analise_escala_list)
    return esforco_np.std()

### Função de Restrição

In [15]:
def funcao_restricao(individual: List[Escala]) -> bool:
    ''' 
        Valida um individuo de acordo com as seguintes regras:
            - Nào pode existir data bloqueada
            - Ninguém pode trabalhar mais do que 24 horas
            - Um funcionário somente poderá trabalhar por 2 dias consecutivos (2 escalas), mesmo que o somatório de horas seja inferior a 24 horas
        
        Args:
            individual(List[Escala]): individuo que será analisado

        Returns:
            True caso seja um individuo válido
    '''

    for e in individual:
        if e.eh_data_bloqueada:
            return False
    
    # ninguém pode trabalhar mais que 24 horas
    horas_acumuladas: int = 0
    dias_trabalhados: int = 0
    id_ultimo_funcionario: int = -1;
    dt_escala_anterior: datetime = datetime(year=2020,month=1, day=1)

    for e in individual:
        if e.funcionario.id != id_ultimo_funcionario or DateTimeUtil.calcula_diferenca_em_dias(dt_escala_anterior, e.inicio) > 1:
            horas_acumuladas = 0;
            id_ultimo_funcionario = e.funcionario.id
            dias_trabalhados = 0
        
        horas_acumuladas += DateTimeUtil.calcula_diferenca_em_horas(e.termino, e.inicio)
        dias_trabalhados += 1
        dt_escala_anterior = e.inicio

        #print(f'{e.inicio} - {e.funcionario.nome} {horas_acumuladas} horas e {dias_trabalhados} dias ({e.inicio} -> {e.termino})')
        if horas_acumuladas > 24:
            return False
        # Mais do que 2 dias consecutivos é uma situação inválida, mesmo que tenha trabalhado menos do que 24 hs
        if dias_trabalhados > 2:
            return False

    # validar se tem algum funcionario alocado em dia de bloqueio
    for e in individual:
        for b in dict_equipe[e.funcionario.id].bloqueios:
            dt_escala: date = e.inicio.date()
            if  dt_escala >= b.data_inicio and dt_escala <= b.data_fim:
                return False 


    return True


### Funções para GA
Funções que serão utilizadas na biblioteca DEAP (Algoritmos Genéticos)

#### Gerador Individuo

In [16]:
def gerador_individuo(icls, random_function: Callable) -> List[Escala]:
    ''' 
        Criador de individuos, será utilizado pela biblioteca deap

        Args:
            icls(any): parametro da biblioteca deap
            random_function(Callable): função que vai gerar valores randomicos

        Returns:
            escala_sobreaviso_gerada(List[Escala]): escala de sobreaviso válida, gerada de forma randomica(com controle)
    ''' 

    individuo_gerado_valido: bool = False

    # lista de indices com escalas vagas
    escalas_bloqueadas: List[Escala] = _get_indice_entradas_com_bloqueio(lista_escala_original)
    # um funcionario vai pegar sempre duas escalas, temos que partir do principio que a meia escala de 01/01 nunca está bloqueada
    num_bloqueios:int = len(escalas_bloqueadas)

    # devemos gerar uma nova escala, caso contrário a escala original será impactada
    escala_sobreaviso_gerada: List[Escala] = _clona_escala_sobreaviso(lista_escala_original)
    
    for _ in range(10000):
        cont_bloqueio: int = 0
        while cont_bloqueio < num_bloqueios:
            idx_escala: int = escalas_bloqueadas[cont_bloqueio]
            
            dt_escala: date = escala_sobreaviso_gerada[idx_escala].inicio.date()
            resp_escala_anterior = escala_sobreaviso_gerada[idx_escala-1].funcionario.id

            funcionarios_livres_na_data: List[Funcionario] = _get_funcionarios_livres_para_escala(dt_escala, funcionarios_ignorar=[resp_escala_anterior])
            # não vamos usar o random informado porque temos que configurar o limite de acordo com o vetor
            idx_rand:int = random.randint(0, len(funcionarios_livres_na_data) - 1)
            resp: Funcionario = funcionarios_livres_na_data[idx_rand]

            escala_sobreaviso_gerada[idx_escala].funcionario = resp
            escala_sobreaviso_gerada[idx_escala].eh_data_bloqueada = False
            escala_sobreaviso_gerada[idx_escala + 1].funcionario = resp
            escala_sobreaviso_gerada[idx_escala + 1].eh_data_bloqueada = False

            cont_bloqueio += 2 

        if funcao_restricao(escala_sobreaviso_gerada):
            individuo_gerado_valido = True
            break
    
    if individuo_gerado_valido:
        if not icls:
            return escala_sobreaviso_gerada    
        else:
            return icls(escala_sobreaviso_gerada)
        # para testes
        # return escala_sobreaviso_gerada
    else:
        raise ValueError('Não foi possível gerar um individuo')




#### Mutação

In [17]:
def mutate(individual: List[Escala], **kwargs) -> List[Escala]:
    '''
        Args:
            individual(List[Escala] ou creator.Individual): Testes locais passam uma lista, rodando na biblioteca deap receberá um wrapper(creator.Individual)

    '''
       
    indices_ja_trocados: List[int] = []

    # lista de indices com escalas para otimizacao
    escalas_otimizacao: List[Escala] = _get_indice_entradas_para_otimizacao(individual)

    numero_trocas: int = random.randint(1, len(escalas_otimizacao)/2)
    
    escala_sobreaviso_mutacao: List[Escala] = _clona_escala_sobreaviso(individual)
    
    for _ in range(numero_trocas):
        # sorteia o indice da troca sem sobreposição
        idx_troca: int = random.randint(0,len(escalas_otimizacao)-2)
        if idx_troca % 2 != 0: idx_troca+=1
        while idx_troca in indices_ja_trocados:
            idx_troca = random.randint(0,len(escalas_otimizacao)-2)
            if idx_troca % 2 != 0: idx_troca+=1
        indices_ja_trocados.append(idx_troca)

        idx_escala: int = escalas_otimizacao[idx_troca]
        dt_escala: date = escala_sobreaviso_mutacao[idx_escala].inicio.date()

        lista_funcionarios_bloqueados_pela_escala: List[int] = _get_funcionarios_bloqueados_pela_escala(escala_sobreaviso_mutacao, idx_escala)
        funcionarios_livres_na_data: List[Funcionario] = _get_funcionarios_livres_para_escala(dt_escala, 
                                                                                              funcionarios_ignorar=lista_funcionarios_bloqueados_pela_escala)
        # caso não tenha outra opção para a data
        if len(funcionarios_livres_na_data) == 0:
            continue

        idx_rand:int = random.randint(0, len(funcionarios_livres_na_data) - 1)
        resp: Funcionario = funcionarios_livres_na_data[idx_rand]

        individual[idx_escala].funcionario = resp
        individual[idx_escala + 1].funcionario = resp
        
    return individual,


#### Cruzamento

In [18]:
def crossover(ind1: List[Escala] ,ind2: List[Escala], **kwargs) -> Tuple[List[Escala], List[Escala]]:
    '''
        Implementado Single Point Crossover - https://www.geeksforgeeks.org/crossover-in-genetic-algorithm/#:~:text=Crossover%20is%20a%20genetic%20operator,depends%20on%20the%20Encoding%20Method.
        Em alguns casos pode gerar individuos iguais, já que as trocas podem se tornar simétricas, nesse cenário forço uma troca randomica para diferenciá-los

        Args:
            ind1, ind2 (List[Escala] ou creator.Individual): individuos para o cruzamento, teste local recebe lista, mas rodand pelo deap receberá um individuo
        Returns:
            Duas listas de escalas contendo o cruzamento entre os individuos
    '''

    # lista de indices com escalas para otimizacao
    escalas_para_cruzamento: List[Escala] = _get_indice_entradas_para_otimizacao(ind1)
    numero_escalas_para_otimizacao:int = len(escalas_para_cruzamento)
    numero_trocas: int = int(numero_escalas_para_otimizacao / 2) #numero_escalas deve sempre ser par
    
    # para forçar diferenciar os gêmeos, quando o cruzamento gera individuos com trocas simetricas
    # indice onde foi realizada a troca e o funcionario ANTERIOR
    lista_trocas_ind1: List[Tuple[int,Funcionario]] = []
    lista_trocas_ind2: List[Tuple[int,Funcionario]] = []
    
    #clona os individuos para não contaminar os parentes
    clone1:List[Escala] = _clona_escala_sobreaviso(ind1)
    clone2:List[Escala] = _clona_escala_sobreaviso(ind2)

    ponto_cruzamento: int = random.randint(1, numero_trocas)

    for idx_troca in range(numero_trocas):
        pos_escala: int = escalas_para_cruzamento[idx_troca * 2]

        if (idx_troca <= ponto_cruzamento):
            lista_trocas_ind2.append((pos_escala, ind2[pos_escala].funcionario))
            if not _troca_responsavel_se_possivel(origem=clone1, destino=ind2, pos_troca=pos_escala):
                del lista_trocas_ind2[-1]
        else:
           lista_trocas_ind1.append((pos_escala, ind1[pos_escala].funcionario))
           if not _troca_responsavel_se_possivel(origem=clone2, destino=ind1, pos_troca=pos_escala):
                del lista_trocas_ind1[-1]
    
    # Tenta diferenciar gemos 
    if sao_individuos_iguais(ind1, ind2):
        tem_troca: bool = False
        lista_trocas_realizadas: List[Tuple[int,Escala]] = None
        lista_para_troca: List[Escala] = None
        

        if len(lista_trocas_ind2) > 0:
            lista_trocas_realizadas = lista_trocas_ind2
            lista_para_troca = ind2
            tem_troca = True
        elif len(lista_trocas_ind1) > 0:  
            lista_trocas_realizadas = lista_trocas_ind1
            lista_para_troca = ind1
            tem_troca = True
          
        if tem_troca:
            num_trocas: int = len(lista_trocas_realizadas)
            if (num_trocas == 0):
                print('não foi possível diferenciar gemeos')
            else:        
                indice_diff_gemeos: int = 0
                if num_trocas > 1:
                    indice_diff_gemeos: int = random.randint(0, num_trocas - 1)
                indice_escala_retornada, resp_retornado = lista_trocas_realizadas[indice_diff_gemeos][0], lista_trocas_realizadas[indice_diff_gemeos][1]
                _troca_responsavel_escala(escala_sobreaviso=lista_para_troca, responsavel=resp_retornado, pos_troca=indice_escala_retornada)                

    return ind1, ind2


### Testes GA
Teste com as funções que serão utilizadas pela DEAP, para garantir o funcionamento

In [19]:
# Geração de um individuo
escala_gerada = gerador_individuo(None, random.randint)
print('Escala gerada é válida?', funcao_restricao(escala_gerada))
salva_individuo('gerada.csv',escala_gerada)
print('Primeiro Individuo')
mede_justica_escala(escala_gerada)

Escala gerada é válida? True
Primeiro Individuo


84.44266742371778

In [20]:
# Mutação
escala_para_mutacao = _clona_escala_sobreaviso(escala_gerada)
mutacao, = mutate(escala_para_mutacao)
print('Escala gerada por mutação é válida?', funcao_restricao(mutacao))
salva_individuo('mutacao.csv', mutacao)
print('Mutação')
mede_justica_escala(mutacao)

Escala gerada por mutação é válida? True
Mutação


91.40049122361606

In [21]:
#cruzamento

escala_para_cruzamento1 = _clona_escala_sobreaviso(escala_gerada)
escala_para_cruzamento2 = _clona_escala_sobreaviso(mutacao)

cruzamento1, cruzamento2 = crossover(escala_para_cruzamento1, escala_para_cruzamento2)
print('Primeira escala gerada por cruzamento é válida?', funcao_restricao(cruzamento1))
print('Segunda escala gerada por cruzamento é válida?', funcao_restricao(cruzamento2))
salva_individuo('cruzamento1.csv', cruzamento1)
salva_individuo('cruzamento2.csv', cruzamento2)
print('Cruzamento 1 \n',mede_justica_escala(cruzamento1), '\n\n')
print('Cruzamento 2 \n',mede_justica_escala(cruzamento2), '\n\n')

Primeira escala gerada por cruzamento é válida? True
Segunda escala gerada por cruzamento é válida? True
Cruzamento 1 
 94.8037586441654 


Cruzamento 2 
 93.70100515654507 




### Utilizando Biblioteca DEAP

In [22]:
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)

In [23]:
toolbox = base.Toolbox()
# Definir o gerador de numeros aleatórios de numeros inteiros entre o intervalo (0 e 50)
toolbox.register("random_int", random.randint, 0, 430)
# Inicialização do cromossomo (quantos genes o cromossomo deve possuir)
toolbox.register("individual", gerador_individuo, creator.Individual, toolbox.random_int)
# Registro do individuo na população
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
# Registro do nome da função objetivo
toolbox.register("evaluate", mede_justica_escala)
# Registro de qual o tipo de cruzamento deve ser utilizado (cruzamento de 2 pontos)
toolbox.register("mate", crossover, icls=creator.Individual)
# toolbox.register("mate", CrossoverFunction, icls=creator.Individual, attr_bool_function=toolbox.attr_bool)
# Registro de qual tipo de mutação deve ser utilizado (probabilidade de um individuo sofrer mutação)
toolbox.register("mutate", mutate, indpb=0.3)
# toolbox.register("mutate", MutationFunction, indpb=0.1)
# Registro de qual o tipo do método de seleção que será utilizado
toolbox.register("select", tools.selTournament, tournsize=10)

In [25]:
# definição da população inicial
population = toolbox.population(n=50)
# quantidade de gerações
NGEN=40
bestIndEachGeneration = []
bestInd = []
for gen in range(NGEN):
  print("Geracao: ", gen)
  offspring = algorithms.varAnd(population, toolbox, cxpb=0.5, mutpb=0.1)
  fits = list(toolbox.map(toolbox.evaluate, offspring))
  
  indexes = range(len(offspring))
  indexes = [x for _,x in sorted(zip(fits, indexes))]
  fits = [fits[x] for x in indexes]
  offspring = [offspring[x] for x in indexes]
  #print(offspring)
  #print(fits)
  for fit, ind in zip(fits, offspring):
    ind.fitness.wvalues = fit
  elites_ind = offspring[:max(1, round(0.15*len(offspring)))]
  population = toolbox.select(offspring, len(population)-len(elites_ind))
  population.extend(elites_ind)
  bestIndEachGeneration.append([offspring[fits.index(min(fits))], min(fits)])
  bestInd.append(min(bestIndEachGeneration, key=lambda t:t[1])[1])
  print(" Melhor Avaliação dessa geração: ", bestIndEachGeneration[-1][1], "\n", 
        "Melhor Avaliação Geral", min(bestIndEachGeneration, key=lambda t:t[1])[1], "\n",
        "Da geração: ", bestIndEachGeneration.index(min(bestIndEachGeneration, key=lambda t:t[1])), "\n" #,
        #"Cromossomo: ", min(bestIndEachGeneration, key=lambda t:t[1])[0], "\n"
       )


Geracao:  0
 Melhor Avaliação dessa geração:  66.15191236996898 
 Melhor Avaliação Geral 66.15191236996898 
 Da geração:  0 

Geracao:  1
 Melhor Avaliação dessa geração:  63.522671511188385 
 Melhor Avaliação Geral 63.522671511188385 
 Da geração:  1 

Geracao:  2
 Melhor Avaliação dessa geração:  63.522671511188385 
 Melhor Avaliação Geral 63.522671511188385 
 Da geração:  1 

Geracao:  3
 Melhor Avaliação dessa geração:  63.522671511188385 
 Melhor Avaliação Geral 63.522671511188385 
 Da geração:  1 

Geracao:  4
 Melhor Avaliação dessa geração:  63.522671511188385 
 Melhor Avaliação Geral 63.522671511188385 
 Da geração:  1 

Geracao:  5
 Melhor Avaliação dessa geração:  63.10699369373569 
 Melhor Avaliação Geral 63.10699369373569 
 Da geração:  5 

Geracao:  6
 Melhor Avaliação dessa geração:  63.10699369373569 
 Melhor Avaliação Geral 63.10699369373569 
 Da geração:  5 

Geracao:  7
 Melhor Avaliação dessa geração:  63.10699369373569 
 Melhor Avaliação Geral 63.10699369373569 
 D

In [26]:
best = min(bestIndEachGeneration, key=lambda t:t[1])[0]
mede_justica_escala(best, debug_enabled=True)

Funcionario 11 - 586.799999999999
Funcionario 14 - 580.4999999999995
Funcionario 09 - 530.0999999999993
Funcionario 01 - 435.29999999999956
Funcionario 07 - 552.5999999999993
Funcionario 12 - 612.2999999999989
Funcionario 18 - 572.099999999999


53.66244839128123

In [None]:
salva_individuo('best-escala-2022.csv', best)