In [None]:
from ortools.sat.python import cp_model

import pandas as pd
data_path='shifts_workers.xlsx'

def prepare_data(data_path:str):
    time_converter=lambda t:t.hour * 60 + t.minute
    minutes_per_day=60*24
    with pd.ExcelFile(data_path) as data:
        periods=data.parse('demand')
        workers_info=data.parse('workers') 
        shift_types=data.parse('shift_types')
      
    periods['shift_start']=periods['time'].apply(time_converter)
    periods=periods.sort_values(by=['date','shift_start'],ascending=[True,True]).reset_index(drop=True)
    periods['shift_end']=periods['shift_start'].shift(-1)
    
    shift_types['time_start']=shift_types['Time_start'].apply(time_converter)
    shift_types['time_end']=shift_types['Time_end'].apply(time_converter)
    return periods[:-1],shift_types,workers_info,minutes_per_day
periods,shift_types,workers_info,day_value=prepare_data(data_path)

# Инициализация модели
model = cp_model.CpModel()
shifts = {}# Словарь для хранения ссылок на переменные
for period_id in periods.index:
    for shift_type in shift_types.index:
        for worker_id in workers_info.index:
            shifts[(period_id,shift_type,worker_id)]=model.NewBoolVar(f'worker_{worker_id}_shift_type_{shift_type}_period_id_{period_id}')
            
def worker_available(shift_type,period_id)->bool:
    return True

for period_id in periods.index:
    available_workers=[]
    for worker_id  in workers_info.index:  
        for shift_type_id in shift_types.index:
            if worker_available(shift_type,period_id):
                available_workers.append(shifts[(period_id,shift_type_id,worker_id)])
    model.Add(sum(available_workers)>= periods.loc[period_id].needed)
                       
#один сотрудник на график
for worker_id  in workers_info.index:
    shifts_used=[model.NewBoolVar(f'shift{shift_type_id}_used_worker_{worker_id}') for shift_type_id in shift_types.index ]
    for i,shift_type_id in enumerate(shift_types.index) :
        _shifts=[shifts[(period_id,shift_type_id,worker_id)] for period_id in periods.index]
        model.Add(sum(_shifts)==0).OnlyEnforceIf(shifts_used[i].Not())
        model.Add(sum(_shifts)>0).OnlyEnforceIf(shifts_used[i])

 
    model.Add(sum(shifts_used) <= 1)        

In [None]:
model.Minimize(sum(
            shifts[(period_id,shift_type,worker_id)]*workers_info.loc[worker_id].price
            for period_id in periods.index
            for worker_id in workers_info.index
            for shift_type in shift_types.index
        )
    )
# Инициализация solver
solver = cp_model.CpSolver()
#solver.parameters.log_search_progress = True
# Решение задачи - та самая одна строчка
status = solver.Solve(model)


def solver_result(solver)->list:
    res=[]
    for key, value in shifts.items():
        solution_value = solver.Value(value)  # Достаем значение переменной
        if solution_value > 0:  # Собираем только те переменные, которые имеют значение 1
            res.append(key)
    return res
def report(solution_list:list):
    print(len(solution_list))
    df=pd.DataFrame(solution_list,columns=('period_id','shift_type','worker_id'))
    df=periods[['date','time']].merge(df,left_index=True, right_on='period_id')
    df=df.merge(shift_types[['Name','Schedules','Price_k']],right_index=True, left_on='shift_type')

    df=df.merge(workers_info,right_index=True, left_on='worker_id').sort_values(by='period_id').reset_index(drop=True)
    return df
r=report(solver_result(solver))

In [None]:
periods

# первая версия

In [24]:
from ortools.sat.python import cp_model

import pandas as pd
data_path='shifts_workers.xlsx'

def split_intervals(periods,extra_period_intervals):
    r=[]
    for i,row in periods.iterrows():
        #row=row.to_dict()
        shift_start,shift_end=row['shift_start'],row['shift_end']
        extra_points=set()
        for extra_interval in extra_period_intervals:

            if extra_interval >shift_start and extra_interval < shift_end:
                extra_points.add(extra_interval)
        if len(extra_points)>0:
            extra_points_list=sorted([shift_start,shift_end,*list(extra_points)])
            for i in range(len(extra_points_list)-1):
                row['shift_start']=extra_points_list[i]
                row['shift_end']=extra_points_list[i+1]
                r.append(row.copy())
        else:
            r.append(row.copy())
    return pd.DataFrame(r).sort_values(by=['date','shift_start'],ascending=[True,True]).reset_index(drop=True)
    
def prepare_data(data_path:str):
    time_converter=lambda t:t.hour * 60 + t.minute
    minutes_per_day=60*24
    with pd.ExcelFile(data_path) as data:
        periods=data.parse('demand')
        workers_info=data.parse('workers') 
        shift_types=data.parse('shift_types')
      
    periods['shift_start']=periods['time'].apply(time_converter)
    periods=periods.sort_values(by=['date','shift_start'],ascending=[True,True]).reset_index(drop=True)
    periods['shift_end']=periods['shift_start'].apply(lambda t:24*60 if t==0 else t).shift(-1)
    periods['weekend']=periods['date'].apply(lambda t:False if t.weekday()<5 else True)
    
    shift_types['time_start']=shift_types['Time_start'].apply(time_converter)
    shift_types['time_end']=shift_types['Time_end'].apply(time_converter)
    
    extra_period_intervals=set(shift_types['time_start'].values.tolist()+shift_types['time_end'].values.tolist())
   
    return split_intervals(periods,extra_period_intervals)[:-1],shift_types,workers_info,minutes_per_day
    
periods,shift_types,workers_info,day_value=prepare_data(data_path)

# Инициализация модели
model = cp_model.CpModel()
shifts = {}# Словарь для хранения ссылок на смены


for worker_id in workers_info.index:
    shift_types_per_worker=[]
    for shift_type in shift_types.index:
        shifts[(shift_type,worker_id)]=model.NewBoolVar(f'worker_{worker_id}shift_type_{shift_type}')
        shift_types_per_worker.append(shifts[(shift_type,worker_id)])
    model.Add(sum(shift_types_per_worker) <= 1)   #ограничение 1 сотрудник на смену

def worker_available(shift_type_info:dict,period_info:dict)->bool:
    time_start,time_end=shift_type_info['time_start'],shift_type_info['time_end']
    
    if period_info['weekend'] is True and shift_type_info['weekend_available'] is not True:
        return False
        
    
    
    if time_start>time_end:
        time_intervals=[(time_start,60*24),(0,time_end)]
    else:
        time_intervals=[(time_start,time_end)]
    for time_interval in time_intervals:
        if time_interval[0]<=period_info['shift_start'] and time_interval[1]>=period_info['shift_end']:
            return True
    return False
    
for period_id in periods.index:
    period_info=periods.loc[period_id]
    available_workers=[]
    for worker_id  in workers_info.index: 
        for shift_type in shift_types.index:
            shift_type_info=shift_types.loc[shift_type]
            if worker_available(shift_type_info,period_info) :
                available_workers.append(shifts[(shift_type,worker_id)])
    if len(available_workers)==0:
        print('cant fill demand')
    model.Add(sum(available_workers)>=period_info.needed)
"""     
model.Minimize(sum(
            shifts[(shift_type,worker_id)]*workers_info.loc[worker_id].price
            for period_id in periods.index
            for worker_id in workers_info.index
            for shift_type in shift_types.index
        )
    )   
"""  
# Инициализация solver
solver = cp_model.CpSolver()
#solver.parameters.log_search_progress = True
# Решение задачи - та самая одна строчка
status = solver.Solve(model)


def solver_result(solver)->list:
    res=[]
    for key, value in shifts.items():
        solution_value = solver.Value(value)  # Достаем значение переменной
        if solution_value > 0:  # Собираем только те переменные, которые имеют значение 1
            res.append(key)
    return res
"""
def report(solution_list:list):
    print(len(solution_list))
    df=pd.DataFrame(solution_list,columns=('period_id','shift_type','worker_id'))
    df=periods[['date','time']].merge(df,left_index=True, right_on='period_id')
    df=df.merge(shift_types[['Name','Schedules','Price_k']],right_index=True, left_on='shift_type')

    df=df.merge(workers_info,right_index=True, left_on='worker_id').sort_values(by='period_id').reset_index(drop=True)
    return df
    
r=report(solver_result(solver))
"""
solver_result(solver)

[(0, 0),
 (0, 1),
 (0, 2),
 (0, 3),
 (1, 4),
 (1, 5),
 (1, 6),
 (1, 7),
 (1, 8),
 (0, 9),
 (0, 10),
 (0, 11),
 (0, 12),
 (0, 13),
 (0, 14)]

In [27]:
def report(solution_list:list):
    print(len(solution_list))
    df=pd.DataFrame(solution_list,columns=('shift_type','worker_id'))
    return df
    df=periods[['date','time']].merge(df,left_index=True, right_on='period_id')
    df=df.merge(shift_types[['Name','Schedules','Price_k']],right_index=True, left_on='shift_type')

    df=df.merge(workers_info,right_index=True, left_on='worker_id').sort_values(by='period_id').reset_index(drop=True)
    return df
    
report(solver_result(solver))

15


Unnamed: 0,shift_type,worker_id
0,0,0
1,0,1
2,0,2
3,0,3
4,1,4
5,1,5
6,1,6
7,1,7
8,1,8
9,0,9


In [5]:
[(0, 0),
 (0, 1),
 (0, 2),
 (0, 3),
 (1, 4),
 (1, 5),
 (1, 6),
 (1, 7),
 (1, 8),
 (0, 9),
 (0, 10),
 (0, 11),
 (0, 12),
 (0, 13),
 (0, 14)]

Unnamed: 0,date,time,needed,shift_start,shift_end,weekend
0,2024-01-01,00:00:00,2,0,480.0,False
1,2024-01-01,00:00:00,2,480,540.0,False
2,2024-01-01,00:00:00,2,540,660.0,False
3,2024-01-01,00:00:00,2,660,720.0,False
4,2024-01-01,12:00:00,3,720,1080.0,False
5,2024-01-01,12:00:00,3,1080,1200.0,False
6,2024-01-01,12:00:00,3,1200,1440.0,False
7,2024-01-02,00:00:00,2,0,480.0,False
8,2024-01-02,00:00:00,2,480,540.0,False
9,2024-01-02,00:00:00,2,540,660.0,False


In [17]:
1.05**20

2.653297705144422

In [None]:
periods

In [None]:
set([1,2])+set([11,2])

In [None]:

# Проверяем статус
if status == cp_model.FEASIBLE:
  print('Найдено решение')
# Извлекаем результат (решение)
result = {}
for key, val in shifts.items():
  sol = solver.Value(val)  # Достаем значение переменной
  if sol > 0:  # Собираем только те переменные, которые имеют значение 1
    result[key] = 1
result

In [None]:
1/0

In [None]:
model.Validate()

In [None]:
status==cp_model.OPTIMAL

In [None]:
# Инициализируем множества/списки
K = ["Klinika1", "Klinika2", "Klinika3"]  # Список клиник в сети
E = ["Expert1", "Expert2", "Expert3", "Expert4", "Expert5"]  # Список экспертов
T = ["Smena" + str(t) for t in range(1, 8)]  # Список смен

trudovoy_kodex = 5  # Константа, ограничение кол-ва рабочих смен у эксперта
min_experts_per_clinic = 2  # Минимальное кол-во экспертов, привязанных к клинике

In [None]:
# Импорт "редактора" для записи ЗУО
from ortools.sat.python import cp_model
import pandas as pd

# Инициализация модели
model = cp_model.CpModel()
     

In [None]:
# Инициализация переменных комбинации
X = {}  # Словарь для хранения ссылок на переменные

for k in K:
  for e in E:
    for t in T:
      var_name = f"x_{k}_{e}_{t}"  # Название переменной
      X[k, e, t] = model.NewBoolVar(name=var_name)

print(f"Кол-во переменных комбинаций: {len(X)}")
print(f"Пример переменной в формате (ключ, переменная): {list(X.items())[0]}")

# Инициализация переменных-индикаторов
Y = {}  # Словарь для хранения ссылок на переменные-индикаторы

for k in K:
  for e in E:
    var_name = f"y_{k}_{e}"  # Название переменной-индикатора
    Y[k, e] = model.NewBoolVar(name=var_name)

In [None]:
# Ограничение: один эксперт в каждой клинике каждый день
for k in K:  # для каждой клиники
  for t in T:   # для каждого дня недели

    # Список экспертов для работы в клинике "k" в смену "t"
    lst_vars = [X[k, e, t] for e in E]  

    # Добавление ограничений в модель: ровно один эксперт в клинике в смену
    model.AddExactlyOne(lst_vars) 
    # model.Add(sum(lst_vars) == 1)  # Альтернативный способ добавления ограничения

In [None]:
# Ограничение: Каждый эксперт может работать только в одной клинике в смену.
for e in E:  # для каждого эксперта
  for t in T:   # для каждой смены

    # Список клиник для работы эксперта "e" в смену "t"
    lst_vars = [X[k, e, t] for k in K]  

    # Добавление ограничений в модель: у эксперта не более одной клиники в смену
    model.AddAtMostOne(lst_vars) 
    # model.Add(sum(lst_vars) <= 1)  # Альтернативный способ добавления ограничения

In [None]:
# Ограничение (вспомогательное): индикатор работы эксперта "e" в клинике "k"
for k in K:
  for e in E:

    # Список смен для работы в клинике "k" и эксперта "e"
    lst_vars = [X[k, e, t] for t in T]  

    # Добавление ограничений в модель: индикатор работы эксперта "e" в клинике "k"
    model.AddMaxEquality(Y[k, e], lst_vars)
    # model.Add(sum(lst_vars) >= Y[k, e])  # Нижняя граница
    # model.Add(sum(lst_vars) <= expert_shift_lim * Y[k, e])  # Верхняя граница

In [None]:
# Инициализация solver
solver = cp_model.CpSolver()

# Решение задачи - та самая одна строчка
status = solver.Solve(model)

# Проверяем статус
if status == cp_model.FEASIBLE:
  print('Найдено решение')

In [None]:
# Извлекаем результат (решение)
result = {}
for key, val in X.items():
  sol = solver.Value(val)  # Достаем значение переменной
  if sol > 0:  # Собираем только те переменные, которые имеют значение 1
    result[key] = 1

# Извлекаем значения переменных индикаторов
resulty = {}
for key, val in Y.items():
  sol = solver.Value(val)
  if sol > 0:
    resulty[key] = 1

print(resulty)
result

In [None]:
import pandas as pd

# Переводим словарь результата в табличный вид
df_tmp = pd.DataFrame(list(result.keys()), columns=["Клиники", "Эксперты", "shift"])
df_tmp = df_tmp.sort_values("Эксперты")

# Извлекаем номер смены в виде целого числа
df_tmp["shift_int"] = df_tmp["shift"].apply(lambda x: int(x[-1]))