## 1.Загрузка бизнес-процесса и преобразование его в сеть Петри

In [None]:
import pandas as pd
import seaborn as sns
from pm4py import read_bpmn, convert_to_petri_net, write_xes, convert_to_dataframe, read_xes, \
    discover_petri_net_alpha_plus
from pm4py.algo.discovery.alpha import algorithm as alpha_miner
from pm4py.algo.discovery.heuristics import algorithm as heuristics_miner
from pm4py.algo.discovery.inductive import algorithm as inductive_miner
from pm4py.algo.evaluation.replay_fitness import algorithm as replay_fitness
from pm4py.algo.simulation.playout.petri_net import algorithm as simulator
from pm4py.objects.conversion.process_tree import converter as pt_converter
from pm4py.visualization.heuristics_net import visualizer as hn_visualizer
from pm4py.visualization.petri_net import visualizer as pn_visualizer
from pm4py.visualization.process_tree import visualizer as pt_visualizer

bpmn_graph = read_bpmn('lab_Vinogradov.bpmn')
net,im,fm = convert_to_petri_net(bpmn_graph)

gviz = pn_visualizer.apply(
    net,
    im,
    fm,
    parameters={"debug": True, "set_rankdir": "LR"} # set_rankdir for horizontal layout
)

# Render and view the graph
pn_visualizer.view(gviz)


## 2.Построение графа покрытия и достижимости маркировок


In [None]:

net, initial_marking, final_marking = convert_to_petri_net(bpmn_graph)

from pm4py.objects.petri_net.utils import reachability_graph

#конструируем граф достижимости встроенными средствами
ts=reachability_graph.construct_reachability_graph(net,initial_marking)

from pm4py.visualization.transition_system import visualizer as ts_visualizer

gviz=ts_visualizer.apply(ts,parameters={ts_visualizer.Variants.VIEW_BASED.value.Parameters.FORMAT:"png"})
ts_visualizer.view(gviz)



In [None]:
#построения графа покрытия маркировок и графа достижимости маркировок срдествами woflan

from pm4py.algo.analysis.woflan.graphs.minimal_coverability_graph import minimal_coverability_graph
from pm4py.algo.analysis.woflan.graphs.reachability_graph import reachability_graph

from IPython.display import Image, display

#граф покрытия
cg=minimal_coverability_graph.apply(net, initial_marking, final_marking)

#граф достижимости
rg=reachability_graph.apply(net, initial_marking, final_marking)


In [None]:

from networkx.drawing.nx_agraph import to_agraph


def vizualizeGraph_graphviz(network):
    """Использование Graphviz layout через NetworkX"""
    is_bounded = True

    # Проверка на неограниченность
    for node, attributes in network.nodes(data=True):
        marking = attributes.get('marking', [])
        for pos in marking:
            if pos == float('inf'):
                is_bounded = False

    # Создаем граф AGraph для использования Graphviz
    A = to_agraph(network)

    # Настраиваем атрибуты как в исходном коде
    A.graph_attr['rankdir'] = 'LR'
    A.node_attr['shape'] = 'ellipse'

    # Устанавливаем метки узлов
    for node in network.nodes():
        marking = network.nodes[node].get('marking', '')
        n = A.get_node(node)
        n.attr['label'] = str(marking)

    # Устанавливаем метки ребер
    for u, v, data in network.edges(data=True):
        e = A.get_edge(u, v)
        e.attr['label'] = data.get('transition', '')

    # Визуализация
    A.draw('graph.png', prog='dot', format='png')
    display(Image('graph.png'))

    return is_bounded

In [None]:
vizualizeGraph_graphviz(cg)

In [None]:
vizualizeGraph_graphviz(rg)

In [None]:
from pm4py.objects.petri_net.utils import petri_utils
from pm4py.visualization.petri_net import visualizer as pn_visualizer

from pm4py.objects.petri_net.obj import PetriNet

import copy


# делаем копию сети
net_copy = copy.deepcopy(net)



# добавляем X - переход и проверяем на ограниченность

def add_cycle_transition(pn: PetriNet, first_place: PetriNet.Place, last_place: PetriNet.Place):
    """
    Добавляет переход, который берёт токен из последнего места
    и кладёт его в первое место.
    """
    # создаём переход
    t_cycle = PetriNet.Transition("t_cycle", "t_cycle")

    # добавляем переход в сеть
    pn.transitions.add(t_cycle)

    # дуги: last_place → t_cycle → first_place
    petri_utils.add_arc_from_to(last_place, t_cycle, pn)
    petri_utils.add_arc_from_to(t_cycle, first_place, pn)
    return t_cycle


#вычисляем начальные и конечные позиции
begin_places = [place for place, tokens in im.items() if tokens > 0]

final_places = [place for place, tokens in fm.items() if tokens > 0]

# создаём переход
t_cycle = PetriNet.Transition("t_cycle", "t_cycle")

# добавляем переход в сеть
net_copy.transitions.add(t_cycle)


#получить реальную позицию (объект) по ключу из маркировки

def get_place_by_name(pn, name):
    for place in pn.places:
        if place.name == name:
            return place
    return None  # если места с таким именем нет




p_fin = next(iter(fm.keys()))   # объект Place
p_beg = next(iter(im.keys()))   # объект Place
p_Fin = get_place_by_name (net_copy, p_fin.name)
p_Beg = get_place_by_name (net_copy, p_beg.name)


#добавляем дуги
petri_utils.add_arc_from_to(p_Fin, t_cycle, net_copy)
petri_utils.add_arc_from_to( t_cycle,p_Beg, net_copy)


#покажем расширенную сетку
gviz = pn_visualizer.apply(
    net_copy,
    im,
    fm,
    parameters={"debug": True, "set_rankdir": "LR"} # set_rankdir for horizontal layout
)

# отобразим
pn_visualizer.view(gviz)


# строим граф покрытия маркировок расширенной сети  и там же проверяем на ограниченность (нет омег)

cg_ext=minimal_coverability_graph.apply(net_copy, initial_marking, final_marking)
is_bounded=vizualizeGraph_graphviz(cg_ext)
print("Is bounded?:",is_bounded)

## 3.Анализ бездефектности
**WOFLAN**

In [None]:
import networkx as nx

#TODO: надо добавить проверку на ограниченность. Вопрос - зачем?

rg_ext=reachability_graph.apply(net_copy, initial_marking, final_marking)
vizualizeGraph_graphviz(rg_ext)

#проверяем на живость расширенную сеть
#вычисляем сильносвязные компоненты. Для живости компонента должна быть одна и каждый переход сети должен ей принадлежать


scc = list(nx.strongly_connected_components(rg_ext))
if len(scc)==1:
    print("Is live")
else:
    print("Not live")

# Вопрос - ДОПОЛНИТЬ проверкой, что каждый переход сетки входит в компоненту SCC (Задание)
print(scc)

# Если компонента сильной связности расширенной сети Петри одна и ей принадлежат все переходы, то делаем вывод, что сеть живая
#
#



In [None]:
from pm4py.algo.analysis.woflan import algorithm as woflan

is_sound_result = woflan.apply(net, initial_marking, final_marking,
                                parameters={
                                    woflan.Parameters.RETURN_ASAP_WHEN_NOT_SOUND: True,
                                    woflan.Parameters.PRINT_DIAGNOSTICS: True,
                                    woflan.Parameters.RETURN_DIAGNOSTICS: True
                                })

print(f"Сеть Петри является корректной (бездефектной): {is_sound_result[0]}")

# Проверка ограниченности сети через граф достижимости
print(f"1. Проверка ограниченности:")
print(f"Количество состояний в графе достижимости: {len(ts.states)}")

if len(ts.states) < 1000:
    print(f"Сеть ограничена (конечное число состояний)")
    is_bounded = True
else:
    print(f"Сеть может быть неограниченной")
    is_bounded = False

# Построение расширенной сети для проверки живости
print(f"\n2. Проверка живости сети:")

# Преобразуем граф достижимости в NetworkX для анализа сильносвязных компонент
try:
    G = nx.DiGraph()

    for state in ts.states:
        G.add_node(str(state))

    for transition in ts.transitions:
        G.add_edge(str(transition.from_state), str(transition.to_state))

    # Находим сильносвязные компоненты (алгоритм Косарайю)
    scc = list(nx.strongly_connected_components(G))

    print(f"Найдено {len(scc)} сильносвязных компонент")

    all_transitions = set([t.name for t in net.transitions])

    # Для живости сети должна быть одна компонента, содержащая все переходы
    if len(scc) == 1:
        print(f"раф достижимости сильносвязный")
        is_live = True
    else:
        # Проверяем, есть ли компонента, которая достижима из всех других
        print(f"Граф достижимости не является сильносвязным")

        # Находим компоненты, которые являются стоками (терминальными SCC)
        condensation = nx.condensation(G)
        terminal_scc = [node for node in condensation.nodes() if condensation.out_degree(node) == 0]

        if len(terminal_scc) == 1:
            print(f"Найдена одна терминальная SCC")
            is_live = True
        else:
            print(f"Найдено {len(terminal_scc)} терминальных SCC")
            is_live = False

    print(f"\n3. Проверка по теореме о бездефектности:")
    print(f"Сеть ограничена: {is_bounded}")
    print(f"Сеть живая: {is_live}")

    if is_bounded and is_live:
        print(f"ВЫВОД: Сеть является бездефектной (по теореме)")
    else:
        print(f"ВЫВОД: Сеть не является бездефектной (по теореме)")

except Exception as e:
    print(f"Ошибка при анализе SCC: {e}")
    print("Продолжаем анализ через другие методы")

In [None]:
net, initial_marking, final_marking = convert_to_petri_net(bpmn_graph)

from pm4py.algo.analysis.woflan import algorithm as woflan

is_sound = woflan.apply(net, initial_marking, final_marking, parameters={woflan.Parameters.RETURN_ASAP_WHEN_NOT_SOUND: True,
                                                     woflan.Parameters.PRINT_DIAGNOSTICS: True,

                                                     woflan.Parameters.RETURN_DIAGNOSTICS: True})

print("\nIs Workflow net sound? ->", is_sound[0])

print("\n Полная статистика",is_sound[1])


## 4.Генерация журнала событий

In [None]:
simulated_log = simulator.apply(net, initial_marking, variant=simulator.Variants.BASIC_PLAYOUT,
                                    parameters={simulator.Variants.BASIC_PLAYOUT.value.Parameters.NO_TRACES: 50})

write_xes(simulated_log, 'log.xes')

gviz = pn_visualizer.apply(net, initial_marking, final_marking)
pn_visualizer.view(gviz)

dataframe = convert_to_dataframe(simulated_log)
dataframe.to_csv('exp.csv')


In [None]:
df = pd.read_csv("exp.csv", index_col=0)
df = df.rename(columns={"case:concept:name": "case_id", "time:timestamp": "datetime", "concept:name": "action"})
df['resource']=''
df.head(20)

In [None]:
clerk_case = ['Первичная проверка документов',
              'Рассмотрение заместителем руководителя']

applicant_case = ['Получение разрешения заявителем',
                  'Исправление замечаний по проекту',
                  'Доработка заявления']

supervisor_case = ['Рассмотрение руководителем']

architect_case = ['Проверка архитектором-экспертом',
                  'Ручная проверка архитектора-эксперта']

engineer_case = ['Проверка инженером-экспертом',
                 'Проверка инженером экспертом']

fire_case = ['Запрос заключения пожарного эксперта',
             'Ручная проверка пожарного надзора']

san_case = ['Медкомиссия',
            'Проверка санэпидем надзором',
            'Запрос заключения санитарно-эпидемиологической службы']

system_case = ['Регистрация заявления',
               'Уведомить о доработке',
               'Проверка данных по материалу',
               'Отправить запрос на дополнительную экспертизу',
               'Требуется ручная проверка',
               'Отправка документов со стороны архитектурного отдела',
               'Отправка документов со стороны инженерного отдела',
               'Формирование пакета документов',
               'Отправить уведомление об отказе',
               'Оформление и подписание разрешения',
               'Уведомить о готовности',
               'Внесение в реестр',
               'Архивирование документов',
               'Отправка на медосмотр',
               'Отправка запроса миграционной службе',
               'Уведомление заявителя о не прохождении надзора']

for row_inx in range(df.shape[0]):
    if df.iloc[row_inx, 0] in clerk_case:
        df.loc[row_inx, 'resource'] = 'Clerk'
    elif df.iloc[row_inx, 0] in applicant_case:
        df.loc[row_inx, 'resource'] = 'Applicant'
    elif df.iloc[row_inx, 0] in supervisor_case:
        df.loc[row_inx, 'resource'] = 'Supervisor'
    elif df.iloc[row_inx, 0] in architect_case:
        df.loc[row_inx, 'resource'] = 'Architect'
    elif df.iloc[row_inx, 0] in engineer_case:
        df.loc[row_inx, 'resource'] = 'Engineer'
    elif df.iloc[row_inx, 0] in fire_case:
        df.loc[row_inx, 'resource'] = 'Fire Service'
    elif df.iloc[row_inx, 0] in san_case:
        df.loc[row_inx, 'resource'] = 'Sanepidemic Service'
    elif df.iloc[row_inx, 0] in system_case:
        df.loc[row_inx, 'resource'] = 'System'

In [None]:
import xml.etree.ElementTree as ET

# Read the file
with open('lab_Vinogradov.bpmn', 'r', encoding='utf-8') as f:
    xml_content = f.read()

# Parse the XML
root = ET.fromstring(xml_content)

# Define namespaces
ns = {
    'bpmn': 'http://www.omg.org/spec/BPMN/20100524/MODEL',
    'zeebe': 'http://camunda.org/schema/zeebe/1.0'
}

# Find all tasks with name attribute
tasks = []
for elem in root.findall('.//*[@name]', ns):
    if elem.tag.startswith('{http://www.omg.org/spec/BPMN/20100524/MODEL}'):  # Only BPMN elements
        task_name = elem.get('name')
        if task_name:
            tasks.append(task_name.strip())

# Remove duplicates and sort
unique_tasks = sorted(set(tasks))

print(unique_tasks)

In [None]:
df = pd.read_csv("exp.csv")

# Переименование столбцов для удобства
df = df.rename(columns={
    "case:concept:name": "case_id",
    "time:timestamp": "datetime",
    "concept:name": "activity"
})

# Преобразование даты
df['datetime'] = pd.to_datetime(df['datetime'])

# Определение ресурсов для каждого действия на основе BPMN процесса
applicant_activities = ['Получение разрешения заявителем', 'Исправление замечаний по проекту']
clerk_activities = ['Первичная проверка документов', 'Рассмотрение заместителем руководителя']
architect_activities = ['Проверка архитектором-экспертом', 'Ручная проверка архитектора-эксперта']
engineer_activities = ['Проверка инженером-экспертом', 'Проверка инженером экспертом']
fire_service_activities = ['Запрос заключения пожарного эксперта', 'Ручная проверка пожарного надзора']
sanepidemic_activities = ['Запрос заключения санитарно-эпидемиологической службы',
                          'Проверка санэпидем надзором', 'Медкомиссия']
supervisor_activities = ['Рассмотрение руководителем']

# Назначение ресурсов
df['resource'] = 'Система/Процесс'  # Значение по умолчанию

for idx, row in df.iterrows():
    activity = str(row['activity'])

    if any(act in activity for act in applicant_activities):
        df.at[idx, 'resource'] = 'Заявитель'
    elif any(act in activity for act in clerk_activities):
        df.at[idx, 'resource'] = 'Служащий'
    elif any(act in activity for act in architect_activities):
        df.at[idx, 'resource'] = 'Архитектор'
    elif any(act in activity for act in engineer_activities):
        df.at[idx, 'resource'] = 'Инженер'
    elif any(act in activity for act in fire_service_activities):
        df.at[idx, 'resource'] = 'Пожарный надзор'
    elif any(act in activity for act in sanepidemic_activities):
        df.at[idx, 'resource'] = 'Санэпидемнадзор'
    elif any(act in activity for act in supervisor_activities):
        df.at[idx, 'resource'] = 'Руководитель'

print("Журнал событий обработан")
print(f"Размерность данных: {df.shape[0]} строк, {df.shape[1]} столбцов")
print(f"\nРаспределение событий по ресурсам:")
print(df['resource'].value_counts())

# Сохранение обработанного журнала
df.to_csv('log.csv', index=False)
print("\nОбработанный журнал сохранен в 'processed_log.csv'")

In [None]:
df.head(20)

In [None]:
events = pd.read_csv('log.csv', index_col=0)
events['datetime'] = pd.to_datetime(events['datetime'])
events.head()

In [None]:
print(f'Event log has {events.shape[0]} rows and {events.shape[1]} columns.')

In [None]:
## Get the case start times to get the time deltas for the 'age' of each activity with respect to start
case_starts_ends = events.pivot_table(index='case_id', aggfunc={'datetime': ['min', 'max']})
case_starts_ends = case_starts_ends.reset_index()
case_starts_ends.columns = ['case_id', 'caseend', 'casestart']
events = events.merge(case_starts_ends, on='case_id')
events['relativetime'] = (events['caseend']) - events['casestart']

## Get day of week
events['weekday'] = events['datetime'].apply(lambda x: x.weekday())
events['date'] = events['datetime'].apply(lambda x: x.date())
events['startdate'] = events['casestart'].apply(lambda x: x.date())
events['hour'] = events['datetime'].apply(lambda x: x.time().hour)
## Get relative times in more friendly terms
events['relativetime_s'] = events['relativetime'].dt.seconds + 86400*events['relativetime'].dt.days
events['relativedays'] = events['relativetime'].dt.days

events.head()

In [None]:
patientnums = [int(e) for e in events['case_id'].apply(lambda x: str(x).strip('case_id'))]
resourcenums = [i for (i, e) in enumerate(events['resource'])]

In [None]:
ax = sns.scatterplot(x=events['relativetime_s'], y=events['case_id'], hue=events['activity'])
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5), ncol=1)

In [None]:
ax = sns.stripplot(x=events['weekday'], y=resourcenums, hue=events['activity'], jitter=0.2)
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5), ncol=1)

## 6.Реконструкция процесса алгоритмами ProcessMining

### Альфа майнер

In [None]:
# alpha miner
simulated_log = read_xes('log.xes')

net, initial_marking, final_marking = alpha_miner.apply(simulated_log)

# Visualise
parameters = {pn_visualizer.Variants.FREQUENCY.value.Parameters.FORMAT: "png"}
gviz = pn_visualizer.apply(net, initial_marking, final_marking,
                            parameters=parameters,
                            variant=pn_visualizer.Variants.FREQUENCY,
                            log=simulated_log)
pn_visualizer.view(gviz)

### Альфа+ майнер

In [None]:
# alpha miner
simulated_log = read_xes('log.xes')

net, initial_marking, final_marking = discover_petri_net_alpha_plus(simulated_log)

# Visualise
parameters = {pn_visualizer.Variants.FREQUENCY.value.Parameters.FORMAT: "png"}
gviz = pn_visualizer.apply(net, initial_marking, final_marking,
                            parameters=parameters,
                            variant=pn_visualizer.Variants.FREQUENCY,
                            log=simulated_log)
pn_visualizer.view(gviz)

### Эвристический майнер

In [None]:
# heuristics miner
simulated_log = read_xes('log.xes')

heu_net = heuristics_miner.apply_heu(simulated_log)

# Visualise
gviz = hn_visualizer.apply(heu_net)
hn_visualizer.view(gviz)

#Petri-net of heuristic miner output
# heuristics miner
net, im, fm = heuristics_miner.apply(simulated_log)

# viz
gviz = pn_visualizer.apply(net, im, fm)

# pn_visualizer.view(gviz)
pn_visualizer.view(gviz)

### Индуктивный майнер

In [None]:

from pm4py.objects.log.importer.xes import importer as xes_importer

simulated_log = xes_importer.apply('log.xes')

# Построение дерева процессов
tree = inductive_miner.apply(simulated_log)

# Визуализация дерева процессов
gviz = pt_visualizer.apply(tree)
pt_visualizer.view(gviz)

In [None]:
from pm4py.algo.conformance.tokenreplay import algorithm as conformance_diagnostics_token_based_replay

# Загрузка XES лог-файла
simulated_log = xes_importer.apply('log.xes')

# Построение дерева процессов с помощью индуктивного майнера
tree = inductive_miner.apply(simulated_log)

# Преобразуем дерево процессов в сеть Петри
net3, im3, fm3 = pt_converter.apply(tree)

# Получаем сеть Петри альфа-алгоритмом
net, im, fm = alpha_miner.apply(simulated_log)

# Получаем сеть Петри эвристическим алгоритмом
net2, im2, fm2 = heuristics_miner.apply(simulated_log)

# Проверяем качество реконструкции альфа-алгоритмом
replayed_traces1 = conformance_diagnostics_token_based_replay.apply(simulated_log, net, im, fm)

# Проверяем качество реконструкции эвристическим алгоритмом
replayed_traces2 = conformance_diagnostics_token_based_replay.apply(simulated_log, net2, im2, fm2)

# Проверяем качество реконструкции индуктивным алгоритмом (через преобразованную сеть Петри)
replayed_traces3 = conformance_diagnostics_token_based_replay.apply(simulated_log, net3, im3, fm3)

In [None]:
print(replayed_traces1[0]["trace_is_fit"])
print(replayed_traces2[0]["trace_is_fit"])
print(replayed_traces3[0]["trace_is_fit"])

In [None]:
log_fitness1 = replay_fitness.evaluate(replayed_traces1, variant=replay_fitness.Variants.TOKEN_BASED)
log_fitness2 = replay_fitness.evaluate(replayed_traces2, variant=replay_fitness.Variants.TOKEN_BASED)
log_fitness3 = replay_fitness.evaluate(replayed_traces3, variant=replay_fitness.Variants.TOKEN_BASED)

In [None]:
log_fitness1, log_fitness2, log_fitness3

### Выводы из результатов:
**Альфа-алгоритм (log_fitness1):**
   - Низкий perc_fit_traces (0%) означает, что построенная альфа-алгоритмом модель не соответствует журналу событий.
   - Средний уровень соответствия (average_trace_fitness = 0.7495025512910716) высок, что говорит о том, что трассы, которые модель смогла воспроизвести, соответствуют журналу с небольшими отклонениями.

**Эвристический алгоритм (log_fitness2):**
   - Низкий perc_fit_traces (0%) означает, что построенная эвристическим алгоритмом модель не соответствует журналу событий.
   - Средний уровень соответствия (average_trace_fitness = 0.9156024386392311) высок, что говорит о том, что трассы, которые модель смогла воспроизвести, соответствуют журналу с небольшими отклонениями.

**Индуктивный майнер (log_fitness3):**
   - Индуктивный майнер (log_fitness3) показал идеальное соответствие (perc_fit_traces = 100%, log_fitness = 1.0)

### Метрики:
- **perc_fit_traces:** Полезна для понимания, насколько модель соответствует журналу "на первый взгляд".
- **average_trace_fitness:** Помогает оценить общую "точность" модели, даже если не все трассы идеально подходят.
- **log_fitness:** Наиболее универсальная метрика, учитывающая как соответствие, так и "штрафы" за ошибки.