# Излечение, преобразование, загрузка

## Импортируем необходимые модули

In [1102]:
import pandas as pd
from sqlalchemy import create_engine, text
from itertools import combinations

## Настраиваем подключение к БД

In [1103]:
user = "entries_user"
password = "entries_password"
host = "localhost"
port = "5432"
database = "entries_db"

engine = create_engine(f"postgresql://{user}:{password}@{host}:{port}/{database}")

## intervals_tgt

Читаем сырые данные и справочную таблицу в датафреймы, обрабатывать данные будем в сгруппированном по full_name и отсортированном по event_dt виде, ввиду чего при рассмотрении последовательных пар записей гарантируется упорядоченность.

In [1104]:
df_data = pd.read_sql("SELECT * FROM entries_src;", engine, index_col="id")
df_data["event_dt"] = pd.to_datetime(df_data["event_dt"])

df_emergency = pd.read_sql("SELECT * FROM emergency_ref", engine, index_col="id")
df_emergency["event_dt"] = pd.to_datetime(df_emergency["event_dt"])
df_emergency["end_dt"] = df_emergency["event_dt"] + pd.to_timedelta(df_emergency["duration"], unit="m") + pd.Timedelta(minutes=10)

### Очистка от шума

Очищаем данные - убираем дубликаты записей (один и тот же статус, время отличается меньше чем на минуту). После очистки данных возможны следующие пары последовательных статусов:
- **Вход - Выход** - целевой интервал
- **Выход - Вход** - интервал, в который сотрудник не находится на рабочем месте
- **Выход - Доступ запрещён** - аналогично предыдущему пункту, но при начале следующего интервала была попытка неуспешного входа
- **Доступ запрещён - Вход** - Неудачная и удачная попытки входа или вход после интервала, завершившегося статусом **Доступ запрещён**
- **Вход - Доступ запрещён** - Интервал, окончание которого - выход со статусом **Доступ запрещён**
- **Вход - Вход** - Запись о входе после покидания офиса не через турникет (Два случая: запланированно или нет)
- **Доступ запрещён - Доступ запрещён** - интервал, в который сотрудник не находится на рабочем месте, но предыдущий действительный интервал закончился выходом со статусом **Доступ запрещён**, а следующий начинается с попытки неуспешного входа

In [1105]:
to_drop = []

for name, group in df_data.groupby("full_name"):
    sorted_group = group.sort_values(by="event_dt")
    rows = list(sorted_group.itertuples(index=True))

    for row_i, row_j in zip(rows, rows[1:]):
        if (row_i.status == row_j.status) and (row_j.event_dt - row_i.event_dt <= pd.Timedelta(seconds=60)):
            to_drop.append(row_i.Index)

df_data.drop(to_drop, inplace=True)

### Пары **Вход - Вход** 

Рассмотрим пару **Вход - Вход**, в обоих случаях первая запись будет валидным началом интервала, для определения второй записи обратимся к справочной таблице **emergency_ref** - если в справочной таблице существует запись о запланированном мероприятии и для времени второй записи выполняется неравенство 
$$event\_dt^{ref} < event\_dt^{src} \leq event\_dt^{ref} + duration^{ref} + 10 (min),$$ 
где 10 минут - учёт очередей на КПП после проведения мероприятия, то мы добавляем запись о выходе, время которой совпадает с временем начала запланированного мероприятия, в противном случае вторую запись о входе удаляем.

In [1106]:
to_drop = []
to_push = []

for name, group in df_data.groupby("full_name"):
    sorted_group = group.sort_values(by="event_dt")
    rows = list(sorted_group.itertuples(index=True))

    for row_i, row_j in zip(rows, rows[1:]):        
        if (row_i.status == row_j.status == "Вход"):
            mask_i = (df_emergency["event_dt"] < row_j.event_dt) & (row_j.event_dt < df_emergency["end_dt"])
            mask = (df_emergency["event_dt"] < row_j.event_dt) & (row_j.event_dt < df_emergency["end_dt"])

            if mask.any():
                match = df_emergency[mask]

                to_push.append({
                    "full_name" : name,
                    "event_dt" : match["event_dt"].iloc[0],
                    "status" : "Выход"
                })

            else:
                to_drop.append(row_j.Index)

df_data.drop(to_drop, inplace=True)
df_data = pd.concat([df_data, pd.DataFrame(to_push)])

### Пары **Доступ запрещён - Доступ запрещён**

Рассмотрим пару **Доступ запрещён - Доступ запрещён**, после фильтрации на предыдущем шаге гарантируется, что если записи с индексами $i$-я и $i+1$ - **Доступ запрещён**, то заипси с индексами $i-1$ и $i+2$ - **Вход**, тогда первая запись из пары подразумевает выход, а вторая - ошибочный вход. Соответственно, у первой записи заменяем статус на **Выход**, а вторую удаляем как неуспешный вход.

In [1107]:
to_drop = []
to_update = []

for name, group in df_data.groupby("full_name"):
    sorted_group = group.sort_values(by="event_dt")
    rows = list(sorted_group.itertuples(index=True))

    for row_i, row_j in zip(rows, rows[1:]):
        if (row_i.status == row_j.status == "Доступ запрещён"):
            to_drop.append(row_j.Index)
            to_update.append(row_i.Index)

df_data.drop(to_drop, inplace=True)
df_data.loc[to_update, "status"] = "Выход"

### Статус **Доступ запрещён**

Рассмотрим пары, один из элементов которых - **Доступ запрещён**. В первую очередь рассмотрим пару **Выход - Доступ запрещён**, поскольку на первом элементе интервал заканчивается, второй элемент может быть только попыткой неудачного входа, отбросив записи **Доступ запрещён**, получится, что в паре **Доступ запрещён - Вход** появляется однозначность - конец предыдущего интервала и начало нового и значение этой пары становится равным значению пары **Вход - Доступ запрещён**, следовательно все оставшиеся записи **Доступ запрещён** означают выходы и мы можем их заменить.

In [1108]:
to_drop = []

for name, group in df_data.groupby("full_name"):
    sorted_group = group.sort_values(by="event_dt")
    rows = list(sorted_group.itertuples(index=True))

    for row_i, row_j in zip(rows, rows[1:]):
        if (row_i.status == "Выход") and (row_j.status == "Доступ запрещён"):
            to_drop.append(row_j.Index)

df_data.drop(to_drop, inplace=True)

df_data.loc[df_data["status"] == "Доступ запрещён", "status"] = "Выход"

### Формирование интервалов

При формировании рассматриваем уникальные пары, при верной обработке на предыдущих шагх гарантируется, что уникальные пары могут иметь вид только **Вход - Выход**. Для рассмотрения случая окончания отчётного периода до окончания рабочего дня обрабатываем непарные записи о входе и присваиваем им крайние дату и время отчётного периода.

In [1109]:
report_period_end = df_data["event_dt"].max().normalize() + pd.Timedelta(hours=23, minutes=59, seconds=59)

to_push = []

for name, group in df_data.groupby("full_name"):
    sorted_group = group.sort_values(by="event_dt")
    n = len(sorted_group)

    for i in range(0, n - 1, 2):
        row_i = sorted_group.iloc[i]
        row_j = sorted_group.iloc[i + 1]

        if row_i.status != "Вход" or row_j.status != "Выход":
            print(row_i.status, row_j.status, i, i + 1, row_j.event_dt)

        to_push.append({
            "full_name" : row_i.full_name,
            "enter_dt" : row_i.event_dt,
            "exit_dt" : row_j.event_dt
        })

    if n % 2 != 0:
        row = sorted_group.iloc[n - 1]
        to_push.append({
            "full_name" : row.full_name,
            "enter_dt" : row.event_dt,
            "exit_dt" : report_period_end
        })

df_intervals = pd.DataFrame(to_push)

### Выгрузка датафрейма в таблицу

In [1110]:
df_intervals = df_intervals.reset_index()
df_intervals.rename(columns={"index" : "id"}, inplace=True)

df_intervals.to_sql("intervals_tgt", engine, if_exists="append", index=False)

618

## workdays_tgt

Берём из таблицы интервалы

In [1111]:
df_intervals = pd.read_sql("SELECT * FROM intervals_tgt;", engine, index_col="id")
df_intervals["enter_dt"] = pd.to_datetime(df_intervals["enter_dt"])
df_intervals["exit_dt"] = pd.to_datetime(df_intervals["exit_dt"])

Рассмотрим интервалы для каждого сотрудника и для каждой отчётной даты $rep\_date$, сформируем интервал 
$$enter\_dt^{min}, \; min(exit\_dt^{ref \, min}, \; rep\_date + 1d + 4h),$$
где $enter\_dt^{min}$ - самый ранний вход, $exit\_dt^{ref \, min}$ - выход для самого позднего входа, в случае окончания сформированного интервала в 04:00:00, следующий стоит начинать с 04:00:01.

Пусть вход был осуществлён 2025-06-28 08:00:00, а выход 2025-06-29 04:20:00, тогда формально будет существовать рабочий день длиной в 20 минут, а если 2025-06-29 08:00:00 был вновь произведён вход, то будет существовать два рабочих дня для одной отчётной даты. Во избежание таких ситуаций будем считать окончанием рабочего дня 04:00, если "естественным" образом он не завершился до 08:00

In [None]:
def to_split(enter_dt: pd.Timestamp, exit_dt: pd.Timestamp) -> bool:
    if enter_dt.date() == exit_dt.date():
        time_4am = pd.Timestamp('04:00:00').time()
        return enter_dt.time() < time_4am < exit_dt.time()
    else:
        return True
    
def split(enter_dt: pd.Timestamp, exit_dt: pd.Timestamp) -> list:
    if enter_dt.date() == exit_dt.date():
        split_dt = pd.Timestamp.combine(enter_dt.date(), pd.Timestamp('04:00:00').time())
    else:
        split_dt = pd.Timestamp.combine(
            enter_dt.date() + pd.Timedelta(days=1),
            pd.Timestamp('04:00:00').time()
        )
    
    return [
        (enter_dt, split_dt),
        (split_dt + pd.Timedelta(seconds=1), exit_dt)
    ]


to_push = []
to_drop = []

for row in df_intervals.itertuples():
    if to_split(row.enter_dt, row.exit_dt):
        splited = split(row.enter_dt, row.exit_dt)

        for elem in splited:
            to_drop.append(row.Index)
            to_push.append({
                "full_name" : row.full_name,
                "enter_dt" : elem[0],
                "exit_dt" : elem[1]
            })

df_intervals.drop(to_drop, inplace=True)
df_intervals = pd.concat([df_intervals, pd.DataFrame(to_push)], ignore_index=True)

def get_group_key(dt):
    if dt.time() >= pd.Timestamp('04:01:00').time():
        return dt.date()
    else:
        return (dt - pd.Timedelta(days=1)).date()
    
df_intervals["group"] = df_intervals["enter_dt"].apply(get_group_key)
grouped = df_intervals.groupby(["full_name", "group"])

for (name, key), group in grouped:
    if len(group) != 2:
        print(len(group))


0   2006-04-03 10:19:14
1   2006-04-03 15:49:24
Name: enter_dt, dtype: datetime64[ns]
2   2006-04-04 08:18:06
3   2006-04-04 13:47:56
Name: enter_dt, dtype: datetime64[ns]
4   2006-04-05 08:24:52
5   2006-04-05 14:07:15
Name: enter_dt, dtype: datetime64[ns]
6   2006-04-06 08:13:27
7   2006-04-06 14:07:44
Name: enter_dt, dtype: datetime64[ns]
8   2006-04-07 08:09:31
9   2006-04-07 13:32:11
Name: enter_dt, dtype: datetime64[ns]
10   2006-04-10 09:24:56
11   2006-04-10 14:52:09
Name: enter_dt, dtype: datetime64[ns]
12   2006-04-11 08:20:56
13   2006-04-11 13:44:52
Name: enter_dt, dtype: datetime64[ns]
14   2006-04-12 10:20:12
15   2006-04-12 15:48:09
Name: enter_dt, dtype: datetime64[ns]
16   2006-04-13 09:10:59
17   2006-04-13 14:14:49
Name: enter_dt, dtype: datetime64[ns]
18   2006-04-14 10:17:33
19   2006-04-14 12:25:59
20   2006-04-14 16:11:40
Name: enter_dt, dtype: datetime64[ns]
21   2006-04-17 08:11:01
22   2006-04-17 13:20:30
Name: enter_dt, dtype: datetime64[ns]
23   2006-04-18 0

### Выгрузка датафрейма в таблицу

In [1113]:
df_workdays = df_workdays.reset_index()
df_workdays.rename(columns={"index" : "id"}, inplace=True)

df_workdays.to_sql("workdays_tgt", engine, if_exists="append", index=False)

DuplicateColumnError: A column with name 'id' is already present in table 'workdays_tgt'.

## aggregated_info_tgt

In [None]:
query_aggregated_info = """
INSERT INTO aggregated_info_tgt (
    full_name,
    month,
    workdays_count,
    on_time_count,
    late_0_15,
    late_15_30,
    late_30_60,
    late_60_plus,
    full_day_count,
    short_day_count,
    avg_worktime
)
SELECT
    full_name,
    TO_CHAR(report_date, 'YYYY-MM') AS month,
    COUNT(*) AS workdays_count,
    COUNT(*) FILTER (WHERE enter_dt::time <= TIME '09:00:00') AS on_time_count,
    COUNT(*) FILTER (
        WHERE enter_dt::time > TIME '09:00:00'
          AND EXTRACT(EPOCH FROM (enter_dt::time - TIME '09:00:00')) / 60 <= 15
    ) AS late_0_15,
    COUNT(*) FILTER (
        WHERE EXTRACT(EPOCH FROM (enter_dt::time - TIME '09:00:00')) / 60 > 15
          AND EXTRACT(EPOCH FROM (enter_dt::time - TIME '09:00:00')) / 60 <= 30
    ) AS late_15_30,
    COUNT(*) FILTER (
        WHERE EXTRACT(EPOCH FROM (enter_dt::time - TIME '09:00:00')) / 60 > 30
          AND EXTRACT(EPOCH FROM (enter_dt::time - TIME '09:00:00')) / 60 <= 60
    ) AS late_30_60,
    COUNT(*) FILTER (
        WHERE EXTRACT(EPOCH FROM (enter_dt::time - TIME '09:00:00')) / 60 > 60
    ) AS late_60_plus,
    COUNT(*) FILTER (
        WHERE EXTRACT(EPOCH FROM (exit_dt - enter_dt)) / 3600 >= 9
    ) AS full_day_count,
    COUNT(*) FILTER (
        WHERE EXTRACT(EPOCH FROM (exit_dt - enter_dt)) / 3600 < 9
    ) AS short_day_count,
    ROUND(AVG(EXTRACT(EPOCH FROM (exit_dt - enter_dt)) / 3600), 2) AS avg_worktime
FROM workdays_tgt
GROUP BY full_name, TO_CHAR(report_date, 'YYYY-MM')
ORDER BY full_name, TO_CHAR(report_date, 'YYYY-MM');
"""

with engine.connect() as conn:
    with conn.begin():
        conn.execute(text(query_aggregated_info))

## Эскпортируем полученные таблицы в .csv

In [None]:
tables = ['intervals_tgt', 'workdays_tgt', 'aggregated_info_tgt']

for table in tables:
    df_data = pd.read_sql(f"SELECT * FROM {table}", engine)
    csv_filename = f"data/target/{table}.csv"
    df_data.to_csv(csv_filename, index=False)
    print(f"Таблица {table} сохранена в файл {csv_filename}")

Таблица intervals_tgt сохранена в файл data/target/intervals_tgt.csv
Таблица workdays_tgt сохранена в файл data/target/workdays_tgt.csv
Таблица aggregated_info_tgt сохранена в файл data/target/aggregated_info_tgt.csv
