## Импорты, подключения и сбор датасетов

In [None]:
import numpy as np
import pandas as pd
from sqlalchemy import create_engine
import pathlib
import os

BASE_DIR = pathlib.Path(os.environ['ETL_ROOT'])
RAW_DIR = BASE_DIR / 'raw_data'
PROC_DIR = BASE_DIR / 'processed'

ad_path                        = RAW_DIR / 'tim_export_ad_user.csv'
sync_path                      = RAW_DIR / 'tim_export_project_sync.csv'

save_path = PROC_DIR / 'sync_transformed.csv'

engine_postgres = create_engine("postgresql+psycopg2://postgres:Q!w2e3r4@192.168.42.188:5430/postgres")
engine_pluginsdb = create_engine("postgresql+psycopg2://postgres:Q!w2e3r4@192.168.42.188:5430/pluginsdb")

## Создание датафреймов по источникам данных

In [15]:
df_ad = pd.read_csv(ad_path)
df_sync = pd.read_csv(sync_path)

## Слияние с AD

In [16]:
df_sync = df_sync.merge(
    df_ad[["display_name", "department", "project_section"]],
    how="left",
    left_on="user_display_name",
    right_on="display_name"
).drop(columns="display_name")

## Создание коротких названий проектов и удаление лишних столбцов в мониторинге

In [17]:
bim_users = {
    'Колпаков Семен Дмитриевич','Пятков Роман Анатольевич',
    'Андреев Александр Константинович','Кичигин Андрей Владимирович',
    'Панов Антон Владимирович','Васьков Денис Игоревич','Попов Антон Михайлович',
    'Кузовлева Ольга Сергеевна','Калачев Даниил Артемович',
    'Григорьев Роман Николаевич','Красильников Дмитрий Сергеевич',
    'Литуева Юлия Дмитриевна','Жук Виталий Томашевич','Овсянкин Роман Николаевич',
    'Романова Анна Вячеславовна','Коновалов Василий Сергеевич',
    'Урманчеев Роман Дамирович', 'Докладчик 708'
}

df_sync["is_bim"] = df_sync["user_display_name"].isin(bim_users)

In [18]:
def extract_short_name(name: str) -> str:
    parts = name.split('_')
    return '_'.join(parts[:2]) if len(parts) >= 2 else name

df_sync['short_project_name'] = df_sync['project_name'].astype(str).apply(extract_short_name)

df_sync = df_sync.drop(columns=[
    'program_name',
    'program_version',
])

In [19]:
mask_atom = df_sync["project_name"].str.contains(
    "АТОМ|ДОУ|08-12|ИКП|ATOM|АПУ", case=False, na=False
)

df_sync["object_name"] = np.select(
    [
        df_sync["project_name"].str.contains("СП.ЛЛУ|стандарт|узлы|узел|библиотека", case=False, na=False),
        mask_atom,
        df_sync["project_name"].str.contains("K01", case=False, na=False),
        df_sync["project_name"].str.contains("ИНПРО", case=False, na=False),
        df_sync["project_name"].str.contains("Ялта", case=False, na=False)
    ],
    [
        "Узлы и стандарты",
        "АТОМ",
        "Кортрос",
        "ИНПРО",
        "Ялта"
    ],
    default="Неизвестные проекты"
)

In [20]:
df_sync["is_detached"] = df_sync["project_name"].str.contains("отсоединено", case=False, na=False).astype(int)

In [21]:
def extract_file_storage_name(row):
    project = row.get("project_name")
    username = row.get("username")

    if pd.isna(project) or pd.isna(username):
        return project  # оставить как есть

    parts = str(project).split("_")
    if len(parts) < 2:
        return project  # оставить как есть

    last_part = parts[-1].strip().lower()
    user_name = str(username).strip().lower()

    if last_part == user_name:
        return "_".join(parts[:-1])
    else:
        return project  # оставить как есть

df_sync["file_storage_name"] = df_sync.apply(extract_file_storage_name, axis=1)

In [22]:
def get_project_solution(row):
    name = str(row["project_name"])
    obj  = row["object_name"]

    if obj == "Кортрос":
        section_map_kortros = {
            "_AR": "АР", "_AI": "АИ",
            "_KR": "КР", "_AGK": "АГК",
            "_VK": "ВК", "_EL": "ЭЛ",
            "_OV": "ОВ", "_AK": "АК",
            "_SS": "СС", "_P": "П",
            "_R": "Р", "_TS": "ТС",
            "_AP": "АП"
        }

        for pattern, section in section_map_kortros.items():
            if pattern in name:
                return section
        return "НД"

    else:
        section_map_rus = {
            "_АР": "АР", "_Форэскиз": "АР",
            "_АИ": "АИ", "_КЖ": "КЖ",
            "_ВК": "ВК", "_ЭЛ": "ЭЛ",
            "_ТС": "ТС", "_ТХ": "ТХ",
            "_ОВ": "ОВ", "_КР": "КР",
            "_КМ": "КМ", "_АП": "АП",
            "_ПТ": "ПТ", "_СС": "СС",
            "_ПБ": "ПБ", "_ЭГ": "ЭГ",
             "_АП": "АП"
        }

        for pattern, section in section_map_rus.items():
            if pattern in name:
                return section
        return "НД"

df_sync["project_solution_name"] = df_sync.apply(get_project_solution, axis=1)

In [23]:
def get_project_stage(row):
    name = str(row["project_name"])
    obj = row["object_name"]

    if obj == "Кортрос":
        # кортеж: (тип_проверки, паттерн): значение
        stage_map_kortros = {
            ("contains", "_P_"): "П",
            ("contains", "_R_"): "Р",
            ("contains", "_AGK_"): "ГК",
            ("endswith", "_P"): "П",
            ("endswith", "_R"): "Р",
            ("endswith", "_AGK"): "ГК"
        }

        for (mode, pattern), stage in stage_map_kortros.items():
            if (mode == "contains" and pattern in name) or \
               (mode == "endswith" and name.endswith(pattern)):
                return stage

        return "НД"

    else:
        stage_map = {
            ("contains", "_П_"): "П",
            ("contains", "_Р_"): "Р",
            ("contains", "_РД_"): "Р",
            ("contains", "_ЭП_"): "ЭП",
            ("contains", "_Форэскиз_"): "ЭП",
            ("contains", "_Эскиз_"): "ЭП",
            ("contains", "_ФЭ_"): "ЭП",
            ("endswith", "_П"): "П",
            ("endswith", "_Р"): "Р",
            ("endswith", "_РД"): "Р",
            ("endswith", "_ЭП"): "ЭП",
            ("endswith", "_Форэскиз"): "ЭП",
            ("endswith", "_Эскиз"): "ЭП",
            ("endswith", "_ФЭ"): "ЭП"
        }

        for (mode, pattern), stage in stage_map.items():
            if (mode == "contains" and pattern in name) or \
               (mode == "endswith" and name.endswith(pattern)):
                return stage

        return "НД"
    
df_sync["project_stage_name"] = df_sync.apply(get_project_stage, axis=1)


In [24]:
str_cols = df_sync.select_dtypes(include='object').columns
df_sync[str_cols] = df_sync[str_cols].fillna("Нет данных")

num_cols = df_sync.select_dtypes(include=['number', 'Int64']).columns
df_sync[num_cols] = df_sync[num_cols].fillna(0)

date_cols = df_sync.select_dtypes(include='datetime').columns
df_sync[date_cols] = df_sync[date_cols].fillna(pd.NaT)  # Или заменить:

In [25]:
df_sync_bim = df_sync[(df_sync['is_bim'] == True) & (df_sync['is_detached'] == 0)].copy()
df_sync_designers = df_sync[(df_sync['is_bim'] == False) & (df_sync['is_detached'] == 0)].copy()

## Пишем в БД

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Инкрементальная загрузка DataFrame‑ов в PostgreSQL.

**Почему выбран ключ‑дата, а не GUID?**
-----------------------------------------------------------------
* В ваших выгрузках строки появляются *один раз* и больше не меняются, а новые — отличаются полем‑даты (snapshot / update_date / loaded_at).
* Максимальная дата в таблице вычисляется за O(1) при наличии индекса и сразу говорит, какие строки новые.
* GUID, сгенерированный на лету (`uuid4()`), не детерминирован: при следующем запуске старые строки получат другие GUID‑ы и будут вставлены повторно.
* Дет‑GUID из хэша строки требует вычислений и всё‑равно дублирует функцию даты.

Итого: **ключ‑дата проще, быстрее и не меняет схему данных**.

----------------------------------------------------------------------------------------------------
Скрипт
====================================================================
Зависимости: pandas, numpy, SQLAlchemy ≥1.4, psycopg2‑binary.
"""
from __future__ import annotations

import os
from typing import Dict

import pandas as pd
from sqlalchemy import create_engine, inspect, text

# ─────────────────────────────────────────────
# Подключение
# ─────────────────────────────────────────────
SCHEMA = "datalake"
CHUNK_SIZE = 5_000

def _ensure_table_and_columns(df: pd.DataFrame, table: str, conn) -> None:
    """Создаёт таблицу или добавляет недостающие столбцы."""
    insp = inspect(conn)
    if not insp.has_table(table, schema=SCHEMA):
        df.head(0).to_sql(table, conn, schema=SCHEMA, if_exists="replace", index=False)
        return

    existing_cols = {c["name"] for c in insp.get_columns(table, schema=SCHEMA)}
    missing_cols = [c for c in df.columns if c not in existing_cols]
    for col in missing_cols:
        sql_type = str(df[col].dtype)
        # упрощённо: pandas приведёт к text/varchar; для строгой типизации можно
        # вызвать pandas.io.sql.get_sqltype как в прошлой версии.
        conn.execute(
            text(
                f'ALTER TABLE "{SCHEMA}"."{table}" '
                f'ADD COLUMN IF NOT EXISTS "{col}" {sql_type}'
            )
        )


def _incremental_append(
    df: pd.DataFrame,
    table: str,
    date_col: str,
    conn,
) -> int:
    """Вставляет только строки, у которых `date_col` > max(date_col) в таблице."""
    _ensure_table_and_columns(df, table, conn)

    # если дата‑колонки нет — ошибка пользователя
    if date_col not in df.columns:
        raise KeyError(f"В DataFrame отсутствует колонка с датой '{date_col}'")

    # получаем максимальную дату в целевой таблице
    max_date = conn.scalar(
        text(
            f'SELECT max("{date_col}") FROM "{SCHEMA}"."{table}"'
        )
    )

    df_new = df[df[date_col] > max_date] if max_date else df
    if df_new.empty:
        return 0

    df_new.to_sql(
        table,
        conn,
        schema=SCHEMA,
        if_exists="append",
        index=False,
        chunksize=CHUNK_SIZE,
        method="multi",
    )
    return len(df_new)

# ─────────────────────────────────────────────
# Example usage
# ─────────────────────────────────────────────
if __name__ == "__main__":
    tasks: Dict[str, Dict] = {
        "ext_project_sync_designers": {
            "df": df_sync_designers,
            "date_col": "date",
        },
        "ext_project_sync_bim": {
            "df": df_sync_bim,
            "date_col": "date",
        },
    }

    with engine_postgres.begin() as conn:
        db = conn.scalar(text("SELECT current_database()"))
        print("🔎 Подключен к базе:", db)

        for table, params in tasks.items():
            added = _incremental_append(params["df"], table, params["date_col"], conn)
            print(f"✅ {table}: добавлено {added} новых строк")
            
            
# from sqlalchemy import text
# import pandas as pd

# # ✅ Проверка подключения к базе
# with engine_postgres.begin() as conn:
#     db_name = pd.read_sql("SELECT current_database()", conn)
#     print("🔎 Подключен к базе:", db_name.iloc[0, 0])

# # ✅ Пересоздание структуры таблицы
# df_sync_designers.head(0).to_sql(
#     "ext_project_sync_designers",
#     engine_postgres,
#     schema="datalake",
#     if_exists="replace",  # Пересоздаёт таблицу с колонками из DataFrame
#     index=False
# )
# print("🛠 Структура таблицы datalake.test_lake пересоздана из DataFrame.")

# # ✅ Загрузка данных в новую таблицу
# df_sync_designers.to_sql(
#     "ext_project_sync_designers",
#     engine_postgres,
#     schema="datalake",
#     if_exists="append",
#     index=False
# )
# print(f"✅ Загружено строк: {len(df_sync_designers)}")

# # ✅ Пересоздание структуры таблицы
# df_sync_bim.head(0).to_sql(
#     "ext_project_sync_bim",
#     engine_postgres,
#     schema="datalake",
#     if_exists="replace",  # Пересоздаёт таблицу с колонками из DataFrame
#     index=False
# )
# print("🛠 Структура таблицы datalake.test_lake пересоздана из DataFrame.")

# # ✅ Загрузка данных в новую таблицу
# df_sync_bim.to_sql(
#     "ext_project_sync_bim",
#     engine_postgres,
#     schema="datalake",
#     if_exists="append",
#     index=False
# )
# print(f"✅ Загружено строк: {len(df_sync_bim)}")

🔎 Подключен к базе: postgres
🛠 Структура таблицы datalake.test_lake пересоздана из DataFrame.
✅ Загружено строк: 240966
🛠 Структура таблицы datalake.test_lake пересоздана из DataFrame.
✅ Загружено строк: 16425
