In [36]:
import pandas as pd
import sys
import math
import random
import plotly.figure_factory as ff
from collections import defaultdict
from sympy import Union
from sympy import Interval as SympyInterval

In [2]:
# Активности
class Activity:
    def __init__(self, act_id, volume, type, direction=None, name=None):
        self.act_id = act_id
        self.volume = volume
        self.type = type
        self.direction = direction
        self.name = name
        self.label = f"{self.type}_{act_id}" 
        self.MACHINES = []  # Список машин с производительностями
        self.INTERVALS = [] # Список интервалов
        

    def add_machine(self, machine):
        self.MACHINES.append(machine)
        
    def add_interval(self, interval):
        self.INTERVALS.append(interval)
        
        
    def fill_intervals_by_number(self):
        self.interval_by_num = defaultdict(int, {number: interval for interval in self.INTERVALS})

    def __repr__(self):
        return (f"ACTIVITY(act_id={self.act_id}, name={self.name}, "
                f"volume={self.volume}, type={self.type}, "
                f"direction={self.direction},"
                f"MACHINES={str(self.MACHINES)}")
    
    def __eq__(self, other):
        return self.act_id == other.act_id and self.type == other.type

    def __hash__(self):
        return hash((self.act_id, self.type))
    
    
# Машины с производительностями
class Machine:
    def __init__(self, machine_id, power, type=None, name=None):
        self.machine_id = machine_id
        self.type = type
        self.power = power
        self.name = name
        self.label = f"Машина_{machine_id}" 
        self.ACTIVITIES = []  # Список активностей
        self.INTERVALS = set() # Список интервалов
        
    def add_activity(self, act):
        self.ACTIVITIES.append(act)
        
    def add_interval(self, interval):
        self.INTERVALS.add(interval)
        
    def __repr__(self):
        return (f"MACHINE(machine_id={self.machine_id}, name={self.name}, "
                f"power={self.power}, type={self.type})")
    
    def __eq__(self, other):
        return self.machine_id == other.machine_id

    def __hash__(self):
        return hash(self.machine_id)
    
# Интервалы
class Interval:
    def __init__(self, start, end, duration=None, number=None):
        self.start = start
        self.end = end
        self.duration = duration
        self.number = number
        self.ACTIVITIES = []  # Список активностей
        self.MACHINES = []  # Список машин с производительностями
        
    def add_activity(self, act):
        self.ACTIVITIES.append(act)
    
    def add_machine(self, machine):
        self.MACHINES.append(machine)
        
    def __eq__(self, other):
        return self.number == other.number

    def __hash__(self):
        return hash(self.number)

In [3]:
class ModelConfig:
    
    # Дата начала расчета (пока число)  
    start_date = 1
    
    # Кол-во активностей для генерации
    act_amt = 10
    
    # Кол-во машин для генерации
    machine_amt = 10


In [4]:
class ModelInput:
    def __init__(
            self,
            model_config: ModelConfig
    ):
        print('Подготовка входных данных модели')
        self.config: ModelConfig = model_config
            
        self.generate_data()
        
        self.calc_interval()
        self.generate_assignments()
        self.calc_params()
        
        
    # Генерация данных
    def generate_data(self):
        self.ACTIVITIES = []
        for i in range(self.config.act_amt):
            self.ACTIVITIES.append(Activity(i, 1000, 'Бурение'))

        self.MACHINES = []
        for i in range(self.config.machine_amt):
            self.MACHINES.append(Machine(i, 100 + 50*i))
        
        for i, act in enumerate(self.ACTIVITIES):
            for j, machine in enumerate(self.MACHINES):
#                 if i % 2 == 0 and machine.machine_id % 2 == 0:
#                     act.add_machine(machine)
#                 elif i % 2 != 0 and machine.machine_id % 2 != 0:
#                     act.add_machine(machine)
#                 else:
#                     pass
                if i!= 4 and i == j:
                    act.add_machine(machine)
                    machine.add_activity(act)
                if i == 4 and j == 5:
                    act.add_machine(machine)
                    machine.add_activity(act)
                if i == 3 and j == 8:
                    act.add_machine(machine)
                    machine.add_activity(act)
        
                
    # Расчет максимального времени выполнения активности
    def calc_interval(self):
        for act in self.ACTIVITIES:
            # Считаем, что производительность задана в одинаковых приведенных единицах
            act.min_power = min(machine.power for machine in act.MACHINES)
            act.max_duration = math.ceil(act.volume / act.min_power)

        self.max_date = self.config.start_date + sum(act.max_duration for act in self.ACTIVITIES)
        
        for act in self.ACTIVITIES:
#             act.INTERVALS = [Interval(num, num+1, 1, num) for num in range(self.config.start_date, self.max_date+1)]
            for num in range(self.config.start_date, self.max_date+1):
                interval = Interval(num, num+1, 1, num)
                act.add_interval(interval)
                for machine in act.MACHINES:
                    machine.add_interval(interval)
#             act.fill_intervals_by_number()

        
    
            
                
    # Назначения машин на активности в интервал времени
    def generate_assignments(self):
        self.ASSIGNMENTS = [(act, machine, interval)
                            for act in self.ACTIVITIES
                            for machine in act.MACHINES
                            for interval in act.INTERVALS]
        
    
    # Расчет необходимых показателей для оптимизации
    def calc_params(self):
        
        # Суммарный объем по всем активностям
        self.sum_volume = sum(act.volume for act in self.ACTIVITIES)
        
        # Объединение интервалов
#         self.ALL_INTERVALS = list(set())
        

        
        
        

                    
                    
                

In [38]:
a = Activity(1, 1, 1, 1)
a.__repr__

# B = Machine(1,1,1)
# B.__repr__

# class 
# # Генерация данных
# def generate_data(act_amt: int, machine_amt: int):
#     ACTIVITIES = []
#     for i in range(act_amt):
#         ACTIVITIES.append(Activity(i, 1, 100*(i+1), 1,1))
        
#     MACHINES = []
#     for i in range(act_amt):
#         ACTIVITIES.append(Activity(i, 1, 100*(i+1), 1,1))

<bound method Activity.__repr__ of ACTIVITY(act_id=1, name=None, volume=1, type=1, direction=1>

In [5]:
# Сбор ограничения
def constraints_from_dict(cons, model, prefix):
    if type(cons) is dict:
        if not cons:
            return
        def rule(model, *k):
            if len(k) == 1:
                k = k[0]
            ret = cons[k]
            if ret is True:
                return Constraint.Feasible
            return ret
        result = Constraint(cons.keys(), rule=rule)
        setattr(model, prefix, result)
    else:
        result = Constraint(expr=cons)
        setattr(model, prefix, result)

# Получение подмножества
def setof(indices, full_set):
    return set([it[indices] for it in full_set])

In [138]:
# from pyomo.environ import Var, Binary, Objective, quicksum, minimize, SolverFactory, ConcreteModel
# from pyomo.core import value
from pyomo.environ import *
import pyomo.environ as pyo

class Model:
    def __init__(self, input: ModelInput) -> None:
        self.model = ConcreteModel()
        self.input = input

        self.output = ModelOutput(
            self.model,
            self.input,
        )

        self.create_model()

    def create_model(self):
        print('Подготовка модели')

        ## Переменные
        # Доля и индикатор назначения машины на активность в интервал t
        self.model.assignment = Var(self.input.ASSIGNMENTS, domain=NonNegativeReals, bounds=(0, 1), initialize=0)
        self.model.assignment_bin = Var(self.input.ASSIGNMENTS, domain=Binary, initialize=0)
        
        
        # Отклонение машины на другую активность
        machine_diff_index = [(act, machine, interval) for machine in self.input.MACHINES
                                      for interval in machine.INTERVALS
                                      for act in machine.ACTIVITIES
                                      if len(machine.ACTIVITIES) >= 2]
        self.model.machine_diff = Var(machine_diff_index, domain=NonNegativeReals, bounds=(0, 1), initialize=0)

        
        # Накопленный невыполненный объем в интервал t
        self.model.running_undone_volume = dict()
        for act in self.input.ACTIVITIES:
            # Сохраняем пройденные интервалы для ускорения расчета
            PREV_INTERVALS = list()
            for interval in act.INTERVALS:
                PREV_INTERVALS.append(interval)
                
                self.model.running_undone_volume[act, interval] = 1 - \
                quicksum(
                self.model.assignment[act, machine, prev_interval] * machine.power * interval.duration
                    for machine in act.MACHINES
                    for prev_interval in PREV_INTERVALS 
                ) / act.volume
                

        ## Ограничения

        # Выполнение всего объема активностей
        cons_act_volume = {}
        for act in self.input.ACTIVITIES:
            cons_act_volume[act] = (
                quicksum(
                    self.model.assignment[act, machine, interval] * machine.power * interval.duration
                    for machine in act.MACHINES
                    for interval in act.INTERVALS 
                )
                ==
                act.volume
            )
            
        constraints_from_dict(cons_act_volume, self.model, 'cons_act_volume')
        
        # В каждый момент времени на активность назначается только одна машина
        cons_one_machine_for_act_t = {}
        for act in self.input.ACTIVITIES:
            for interval in act.INTERVALS:
                cons_one_machine_for_act_t[act, interval] = (
                quicksum(
                    self.model.assignment[act, machine, interval]
                    for machine in act.MACHINES
                )
                <= 1)
        
        constraints_from_dict(cons_one_machine_for_act_t, self.model, 'cons_one_machine_for_act_t')
        
        # В каждый момент времени машина назначается только на одну активность
        cons_one_act_for_machine_t = {}
        for machine in self.input.MACHINES:
#             for interval in sorted(list(machine.INTERVALS), key=lambda interval: interval.start): 
            for interval in machine.INTERVALS:
                cons_one_act_for_machine_t[machine, interval] = (
                quicksum(
                    self.model.assignment[act, machine, interval]
                    for act in machine.ACTIVITIES)
                <= 1)
                
        constraints_from_dict(cons_one_act_for_machine_t, self.model, 'cons_one_act_for_machine_t')
        
        # Отклонение машины на другую активность
        cons_abs_deviation_machine = {}
        for machine in self.input.MACHINES:
            if len(machine.ACTIVITIES) >= 2:
                prev_interval = Interval(-1, -1, -1, -1)
                for interval in sorted(list(machine.INTERVALS), key=lambda interval: interval.start):
                    if prev_interval != Interval(-1, -1, -1, -1):
                        for act in machine.ACTIVITIES:
                            cons_abs_deviation_machine[act,machine,interval,1] = (
                                self.model.machine_diff[act,machine,interval] >= 
                                self.model.assignment_bin[act, machine, interval] - 
                                self.model.assignment_bin[act, machine, prev_interval]
                            )
                            
                            cons_abs_deviation_machine[act,machine,interval,2] = (
                                self.model.machine_diff[act,machine,interval] >= 
                                self.model.assignment_bin[act, machine, prev_interval] - 
                                self.model.assignment_bin[act, machine, interval]
                            )
                    prev_interval = interval
                    
        constraints_from_dict(cons_abs_deviation_machine, self.model, 'cons_abs_deviation_machine')
                    
#         cons_abs_deviation_machine = {}
#         for machine in self.input.MACHINES:
#             if len(machine.ACTIVITIES) >= 2:
#                 for interval in sorted(list(machine.INTERVALS), key=lambda interval: interval.start):
#                     for act in machine.ACTIVITIES:
#                         cons_abs_deviation_machine[act,machine,interval,1] = (
#                             self.model.machine_diff[act,machine,interval] >= 
#                             self.model.assignment[act, machine, interval] - 
#                             0.5
#                         )

#                         cons_abs_deviation_machine[act,machine,interval,2] = (
#                             self.model.machine_diff[act,machine,interval] >= 
#                             0.5 - 
#                             self.model.assignment[act, machine, interval]
#                         )
#                 prev_interval = interval
                
#         constraints_from_dict(cons_abs_deviation_machine, self.model, 'cons_abs_deviation_machine')


        # Связь бинарных и вещественных переменных
        cons_assign_reals_bin_conn = {}
        for act,machine,interval in self.input.ASSIGNMENTS:
            if len(act.MACHINES) >= 2:
                cons_assign_reals_bin_conn[act,machine,interval,1] = (
                    self.model.assignment[act,machine,interval] 
                        <= 
                    self.model.assignment_bin[act, machine, interval])

                cons_assign_reals_bin_conn[act,machine,interval,2] = (
                    self.model.assignment[act,machine,interval] 
                        >= 
                    self.model.assignment_bin[act, machine, interval] * 5e-5)
                
        constraints_from_dict(cons_assign_reals_bin_conn, self.model, 'cons_assign_reals_bin_conn')
            
        


        ### Целевая функция 
        # Выполнение работ как можно раньше
        expr1 = quicksum(self.model.running_undone_volume[act, interval]
                        for act,interval in self.model.running_undone_volume.keys())
        
        # Минимизация переездов машин между активностями
        expr2 = quicksum(self.model.machine_diff[act,machine,interval]
                        for act,machine,interval in machine_diff_index)

        # Минимизация бинарных переменных
        expr3 = quicksum(self.model.assignment_bin[act,machine,interval]
                        for act,machine,interval in self.input.ASSIGNMENTS)

        # Минимизация времени выполнения работ
        self.model.obj = Objective(
            expr=(
                expr1 + expr2
                
#              expr=(quicksum(
#                     self.model.assignment[act, machine, interval] * interval.number
#                         for act in self.input.ACTIVITIES 
#                         for machine in act.MACHINES
#                         for interval in act.INTERVALS
#                 )
                
            )
            , sense=minimize
        )

        return

    def solve_model(self):
        print('Запуск оптимизации')

        # Solver = SolverFactory('cbc')

        # Solver.options['Threads' ] = 10
        # Solver.options['second' ] = 1800
        # Solver.options['allowableGap' ] = 0.02

        #solver = SolverFactory('appsi_highs')
        #solver.options['time_limit'] = 900
        #solver.options['mip_rel_gap'] = 0.02

        solver_name = 'appsi_highs'
        solver = SolverFactory(solver_name)
        # self.model.write('1.lp', io_options={'symbolic_solver_labels': True})
        if solver_name == 'appsi_highs':
            solver.options['time_limit'] = 3600
            solver.options['mip_rel_gap'] = 1e-5
            SolverResults = solver.solve(self.model, tee=True)
        else:
            solver.options['TimeLimit'] = 3600
            solver.options['Method'] = 3
            SolverResults = solver.solve(self.model, tee=True, warmstart=True)
            
            
        # Обработка статусов    
        if (SolverResults.solver.termination_condition == TerminationCondition.infeasible 
            or SolverResults.solver.termination_condition == TerminationCondition.infeasibleOrUnbounded 
            # or SolverResults.solver.termination_condition == TerminationCondition.aborted
            ):
            ResultStatus = 'Infeasible'

            # sys.exit("\n\n #opti-model-status NOT_FOUND \n\n")
        else:
            ResultStatus = 'OK'

        return       

   

In [135]:
class ModelOutput:
    def __init__(self, model: ConcreteModel, input: ModelInput):
        self.model = model
        self.input = input

        self.result_assignments_df = pd.DataFrame(
            columns=[
                'act',
                'machine',
                'Start',
                'Finish'
            ]
        )
        
        
        
        

    def transform_results(self):
        
        self.ROUNDING_CONST = 1e-4
        assignment_result = defaultdict(int, {(act,machine,interval): round(value(self.model.assignment[act,machine,interval]), 3)
                            for act,machine,interval in self.input.ASSIGNMENTS})
        
        # Соединение в крупные интервалы
        self.RESULT_GANTT = list()
        for act in self.input.ACTIVITIES:
            for machine in act.MACHINES:
                prev_interval = Interval(1,1,0,1)
                full_interval_flg = defaultdict(int)
                INTERVALS = list()
                for interval in act.INTERVALS:
                    if assignment_result[act,machine,interval] > 1 - self.ROUNDING_CONST:
                        full_interval_flg[act,machine,interval] = 1

                    if assignment_result[act,machine,interval] > self.ROUNDING_CONST:
                        if full_interval_flg[act,machine,prev_interval] == 1:
                            start = interval.start
                            end = interval.start + interval.duration*assignment_result[act,machine,interval]
                        else:
                            start = interval.end - interval.duration*assignment_result[act,machine,interval]
                            end = interval.end
                        INTERVALS.append(SympyInterval(start,end))
                    prev_interval = interval
                        
                        
#                 intervals = [SympyInterval(interval.start, interval.start+interval.duration*assignment_result[act,machine,interval]) 
#                              for interval in act.INTERVALS
#                              if assignment_result[act,machine,interval] > ROUNDING_CONST]
                INTERVALS.append(SympyInterval(0,0))
                u = Union(*INTERVALS)
                for subset in u.args:
                    if subset.measure > 0:
                        self.RESULT_GANTT.append((act,machine,Interval(float(subset.start), float(subset.end))))
#         self.RESULT_GANTT = list()
#         for act in self.input.ACTIVITIES:
#             for machine in act.MACHINES:
#                 intervals = [SympyInterval(interval.start, interval.end) for interval in act.INTERVALS
#                         if assignment_result[act,machine,interval] > 1e-4]
#                 u = Union(*intervals)
#                 if act.label == 'Бурение_1' and machine.label == 'Машина_1':
#                     print(u)
#                     sys.exit()
#                 start_interval, prev_interval = Interval(1,1,0,1), Interval(1,1,0,1)
#                 for interval in act.INTERVALS:
#                     if assignment_result[act,machine,interval] > 1e-4:
#                         if interval.number - prev_interval.number > 1:
#                             end = prev_interval.start + prev_interval.duration * assignment_result[act,machine,prev_interval]
#                             self.RESULT_GANTT.append((act,machine,Interval(start_interval.start, end)))
#                             start_interval = interval
#                         prev_interval = interval
                            
                
#                 end = prev_interval.start + prev_interval.duration * assignment_result[act,machine,prev_interval]
#                 self.RESULT_GANTT.append((act,machine,Interval(start_interval.start, end)))
            
                
        
        # Детальный аутпут
        assignment_result = [[act.label, machine.label, interval.start, interval.end, value(self.model.assignment[act,machine,interval])]
                                      for act in self.input.ACTIVITIES
                                      for machine in act.MACHINES
                                      for interval in act.INTERVALS
                                  if value(self.model.assignment[act,machine,interval]) > self.ROUNDING_CONST]
        self.result_detailed_df = pd.DataFrame(
            columns=[
                'act',
                'machine',
                'Start',
                'End',
                'Frac'
            ])
            
        self.result_detailed_df = pd.DataFrame(assignment_result, columns=self.result_detailed_df.columns)
        
                                            
                            
        

    def create_output(self):
        print('Подготовка выходных данных')
        
                            

        result_assignments = [
            [
                act.label,
                machine.label,
                interval.start,
                interval.end

            ]
            for act,machine,interval in self.RESULT_GANTT
        ]
        self.result_assignments_df = pd.DataFrame(result_assignments, columns=self.result_assignments_df.columns)
        
#         self.result_jobs_df = self.result_assignments_df.groupby(['act', 'machine']).agg(
#             Start=pd.NamedAgg(column="interval", aggfunc="min"),
#             Finish=pd.NamedAgg(column="interval", aggfunc="max")
#             ).reset_index()
#         self.result_jobs_df = self.result_assignments_df.copy()
        
    def plot_gantt(self):
        df = self.result_assignments_df.copy()
        df['Task'] = df['act'] + ' ' + df['machine'].astype(str)
        df['Complete'] = 100

        colors = dict()
        random.seed(42)
        for i, act in enumerate(self.input.ACTIVITIES):
            colors[act.label] = f"rgb({random.randint(0, 255)}, {random.randint(0, 255)}, {random.randint(0, 255)})"
            
        fig = ff.create_gantt(df, colors=colors, index_col='act', show_colorbar=True)
        fig.update_layout(xaxis_type='linear')
        fig.show()
        
    
    def print_input(self):
        print('Активности:')
        for act in self.input.ACTIVITIES:
            print(f"{act.label}: {act.volume}")
            print(f"{[f'{machine.label}: {machine.power}'for machine in act.MACHINES]}\n")
            
#         print('\nМашины:')
#         for machine in self.input.MACHINES:
#             print(f"{machine.label}: {machine.power}")


        

In [139]:
# model = Model()
# model.solve_model()
input = ModelInput(ModelConfig)
model = Model(input)
model.solve_model()
# output = ModelOutput(model, input)
# output.create_output()
model.output.transform_results()
model.output.create_output()
model.output.plot_gantt()
model.output.print_input()
# model.output.result_assignments_df.head()



Подготовка входных данных модели
Подготовка модели
Запуск оптимизации
Подготовка выходных данных


Активности:
Бурение_0: 1000
['Машина_0: 100']

Бурение_1: 1000
['Машина_1: 150']

Бурение_2: 1000
['Машина_2: 200']

Бурение_3: 1000
['Машина_3: 250', 'Машина_8: 500']

Бурение_4: 1000
['Машина_5: 350']

Бурение_5: 1000
['Машина_5: 350']

Бурение_6: 1000
['Машина_6: 400']

Бурение_7: 1000
['Машина_7: 450']

Бурение_8: 1000
['Машина_8: 500']

Бурение_9: 1000
['Машина_9: 550']



In [133]:
df = model.output.result_detailed_df.copy()
df[df['machine'].isin(['Машина_5'])].sort_values(by=['Start'])

Unnamed: 0,act,machine,Start,End,Frac
29,Бурение_5,Машина_5,1,2,1.0
30,Бурение_5,Машина_5,2,3,1.0
25,Бурение_4,Машина_5,3,4,0.142857
31,Бурение_5,Машина_5,3,4,0.857143
26,Бурение_4,Машина_5,4,5,1.0
27,Бурение_4,Машина_5,5,6,1.0
28,Бурение_4,Машина_5,6,7,0.714286


In [84]:
a = SympyInterval(0,0)
b = SympyInterval(1.5,2.5)
intervals = [*[a,b]]
u = Union(*intervals)


for subset in u.args:
    print(subset, subset.measure)
    if subset.measure>0:
        print(type(float(round(subset.start,4))))
# model.output.result_assignments_df



{0} 0
Interval(1.50000000000000, 2.50000000000000) 1.00000000000000
<class 'float'>


In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from pyomo.environ import value as get_value

def plot_gantt_chart(model, ops, machines, T_proc, d_due, mvar, b, op_project, tol=1e-6):
    # Формируем расписание по машинам и собираем данные для легенды.
    schedule = {r: [] for r in machines}
    for op in ops:
        assigned_machine = None
        for r in machines:
            var_val = mvar[(op, r)].varValue
            if var_val is None:
                var_val = 0
            if var_val > tol:
                assigned_machine = r
                break
        if assigned_machine is None:
            print(f"Операция {op} не назначена ни на одну машину!")
            continue
        start = b[op].varValue
        finish = start + T_proc[op]
        schedule[assigned_machine].append((op, start, finish, d_due[op]))
    
    for r in schedule:
        schedule[r].sort(key=lambda x: x[1])
    
    # Определяем уникальные проекты и создаём цветовую карту.
    unique_projects = sorted(set(op_project[op] for op in ops))
    cmap = plt.get_cmap('tab20', len(unique_projects))
    project_colors = {proj: cmap(i) for i, proj in enumerate(unique_projects)}
    
    # Рисуем диаграмму Ганта.
    fig, ax = plt.subplots(figsize=(12, 6))
    height = 0.8
    yticks = []
    yticklabels = []
    for idx, r in enumerate(machines):
        y = idx
        yticks.append(y)
        yticklabels.append(f"Машина {r}")
        for op, start, finish, due in schedule[r]:
            duration = finish - start
            color = project_colors[op_project[op]]
            ax.barh(y, duration, left=start, height=height, align='center', 
                    edgecolor='black', color=color)
            ax.axvline(due, color='red', linestyle='--', linewidth=1)
    ax.set_yticks(yticks)
    ax.set_yticklabels(yticklabels)
    ax.set_xlabel("Время")
    ax.set_title("Диаграмма Ганта операций по машинам")
    
    # Создаём легенду: для каждого проекта создаём патч с соответствующим цветом.
    patches = [mpatches.Patch(color=project_colors[proj], label=f"Проект {proj}") for proj in unique_projects]
    ax.legend(handles=patches, loc='upper right', title="Проекты")
    
    plt.tight_layout()
    plt.show()


In [195]:
random.seed(42)
print(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
print(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
# random.randint(0, 255)

57 12 140
125 114 71
