# Process Mining

### Разбор реализации метрик из open-source библиотеки SberPM

SberPM - это Python-библиотека для анализа бизнес-процессов (Process Mining), разработанная Сбером. Она является open-source проектом, однако эта версия урезана, полная версия платная. В данном формате пытаемся воспроизвести основные метрики, которые можно использовать для дальнейшего анализа и вызуализации.

## Библиотеки

In [1]:
import pandas as pd
from sberpm import DataHolder
from sberpm.metrics import ActivityMetric, TransitionMetric, IdMetric, TraceMetric, UserMetric
from clickhouse_driver import Client
import os
from dotenv import load_dotenv

In [2]:
# версия sberpm
# Проверить совместимость с другими библиотеками
# Возможно нужно создавать виртуальное окружение и подбивать совместимость
import sberpm
print(sberpm.__version__)

3.4.0


Пример создания окружения для работы через Anaconda Prompt:
```
conda create -n sberpm_env python=3.9 -y 
conda activate sberpm_env
```
python=3.9 -> предполагается, что наиболее стабильная версия.

Можно изменить версию sberpm:
```
pip install sberpm==2.5.0 --force-reinstall
```

В существующей реализации (далее по ячейкам) python=3.12 и sberpm==3.4.0 для расчёта метрик работает стабильно.


**ВАЖНО!** 

Для sberpm 3.4.0 многие библиотеки и классы были удалены, что то поменялось. Далее реализованы метрики для актуальной версии. 

## Загрузка DataHolder

In [3]:
# Можно подгрузить cvs

# Путь к cvs файлу с логами
#path = 'PM/Pack_логи.csv'

In [4]:
# Создаем dataframe в pandas для определения типа данных и дальнейшей работы
#df = pd.read_csv(path)

In [3]:
# Можно подгрузить через Clickhouse
# В любом случае, сначала пишем dataframe, если будут конфликты с типами данных, их можно будет поправить
# Подключаемся к БД для загрузки датасета
load_dotenv() # обращение к .env файлу, который находится в репозитории Git (или папке с тетрадкой) и содержит логин и пароль для доступа к БД
 
user_click = os.getenv("DB_USER") # логин
password_click = os.getenv("DB_PASSWORD") # пароль

print(f"Привет, {os.getenv("DB_USER")}!") # проверка

Привет, None!


In [None]:
# Clickhouse
# Чтение данных
client_read = Client(
    host='clickhouse_dwh_host', #Сервер
    user=user_click, # логин
    password=password_click # пароль
)

In [None]:
# Берем основной запрос с задачами на пользователя
q_user = '''
	select 
		pme.local_date_time,
		pme.task_id,
		pme.event_description,
		coalesce(
		  nullIf(pme.user_id, ''), -- если в текущей строке есть значение, то берем его
			  anyLast(nullIf(pme.user_id, '')) OVER (
			    PARTITION BY pme.task_id
			    ORDER BY pme.local_date_time
			    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
			  ), -- иначе, последнее ненулевое значение до текущей строки
			  first_value(nullIf(pme.user_id, '')) OVER (
			    PARTITION BY pme.task_id
			    ORDER BY pme.local_date_time
			    ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
			  ), -- иначе, первое ненулевое значение после текущей строки
			  '' -- если ничего не нашлось, ставим пустую строку
		) AS user_id
	from pme
	where pme.local_date >= '2025-04-21' -- Рекомендуемая дата, с которой лучше собирать данные 
		and pme.local_date <= '2025-07-21' -- До даты предыдущего исследования для сравнения метрик
		-- Убираем события, где есть (фактически, они дублируют целевые действия):					
		and pme.event_description not like '%- отменить' 
		and pme.event_description not like '%- отмена'
		and not JSONExtractString(pme.json_data, 'progress') = 'false'
		and not JSONExtractString(pme.json_data, 'selected') = 'false'
		and not JSONExtractString(pme.json_data, 'success') = 'false'
;
'''
req1, col1 = client_read.execute(q_user, with_column_types=True) 
df = pd.DataFrame(req1, columns=[x[0] for x in col1]) 

In [8]:
# Если в датах могут быть смешанные форматы или возможные ошибки, то нужно привести к единому
df['local_date_time'] = pd.to_datetime(df['local_date_time'], format='mixed', errors='coerce')

In [9]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 72135 entries, 0 to 72134
Data columns (total 4 columns):
 #   Column             Non-Null Count  Dtype         
---  ------             --------------  -----         
 0   local_date_time    72135 non-null  datetime64[ns]
 1   task_id            72135 non-null  object        
 2   event_description  72135 non-null  object        
 3   user_id            72135 non-null  object        
dtypes: datetime64[ns](1), object(3)
memory usage: 2.2+ MB


In [10]:
# DataHolder
data_holder = DataHolder(
    data=df,
    col_case='task_id',
    col_stage='event_description',
    col_start_time='local_date_time',
    col_user='user_id'
)

[1mℹ️ INFO    [0m | [34msberpm.baza._sota_utils[0m:	Чтение данных...

[1mℹ️ INFO    [0m | [34msberpm.baza.holder[0m:	Обработка данных ивент лога...

[1mℹ️ INFO    [0m | [34msberpm.baza._data_processing[0m:	Определяем пропуски в текстовых данных...

[1mℹ️ INFO    [0m | [34msberpm.baza._data_processing[0m:	В текстовой колонке 'user_id' дополнительно найдено 1487 пропущенных значений

Рекомендуется указать общий формат времени для корректного парсинга данных, например, time_format='%d-%m-%Y %H:%M:%S'.
Подробнее о допустимых форматах на https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes

C учетом параметров DataHolder: dayfirst = True, yearfirst = False.

[1mℹ️ INFO    [0m | [34msberpm.baza._data_processing[0m:	Данные будут отсортированы по следующим колонкам: ['task_id', 'local_date_time', 'event_description']

[1mℹ️ INFO    [0m | [34msberpm.baza._data_processing[0m:	Рассчитаны длительности операций - см. колонку duration (synthetic

  return arr.astype(dtype, copy=True)


In [4]:
#data_holder.data.head()

In [12]:
print(f'Количество событий: {len(data_holder.data)}')
print(f'Количество кейсов: {data_holder.data['task_id'].nunique()}')
print(f'Количество пользователей: {data_holder.data['user_id'].nunique()}')

Количество событий: 72135
Количество кейсов: 1553
Количество пользователей: 30


In [13]:
# Прверяем даты на пральное воспроизведение формата
data_holder.data['local_date_time'].describe()

count                            72135
mean     2025-06-21 17:55:08.621244416
min         2025-04-21 14:04:16.737793
25%      2025-06-11 15:38:32.446499840
50%      2025-06-26 14:32:18.783000064
75%         2025-07-09 12:59:11.288000
max         2025-07-21 19:17:39.586814
Name: local_date_time, dtype: object

## Метрики

- __apply__ – расчет всех характеристик
- __calc_metrics(...)__ – расчет указанных метрик (соответствуют методам/названиям колонок в DataFrame из apply)
- __calculate_time_metrics__ – расчет временных характеристик
- __total_duration__ – расчет суммарного времени работы
- __min_duration__ – расчет минимального времени работы
- __max_duration__ – расчет максимального времени работы
- __mean_duration__ – расчет среднего времени работы
- __median_duration__ – расчет медианного времени работы
- __std_duration__ – расчет стандартного отклонения времени работы
- __var_duration__ – расчет дисперсии времени работы

### ActivityMetric

__Расчёт метрик по конкретным операциям__

- __count__ - сколько раз активность встречается в логе
- __unique_ids__ - уникальные id для каждой активности
- __unique_ids_num__ - количество уникальных id для каждой активности
- __aver_count_in_trace__ - среднее количество раз встречаемости активности в цепочке
- __loop_percent__ - процент зацикленности
- __throughput__ - частота - количество выполненных активностей за единицу времени
- __unique_users__ - уникальные пользователи, работавшие с данной активностью
- __unique_users_num__ - количество уникальных пользователей, работающих над данной активностью
- __success_rate(...)__ - доля id, имеющих данную активность, которая выполнилась успешно (закончились успешными активностями)
- __failure_rate(...)__ - доля id, имеющих данную активность, которая выполнилась неуспешно (закончились неуспешными активностями)

In [14]:
# Создание объекта ActivityMetric
activity_metric = ActivityMetric(data_holder, time_unit='d')
# time_unit='d' значит, что метрики будут рассчитаны по дням. Например:
# количество событий на пользователя в день
# активные дни
# средняя активность в день
# 'd' - день (day); 'h' - час (hour); 'w' - неделя (week); 'm' - месяц (month)

# Расчет всех метрик
df_activity_metric = activity_metric.apply()

In [5]:
#df_activity_metric.sort_values(by='count', ascending=False).head()

### IdMetric

__Расчёт метрик по экземплярам (в данном случае, по поручениям), а именно всех событий внутри экземпляра__

- __trace__ - цепочка (список активностей)
- __trace_length__ - длина цепочки (кол-во активностей в цепочке)
- __unique_activities__ - уникальные активности в цепочке
- __unique_activities_num__ - количество уникальных активностей в цепочке
- __loop_percent__ - процент зацикленности
- __unique_users__ - уникальные пользователи, работающие с этим ID
- __unique_users_num__ - кол-во уникальных пользователей, работавших с данным ID

In [16]:
# Создание объекта IdMetric
id_metric = IdMetric(data_holder, time_unit='d')

# Расчет всех метрик
df_id_metric = id_metric.apply()

In [6]:
#df_id_metric.head()

### TransitionMetric

__Расчёт метрик по переходам из операции в операцию__

- __count__ - сколько раз данный переход встречается в логе
- __unique_ids__ - уникальные id  для каждого перехода
- __unique_ids_num__ - количество уникальных id для каждого перехода
- __aver_count_in_trace__ - среднее количество раз встречаемости объекта в цепочке
- __loop_percent__ - процент зацикленности
- __throughput__ - частота - количество выполненных переходов за единицу времени
- __unique_users__ - уникальные пользователи, работающие над объектом
- __unique_users_num__ - кол-во уникальных пользователей, работающих над объектом
- __success_rate(...)__ - доля id, имеющих текущий переход, которые выполнились успешно (закончились успешными активностями)
- __failure_rate(...)__ - доля id, имеющих текущий переход, которые выполнились неуспешно (закончились неуспешными активностями)

In [18]:
# Создание объекта TransitionMetric
transition_metric = TransitionMetric(data_holder, time_unit='d')

# Расчет всех метрик
df_transition_metric = transition_metric.apply()

In [7]:
#df_transition_metric.head()

### TraceMetric

__Расчёт метрик с группировкой по уникальным цепочкам операций__

- __count__ - сколько раз данная цепочка встречается в логе
- __ids__ - уникальные id с данной цепочкой
- __trace_length__ - длина цепочки (кол-во активностей в цепочке)
- __unique_activities__ - уникальные активности в цепочке
- __unique_activities_num__ - количество уникальных активностей в цепочке активностей
- __unique_users__ - уникальные пользователи, работающие над цепочкой активностей
- __unique_users_num__ - количество уникальных пользователей, работающих над цепочкой активностей

In [20]:
# Создание объекта TraceMetric
trace_metric = TraceMetric(data_holder, time_unit='d')

# Расчет всех метрик
df_trace_metric = trace_metric.apply()

In [8]:
#df_trace_metric.head()

### UserMetric

__Расчёт метрик по пользователям__

- __count__ - сколько раз данный пользователь встречается в логе
- __unique_activities__ - уникальные активности, с которыми работал пользователь
- __unique_activities_num__ - количество уникальных активностей, с которыми работал пользователь
- __unique_ids__ - уникальные id с данным пользователем
- __unique_ids_num__ - количество уникальных id с данным пользователем
- __throughput__ - число раз выполнения объекта за единицу времени
- __workload__ - доля активности лога, выполненных данным пользователем

In [22]:
# Создание объекта UserMetric
user_metric = UserMetric(data_holder, time_unit='d')

# Расчет всех метрик
df_user_metric = user_metric.apply()

In [9]:
#df_user_metric.head()