# Вводные

## Импорт библиотек

In [177]:
# Library
import numpy as np
import pandas as pd
import seaborn as sns
import scipy.stats as stats
import math
from statsmodels.stats.power import NormalIndPower, TTestIndPower 
from collections import namedtuple
import scipy.stats as sps

ExperimentComparisonResults = namedtuple('ExperimentComparisonResults', 
                                        ['pvalue', 'effect', 'ci_length', 'left_bound', 'right_bound'])

from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.neighbors import NearestNeighbors

from sqlalchemy import create_engine
import pyodbc
import psycopg2
import psycopg2.extras
import os
import gc
import io
import time
import sys
from tqdm.notebook import tqdm as tqdm_notebook
import re

# Для выгрузки отчёта
import xlwings as xw
import shutil
import win32com.client as win32
import datetime as dt

# Графические настройки 
import matplotlib.pyplot as plt

import logging
from sqlalchemy.exc import OperationalError, ProgrammingError
import functools
from IPython.display import clear_output

## Подключение и параметры

In [178]:
# Consts
os.chdir(r"e:\users\meshchaninov_av\Documents\Расчёты эффектов_готовые\Выгрузки Июнь 2025")

from Connector_package import GreenPlumConnector, teradata as teradata_query

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

with open(r'e:\users\meshchaninov_av\Documents\Python!подключение_к_greenplum.txt', 'r', encoding='utf-8') as f:
    psw = f.read().strip()

GP_PARAMS = {
    "dbname": "dwh",
    "user": "meshchaninov_av",
    "password": psw,
    "host": "10.239.6.220",
    "port": "5432"
}

accaunt = GP_PARAMS["user"]
engine = create_engine(f'postgresql://{accaunt}:{psw}@10.239.6.220:5432/dwh')

gp_connector = GreenPlumConnector(GP_PARAMS, engine=engine)
logging.info("Подключение к GreenPlum установлено.")

odbc_td = 'DSN=teradata'
odbc_gp = 'DSN=GreenPlum'

# teradata connector
connect_td = pyodbc.connect(odbc_td)
connect_td.autocommit = True
cursor_td = connect_td.cursor()

2025-09-15 14:55:26,383 - INFO - Успешно подключились к GreenPlum.
2025-09-15 14:55:26,384 - INFO - Подключение к GreenPlum установлено.


In [179]:
# Декоратор для логирования
def log_function_call(func):
    """
    Декоратор для автоматического логирования вызова и завершения функций.
    """
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        func.name = func.__name__
        logging.info(f"Начало выполнения функции {func.__name__} с args={args}, kwargs={kwargs}")
        try:
            result = func(*args, **kwargs)
            logging.info(f"Функция {func.__name__} завершена успешно.")
            return result
        except Exception as e:
            logging.error(f"Ошибка в функции {func.__name__}: {e}", exc_info=True)
            raise e
    return wrapper

# Функции

## whs

In [180]:
@log_function_call
def create_whs(gp_connector, mask):
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_whs;""")
    gp_connector.execute_query(f""" --sql
    Create Table ba.vt_{mask}_whs (
    orgunit_id INTEGER
    )
    ;""")

    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_whs
    SELECT distinct orgunit_id
    FROM dm.whs
    WHERE working = '1'
    ;""")

## cus_ruls

In [181]:
@log_function_call
def create_cus_ruls(gp_connector, mask, art_grp_lvl_2_name, type_art_group_level, promo_start_date_act, promo_end_date_act, actn_name, type, version, code_act, code_prev, promo_start_date_prev, promo_end_date_prev):
    _ALLOWED = {'art_grp_lvl_1_name','art_grp_lvl_2_name','art_grp_lvl_3_name'}
    _SYNONYM = {'lvl1':'art_grp_lvl_1_name','lvl2':'art_grp_lvl_2_name','lvl3':'art_grp_lvl_3_name'}

    def resolve_group_column(type_art_group_level: str) -> str:
        """Из helper → одна из 3 колонок; неизвестное → lvl2 по умолчанию."""
        raw = (type_art_group_level or '').strip()
        col = _SYNONYM.get(raw.lower(), raw)  # разрешаем lvl1/lvl2/lvl3 и прямые имена
        if col not in _ALLOWED:
            logging.warning(f"[group-level] Unknown '{type_art_group_level}', fallback to art_grp_lvl_2_name")
            return 'art_grp_lvl_2_name'
        return col

    def values_to_in_clause(values) -> str:
        """'Пиво; Твердые сыры' → 'Пиво','Твердые сыры' (экранирует кавычки)."""
        if isinstance(values, (list, tuple, set)):
            parts = [str(x) for x in values]
        else:
            cleaned = str(values).strip().strip('"').strip("'")
            parts = re.split(r'[;,]', cleaned)
        parts = [p.strip() for p in parts if p]
        return ','.join("'" + p.replace("'", "''") + "'" for p in parts)

    # 1) ОБЩАЯ ЧАСТЬ (для всех версий): выбираем колонку и значения, строим список статей
    col     = resolve_group_column(type_art_group_level)
    vals_in = values_to_in_clause(art_grp_lvl_2_name)

    gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_articles;")
    gp_connector.execute_query(f"""
        CREATE TABLE ba.tmp_{mask}_{type}_articles AS
        SELECT DISTINCT article_id
        FROM dm.art_ext
        WHERE {col} IN ({vals_in});
    """)

    cnt = gp_connector.gp(f"SELECT COUNT(*) FROM ba.tmp_{mask}_{type}_articles;").iloc[0,0]
    if cnt == 0 and col != 'art_grp_lvl_2_name':
        logging.warning(f"[group-level] 0 articles by '{col}'. Fallback to art_grp_lvl_2_name.")
        gp_connector.execute_query(f"TRUNCATE TABLE ba.tmp_{mask}_{type}_articles;")
        gp_connector.execute_query(f"""
            INSERT INTO ba.tmp_{mask}_{type}_articles
            SELECT DISTINCT article_id
            FROM dm.art_ext
            WHERE art_grp_lvl_2_name IN ({vals_in});
        """)

    if version == 1:
        # ──────────────────────────────────────────────────────────
        # Покупатели товаров в promo_start_date_act
        # ──────────────────────────────────────────────────────────
        gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_buyers_act;")
        gp_connector.execute_query(f"""
        CREATE TABLE ba.tmp_{mask}_{type}_buyers_act AS
        SELECT DISTINCT ch.contact_id
        FROM   dm.cheque_item  ci
        JOIN   ba.tmp_{mask}_{type}_articles a ON a.article_id = ci.article_id
        JOIN   dm.cheque ch ON ch.cheque_pk = ci.cheque_pk
        WHERE  ch.datetime BETWEEN '{promo_start_date_act}'::timestamp
        AND '{promo_end_date_act}'::timestamp + INTERVAL '1 day' - INTERVAL '1 second'
        ;""")

        # ──────────────────────────────────────────────────────────
        # Акцепты «Категории» в promo_start_date_act
        # ──────────────────────────────────────────────────────────
        gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_offer_accept_act;")
        gp_connector.execute_query(f"""
        CREATE TABLE ba.tmp_{mask}_{type}_offer_accept_act AS
        SELECT DISTINCT oc.contact_id
        FROM   dm.offer_contact oc
        JOIN   dm.offer o ON o.offer_pk = oc.offer_pk
        WHERE  o.code = '{code_act}'
        AND  oc.created_on BETWEEN '{promo_start_date_act}'::timestamp
        AND '{promo_end_date_act}'::timestamp + INTERVAL '1 day' - INTERVAL '1 second'
        ;""")

        # ──────────────────────────────────────────────────────────
        # ЦА = покупали + акцептовали
        # ──────────────────────────────────────────────────────────
        gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.vt_{mask}_cus_ruls;")
        gp_connector.execute_query(f"""
        CREATE TABLE ba.vt_{mask}_cus_ruls AS
        SELECT contact_id
        FROM ba.tmp_{mask}_{type}_buyers_act
        INTERSECT
        SELECT contact_id
        FROM  ba.tmp_{mask}_{type}_offer_accept_act
        ;""")

    elif version == 2:
        # ──────────────────────────────────────────────────────────
        # Покупатели товаров в promo_start_date_prev + promo_start_date_act  (нужно ≥ 1 чек в обоих мес.)
        # ──────────────────────────────────────────────────────────
        gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_buyers_act_prev;")
        gp_connector.execute_query(f"""
        CREATE TABLE ba.tmp_{mask}_{type}_buyers_act_prev AS
        SELECT contact_id
        FROM   dm.cheque_item  ci
        JOIN   ba.tmp_{mask}_{type}_articles a ON a.article_id = ci.article_id
        JOIN   dm.cheque ch ON ch.cheque_pk = ci.cheque_pk
        WHERE  ch.datetime BETWEEN '{promo_start_date_prev}'::timestamp      -- начало апреля
        AND '{promo_end_date_act}'::timestamp -- конец мая
        GROUP BY contact_id
        HAVING COUNT(DISTINCT date_trunc('month', ch.datetime)) = 2   -- апр + май
        ;""")

        # ──────────────────────────────────────────────────────────
        # Акцепты «Категории» в promo_start_date_act 
        # ──────────────────────────────────────────────────────────
        gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_offer_accept_act;")
        gp_connector.execute_query(f"""
        CREATE TABLE ba.tmp_{mask}_{type}_offer_accept_act AS
        SELECT DISTINCT oc.contact_id
        FROM   dm.offer_contact oc
        JOIN   dm.offer o ON o.offer_pk = oc.offer_pk
        WHERE  o.code = '{code_act}'
        AND  oc.created_on BETWEEN '{promo_start_date_act}'::timestamp
        AND '{promo_end_date_act}'::timestamp + INTERVAL '1 day' - INTERVAL '1 second'
        ;""")

        # ──────────────────────────────────────────────────────────
        # Акцепты «Категории» в promo_start_date_prev 
        # ──────────────────────────────────────────────────────────
        gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_offer_accept_prev;")
        gp_connector.execute_query(f"""
        CREATE TABLE ba.tmp_{mask}_{type}_offer_accept_prev AS
        SELECT DISTINCT oc.contact_id
        FROM dm.offer_contact oc
        JOIN dm.offer o ON o.offer_pk = oc.offer_pk
        WHERE o.code = '{code_prev}'
        AND oc.created_on BETWEEN '{promo_start_date_prev}'::timestamp
        AND '{promo_end_date_prev}'::timestamp + INTERVAL '1 day' - INTERVAL '1 second'
        ;""")

        # ──────────────────────────────────────────────────────────
        # ЦА = пересечение трех предыдущих таблиц
        # ──────────────────────────────────────────────────────────
        gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.vt_{mask}_cus_ruls;")
        gp_connector.execute_query(f"""
        CREATE TABLE ba.vt_{mask}_cus_ruls AS
        SELECT contact_id FROM ba.tmp_{mask}_{type}_buyers_act_prev
        INTERSECT
        SELECT contact_id FROM ba.tmp_{mask}_{type}_offer_accept_act
        INTERSECT
        SELECT contact_id FROM ba.tmp_{mask}_{type}_offer_accept_prev;
        """)
        
    else:
        # ──────────────────────────────────────────────────────────
        # Покупатели товаров в promo_start_date_act
        # ──────────────────────────────────────────────────────────
        gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_buyers_act;")
        gp_connector.execute_query(f"""
        CREATE TABLE ba.tmp_{mask}_{type}_buyers_act AS
        SELECT DISTINCT ch.contact_id
        FROM   dm.cheque_item  ci
        JOIN   ba.tmp_{mask}_{type}_articles a ON a.article_id = ci.article_id
        JOIN   dm.cheque ch ON ch.cheque_pk = ci.cheque_pk
        WHERE  ch.datetime BETWEEN '{promo_start_date_act}'::timestamp
        AND '{promo_end_date_act}'::timestamp + INTERVAL '1 day' - INTERVAL '1 second'
        ;""")

        # ──────────────────────────────────────────────────────────
        # Акцепты «Категории» в promo_start_date_act 
        # ──────────────────────────────────────────────────────────
        gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_offer_accept_act;")
        gp_connector.execute_query(f"""
        CREATE TABLE ba.tmp_{mask}_{type}_offer_accept_act AS
        SELECT DISTINCT oc.contact_id
        FROM   dm.offer_contact oc
        JOIN   dm.offer o ON o.offer_pk = oc.offer_pk
        WHERE  o.code = '{code_act}'
        AND  oc.created_on BETWEEN '{promo_start_date_act}'::timestamp
        AND '{promo_end_date_act}'::timestamp + INTERVAL '1 day' - INTERVAL '1 second'
        ;""")

        # ──────────────────────────────────────────────────────────
        # Акцепты «Категории» в promo_start_date_prev (для исключения из ЦА)
        # ──────────────────────────────────────────────────────────
        gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_offer_accept_prev;")
        gp_connector.execute_query(f"""
        CREATE TABLE ba.tmp_{mask}_{type}_offer_accept_prev AS
        SELECT DISTINCT oc.contact_id
        FROM dm.offer_contact oc
        JOIN dm.offer o ON o.offer_pk = oc.offer_pk
        WHERE o.code = '{code_prev}'
        AND oc.created_on BETWEEN '{promo_start_date_prev}'::timestamp
        AND '{promo_end_date_prev}'::timestamp + INTERVAL '1 day' - INTERVAL '1 second'
        ;""")

        # ──────────────────────────────────────────────────────────
        # ЦА = покупали + акцептовали в promo_start_date_act, но не акцептовали в promo_start_date_prev
        # ──────────────────────────────────────────────────────────
        gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.vt_{mask}_cus_ruls;")
        gp_connector.execute_query(f"""
        CREATE TABLE ba.vt_{mask}_cus_ruls AS
        SELECT contact_id
        FROM (
        SELECT contact_id
        FROM ba.tmp_{mask}_{type}_buyers_act
        INTERSECT
        SELECT contact_id
        FROM ba.tmp_{mask}_{type}_offer_accept_act
        ) t
        WHERE contact_id NOT IN (
        SELECT contact_id FROM ba.tmp_{mask}_{type}_offer_accept_prev)
        ;""")


    gp_connector.execute_query(f"""
    DELETE FROM BA.T_ZIG_SPR_IDN_ACTN
    WHERE ACTN_NAME = '{actn_name}'
    ;""")

    gp_connector.execute_query(f"""
    INSERT INTO BA.T_ZIG_SPR_IDN_ACTN
    SELECT DISTINCT
        contact_id
        ,'{actn_name}'
        ,date('{promo_start_date_act}')
        ,date('{promo_end_date_act}')
    FROM 
        ba.vt_{mask}_cus_ruls
    ;""")


## create_kg

In [182]:
@log_function_call
def create_kg(gp_connector, mask, actn_name, art_grp_lvl_2_name, promo_start_date_act, promo_end_date_act, type, code_act, code_prev, promo_start_date_prev, promo_end_date_prev):
    # ──────────────────────────────────────────────────────────
    # Покупатели товаров в promo_start_date_act
    # ──────────────────────────────────────────────────────────
    gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_buyers_act;")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.tmp_{mask}_{type}_buyers_act AS
    SELECT DISTINCT ch.contact_id
    FROM dm.cheque_item  ci
    JOIN ba.tmp_{mask}_{type}_articles a  ON a.article_id = ci.article_id
    JOIN dm.cheque ch ON ch.cheque_pk = ci.cheque_pk
    WHERE ch.datetime BETWEEN '{promo_start_date_act}'::timestamp
    AND '{promo_end_date_act}'::timestamp + INTERVAL '1 day' - INTERVAL '1 second'
    ;""")

    # ──────────────────────────────────────────────────────────
    # Акцепты «Категории» в promo_start_date_act (для исключения из КГ)
    # ──────────────────────────────────────────────────────────
    gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_offer_accept_act;")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.tmp_{mask}_{type}_offer_accept_act AS
    SELECT DISTINCT oc.contact_id
    FROM dm.offer_contact oc
    JOIN dm.offer o ON o.offer_pk = oc.offer_pk
    WHERE o.code = '{code_act}'
    AND oc.created_on BETWEEN '{promo_start_date_act}'::timestamp
    AND '{promo_end_date_act}'::timestamp + INTERVAL '1 day' - INTERVAL '1 second'
    ;""")

    # ──────────────────────────────────────────────────────────
    # Акцепты «Категории» в promo_start_date_prev (для исключения из КГ)
    # ──────────────────────────────────────────────────────────
    gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_offer_accept_prev;")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.tmp_{mask}_{type}_offer_accept_prev AS
    SELECT DISTINCT oc.contact_id
    FROM dm.offer_contact oc
    JOIN dm.offer o ON o.offer_pk = oc.offer_pk
    WHERE o.code = '{code_prev}'
    AND oc.created_on BETWEEN '{promo_start_date_prev}'::timestamp
    AND '{promo_end_date_prev}'::timestamp + INTERVAL '1 day' - INTERVAL '1 second'
    ;""")

    # ──────────────────────────────────────────────────────────
    # КГ = покупали, но не акцептовали promo_start_date_prev‑promo_end_date_act
    # ──────────────────────────────────────────────────────────
    gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.vt_{mask}_cus_ctrl;")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.vt_{mask}_cus_ctrl AS
    SELECT contact_id
    FROM ba.tmp_{mask}_{type}_buyers_act
    WHERE contact_id NOT IN (SELECT contact_id
                             FROM ba.tmp_{mask}_{type}_offer_accept_act
                             UNION 
                             SELECT contact_id
                             FROM ba.tmp_{mask}_{type}_offer_accept_prev)
    ;""")

    # Объединяем ЦА и КГ
    gp_connector.execute_query(f"drop table if exists ba.{mask}_ca_and_kg;")
    gp_connector.execute_query(f""" --sql
    create Table ba.{mask}_ca_and_kg (
    contact_id INTEGER
    )
    WITH (
        appendonly=true,
        blocksize=32768,
        compresstype=zstd,
        compresslevel=4,
        orientation=column)
    distributed by (contact_id)
    ;""")

    gp_connector.execute_query(f""" --sql
    insert into ba.{mask}_ca_and_kg 
        select contact_id
        from ba.vt_{mask}_cus_ruls
        
        union
        
        select contact_id
        from ba.vt_{mask}_cus_ctrl
    ;""")

    cnt_cus = gp_connector.gp(f"""SELECT count(1) FROM BA.T_ZIG_SPR_IDN_ACTN WHERE ACTN_NAME = '{actn_name}';""")

    cnt_cus_value = cnt_cus.iloc[0, 0]
    if cnt_cus_value <= 500_000:
        n_KG = 10
    elif 500_000 < cnt_cus_value <= 1_000_000:
        n_KG = 5
    else:
        n_KG = 2
    return n_KG


## create_spr_actn

In [183]:
@log_function_call
def create_spr_actn(gp_connector, mask, actn_type, actn_name, lengthprev, lengthpost):
    gp_connector.execute_query(f"""
    INSERT INTO BA.T_ZIG_SPR_ACTN
    SELECT
        ACTN_NAME
        ,max_ACTN_ID + 1 AS ACTN_ID
        ,DATE_START
        ,DATE_END
        ,DATE_END - DATE_START + 1 AS ACTN_LEGTH
        ,'{actn_type}'
    FROM (
        SELECT DISTINCT
            ACTN_NAME
            ,DATE_START
            ,DATE_END
        FROM
            BA.T_ZIG_SPR_IDN_ACTN TSIA
        WHERE
            ACTN_NAME = '{actn_name}'
        ) D
    LEFT JOIN (
        SELECT max(ACTN_ID) AS max_ACTN_ID FROM BA.T_ZIG_SPR_ACTN
        ) s ON 1 = 1
    ;
    """)

    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_spr_actn;""")
    gp_connector.execute_query(f""" --sql
    Create Table ba.vt_{mask}_spr_actn (
    ACTN_NAME VARCHAR(50),
    ACTN_ID SMALLINT,
    ACTN_GRP VARCHAR(50),
    DATE_START DATE,
    DATE_END DATE,
    ACTN_LENGTH INTEGER,
    DATE_ST_PRE DATE,
    DATE_END_LAST DATE
    )
    ;""")

    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_spr_actn
    SELECT
        ACTN_NAME
        ,ACTN_ID
        ,ACTN_GRP
        ,DATE_START
        ,DATE_END
        ,ACTN_LENGTH
        ,DATE_START - {lengthprev} AS DATE_ST_PRE
        ,DATE_END + {lengthpost} AS DATE_END_LAST
    FROM
        BA.T_ZIG_SPR_ACTN
    ;""")

## days

In [184]:
@log_function_call
def create_days(gp_connector, mask, actn_id, DateStPre, DateEndLast):
    # 1. Уже есть запись
    row = gp_connector.gp(f"""
        SELECT cutoff_id
        FROM   ba.t_actn_fix
        WHERE  actn_id = {actn_id};
    """)

    # 2. Если нет — берём актуальный cutoff и вставляем -----------------
    if row.empty:
        cutoff = actn_id - 1
        cutoff = gp_connector.gp("""
            SELECT MAX(actn_id) AS id
            FROM   ba.t_zig_spr_actn
        """).id[0]

        gp_connector.execute_query(f"""
            INSERT INTO ba.t_actn_fix (actn_id, cutoff_id)
            VALUES ({actn_id}, {cutoff});
        """)
    else:
        cutoff = row.cutoff_id[0] 

    #КалендарныеНедели
    gp_connector.execute_query(f""" Drop Table if exists ba.vt_{mask}_promo_week;""")
    gp_connector.execute_query(f""" --sql
    Create Table ba.vt_{mask}_promo_week (
    day_id date,
    week_id integer,
    month_id integer
    )
    ;""")

    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_promo_week
    SELECT 
        day_id
        ,week_id_2
        ,month_id
    FROM dict.days d
    WHERE day_id BETWEEN '2019-11-01' AND CURRENT_DATE	
    ;""")

    #Справочник акция - день
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_days;""")
    gp_connector.execute_query(f""" --sql
    Create Table ba.vt_{mask}_days (
    ACTN_NAME VARCHAR(50),
    ACTN_ID SMALLINT,
    DATE_START DATE,
    DATE_END DATE,
    ACTN_LENGTH INTEGER,
    DAY_ID DATE,
    WEEK_ID INTEGER,
    MONTH_ID INTEGER,
    ACTN_PERIOD SMALLINT
    )
    ;""")

    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_days
    SELECT
        ACTN_NAME
        ,ACTN_ID
        ,DATE_START
        ,DATE_END
        ,ACTN_LENGTH
        ,D.DAY_ID
        ,D.WEEK_ID
        ,D.MONTH_ID
        ,CASE WHEN D.DAY_ID < DATE_START THEN 1
            WHEN D.DAY_ID BETWEEN DATE_START AND DATE_END THEN 2
            WHEN D.DAY_ID > DATE_END THEN 3
        END AS ACTN_PERIOD
    FROM
        BA.T_ZIG_SPR_ACTN TSA
    JOIN
        ba.vt_{mask}_promo_week D
        ON D.DAY_ID BETWEEN '{DateStPre}' AND '{DateEndLast}'
    ;""")
    return cutoff

## days_cross

In [185]:
@log_function_call
def create_days_cross(gp_connector, mask, cutoff, actn_id):
	#Пересечение акций
	gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_actn_duble;""")
	gp_connector.execute_query(f""" --sql
	Create Table ba.vt_{mask}_actn_duble (
	ACTN_NAME VARCHAR(50),
	ACTN_ID SMALLINT,
	ACTN_GRP VARCHAR(50),
	DATE_START DATE,
	DATE_END DATE,
	ACTN_NAME_DUBLE VARCHAR(50),
	ACTN_ID_DUBLE SMALLINT,
	ACTN_GRP_DUBLE VARCHAR(50),
	DATE_START_DUBLE DATE,
	DATE_END_DUBLE DATE
	)
	;""")

	gp_connector.execute_query(f"""
	INSERT INTO ba.vt_{mask}_actn_duble
	SELECT
		ACTN_NAME
		,ACTN_ID
		,ACTN_GRP
		,DATE_START
		,DATE_END
		,ACTN_NAME_DUBLE
		,ACTN_ID_DUBLE
		,ACTN_GRP_DUBLE
		,DATE_START_DUBLE
		,DATE_END_DUBLE
	FROM (
		SELECT DISTINCT
			SA.ACTN_NAME
			,SA.ACTN_ID
			,SA.ACTN_GRP
			,SA.DATE_START
			,SA.DATE_END
			,B.ACTN_NAME AS ACTN_NAME_DUBLE
			,B.ACTN_ID AS ACTN_ID_DUBLE
			,B.ACTN_GRP AS ACTN_GRP_DUBLE
			,B.DATE_START AS DATE_START_DUBLE
			,B.DATE_END AS DATE_END_DUBLE
			,CASE WHEN SA.ACTN_ID <> B.ACTN_ID THEN 1 ELSE 0 END IS_DUBLE
		FROM (
			SELECT DISTINCT
				SA.ACTN_NAME
				,SA.ACTN_ID
				,SA.ACTN_GRP
				,SA.DATE_START
				,SA.DATE_END
				,D.DAY_ID
			FROM 
				ba.vt_{mask}_days D
			JOIN
				ba.vt_{mask}_spr_actn SA
				ON D.DAY_ID BETWEEN SA.DATE_START AND SA.DATE_END
			) SA
		JOIN
			ba.vt_{mask}_spr_actn B
			ON SA.DAY_ID BETWEEN B.DATE_START AND B.DATE_END
			AND B.actn_id <= {cutoff} -- <<< добавил
		) D
	WHERE
		IS_DUBLE = 1
		and ACTN_ID > ACTN_ID_DUBLE
		and (
			(
			ACTN_GRP = 'Кроссформатная акция'
			and
			ACTN_GRP_DUBLE in ('Кроссформатная акция','Купонная акция','Механики MAU')
			)
			or
			(
				ACTN_GRP in ('Акция Лояльности', 'Купонная акция','Механики MAU') 
				and
				ACTN_GRP_DUBLE in ('Акция Лояльности','Кроссформатная акция','Купонная акция','Механики MAU')
			)
			
			or 
			(
				ACTN_GRP in ('Выпечка','Оценка проекта','Пилот','Розыгрыш','Розыгрыши','ТВ реклама') 
				and
				ACTN_GRP_DUBLE in ('Выпечка','Оценка проекта','Пилот','Розыгрыш','Розыгрыши','ТВ реклама')
			)
			)
	;""")

	df_duble_actn = gp_connector.gp(f"""SELECT * FROM ba.vt_{mask}_actn_duble WHERE ACTN_ID = {actn_id};""")

	#Список акций, которые нужно исключить из exclude_actn_str
	actions_to_exclude = []
	#Фильтруем df_duble_actn, удаляя указанные акции
	df_filtered = df_duble_actn[~df_duble_actn["actn_name_duble"].isin(actions_to_exclude)]

	#Формируем строку для SQL
	exclude_actn_str = gp_connector.to_sql_list(df_filtered["actn_name_duble"], quotes=True)

	exclude_actn = exclude_actn_str

	#Справочник дней с пересечениями акций
	gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_days_cross;""")
	gp_connector.execute_query(f""" --sql
	Create Table ba.vt_{mask}_days_cross (
	CONTACT_ID INTEGER,
	ACTN_NAME VARCHAR(50),
	ACTN_ID SMALLINT,
	ACTN_GRP VARCHAR(50),
	DATE_START DATE,
	DATE_END DATE,
	DAY_ID DATE,
	WEEK_ID INTEGER,
	IS_CROSS SMALLINT,
	CNT_DAY_WO_CROSS INTEGER
	)
	;""")

	gp_connector.execute_query(f""" --sql
	insert into ba.vt_{mask}_days_cross
	WITH data AS (
		SELECT DISTINCT
			c.CONTACT_ID
			,c.ACTN_NAME
			,c.ACTN_ID
			,c.ACTN_GRP
			,c.DATE_START
			,c.DATE_END
			,c.DAY_ID
			,c.WEEK_ID
			,CASE WHEN d.CONTACT_ID IS NOT NULL THEN 1 ELSE 0 END AS IS_CROSS
		FROM (
			SELECT
				si.CONTACT_ID
				,sa.ACTN_NAME
				,sa.ACTN_ID
				,sa.ACTN_GRP
				,sa.DATE_START
				,sa.DATE_END
				,d.DAY_ID
				,d.WEEK_ID
			FROM 
				ba.vt_{mask}_promo_week d
			JOIN
				ba.vt_{mask}_spr_actn sa 
				ON d.DAY_ID BETWEEN SA.DATE_START AND SA.DATE_END
				AND sa.ACTN_ID = {actn_id}
			JOIN
				BA.T_ZIG_SPR_IDN_ACTN si 
				on si.ACTN_NAME = sa.ACTN_NAME
			) c
		LEFT JOIN (               --Определяю клиентов и свободные даты из других акций
			SELECT distinct
				CONTACT_ID
				,DAY_ID
			FROM
				BA.T_ZIG_SPR_IDN_ACTN c
			JOIN
				ba.vt_{mask}_actn_duble a
				on a.ACTN_ID = {actn_id}
				and a.actn_id_duble <= {cutoff} -- <<< добавил
				and a.ACTN_NAME_DUBLE = c.ACTN_NAME
				and a.ACTN_NAME_DUBLE not in ({exclude_actn})
			join
				ba.vt_{mask}_promo_week d
				on d.DAY_ID between a.date_start_duble and a.date_end_duble
			) d ON d.DAY_ID = c.DAY_ID
				AND d.CONTACT_ID = c.CONTACT_ID
		WHERE
			c.ACTN_ID = {actn_id}
	)

	SELECT DISTINCT
		CONTACT_ID
		,ACTN_NAME
		,ACTN_ID
		,ACTN_GRP
		,DATE_START
		,DATE_END
		,DAY_ID
		,WEEK_ID
		,IS_CROSS
		,MAX(CNT_DAY) OVER (PARTITION BY CONTACT_ID)  AS CNT_DAY_WO_CROSS
	FROM (
		SELECT DISTINCT
			CONTACT_ID
			,ACTN_NAME
			,ACTN_ID
			,ACTN_GRP
			,DATE_START
			,DATE_END
			,DAY_ID
			,WEEK_ID
			,IS_CROSS
			,CASE WHEN IS_CROSS = 0 THEN COUNT(DAY_ID) OVER (PARTITION BY IS_CROSS, CONTACT_ID) ELSE 0 END as CNT_DAY
		FROM
			data
	) d
	order by DAY_ID
	;""")

	gp_connector.execute_query(f"""--sql 
	delete from ba.vt_{mask}_days_cross
	where CNT_DAY_WO_CROSS = 0
	;""")

## frod

In [186]:
@log_function_call
def frod(gp_connector, mask):
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_frod;""")
    gp_connector.execute_query(f""" --sql
    CREATE TABLE ba.vt_{mask}_frod (
    CONTACT_ID NUMERIC
    )  WITH (
        appendonly=true,
        blocksize=32768,
        compresstype=zstd,
        compresslevel=4,
        orientation=column)
    ;""")

    gp_connector.execute_query(f""" --sql 
    insert into ba.vt_{mask}_frod
    select contact_id
    from dm.contact_ea 
    where  lower(name) like '%client%soft%' 
        or lower(name) like  '%client%hard%'    
    group by 1
    ;""")

## create_trn_0

In [187]:
@log_function_call
def create_trn_0(gp_connector, mask, actnId, lengthPrev):
    month = gp_connector.gp(f"""
            select
                    month_id
                    ,min(day_id) as min_dt
                    ,max(day_id) as max_dt
            from
                    ba.vt_{mask}_days
                where
                    actn_id = {actnId} 
                    and actn_period = 1
                group by 1 
                order by 1
            ;""")

    month

    #Период "до". Транзакции
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_trn_0;""")
    gp_connector.execute_query(f""" --sql
    CREATE TABLE ba.vt_{mask}_trn_0 (
    CONTACT_ID NUMERIC,
    ORGUNIT_ID INTEGER,
    SQUARE_TRADE NUMERIC,
    FRMT_ID INTEGER,
    REGION_ID INTEGER,
    DAY_DATE DATE,
    CHEQUE_PK BYTEA,
    SUMM_DISCOUNTED NUMERIC,
    ACTN_PERIOD SMALLINT,
    REGISTRATION_DATE DATE,
    CNT_DAY_WO_CROSS INTEGER,
    IS_TRN_FLTR SMALLINT,
    CARD_NUMBER VARCHAR(50)
    )  WITH (
        appendonly=true,
        blocksize=32768,
        compresstype=zstd,
        compresslevel=4,
        orientation=column)
    DISTRIBUTED BY (contact_id)
    ;""")

    for i in tqdm_notebook(range(len(month))):
        dt_start = str(month.min_dt[i])
        dt_end = str(month.max_dt[i])

        gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_trn_temp;""")
        gp_connector.execute_query(f""" --sql
        CREATE TABLE ba.vt_{mask}_trn_temp (
        contact_id integer,
        orgunit_id integer,
        datetime date,
        cheque_pk bytea,
        summ_discounted numeric,
        card_number varchar,
        number varchar,
        dt_load date
        ) 
            WITH (
            appendonly=true,
            blocksize=32768,
            compresstype=zstd,
            compresslevel=4,
            orientation=column)
        DISTRIBUTED BY (card_number, datetime, number)
        ;""")

        gp_connector.execute_query(f""" --sql
        insert into ba.vt_{mask}_trn_temp 
        SELECT 
            t.contact_id
            ,t.orgunit_id
            ,t.datetime
            ,t.cheque_pk
            ,t.summ_discounted
            ,t.card_number
            ,t.number
            ,t.dt_load
        FROM 
            dm.cheque t
        WHERE
            operation_type_id = 1
            AND t.datetime between ('{dt_start}'::timestamp) AND ('{dt_end}'::timestamp + interval '1' day - interval '1' second)
        ;""")

        gp_connector.execute_query(f""" --sql
        INSERT INTO ba.vt_{mask}_trn_0
        SELECT 
            contact_id
            ,orgunit_id
            ,square_trade
            ,frmt_id
            ,region_id
            ,datetime
            ,cheque_pk
            ,summ_discounted
            ,ACTN_PERIOD                 -- Период "ДО"
            ,registration_date
            ,CNT_DAY_WO_CROSS
            ,IS_TRN_FLTR
            ,card_number
        from (
            SELECT 
                t.contact_id
                ,t.orgunit_id
                ,w.square_trade
                ,w.frmt_id
                ,w.region_id
                ,t.datetime
                ,t.cheque_pk
                ,t.summ_discounted
                ,1 as ACTN_PERIOD                 -- Период "ДО"
                ,c.registration_date
                ,{lengthPrev} as CNT_DAY_WO_CROSS
                ,0 as IS_TRN_FLTR
                ,t.card_number
                ,row_number() over(partition by t.card_number, t.datetime, t.number order by t.datetime nulls last, t.dt_load nulls first) rn
            FROM 
                ba.vt_{mask}_trn_temp t
            JOIN
                ba.vt_{mask}_whs ww on ww.orgunit_id = t.orgunit_id
            JOIN
                dm.whs w ON w.orgunit_id = t.orgunit_id
            JOIN 
                dm.contact c ON c.contact_id = t.contact_id
            ) d
        WHERE
            rn = 1
        ;""")

## create_trn_0_v2

In [188]:
@log_function_call
def create_trn_0_v2(gp_connector, mask, actnId, lengthActn):
    month = gp_connector.gp(f"""
            select
                    month_id
                    ,min(day_id) as min_dt
                    ,max(day_id) as max_dt
            from
                    ba.vt_{mask}_days
                where
                    actn_id = {actnId} 
                    and actn_period = 2
                group by 1 
                order by 1
            ;""")

    month
    
    for i in tqdm_notebook(range(len(month))):
        dt_start = month.min_dt[i]
        dt_end = month.max_dt[i]

        gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_trn_temp;""")
        gp_connector.execute_query(f""" --sql
        CREATE TABLE ba.vt_{mask}_trn_temp  ( 
        contact_id integer,
        orgunit_id integer,
        datetime date,
        cheque_pk bytea,
        summ_discounted numeric,
        card_number varchar (50),
        number varchar (50),
        dt_load date
        )
        WITH (
            appendonly=true,
            blocksize=32768,
            compresstype=zstd,
            compresslevel=4,
            orientation=column)
        DISTRIBUTED BY (card_number, datetime, number)
        ;""")

        gp_connector.execute_query(f""" --sql
        insert into ba.vt_{mask}_trn_temp
        SELECT 
            t.contact_id
            ,t.orgunit_id
            ,t.datetime
            ,t.cheque_pk
            ,t.summ_discounted
            ,t.card_number
            ,t.number
            ,t.dt_load
        FROM 
            dm.cheque t
        WHERE
            operation_type_id = 1
            AND t.datetime between ('{dt_start}'::timestamp) AND ('{dt_end}'::timestamp + interval '1' day - interval '1' second)
        ;""")

        gp_connector.execute_query(f""" --sql
        INSERT INTO ba.vt_{mask}_trn_0
        SELECT 
            contact_id
            ,orgunit_id
            ,square_trade
            ,frmt_id
            ,region_id
            ,datetime
            ,cheque_pk
            ,summ_discounted
            ,ACTN_PERIOD                 -- Период "Акционный"
            ,registration_date
            ,CNT_DAY_WO_CROSS
            ,IS_TRN_FLTR
            ,card_number
        from (
            SELECT 
                t.contact_id
                ,t.orgunit_id
                ,w.square_trade
                ,w.frmt_id
                ,w.region_id
                ,t.datetime
                ,t.cheque_pk
                ,t.summ_discounted
                ,2 as ACTN_PERIOD                 -- Акционный период
                ,c.registration_date
                ,coalesce(cd.CNT_DAY_WO_CROSS, {lengthActn}) as CNT_DAY_WO_CROSS
                ,coalesce(cd.IS_CROSS, 0) as IS_TRN_FLTR
                ,t.card_number
                ,row_number() over(partition by t.card_number, t.datetime, t.number order by t.datetime nulls last, t.dt_load nulls first) rn
            FROM 
                ba.vt_{mask}_trn_temp t
            JOIN
                ba.vt_{mask}_whs ww on ww.orgunit_id = t.orgunit_id
            JOIN
                dm.whs w ON w.orgunit_id = t.orgunit_id
            JOIN 
                dm.contact c ON c.contact_id = t.contact_id
            left JOIN 
                ba.vt_{mask}_days_cross cd
                on cd.contact_id = t.contact_id
                and cd.day_id = date(t.datetime)
            ) d
        where
            rn = 1
        ;""")

## create_agg

In [189]:
@log_function_call
def create_agg(gp_connector, mask, DateStart):
    gp_connector.execute_query(f"ANALYZE ba.{mask}_ca_and_kg;")
    gp_connector.execute_query(f"ANALYZE ba.vt_{mask}_trn_0;")
    gp_connector.execute_query(f"ANALYZE dict.days;")
    gp_connector.execute_query(f"ANALYZE ba.vt_{mask}_frod;")
    """
    Формирует таблицу ba.vt_{mask}_trn_1 с агрегированными показателями
    по периодам (1 — период ДО, 2 — период АКЦИИ).
    Если is_kg_needed = TRUE для данной акции, 
    добавляем JOIN на ba.t_{mask}_control_group, 
    чтобы оставить только клиентов из контрольной группы.
    """

    # Смотрим, нужна ли контрольная группа
    df_check_kg = gp_connector.gp(f"""
        SELECT is_kg_needed
        FROM BA.helper_category
        WHERE mask = '{mask}'
    """)
    if df_check_kg.empty:
        logging.warning(f"[create_agg] Не найдена запись в helper_category для mask={mask}.")
        return

    is_kg_needed = df_check_kg['is_kg_needed'].iloc[0]  # True/False

    # Строим дополнительный JOIN, если КГ нужна
    if is_kg_needed:
        kg_join_str = f"JOIN ba.{mask}_ca_and_kg cg ON cg.contact_id = t.contact_id"
        logging.info(f"[create_agg][mask={mask}] КГ включена => используем JOIN на {mask}_ca_and_kg.")
    else:
        kg_join_str = ""  # Пустая строка, если КГ не требуется

    logging.info(f"[create_agg][mask={mask}] Начинаем создание таблицы ba.vt_{mask}_trn_1...")

    # Агрегация транзакций
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_trn_1;""")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.vt_{mask}_trn_1 (
    CONTACT_ID INTEGER,
    ACTN_PERIOD SMALLINT,
    FRMT_ID INTEGER,
    REGION_ID INTEGER,
    REGISTRATION_DATE DATE,
    CNT_DAY INTEGER,
    CNT_TRN INTEGER,
    OPSUM NUMERIC,
    AVG_TXN NUMERIC,
    AVG_SPEND NUMERIC,
    CNT_TRN_FLTR INTEGER,
    AVG_TXN_FLTR NUMERIC,
    LONG_VISIT INTEGER,
    SQUARE_TRADE NUMERIC,
    CNT_WEEK INTEGER
    )
    WITH (
        appendonly=true,
        blocksize=32768,
        compresstype=zstd,
        compresslevel=4,
        orientation=column)
    DISTRIBUTED BY (contact_id)
    ;""")

    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_trn_1
    WITH trn AS ( 
        SELECT
            CONTACT_ID
            ,ORGUNIT_ID
            ,SQUARE_TRADE
            ,ACTN_PERIOD
            ,FRMT_ID
            ,REGION_ID
            ,REGISTRATION_DATE
            ,DAY_DATE
            ,CHEQUE_PK
            ,SUMM_DISCOUNTED
            ,CNT_DAY_WO_CROSS
            ,IS_TRN_FLTR
        FROM
            ba.vt_{mask}_trn_0
        WHERE
            SUMM_DISCOUNTED > 0
            AND ACTN_PERIOD = 1    --Период "ДО"
        ), favorite_whs AS (
        select
            contact_id,
            orgunit_id,
            frmt_id,
            region_id,
            square_trade,
            cnt_trn,
            row_number() over (partition by contact_id, frmt_id, region_id order by cnt_trn desc) as rang_whs
        from (
            select
                contact_id,
                orgunit_id,
                frmt_id,
                region_id,
                max(square_trade) as square_trade,
                count(distinct cheque_pk) as cnt_trn
            from trn
            group by 1,2,3,4) d
        )

    SELECT
        t.CONTACT_ID
        ,ACTN_PERIOD
        ,t.FRMT_ID
        ,t.REGION_ID
        ,REGISTRATION_DATE
        ,COUNT(DISTINCT DAY_DATE)                       AS CNT_DAY
        ,COUNT(DISTINCT CHEQUE_PK)                      AS CNT_TRN
        ,SUM(SUMM_DISCOUNTED)                           AS OPSUM
        ,AVG(SUMM_DISCOUNTED)                           AS AVG_TXN
        ,SUM(case when IS_TRN_FLTR = 0 then SUMM_DISCOUNTED end) / MAX(CNT_DAY_WO_CROSS)   AS AVG_SPEND
        ,COUNT(DISTINCT case when IS_TRN_FLTR = 0 then CHEQUE_PK end)                      AS CNT_TRN_FLTR
        ,AVG(case when IS_TRN_FLTR = 0 then SUMM_DISCOUNTED end)                           AS AVG_TXN_FLTR
        ,MAX(date('{DateStart}')) - MAX(DAY_DATE)       AS LONG_VISIT
        ,max(w.square_trade)                            as square_trade
        ,COUNT(DISTINCT d.week_id_2)                    AS CNT_WEEK
    FROM trn t
    left join favorite_whs w 
        on w.contact_id = t.contact_id
        and w.orgunit_id = t.orgunit_id
        and w.rang_whs = 1
    {kg_join_str}
    join dict.days d on d.day_id = t.day_date
    left join ba.vt_{mask}_frod f on f.contact_id = t.contact_id
    where f.contact_id is null
    GROUP BY 1,2,3,4,5
    ;""")

    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_trn_1
    WITH trn AS ( 
        SELECT
            CONTACT_ID
            ,ORGUNIT_ID
            ,SQUARE_TRADE
            ,ACTN_PERIOD
            ,FRMT_ID
            ,REGION_ID
            ,REGISTRATION_DATE
            ,DAY_DATE
            ,CHEQUE_PK
            ,SUMM_DISCOUNTED
            ,CNT_DAY_WO_CROSS
            ,IS_TRN_FLTR
        FROM
            ba.vt_{mask}_trn_0
        WHERE
            SUMM_DISCOUNTED > 0
            AND ACTN_PERIOD = 2    --Акц.период
        ), favorite_whs AS (
        select
            contact_id,
            orgunit_id,
            frmt_id,
            region_id,
            square_trade,
            cnt_trn,
            row_number() over (partition by contact_id, frmt_id, region_id order by cnt_trn desc) as rang_whs
        from (
            select
                contact_id,
                orgunit_id,
                frmt_id,
                region_id,
                max(square_trade) as square_trade,
                count(distinct cheque_pk) as cnt_trn
            from trn
            group by 1,2,3,4) d
        )

    SELECT
        t.CONTACT_ID
        ,ACTN_PERIOD
        ,t.FRMT_ID
        ,t.REGION_ID
        ,REGISTRATION_DATE
        ,COUNT(DISTINCT DAY_DATE)                       AS CNT_DAY
        ,COUNT(DISTINCT CHEQUE_PK)                      AS CNT_TRN
        ,SUM(SUMM_DISCOUNTED)                           AS OPSUM
        ,AVG(SUMM_DISCOUNTED)                           AS AVG_TXN
        ,SUM(case when IS_TRN_FLTR = 0 then SUMM_DISCOUNTED end) / MAX(CNT_DAY_WO_CROSS)   AS AVG_SPEND
        ,COUNT(DISTINCT case when IS_TRN_FLTR = 0 then CHEQUE_PK end)                      AS CNT_TRN_FLTR
        ,AVG(case when IS_TRN_FLTR = 0 then SUMM_DISCOUNTED end)                           AS AVG_TXN_FLTR
        ,0                                              AS LONG_VISIT
        ,max(w.square_trade)                            as square_trade
        ,COUNT(DISTINCT d.week_id_2)                    AS CNT_WEEK
    FROM
        trn t
    left join
        favorite_whs w 
        on w.contact_id = t.contact_id
        and w.orgunit_id = t.orgunit_id
        and w.rang_whs = 1
    {kg_join_str}
    join dict.days d on d.day_id = t.day_date
    left join ba.vt_{mask}_frod f on f.contact_id = t.contact_id
    where f.contact_id is null
    GROUP BY 1,2,3,4,5
    ;
    """)

    # Проверка результатов

    df_check = gp_connector.gp(f"""
        SELECT ACTN_PERIOD, COUNT(DISTINCT CONTACT_ID) AS cnt_cont
        FROM ba.vt_{mask}_trn_1
        GROUP BY ACTN_PERIOD
        ORDER BY ACTN_PERIOD
    """)
    logging.info(f"[create_agg][mask={mask}] Кол-во уникальных клиентов по периодам:\n{df_check}")

    df_null_square = gp_connector.gp(f"""
        SELECT *
        FROM ba.vt_{mask}_trn_1
        WHERE square_trade IS NULL
        LIMIT 5
    """)
    if not df_null_square.empty:
        logging.warning(f"[create_agg][mask={mask}] Есть записи, где square_trade IS NULL. Пример:\n{df_null_square}")

    logging.info(f"[create_agg][mask={mask}] Агрегация транзакций (trn_1) завершена.")

## process_clear_and_aggregate

In [190]:
@log_function_call
def process_clear_and_aggregate(gp_connector, mask):
	# от покупки
	gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_ca_clear;""")
	gp_connector.execute_query(f"""
	CREATE TABLE ba.vt_{mask}_ca_clear (
	contact_id integer,
	opsum numeric,
	cnt_trn numeric,
	RANK_OPSUM_MIN numeric,
	RANK_OPSUM_MAX numeric,
	RANK_TRN_MIN numeric,
	RANK_TRN_MAX numeric
	)
	WITH (
		appendonly=true,
		blocksize=32768,
		compresstype=zstd,
		compresslevel=4,
		orientation=column)
	;""")

	gp_connector.execute_query(f"""
	INSERT INTO ba.vt_{mask}_ca_clear
	WITH cus_actn AS (
		select
			contact_id
		from
			ba.vt_{mask}_days_cross
		group by 1
		), cus_data as (
		select
			t.contact_id
			,sum(opsum) as opsum
			,sum(cnt_trn) as cnt_trn
		FROM 
			ba.vt_{mask}_trn_1 t
		JOIN
			cus_actn c on c.contact_id = t.contact_id
		GROUP BY 1
		), ca_rank as (
		SELECT 
			PERCENTILE_DISC(0.01) WITHIN GROUP (ORDER BY opsum) as RANK_OPSUM_MIN
			,PERCENTILE_DISC(0.99) WITHIN GROUP (ORDER BY opsum) as RANK_OPSUM_MAX
			,PERCENTILE_DISC(0.01) WITHIN GROUP (ORDER BY cnt_trn) as RANK_TRN_MIN
			,PERCENTILE_DISC(0.99) WITHIN GROUP (ORDER BY cnt_trn) as RANK_TRN_MAX
		FROM
			cus_data
		)
	select
		t.contact_id
		,t.opsum
		,t.cnt_trn
		,RANK_OPSUM_MIN
		,RANK_OPSUM_MAX
		,RANK_TRN_MIN
		,RANK_TRN_MAX
	from
		cus_data t
	join
		ca_rank c on 1=1
	;""")
	
	# CA/KG
	gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_cus_clear;""")
	gp_connector.execute_query(f""" --sql
	Create Table ba.vt_{mask}_cus_clear (
	CONTACT_ID INTEGER
	);
	""")

	gp_connector.execute_query(f"""
	INSERT INTO ba.vt_{mask}_cus_clear
	WITH cus_kg as (
		select
			t.contact_id
			,sum(opsum) as opsum
			,sum(cnt_trn) as cnt_trn
		FROM 
			ba.vt_{mask}_trn_1 t
		left join
			(select contact_id from ba.vt_{mask}_days_cross group by 1) c on c.contact_id = t.contact_id
		where
			c.contact_id is null
		GROUP BY 1
		), rank as (
		select distinct
			RANK_OPSUM_MIN
			,RANK_OPSUM_MAX
			,RANK_TRN_MIN
			,RANK_TRN_MAX
		from
			ba.vt_{mask}_ca_clear
		)
	select
		contact_id
	from
		ba.vt_{mask}_ca_clear
	where
		opsum BETWEEN RANK_OPSUM_MIN AND RANK_OPSUM_MAX
		and cnt_trn BETWEEN RANK_TRN_MIN AND RANK_TRN_MAX
	UNION 
	select
		t.contact_id
	from
		cus_kg t
	join
		rank r on 1=1
	where
		opsum BETWEEN RANK_OPSUM_MIN AND RANK_OPSUM_MAX
		and cnt_trn BETWEEN RANK_TRN_MIN AND RANK_TRN_MAX
	;""")
	# Агрегация транзакций
	gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_trn;""")
	gp_connector.execute_query(f"""
	CREATE TABLE ba.vt_{mask}_trn (
	CONTACT_ID INTEGER,
	ACTN_PERIOD SMALLINT,
	FRMT_ID INTEGER,
	REGION_ID INTEGER,
	REGISTRATION_DATE DATE,
	CNT_DAY INTEGER,
	CNT_TRN INTEGER,
	OPSUM NUMERIC,
	AVG_TXN NUMERIC,
	AVG_SPEND NUMERIC,
	CNT_TRN_FLTR INTEGER,
	AVG_TXN_FLTR NUMERIC,
	LONG_VISIT INTEGER,
	SQUARE_TRADE NUMERIC,
	CNT_WEEK INTEGER
	)
	WITH (
		appendonly=true,
		blocksize=32768,
		compresstype=zstd,
		compresslevel=4,
		orientation=column)
	;""")

	gp_connector.execute_query(f"""
	INSERT INTO ba.vt_{mask}_trn
	SELECT
		t.CONTACT_ID
		,ACTN_PERIOD
		,FRMT_ID
		,REGION_ID
		,REGISTRATION_DATE
		,CNT_DAY
		,CNT_TRN
		,OPSUM
		,AVG_TXN
		,AVG_SPEND
		,CNT_TRN_FLTR
		,AVG_TXN_FLTR
		,LONG_VISIT
		,SQUARE_TRADE
		,CNT_WEEK
	FROM
		ba.vt_{mask}_trn_1 t
	join
		ba.vt_{mask}_cus_clear c on c.contact_id = t.contact_id
	;
	""")

## reg_new_returned

In [191]:
@log_function_call
def reg_new_returned(gp_connector, mask, promo_start_date):
   #ОПЕРЕДЕЛЯЮ ТИП КЛИЕНТА: РЕГУЛЯРНЫЙ/НОВЫЙ/ВЕРНУВШИЙСЯ
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_cus_type;""")
    gp_connector.execute_query(f""" --sql
    Create Table ba.vt_{mask}_cus_type (
    CONTACT_ID INTEGER,
    FRMT_ID INTEGER,
    REGION_ID INTEGER,
    CUS_TYPE VARCHAR(10)
    )
    WITH (
        appendonly=true,
        blocksize=32768,
        compresstype=zstd,
        compresslevel=4,
        orientation=column);
    """)

    #regular
    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_cus_type
    SELECT
        CONTACT_ID
        ,FRMT_ID
        ,REGION_ID
        ,'REGULAR'
    FROM
        ba.vt_{mask}_trn
    GROUP BY 
        CONTACT_ID
        ,FRMT_ID
        ,REGION_ID
    HAVING COUNT(DISTINCT ACTN_PERIOD) = 2
    ;
    """)
    #returned
    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_cus_type
    SELECT DISTINCT
        CONTACT_ID
        ,FRMT_ID
        ,REGION_ID
        ,'RETURNED'
    FROM (
        SELECT DISTINCT
            CONTACT_ID
            ,FRMT_ID
            ,REGION_ID
        FROM
            ba.vt_{mask}_trn
        WHERE
            ACTN_PERIOD = 2
            AND REGISTRATION_DATE < '{promo_start_date}'
        EXCEPT 
            SELECT DISTINCT
                CONTACT_ID
                ,FRMT_ID
                ,REGION_ID
            FROM
                ba.vt_{mask}_trn
            WHERE
                ACTN_PERIOD = 1
        ) D
    ;
    """)
    #new
    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_cus_type
    SELECT DISTINCT
        CONTACT_ID
        ,FRMT_ID
        ,REGION_ID
        ,'NEW'
    FROM (
        SELECT DISTINCT
            CONTACT_ID
            ,FRMT_ID
            ,REGION_ID
        FROM
            ba.vt_{mask}_trn
        WHERE
            ACTN_PERIOD = 2
        EXCEPT 
            SELECT DISTINCT
                CONTACT_ID
                ,FRMT_ID
                ,REGION_ID
            FROM
                ba.vt_{mask}_cus_type
        ) D
    ;
    """)

## clear_reg

In [192]:
@log_function_call
def clear_reg(gp_connector, mask, actnId, actn_name, lengthPrev, lengthActn): 
    #Очистка Регулярных
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_cus_reg;""")
    gp_connector.execute_query(f""" --sql
    Create Table ba.vt_{mask}_cus_reg (
    CONTACT_ID INTEGER,
    IS_CA SMALLINT,
    ACTN_ID SMALLINT,
    ACTN_PERIOD SMALLINT,
    FRMT_ID INTEGER,
    REGION_ID INTEGER,
    CUS_TYPE varchar(10),
    CNT_DAY INTEGER,
    CNT_TRN INTEGER,
    OPSUM NUMERIC,
    AVG_TXN NUMERIC,
    AVG_SPEND NUMERIC,
    CNT_TRN_FLTR INTEGER,
    AVG_TXN_FLTR NUMERIC,
    LONG_VISIT INTEGER,
    SQUARE_TRADE NUMERIC,
    CNT_WEEK INTEGER
    )
    WITH (
        appendonly=true,
        blocksize=32768,
        compresstype=zstd,
        compresslevel=4,
        orientation=column);
    """)

    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_cus_reg
    SELECT
        CONTACT_ID
        ,IS_CA
        ,ACTN_ID
        ,ACTN_PERIOD
        ,FRMT_ID
        ,REGION_ID
        ,CUS_TYPE
        ,CNT_DAY
        ,CNT_TRN
        ,OPSUM
        ,AVG_TXN
        ,AVG_SPEND
        ,CNT_TRN_FLTR
        ,AVG_TXN_FLTR
        ,LONG_VISIT
        ,SQUARE_TRADE
        ,CNT_WEEK
    FROM (
        SELECT
            T.CONTACT_ID
            ,case when ca.CONTACT_ID is not null then 1 else 0 end IS_CA
            ,{actnId} AS ACTN_ID
            ,ACTN_PERIOD
            ,T.FRMT_ID
            ,T.REGION_ID
            ,CUS_TYPE
            ,CNT_DAY
            ,CNT_TRN
            ,OPSUM
            ,AVG_TXN
            ,AVG_SPEND
            ,CNT_TRN_FLTR
            ,AVG_TXN_FLTR
            ,LONG_VISIT
            ,SQUARE_TRADE
            ,CNT_WEEK
            ,max(case when ACTN_PERIOD = 1 then CNT_TRN else 0 end) over (partition by t.CONTACT_ID, t.FRMT_ID, t.REGION_ID) as CNT_TRN_PREV
            ,max(case when ACTN_PERIOD = 2 then CNT_TRN else 0 end) over (partition by t.CONTACT_ID, t.FRMT_ID, t.REGION_ID) as CNT_TRN_ACTN
            ,max(case when ACTN_PERIOD = 1 then CNT_DAY else 0 end) over (partition by t.CONTACT_ID, t.FRMT_ID, t.REGION_ID) as CNT_DAY_PREV
            ,max(case when ACTN_PERIOD = 2 then CNT_DAY else 0 end) over (partition by t.CONTACT_ID, t.FRMT_ID, t.REGION_ID) as CNT_DAY_ACTN
        FROM 
            ba.vt_{mask}_trn T
        JOIN
            ba.vt_{mask}_cus_type C
            ON C.CONTACT_ID = T.CONTACT_ID 
            AND C.FRMT_ID = T.FRMT_ID
            AND C.REGION_ID = T.REGION_ID
        left JOIN
            (SELECT distinct CONTACT_ID FROM BA.T_ZIG_SPR_IDN_ACTN WHERE ACTN_NAME = '{actn_name}') CA 
            ON CA.CONTACT_ID = T.CONTACT_ID
        ) D
    WHERE 1=1
        and CNT_DAY_PREV >= 2
        and CNT_DAY_ACTN >= 2
        and CNT_TRN_PREV BETWEEN 2 AND {lengthPrev}
        and CNT_TRN_ACTN BETWEEN 2 AND {lengthActn}
    ;
    """)

## dna

In [193]:
@log_function_call
def dna(gp_connector, mask, lengthActn):
#ДНК клиента по признакам Траты/ср.чек/частота покупок
	gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_ca_frmt;""")
	gp_connector.execute_query(f""" --sql
	Create Table ba.vt_{mask}_ca_frmt (
	CONTACT_ID INTEGER,
	FRMT_ID INTEGER,
	REGION_ID INTEGER,
	CUS_TYPE varchar(10),
	OPSUM_LVL INTEGER,
	AVG_TXN_LVL INTEGER,
	CNT_TRN_LVL INTEGER
	)
	WITH (
		appendonly=true,
		blocksize=32768,
		compresstype=zstd,
		compresslevel=4,
		orientation=column);
	""")
	#подбираю КГ со схожим поведением
	gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_kg_frmt;""")
	gp_connector.execute_query(f""" --sql
	Create Table ba.vt_{mask}_kg_frmt (
	CONTACT_ID INTEGER,
	FRMT_ID INTEGER,
	REGION_ID INTEGER,
	CUS_TYPE varchar(10),
	OPSUM_LVL INTEGER,
	AVG_TXN_LVL INTEGER,
	CNT_TRN_LVL INTEGER
	)
	WITH (
		appendonly=true,
		blocksize=32768,
		compresstype=zstd,
		compresslevel=4,
		orientation=column);
	""")

	gp_connector.execute_query(f"""
	INSERT INTO ba.vt_{mask}_ca_frmt
	SELECT
		c.CONTACT_ID
		,FRMT_ID
		,REGION_ID
		,CUS_TYPE
		,COALESCE((FLOOR(OPSUM/{lengthActn}/100) * 100), 0) AS OPSUM_LVL	--100
		,COALESCE((FLOOR(AVG_TXN/50) * 50), 0) AS AVG_TXN_LVL
		,COALESCE((FLOOR(CNT_TRN/1)*1), 0) AS CNT_TRN_LVL
	FROM
		ba.vt_{mask}_cus_reg c
	JOIN
		(SELECT distinct CONTACT_ID FROM ba.vt_{mask}_days_cross) CA 
		ON CA.CONTACT_ID = c.CONTACT_ID
	WHERE 
		CUS_TYPE = 'REGULAR'
		and IS_CA = 1
		and ACTN_PERIOD = 1 -- prev
	;
	""")
	gp_connector.execute_query(f"""
	INSERT INTO ba.vt_{mask}_kg_frmt
	SELECT
		CONTACT_ID
		,FRMT_ID
		,REGION_ID
		,CUS_TYPE
		,COALESCE((FLOOR(OPSUM/{lengthActn}/100) * 100), 0) AS OPSUM_LVL	--100
		,COALESCE((FLOOR(AVG_TXN/50) * 50), 0) AS AVG_TXN_LVL
		,COALESCE((FLOOR(CNT_TRN/1)*1), 0) AS CNT_TRN_LVL
	FROM
		ba.vt_{mask}_cus_reg
	WHERE 
		CUS_TYPE = 'REGULAR'
		and IS_CA = 0
		and ACTN_PERIOD = 1 -- prev
	;
	""")

## kg_for_ca

In [194]:
@log_function_call
def kg_for_ca(gp_connector, mask, n_KG):
	gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_ca_frmt_grp;""")
	gp_connector.execute_query(f""" --sql
	Create Table ba.vt_{mask}_ca_frmt_grp (
	FRMT_ID INTEGER,
	REGION_ID INTEGER,
	CUS_TYPE VARCHAR(10),
	OPSUM_LVL INTEGER,
	AVG_TXN_LVL INTEGER,
	CNT_TRN_LVL INTEGER,
	CNT_CUS INTEGER
	)
	WITH (
		appendonly=true,
		blocksize=32768,
		compresstype=zstd,
		compresslevel=4,
		orientation=column);
	""")

	gp_connector.execute_query(f"""
	INSERT INTO ba.vt_{mask}_ca_frmt_grp
	SELECT
		FRMT_ID
		,REGION_ID
		,CUS_TYPE
		,OPSUM_LVL
		,AVG_TXN_LVL
		,CNT_TRN_LVL
		,COUNT(DISTINCT CONTACT_ID) AS CNT_CUS
	FROM
		ba.vt_{mask}_ca_frmt
	GROUP BY
		FRMT_ID
		,REGION_ID
		,CUS_TYPE
		,OPSUM_LVL
		,AVG_TXN_LVL
		,CNT_TRN_LVL
	;
	""")
	gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_kg_lfl_frmt;""")
	gp_connector.execute_query(f""" --sql
	Create Table ba.vt_{mask}_kg_lfl_frmt (
	CONTACT_ID INTEGER,
	FRMT_ID INTEGER,
	REGION_ID INTEGER,
	CUS_TYPE varchar(10),
	OPSUM_LVL INTEGER,
	AVG_TXN_LVL INTEGER,
	CNT_TRN_LVL INTEGER,
	CNT_CUS INTEGER,
	CNT_KG INTEGER
	)
	WITH (
		appendonly=true,
		blocksize=32768,
		compresstype=zstd,
		compresslevel=4,
		orientation=column)
	;""")

	gp_connector.execute_query(f"""
	INSERT INTO ba.vt_{mask}_kg_lfl_frmt
	SELECT
		CONTACT_ID
		,FRMT_ID
		,REGION_ID
		,CUS_TYPE
		,OPSUM_LVL
		,AVG_TXN_LVL
		,CNT_TRN_LVL
		,CNT_CUS
		,CNT_KG
	FROM (
		SELECT
			CONTACT_ID
			,FRMT_ID
			,REGION_ID
			,CUS_TYPE
			,OPSUM_LVL
			,AVG_TXN_LVL
			,CNT_TRN_LVL
			,CNT_CUS
			,ROW_NUM
			,count(CONTACT_ID) over (partition by FRMT_ID, REGION_ID, CUS_TYPE, OPSUM_LVL, AVG_TXN_LVL, CNT_TRN_LVL) as CNT_KG
		FROM (
			SELECT
				KF.CONTACT_ID
				,KF.FRMT_ID
				,KF.REGION_ID
				,kf.CUS_TYPE
				,KF.OPSUM_LVL
				,KF.AVG_TXN_LVL
				,KF.CNT_TRN_LVL
				,CFG.CNT_CUS
				,ROW_NUMBER() OVER (PARTITION BY KF.REGION_ID, KF.FRMT_ID, KF.CUS_TYPE, KF.OPSUM_LVL, KF.AVG_TXN_LVL, KF.CNT_TRN_LVL order by KF.CNT_TRN_LVL desc) AS ROW_NUM
			FROM
				ba.vt_{mask}_kg_frmt KF
			JOIN
				ba.vt_{mask}_ca_frmt_grp CFG
				on CFG.FRMT_ID = KF.FRMT_ID
				AND CFG.REGION_ID = KF.REGION_ID
				AND cfg.CUS_TYPE = kf.CUS_TYPE
				AND CFG.OPSUM_LVL = KF.OPSUM_LVL
				AND CFG.AVG_TXN_LVL = KF.AVG_TXN_LVL
				AND CFG.CNT_TRN_LVL = KF.CNT_TRN_LVL
			) d
		WHERE
			(case when FRMT_ID = 1 then ROW_NUM else 0 end) <= CNT_CUS * {n_KG}
		) d
	;
	""")
	#Фильтрую ЦА для которых не подобралась КГ
	gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_ca_lfl_frmt;""")
	gp_connector.execute_query(f""" --sql
	Create Table ba.vt_{mask}_ca_lfl_frmt (
	CONTACT_ID INTEGER,
	FRMT_ID INTEGER,
	REGION_ID INTEGER,
	CUS_TYPE varchar(10),
	OPSUM_LVL INTEGER,
	AVG_TXN_LVL INTEGER,
	CNT_TRN_LVL INTEGER
	)
	WITH (
		appendonly=true,
		blocksize=32768,
		compresstype=zstd,
		compresslevel=4,
		orientation=column);
	""")

	gp_connector.execute_query(f"""
	INSERT INTO ba.vt_{mask}_ca_lfl_frmt
	SELECT
		C.CONTACT_ID
		,C.FRMT_ID
		,C.REGION_ID
		,c.CUS_TYPE
		,C.OPSUM_LVL
		,C.AVG_TXN_LVL
		,C.CNT_TRN_LVL
	FROM 
		ba.vt_{mask}_ca_frmt C
	JOIN (
		SELECT DISTINCT
			FRMT_ID
			,REGION_ID
			,CUS_TYPE
			,OPSUM_LVL
			,AVG_TXN_LVL
			,CNT_TRN_LVL
		FROM 
			ba.vt_{mask}_kg_lfl_frmt
		) D
		on D.FRMT_ID = C.FRMT_ID
		AND D.REGION_ID = C.REGION_ID
		AND d.CUS_TYPE = c.CUS_TYPE
		AND d.OPSUM_LVL = c.OPSUM_LVL
		AND D.AVG_TXN_LVL = C.AVG_TXN_LVL
		AND D.CNT_TRN_LVL = C.CNT_TRN_LVL
	;
	""")
	#Собираю в таблицу ЦА/КГ
	gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_ca_cg;""")
	gp_connector.execute_query(f""" --sql
	Create Table ba.vt_{mask}_ca_cg (
	CONTACT_ID INTEGER,
	FRMT_ID INTEGER,
	REGION_ID INTEGER,
	CUS_TYPE VARCHAR(10),
	IS_CA SMALLINT
	)
	WITH (
		appendonly=true,
		blocksize=32768,
		compresstype=zstd,
		compresslevel=4,
		orientation=column);
	""")

	gp_connector.execute_query(f"""
	INSERT INTO ba.vt_{mask}_ca_cg
	SELECT DISTINCT
		CONTACT_ID
		,FRMT_ID
		,REGION_ID
		,CUS_TYPE
		,1 AS IS_CA
	FROM 
		ba.vt_{mask}_ca_lfl_frmt
	UNION 
	SELECT DISTINCT
		CONTACT_ID
		,FRMT_ID
		,REGION_ID
		,CUS_TYPE
		,0 AS IS_CA
	FROM 
		ba.vt_{mask}_kg_lfl_frmt
	;
	""")

## cnt_actn

In [195]:
@log_function_call
def cnt_actn(gp_connector, mask, DateStPre, DateStart, DateEnd, actnId):    
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_cnt_actn;""")
    gp_connector.execute_query(f""" --sql
    CREATE TABLE ba.vt_{mask}_cnt_actn (
    CONTACT_ID NUMERIC,
    IS_CA smallint,
    ACTN_PERIOD smallint,
    CNT_ACTN NUMERIC
    )  WITH (
        appendonly=true,
        blocksize=32768,
        compresstype=zstd,
        compresslevel=4,
        orientation=column)
    ;""")


    gp_connector.execute_query(f""" --sql
    insert into ba.vt_{mask}_cnt_actn 
    SELECT 
        contact_id,
        is_ca,
        actn_period,
        count(distinct actn_grp) as cnt_actn
    FROM (
    select b.contact_id,
        b.actn_name,
        b.date_start,
        b.date_end,
        d.actn_grp,
        c.is_ca,
        case when (b.date_start between '{DateStPre}' and '{DateStart}'
                or b.date_end between '{DateStPre}' and '{DateStart}') then 1
            when (b.date_start between '{DateStart}' and '{DateEnd}'
                or b.date_end between '{DateStart}' and '{DateEnd}') then 2
            else 0
        end as actn_period
    from BA.T_ZIG_SPR_IDN_ACTN b 
    join (select distinct contact_id, is_ca from ba.vt_{mask}_ca_cg) c on c.contact_id = b.contact_id
    join BA.T_ZIG_SPR_ACTN d on d.actn_name = b.actn_name
    where (b.date_start between '{DateStPre}' and '{DateEnd}'
        or b.date_end between '{DateStPre}' and '{DateEnd}')
        and d.actn_id < {actnId}
        --and d.actn_id < 1589
        ) d
    group by 1,2,3
    ;""")

## dynamic_gr20

In [196]:
@log_function_call
def dynamic_gr20(gp_connector, mask, actnId):
    month = gp_connector.gp(f"""
           select
                month_id
                ,min(day_id) as min_dt
                ,max(day_id) as max_dt
           from
                ba.vt_{mask}_days
            where
                actn_id = {actnId} 
                and actn_period = 1
            group by 1 
            order by 1
        ;""")

    month

    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_cus_gr_0;""")
    gp_connector.execute_query(f""" --sql
    Create Table ba.vt_{mask}_cus_gr_0 (
    CONTACT_ID INTEGER,
    GR20_ID INTEGER,
    FRMT_ID INTEGER,
    REGION_ID INTEGER,
    CNT_TRN INTEGER,
    OPSUM NUMERIC,
    QNTY NUMERIC
    )
    WITH (
        appendonly=true,
        blocksize=32768,
        compresstype=zstd,
        compresslevel=4,
        orientation=column);
    """)

    for i in tqdm_notebook(range(len(month))):
        dt_start = str(month.min_dt[i])
        dt_end = str(month.max_dt[i])

        gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_trn_temp;""")
        gp_connector.execute_query(f""" --sql
        CREATE TABLE ba.vt_{mask}_trn_temp  (
        contact_id integer,
        orgunit_id integer,
        frmt_id integer,
        region_id integer,
        cheque_pk bytea,
        article_id integer,
        summ_discounted numeric,
        quantity numeric
        )
        WITH (
            appendonly=true,
            blocksize=32768,
            compresstype=zstd,
            compresslevel=4,
            orientation=column)
        ;""")

        gp_connector.execute_query(f""" --sql
        insert into ba.vt_{mask}_trn_temp
        with trn as (
            select 
                t.contact_id,
                t.orgunit_id,
                t.frmt_id,
                t.region_id,
                t.cheque_pk,
                t.summ_discounted
            from ba.vt_{mask}_trn_0 t
            join ba.vt_{mask}_ca_cg cc 
                on cc.contact_id = t.contact_id
                and cc.frmt_id = t.frmt_id
                and cc.region_id = t.region_id
            where day_date between ('{dt_start}'::timestamp) AND ('{dt_end}'::timestamp + interval '1' day - interval '1' second)
        )
        SELECT 
            t.contact_id,
            t.orgunit_id,
            t.frmt_id,
            t.region_id,
            t.cheque_pk,
            ci.article_id,
            ci.summ_discounted,
            ci.quantity
        FROM trn t
        JOIN dm.cheque_item ci on ci.cheque_pk = t.cheque_pk
        WHERE ci.datetime between ('{dt_start}'::timestamp) AND ('{dt_end}'::timestamp + interval '1' day - interval '1' second)
        ;""")

        gp_connector.execute_query(f""" --sql
        insert into ba.vt_{mask}_cus_gr_0
        SELECT
            t.contact_id,
            ae.ART_GRP_LVL_0_ID as gr20_id,
            t.frmt_id,
            t.region_id,
            count(DISTINCT t.cheque_pk) AS cnt_trn,
            sum(t.summ_discounted) AS opsum,
            sum(t.quantity) AS qnty
        from ba.vt_{mask}_trn_temp t
        JOIN dm.art_ext ae ON ae.article_id = t.article_id
        GROUP BY 1,2,3,4
        ;""")

    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_cus_gr;""")
    gp_connector.execute_query(f""" --sql
    Create Table ba.vt_{mask}_cus_gr (
    CONTACT_ID INTEGER,
    GR20_ID INTEGER,
    FRMT_ID INTEGER,
    REGION_ID INTEGER,
    CNT_TRN INTEGER,
    OPSUM NUMERIC,
    QNTY NUMERIC
    )
    WITH (
        appendonly=true,
        blocksize=32768,
        compresstype=zstd,
        compresslevel=4,
        orientation=column);
    """)

    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_cus_gr
    SELECT
        CONTACT_ID
        ,GR20_ID
        ,FRMT_ID
        ,REGION_ID
        ,sum(cnt_trn) AS cnt_trn
        ,sum(opsum) AS opsum
        ,sum(qnty) AS qnty
    from
        ba.vt_{mask}_cus_gr_0
    GROUP BY 
        CONTACT_ID
        ,GR20_ID
        ,FRMT_ID
        ,REGION_ID
    ;""")

## gr20_transp

In [197]:
@log_function_call
def gr20_transp(gp_connector, mask):
	gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_cus_gr_transp;""")
	gp_connector.execute_query(f""" --sql
	Create Table ba.vt_{mask}_cus_gr_transp (
	CONTACT_ID INTEGER,
	FRMT_ID INTEGER,
	REGION_ID INTEGER,
	"cnt_trn_Бакалея" NUMERIC,
	"cnt_trn_Безалкогольные напитки" NUMERIC,
	"cnt_trn_Бытовая химия" NUMERIC,
	"cnt_trn_Вино" NUMERIC,
	"cnt_trn_Детское питание" NUMERIC,
	"cnt_trn_Замороженная продукция" NUMERIC,
	"cnt_trn_Кондитерские изделия" NUMERIC,
	"cnt_trn_Консервированные продукты" NUMERIC,
	"cnt_trn_Кофе, какао" NUMERIC,
	"cnt_trn_Крепкий алкоголь" NUMERIC,
	"cnt_trn_Кулинария" NUMERIC,
	"cnt_trn_Молочная продукция" NUMERIC,
	"cnt_trn_Мучные кондитерские изделия" NUMERIC,
	"cnt_trn_Мясная гастрономия" NUMERIC,
	"cnt_trn_Мясо" NUMERIC,
	"cnt_trn_Парфюмерия и декоративная косметика" NUMERIC,
	"cnt_trn_Продукция для животных" NUMERIC,
	"cnt_trn_Промышленные товары" NUMERIC,
	"cnt_trn_Птица" NUMERIC,
	"cnt_trn_Рыба" NUMERIC,
	"cnt_trn_Рыбная гастрономия" NUMERIC,
	"cnt_trn_Свежие овощи" NUMERIC,
	"cnt_trn_Свежие фрукты" NUMERIC,
	"cnt_trn_Слабоалкогольные напитки" NUMERIC,
	"cnt_trn_Снэки" NUMERIC,
	"cnt_trn_Специальное питание" NUMERIC,
	"cnt_trn_Сыры" NUMERIC,
	"cnt_trn_Табачные изделия" NUMERIC,
	"cnt_trn_Уход и гигиена" NUMERIC,
	"cnt_trn_Хлеб и хлебобулочные изделия" NUMERIC,
	"cnt_trn_Чай" NUMERIC,
	"cnt_trn_Яичные товары" NUMERIC
	)
	WITH (
		appendonly=true,
		blocksize=32768,
		compresstype=zstd,
		compresslevel=4,
		orientation=column)
	;""")

	gp_connector.execute_query(f""" --sql
	insert into ba.vt_{mask}_cus_gr_transp
	SELECT
		CONTACT_ID
		,FRMT_ID
		,REGION_ID
		,max(CASE WHEN GR20_ID = 16722	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Бакалея"
		,max(CASE WHEN GR20_ID = 2135	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Безалкогольные напитки"
		,max(CASE WHEN GR20_ID = 1262	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Бытовая химия"
		,max(CASE WHEN GR20_ID = 1173	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Вино"
		,max(CASE WHEN GR20_ID = 551	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Детское питание"
		,max(CASE WHEN GR20_ID = 16906	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Замороженная продукция"
		,max(CASE WHEN GR20_ID = 102	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Кондитерские изделия"
		,max(CASE WHEN GR20_ID = 16562	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Консервированные продукты"
		,max(CASE WHEN GR20_ID = 78		THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Кофе, какао"
		,max(CASE WHEN GR20_ID = 17034	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Крепкий алкоголь"
		,max(CASE WHEN GR20_ID = 684	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Кулинария"
		,max(CASE WHEN GR20_ID = 16352	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Молочная продукция"
		,max(CASE WHEN GR20_ID = 17079	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Мучные кондитерские изделия"
		,max(CASE WHEN GR20_ID = 16315	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Мясная гастрономия"
		,max(CASE WHEN GR20_ID = 16332	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Мясо"
		,max(CASE WHEN GR20_ID = 1181	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Парфюмерия и декоративная косметика"
		,max(CASE WHEN GR20_ID = 1155	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Продукция для животных"
		,max(CASE WHEN GR20_ID = 1869	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Промышленные товары"
		,max(CASE WHEN GR20_ID = 16322	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Птица"
		,max(CASE WHEN GR20_ID = 16342	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Рыба"
		,max(CASE WHEN GR20_ID = 16752	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Рыбная гастрономия"
		,max(CASE WHEN GR20_ID = 16864	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Свежие овощи"
		,max(CASE WHEN GR20_ID = 17241	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Свежие фрукты"
		,max(CASE WHEN GR20_ID = 323	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Слабоалкогольные напитки"
		,max(CASE WHEN GR20_ID = 1128	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Снэки"
		,max(CASE WHEN GR20_ID = 1327	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Специальное питание"
		,max(CASE WHEN GR20_ID = 16343	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Сыры"
		,max(CASE WHEN GR20_ID = 367	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Табачные изделия"
		,max(CASE WHEN GR20_ID = 252	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Уход и гигиена"
		,max(CASE WHEN GR20_ID = 679	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Хлеб и хлебобулочные изделия"
		,max(CASE WHEN GR20_ID = 415	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Чай"
		,max(CASE WHEN GR20_ID = 222	THEN CNT_TRN_GR ELSE 0 END) AS "cnt_trn_Яичные товары"                  
	FROM ( 
		SELECT
			cg.CONTACT_ID
			,cg.GR20_ID
			,cg.FRMT_ID
			,cg.REGION_ID
			--,cg.OPSUM / co.CNT_TRN as AVG_SPEND_GR
			,cg.CNT_TRN as CNT_TRN_GR
		FROM
			ba.vt_{mask}_cus_gr cg
		JOIN
			ba.vt_{mask}_cus_reg co 
			on co.CONTACT_ID = cg.CONTACT_ID
			and co.FRMT_ID = cg.FRMT_ID
			and co.REGION_ID = cg.REGION_ID
			and co.ACTN_PERIOD = 1
		) d
	GROUP BY
		CONTACT_ID
		,FRMT_ID
		,REGION_ID
	;
	""")

## age_and_active_virt

In [198]:
@log_function_call
def age_and_active_virt(gp_connector, mask):
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_cus_gender;""")
    gp_connector.execute_query(f""" --sql
    Create Table ba.vt_{mask}_cus_gender (
    CONTACT_ID INTEGER,
    GENDER_CODE smallint,
    AGE_CALC INTEGER
    )
    WITH (
        appendonly=true,
        blocksize=32768,
        compresstype=zstd,
        compresslevel=4,
        orientation=column)
    ;""")

    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_cus_gender
    SELECT
        c.CONTACT_ID
        ,case when gendercalc = 'F' then 1
            when gendercalc = 'M' then 2
            else 0
        end as GENDER_CODE
        ,(current_date - date(birth_date))/364 AS AGE_CALC
    FROM 
        dm.contact c
    JOIN
        (select distinct CONTACT_ID from ba.vt_{mask}_ca_cg) cg on cg.CONTACT_ID = c.CONTACT_ID
    ;""")
    ### Активный пользователь вирт.карты
    #Выделяю клиентов с виртуальной картой
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_cus_virt;""")
    gp_connector.execute_query(f""" --sql
    Create TABLE ba.vt_{mask}_cus_virt (
    CONTACT_ID INTEGER
    )
    WITH (
        appendonly=true,
        blocksize=32768,
        compresstype=zstd,
        compresslevel=4,
        orientation=column)
    ;""")

    # CA
    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_cus_virt
    WITH trn AS (
    SELECT distinct
        contact_id
        ,card_number
    FROM
        ba.vt_{mask}_trn_0
    WHERE
        actn_period = 1
    )

    SELECT DISTINCT
        t.CONTACT_ID
    FROM
        trn t
    JOIN
        (select distinct CONTACT_ID from ba.vt_{mask}_ca_cg) cg on cg.CONTACT_ID = T.CONTACT_ID
    JOIN
        dm.card cc ON cc.card_number = t.card_number
    WHERE
        cc.card_type_id in (1, 90) -- VIRTUAL_01, VIRTUAL_02
    ;""")

## itog

In [199]:
@log_function_call
def itog(gp_connector, mask, lengthActn):
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_cus_profile;""")
    gp_connector.execute_query(f""" --sql
    Create Table ba.vt_{mask}_cus_profile (
    CONTACT_ID INTEGER,
    FRMT_ID INTEGER,
    REGION_ID INTEGER,
    IS_CA SMALLINT,
    IS_VIRT SMALLINT,
    GENDER SMALLINT,
    AGE INTEGER,
    LONG_VISIT INTEGER,
    SQUARE_TRADE NUMERIC,
    SPEND_PREV float,
    SPEND_ACTN float,
    AVG_TXN_PREV float,
    AVG_TXN_ACTN float,
    CNT_TRN_PREV float,
    CNT_TRN_ACTN float,
    CNT_DAY_PREV float,
    OPSUM_ACTN float,
    CNT_TRN_FLTR_ACTN float,
    AVG_TXN_FLTR_ACTN float,
    CNT_WEEK_PREV float,
    CNT_WEEK_ACTN float,
    CNT_ACTN_PREV INTEGER,
    CNT_ACTN_ACTN INTEGER,
    "cnt_trn_Бакалея" NUMERIC,
    "cnt_trn_Безалкогольные напитки" NUMERIC,
    "cnt_trn_Бытовая химия" NUMERIC,
    "cnt_trn_Вино" NUMERIC,
    "cnt_trn_Детское питание" NUMERIC,
    "cnt_trn_Замороженная продукция" NUMERIC,
    "cnt_trn_Кондитерские изделия" NUMERIC,
    "cnt_trn_Консервированные продукты" NUMERIC,
    "cnt_trn_Кофе, какао" NUMERIC,
    "cnt_trn_Крепкий алкоголь" NUMERIC,
    "cnt_trn_Кулинария" NUMERIC,
    "cnt_trn_Молочная продукция" NUMERIC,
    "cnt_trn_Мучные кондитерские изделия" NUMERIC,
    "cnt_trn_Мясная гастрономия" NUMERIC,
    "cnt_trn_Мясо" NUMERIC,
    "cnt_trn_Парфюмерия и декоративная косметика" NUMERIC,
    "cnt_trn_Продукция для животных" NUMERIC,
    "cnt_trn_Промышленные товары" NUMERIC,
    "cnt_trn_Птица" NUMERIC,
    "cnt_trn_Рыба" NUMERIC,
    "cnt_trn_Рыбная гастрономия" NUMERIC,
    "cnt_trn_Свежие овощи" NUMERIC,
    "cnt_trn_Свежие фрукты" NUMERIC,
    "cnt_trn_Слабоалкогольные напитки" NUMERIC,
    "cnt_trn_Снэки" NUMERIC,
    "cnt_trn_Специальное питание" NUMERIC,
    "cnt_trn_Сыры" NUMERIC,
    "cnt_trn_Табачные изделия" NUMERIC,
    "cnt_trn_Уход и гигиена" NUMERIC,
    "cnt_trn_Хлеб и хлебобулочные изделия" NUMERIC,
    "cnt_trn_Чай" NUMERIC,
    "cnt_trn_Яичные товары" NUMERIC
    )
    WITH (
        appendonly=true,
        blocksize=32768,
        compresstype=zstd,
        compresslevel=4,
        orientation=column);
    """)

    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_cus_profile
    WITH opsum_agg AS ( 
        select
            t.CONTACT_ID
            ,t.FRMT_ID
            ,t.REGION_ID
            ,c.IS_CA
            ,t.ACTN_PERIOD
            ,coalesce(t.AVG_SPEND, 0) as AVG_SPEND
            ,t.AVG_TXN
            ,t.CNT_TRN
            ,t.LONG_VISIT
            ,t.SQUARE_TRADE
            ,case when cv.CONTACT_ID is not null then 1 else 0 end as IS_VIRT
            ,t.CNT_DAY
            ,t.OPSUM
            ,coalesce(t.CNT_TRN_FLTR, 0) as CNT_TRN_FLTR
            ,coalesce(t.AVG_TXN_FLTR, 0) as AVG_TXN_FLTR
            ,coalesce(t.CNT_WEEK, 0)  as CNT_WEEK
            ,coalesce(ca.CNT_ACTN, 0) as CNT_ACTN
        from
            ba.vt_{mask}_cus_reg t
        JOIN
            ba.vt_{mask}_ca_cg c 
            on c.CONTACT_ID = t.CONTACT_ID
            and c.FRMT_ID = t.FRMT_ID
            and c.REGION_ID = t.REGION_ID
            and c.IS_CA = t.IS_CA
        left JOIN
            ba.vt_{mask}_cus_virt cv on cv.CONTACT_ID = t.CONTACT_ID
        left join
            ba.vt_{mask}_cnt_actn ca 
            on ca.contact_id = t.contact_id
            and ca.is_ca = t.is_ca
            and ca.actn_period = t.actn_period
    ), opsum_agg_transp AS (
        select
            CONTACT_ID
            ,FRMT_ID
            ,REGION_ID
            ,IS_CA
            ,IS_VIRT
            ,sum(case when ACTN_PERIOD = 1 then AVG_SPEND else 0 end) as SPEND_PREV
            ,sum(case when ACTN_PERIOD = 2 then AVG_SPEND else 0 end) as SPEND_ACTN
            ,max(case when ACTN_PERIOD = 1 then AVG_TXN end) as AVG_TXN_PREV
            ,max(case when ACTN_PERIOD = 2 then AVG_TXN end) as AVG_TXN_ACTN
            ,sum(case when ACTN_PERIOD = 1 then CNT_TRN end) as CNT_TRN_PREV
            ,sum(case when ACTN_PERIOD = 2 then CNT_TRN end) as CNT_TRN_ACTN
            ,max(case when ACTN_PERIOD = 1 then LONG_VISIT end) as LONG_VISIT
            ,max(case when ACTN_PERIOD = 1 then SQUARE_TRADE end) as SQUARE_TRADE
            ,max(case when ACTN_PERIOD = 1 then CNT_DAY end) as CNT_DAY_PREV
            ,max(case when ACTN_PERIOD = 2 then OPSUM / {lengthActn} end) as OPSUM_ACTN
            ,max(case when ACTN_PERIOD = 2 then CNT_TRN_FLTR else 0 end) as CNT_TRN_FLTR_ACTN
            ,max(case when ACTN_PERIOD = 2 then AVG_TXN_FLTR else 0 end) as AVG_TXN_FLTR_ACTN
            ,max(case when ACTN_PERIOD = 1 then CNT_WEEK else 0 end) as CNT_WEEK_PREV
            ,max(case when ACTN_PERIOD = 2 then CNT_WEEK else 0 end) as CNT_WEEK_ACTN
            ,max(case when ACTN_PERIOD = 1 then CNT_ACTN else 0 end) as CNT_ACTN_PREV
            ,max(case when ACTN_PERIOD = 2 then CNT_ACTN else 0 end) as CNT_ACTN_ACTN
        from
            opsum_agg
        group by
            CONTACT_ID
            ,FRMT_ID
            ,REGION_ID
            ,IS_CA
            ,IS_VIRT
    )

    SELECT distinct
        t.CONTACT_ID
        ,t.FRMT_ID
        ,t.REGION_ID
        ,t.IS_CA
        ,t.IS_VIRT
        ,cg.GENDER_CODE
        ,cg.AGE_CALC
        ,t.LONG_VISIT
        ,t.SQUARE_TRADE
        ,t.SPEND_PREV
        ,t.SPEND_ACTN
        ,t.AVG_TXN_PREV
        ,t.AVG_TXN_ACTN
        ,t.CNT_TRN_PREV
        ,t.CNT_TRN_ACTN
        ,t.CNT_DAY_PREV
        ,t.OPSUM_ACTN
        ,t.CNT_TRN_FLTR_ACTN
        ,t.AVG_TXN_FLTR_ACTN
        ,t.CNT_WEEK_PREV
        ,t.CNT_WEEK_ACTN
        ,t.CNT_ACTN_PREV
        ,t.CNT_ACTN_ACTN
        ,"cnt_trn_Бакалея"
        ,"cnt_trn_Безалкогольные напитки"
        ,"cnt_trn_Бытовая химия"
        ,"cnt_trn_Вино"
        ,"cnt_trn_Детское питание"
        ,"cnt_trn_Замороженная продукция"
        ,"cnt_trn_Кондитерские изделия"
        ,"cnt_trn_Консервированные продукты"
        ,"cnt_trn_Кофе, какао"
        ,"cnt_trn_Крепкий алкоголь"
        ,"cnt_trn_Кулинария"
        ,"cnt_trn_Молочная продукция"
        ,"cnt_trn_Мучные кондитерские изделия"
        ,"cnt_trn_Мясная гастрономия"
        ,"cnt_trn_Мясо"
        ,"cnt_trn_Парфюмерия и декоративная косметика"
        ,"cnt_trn_Продукция для животных"
        ,"cnt_trn_Промышленные товары"
        ,"cnt_trn_Птица"
        ,"cnt_trn_Рыба"
        ,"cnt_trn_Рыбная гастрономия"
        ,"cnt_trn_Свежие овощи"
        ,"cnt_trn_Свежие фрукты"
        ,"cnt_trn_Слабоалкогольные напитки"
        ,"cnt_trn_Снэки"
        ,"cnt_trn_Специальное питание"
        ,"cnt_trn_Сыры"
        ,"cnt_trn_Табачные изделия"
        ,"cnt_trn_Уход и гигиена"
        ,"cnt_trn_Хлеб и хлебобулочные изделия"
        ,"cnt_trn_Чай"
        ,"cnt_trn_Яичные товары"
    FROM
        opsum_agg_transp t
    JOIN
        ba.vt_{mask}_cus_gender cg on cg.CONTACT_ID = t.CONTACT_ID
    left JOIN
        ba.vt_{mask}_cus_gr_transp cgt 
        on cgt.CONTACT_ID = t.CONTACT_ID
        and cgt.FRMT_ID = t.FRMT_ID
        and cgt.REGION_ID = t.REGION_ID
    ;
    """)

## psm

In [200]:
@log_function_call
def psm(gp_connector, mask, actn_name, actnId):
    import numpy as np
    #ЗАПОЛНИТЬ!
    mde_ratio = 10.0 / 100.0 #10% минимальный детектируемый эффект прироста трат
    # Оптимизированная функция
    def perform_matching(indexes: np.ndarray,
                        is_ca_array: np.ndarray,
                        frmt_id_array: np.ndarray,
                        region_id_array: np.ndarray) -> np.ndarray:
        """
        Batch-функция для мэтчинга KNN.
        Перебирает все индексы, где is_ca == 1, для каждого такого индекса ищет
        первого подходящего кандидата (is_ca == 0) из его ближайших соседей, который:
        1) Не совпадает с самим индексом.
        2) Имеет тот же frmt_id.
        3) Имеет тот же region_id.
        4) Ещё не использован (уникальный).
        Параметры:
        ----------
        indexes : np.ndarray
            Результат вызова knn.kneighbors(...)[1], массив индексов ближайших соседей.
            Размерность: (n_samples, n_neighbors).
        is_ca_array : np.ndarray
            Массив признака is_ca (0/1) для каждой строки.
        frmt_id_array : np.ndarray
            Массив, в котором хранится frmt_id для каждой строки.
        region_id_array : np.ndarray
            Массив, в котором хранится region_id для каждой строки.

        Возвращает:
        ----------
        np.ndarray
            Массив matched_element длиной n_samples, где:
            - matched_element[i] = индекс подобранного «не ЦА» для строки i,
            - matched_element[i] = np.nan, если подходящий кандидат не найден.
        """

        n = len(is_ca_array)
        # Сюда запишем итоговый индекс «пары» для каждой строки (или np.nan, если нет)
        matched_element = np.full(n, np.nan, dtype=np.float32)

        # Булевый массив для учёта уже использованных кандидатов:
        used_kg_mask = np.zeros(n, dtype=bool)

        # Определяем индексы, где is_ca == 1 (лечебная группа)
        treated_indices = np.where(is_ca_array == 1)[0]

        for current_index in treated_indices:
            # Ближайшие соседи для текущего индекса
            candidates = indexes[current_index, :]

            # Маска: не сам себя
            mask_not_self = (candidates != current_index)
            # Маска: кандидат из контрольной группы (is_ca == 0)
            mask_ca = (is_ca_array[candidates] == 0)
            # Маска: тот же формат
            mask_frmt = (frmt_id_array[candidates] == frmt_id_array[current_index])
            # Маска: тот же регион
            mask_region = (region_id_array[candidates] == region_id_array[current_index])
            # Маска: ещё не использован (unique pair)
            mask_unused = ~used_kg_mask[candidates]

            valid_mask = mask_not_self & mask_ca & mask_frmt & mask_region & mask_unused
            valid_indices = np.where(valid_mask)[0]

            if len(valid_indices) > 0:
                # Берём первого подходящего кандидата
                chosen_idx = candidates[valid_indices[0]]
                matched_element[current_index] = chosen_idx
                # Отмечаем, что chosen_idx теперь нельзя использовать повторно
                used_kg_mask[chosen_idx] = True

        return matched_element

    def perfom_matching_v1(row, indexes, df):
        current_index = int(row['level_0']) # Obtain value from index-named column, not the actual DF index.
        for idx in indexes[current_index,:]:
            if (current_index != idx) and (row.is_ca == 1) and (df.loc[idx].is_ca == 0) and\
            (row.frmt_id == df.loc[idx].frmt_id) and (row.region_id == df.loc[idx].region_id):
                return int(idx)
            
    def perfom_matching_v2(row, indexes, df, kg_lst):
        current_index = int(row['level_0']) # Obtain value from index-named column, not the actual DF index.
        for idx in indexes[current_index,:]:
            if (current_index != idx) and (row.is_ca == 1) and (df.loc[idx].is_ca == 0) and (idx not in kg_lst) and\
            (row.frmt_id == df.loc[idx].frmt_id) and (row.region_id == df.loc[idx].region_id):
                kg_lst.append(idx)
                return int(idx)

    def min_sample_size_avg(std, mean_diff, power = 0.8, sig_level = 0.05, alternative = "two-sided", control_ratio = 0.5):
        if alternative == "one-sided":
            alternative = "larger"
        power_analysis  = TTestIndPower()
        d = mean_diff / std 
        ratio = (1 - control_ratio) / control_ratio
        sample_size = power_analysis.solve_power(
            effect_size=d,
            nobs1=None,
            alpha=sig_level,
            power=power,
            ratio=ratio,
            alternative=alternative)
        sample_power = power_analysis.power(
            effect_size=d,
            nobs1=sample_size,
            alpha=sig_level,
            ratio=ratio)
        return int(sample_size), float(sample_power) # nobs1 = размер контрольной группы

    def absolute_ttest(control, test):
        mean_control = np.mean(control)
        mean_test = np.mean(test)
        var_mean_control  = np.var(control) / len(control)
        var_mean_test  = np.var(test) / len(test)
        
        difference_mean = mean_test - mean_control
        difference_mean_var = var_mean_control + var_mean_test
        difference_distribution = sps.norm(loc=difference_mean, scale=np.sqrt(difference_mean_var))

        left_bound, right_bound = difference_distribution.ppf([0.025, 0.975])
        ci_length = (right_bound - left_bound)
        pvalue = 2 * min(difference_distribution.cdf(0), difference_distribution.sf(0))
        effect = difference_mean
        return ExperimentComparisonResults(round(pvalue, 4), round(effect, 4), round(ci_length, 4), round(left_bound, 4), round(right_bound, 4))


    def cuped_ttest(control, test, control_before, test_before):
        theta = (np.cov(control, control_before)[0, 1] + np.cov(test, test_before)[0, 1]) /\
                    (np.var(control_before) + np.var(test_before))
        control_cup = control - theta * control_before
        test_cup = test - theta * test_before
        return absolute_ttest(control_cup, test_cup)
    
    query = f"""select distinct FRMT_ID, REGION_ID from ba.vt_{mask}_cus_profile order by FRMT_ID desc;"""
    frmt_region = gp_connector.gp(query)

    #Использование новой функции с заранее выделенными массивами
    frmt = frmt_region.frmt_id.unique().tolist()
    region = frmt_region.region_id.unique().tolist()
    # frmt = [1]
    # region = [227]

    #Сразу создадим списки, куда будем складывать результаты
    cus_lfl_list = []
    stat_test_list = []

    print('Доля мэтчинга ЛФЛ ЦА/КГ:')

    batch_size = 100000

    for f in range(len(frmt)):
        for r in range(len(region)):
            gc.collect()

            data_chunks = []
            offset = 0
                
            print(f"\nЗагрузка данных frmt_id={frmt[f]}, region_id={region[r]} батчами...")

            # Загрузка батчами
            while True:
                query = f"""
                    SELECT * FROM ba.vt_{mask}_cus_profile 
                    WHERE frmt_id = '{frmt[f]}' AND region_id = '{region[r]}'
                    LIMIT {batch_size} OFFSET {offset}
                """
                batch_df = gp_connector.gp(query)

                if batch_df.empty:
                    break

                batch_df = gp_connector.reduce_mem_usage(batch_df, verbose=False)
                data_chunks.append(batch_df)
                offset += batch_size

                print(f"Загружено всего строк: {offset}, в текущем батче: {len(batch_df)}")

            if not data_chunks:
                print(f"Нет данных для frmt_id={frmt[f]}, region_id={region[r]}. Пропускаем.")
                continue

            # Объединяем все батчи в один DataFrame
            df = pd.concat(data_chunks, ignore_index=True)
            data_chunks = []  # Освобождаем память
            gc.collect()     

            # Заполняю пропуски средним
            # cols = df.iloc[:, 6:].columns
            # df[cols] = df[cols].fillna(df[cols].mean())

            df = df.dropna().reset_index(drop=True)
            df_len = df.groupby('is_ca')['contact_id'].count()
        
            #Модель
            if len(df_len) > 1 and df_len.values[0] > 1 and df_len.values[1] > 1:
                df['treatment'] = df['is_ca'] == 1
                df['is_female'] = df['gender'] == 1

                TREATMENT = 'treatment'
                OUTCOME = 'opsum_actn'

                cols = ['is_female', 'age', 'is_virt', 'long_visit', 'square_trade',
                'spend_prev', 'avg_txn_prev', 'cnt_trn_prev',
                'cnt_week_prev', 'cnt_week_actn', 'cnt_day_prev', 'cnt_actn_prev', 'cnt_actn_actn',
                'cnt_trn_Бакалея', 'cnt_trn_Безалкогольные напитки', 'cnt_trn_Бытовая химия',
                'cnt_trn_Вино', 'cnt_trn_Детское питание',
                'cnt_trn_Замороженная продукция', 'cnt_trn_Кондитерские изделия',
                'cnt_trn_Консервированные продукты', 'cnt_trn_Кофе, какао',
                'cnt_trn_Крепкий алкоголь', 'cnt_trn_Кулинария',
                'cnt_trn_Молочная продукция', 'cnt_trn_Мучные кондитерские изделия',
                'cnt_trn_Мясная гастрономия', 'cnt_trn_Мясо',
                'cnt_trn_Парфюмерия и декоративная кос',
                'cnt_trn_Продукция для животных', 'cnt_trn_Промышленные товары',
                'cnt_trn_Птица', 'cnt_trn_Рыба', 'cnt_trn_Рыбная гастрономия',
                'cnt_trn_Свежие овощи', 'cnt_trn_Свежие фрукты',
                'cnt_trn_Слабоалкогольные напитки', 'cnt_trn_Снэки',
                'cnt_trn_Специальное питание', 'cnt_trn_Сыры',
                'cnt_trn_Табачные изделия', 'cnt_trn_Уход и гигиена',
                'cnt_trn_Хлеб и хлебобулочные изделия', 'cnt_trn_Чай',
                'cnt_trn_Яичные товары']

                # Estimate propensity scores 
                # Build a descriptive model
                t = df[TREATMENT]
                X = df[cols]
                pipe = Pipeline([
                    # ('minmax', MinMaxScaler()),
                    ('scaler', StandardScaler()),
                    ('logistic_classifier', LogisticRegression(random_state=789, class_weight='balanced'))
                ])
                pipe.fit(X, t)

                threshold = 0.5
                df['proba'] = pipe.predict_proba(X)[:,1]
                ind = df[df.proba == 1].index
                df.loc[ind, 'proba'] = 0.9999
                df['logit'] = df['proba'].apply(lambda p: np.log(p/(1-p)))
                df['pred'] = np.where(df['proba']>=threshold, 1, 0)

                # Масштабирование признаков
                X_scale = pd.DataFrame(pipe['scaler'].transform(X), columns=pipe['scaler'].get_feature_names_out())
                X_scale['logit'] = df['logit'].copy()

                #Мэтчинг KNN
                caliper = np.std(df.logit) * 0.25
                knn = NearestNeighbors(n_neighbors=5, p = 2, radius=caliper)
                knn.fit(X_scale[['logit', 'spend_prev', 'avg_txn_prev', 'cnt_trn_prev']].to_numpy())

                # Common support distances and indexes
                distances , indexes = knn.kneighbors(X_scale[['logit', 'spend_prev', 'avg_txn_prev', 'cnt_trn_prev']].to_numpy(), n_neighbors=5)

                treated_x = df[df[TREATMENT]][['logit']].values
                non_treated_x = df[~df[TREATMENT]][['logit']].values

                # Выносим массивы
                is_ca_array = df['is_ca'].values
                frmt_id_array = df['frmt_id'].values
                region_id_array = df['region_id'].values

                # Применяем batch-функцию мэтчинга
                matched_element_array = perform_matching(
                    indexes, is_ca_array, frmt_id_array, region_id_array
                )
                df['matched_element'] = matched_element_array

                treated_with_match = ~pd.isna(df['matched_element'])
                treated_matched_data = df.loc[treated_with_match, [
                    'contact_id','frmt_id','region_id','is_ca',
                    'spend_prev','spend_actn', 'opsum_actn','logit','matched_element'
                ]].copy()

                attributes = ['contact_id', 'frmt_id', 'region_id', 'is_ca', 'spend_prev', 'spend_actn', 'opsum_actn', 'logit']
                untreated_matched_data = df.loc[treated_matched_data.matched_element.values, attributes]

                if len(treated_matched_data) > 1:
                    all_mached_data = pd.concat([treated_matched_data, untreated_matched_data])
                    # Сохраняем в список (потом склеим за пределами цикла)
                    cus_lfl_list.append(
                    all_mached_data[['contact_id', 'frmt_id', 'region_id', 'is_ca', 'logit']]
                    )

                    #Расчет репрезентативности выборки
                    n1 = len(all_mached_data.query('is_ca == 0')['contact_id'].unique())
                    n2 = len(all_mached_data.query('is_ca == 1')['contact_id'].unique())
                    std = all_mached_data['opsum_actn'].std()
                    mean_diff = all_mached_data.query('is_ca == 1')['opsum_actn'].mean() -\
                                all_mached_data.query('is_ca == 0')['opsum_actn'].mean()
                    
                    #mean_diff = df.query('is_ca == 1')['spend_prev'].mean() * mde_ratio
                    
                    min_sample_size, power = min_sample_size_avg(std = std, mean_diff = mean_diff)
                    if n2 >= min_sample_size:
                        is_ok_sample = 1
                    else:
                        is_ok_sample = 0

                    # Сходимость на периоде "ДО"
                    treated_outcome_prev = treated_matched_data.spend_prev
                    untreated_outcome_prev = untreated_matched_data.spend_prev
                    _ , p_val_prev = stats.ttest_ind(treated_outcome_prev, untreated_outcome_prev)

                    # Сходимость на периоде "АКЦ"
                    treated_outcome_actn = treated_matched_data.opsum_actn
                    untreated_outcome_actn = untreated_matched_data.opsum_actn
                    ttest_actn = cuped_ttest(untreated_outcome_actn, treated_outcome_actn, untreated_outcome_prev, treated_outcome_prev)
                    #_ , p_val_actn = stats.ttest_ind(treated_outcome_actn, untreated_outcome_actn, equal_var=False)
                    
                    stat_temp = pd.DataFrame({'frmt_id':[frmt[f]],
                                            'region_id':[region[r]],
                                            'n_ca_total':[n2],
                                            'min_sample_size':[min_sample_size],
                                            'is_ok_sample':[is_ok_sample],
                                            'power':[power],
                                            'stat_prev':[p_val_prev],
                                            'stat_actn':[ttest_actn[0]],
                                            'effect':[ttest_actn[1]],
                                            'ci_length':[ttest_actn[2]],
                                            'left_bound':[ttest_actn[3]],
                                            'right_bound':[ttest_actn[4]]})
                    stat_test_list.append(stat_temp)

                    # Доля мэтчинга ЛФЛ ЦА/КГ
                    match_ratio = treated_matched_data.groupby(['frmt_id', 'region_id'])['contact_id'].count() /\
                                df[df.is_ca == 1].groupby(['frmt_id', 'region_id'])['contact_id'].count() * 100
                    print(f'{match_ratio.index[0][0]} {match_ratio.index[0][1]} {match_ratio.values}')
            else:
                stat_temp = pd.DataFrame({'frmt_id':[frmt[f]],
                                        'region_id':[region[r]],
                                        'n_ca_total':[n2],
                                        'min_sample_size':[0],
                                        'is_ok_sample':[0],
                                        'power':[0],
                                        'stat_prev':[0.0],
                                        'stat_actn':[1.0],
                                        'effect':[0.0],
                                        'ci_length':[0.0],
                                        'left_bound':[0.0],
                                        'right_bound':[0.0]})
                stat_test_list.append(stat_temp)
            
            df_last = df

            if not (f == len(frmt) - 1 and r == len(region) - 1):
            # Не держим df в памяти, сохраняем только последний для визуализации
                del df
                gc.collect()

    gc.collect()

    #Итоговые объединения
    cus_lfl = pd.concat(cus_lfl_list, ignore_index=True) if cus_lfl_list else pd.DataFrame(
        columns=['contact_id', 'frmt_id', 'region_id', 'is_ca', 'logit']
    )
    stat_test = pd.concat(stat_test_list, ignore_index=True) if stat_test_list else pd.DataFrame(
        columns=['frmt_id', 'region_id', 'n_ca_total', 'min_sample_size', 'is_ok_sample',
                'power', 'stat_prev', 'stat_actn', 'effect', 'ci_length', 'left_bound', 'right_bound']
    )

    print('Процесс завершен.')
    stat_test.reset_index(drop=True, inplace=True)
    stat_test['stat_shod'] = 0
    stat_test['stat_test'] = 0

    for i in range(len(stat_test)):
        if stat_test.loc[i, 'stat_prev'] > 0.05:
            stat_test.loc[i, 'stat_shod'] = 1
            if stat_test.loc[i, 'stat_actn'] < 0.05:
                if stat_test.loc[i, 'effect'] > 0:
                    stat_test.loc[i, 'stat_test'] = 1

    file_name = f"{actn_name}_stat_test.xlsx"
    stat_test.to_excel(file_name, index=False)

    # print('Оцениваемый признак: Траты покупателя')
    # print('Тотал результатов: ', len(stat_test))
    # print()
    # print('Тип клиента: REGULAR')
    # print('Кол-во: ', len(stat_test))
    # print('Кол-во сходимых результатов: ', sum(stat_test.stat_shod))
    # print('Доля сходимых результатов: {:.1f}%'.format(sum(stat_test.stat_shod)/len(stat_test)*100))
    # print()
    # print('Из них: Кол-во значимых эффектов: ', sum(stat_test.stat_test))
    # print('        Доля значимых эффектов: {:.1f}%'.format(sum(stat_test.stat_test)/len(stat_test)*100))
    # print()
    stat_test
    cus_lfl.groupby('is_ca')['contact_id'].nunique()

    #Сохраняю Стат.тест
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_stat_temp;""")
    gp_connector.execute_query(f""" --sql
    Create Table ba.vt_{mask}_stat_temp (
    FRMT_ID INTEGER,
    REGION_ID INTEGER,
    STAT_TEST SMALLINT
    );
    """)

    res = stat_test[['frmt_id', 'region_id', 'stat_test']]
    gp_connector.insert_data(df=res, tablename=f'ba.vt_{mask}_stat_temp')
    gp_connector.execute_query(f"""
    DELETE FROM BA.T_ZIG_STAT_TEST_PSM
    WHERE ACTN_NAME = '{actn_name}';
    """)

    gp_connector.execute_query(f"""
    INSERT INTO BA.T_ZIG_STAT_TEST_PSM
    SELECT
        '{actn_name}'
        ,FRMT_ID
        ,REGION_ID
        ,STAT_TEST
    FROM 
        ba.vt_{mask}_stat_temp
    ;
    """)
    gp_connector.gp(f"""SELECT count(1) FROM BA.T_ZIG_STAT_TEST_PSM where ACTN_NAME = '{actn_name}';""").iloc[:,0][0]
    ### Сохраняю ЦА/КГ (ЛФЛ)
    cus_lfl.contact_id.nunique()
    #Сохраняю результат
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_cus_temp;""")
    gp_connector.execute_query(f""" --sql
    Create Table ba.vt_{mask}_cus_temp (
    CONTACT_ID INTEGER,
    FRMT_ID INTEGER,
    REGION_ID INTEGER,
    IS_CA SMALLINT,
    LOGIT NUMERIC
    );
    """)

    cus_lfl = cus_lfl.reset_index(drop=True)
    gp_connector.insert_data(df=cus_lfl, tablename=f'ba.vt_{mask}_cus_temp')
    #Собираю в таблицу ЦА/КГ
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_cus;""")
    gp_connector.execute_query(f""" --sql
    Create Table ba.vt_{mask}_cus (
    CONTACT_ID INTEGER,
    ACTN_ID SMALLINT,
    FRMT_ID INTEGER,
    REGION_ID INTEGER,
    CUS_TYPE VARCHAR(10),
    IS_CA SMALLINT,
    IS_CUS_LFL SMALLINT,
    LOGIT NUMERIC
    );
    """)

    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_cus
    SELECT DISTINCT
        CONTACT_ID
        ,{actnId}
        ,FRMT_ID
        ,REGION_ID
        ,'REGULAR' as CUS_TYPE
        ,IS_CA
        ,1 AS IS_CUS_LFL
        ,LOGIT
    FROM 
        ba.vt_{mask}_cus_temp
    ;
    """)
    #NEW, RETURNED
    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_cus
    SELECT DISTINCT
        CONTACT_ID
        ,{actnId}
        ,FRMT_ID
        ,REGION_ID
        ,CUS_TYPE
        ,1 AS IS_CA
        ,1 AS IS_CUS_LFL
    --    ,null as LOGIT
    FROM (
        SELECT DISTINCT
            T.CONTACT_ID
            ,T.FRMT_ID
            ,T.REGION_ID
            ,C.CUS_TYPE
        FROM
            ba.vt_{mask}_trn T
        JOIN
            (SELECT distinct CONTACT_ID FROM ba.vt_{mask}_days_cross) CA 
            ON CA.CONTACT_ID = T.CONTACT_ID
        JOIN
            ba.vt_{mask}_cus_type C
            ON C.CONTACT_ID = T.CONTACT_ID 
            AND C.FRMT_ID = T.FRMT_ID
            AND C.REGION_ID = T.REGION_ID
        WHERE
            ACTN_PERIOD = 2
            and CUS_TYPE IN ('NEW', 'RETURNED')
            and CNT_TRN >= 2
        ) d
    ;
    """)
    #оставшиеся ЦА - не ЛФЛ
    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_cus
    SELECT distinct
        CONTACT_ID
        ,{actnId}
        ,FRMT_ID
        ,REGION_ID
        ,CUS_TYPE
        ,1 AS IS_CA
        ,0 AS IS_CUS_LFL
    --    ,NULL as LOGIT
    FROM (
        SELECT distinct
            CONTACT_ID
            ,FRMT_ID
            ,REGION_ID
            ,CUS_TYPE
        FROM (
            SELECT DISTINCT
                T.CONTACT_ID
                ,T.FRMT_ID
                ,T.REGION_ID
                ,C.CUS_TYPE
            FROM
                ba.vt_{mask}_trn T
            JOIN
                (SELECT distinct CONTACT_ID FROM ba.vt_{mask}_days_cross) CA 
                ON CA.CONTACT_ID = T.CONTACT_ID
            JOIN
                ba.vt_{mask}_cus_type C
                ON C.CONTACT_ID = T.CONTACT_ID 
                AND C.FRMT_ID = T.FRMT_ID
                AND C.REGION_ID = T.REGION_ID
            WHERE
                ACTN_PERIOD = 2) d
        EXCEPT
        SELECT
            CONTACT_ID
            ,FRMT_ID
            ,REGION_ID
            ,CUS_TYPE
        FROM 
            ba.vt_{mask}_cus
        WHERE 
            IS_CA = 1
        ) d
    ;
    """)
    #сохраняю ЦА/КГ
    gp_connector.execute_query(f"""
    DELETE FROM BA.T_ZIG_SPR_CUS_LFL
    WHERE ACTN_ID = {actnId}
    ;
    """)

    gp_connector.execute_query(f"""
    INSERT INTO BA.T_ZIG_SPR_CUS_LFL
    SELECT
        CONTACT_ID
        ,ACTN_ID
        ,FRMT_ID
        ,REGION_ID
        ,CUS_TYPE
        ,IS_CA
        ,IS_CUS_LFL
        ,LOGIT
    FROM 
        ba.vt_{mask}_cus
    ;
    """)

## margin

In [201]:
@log_function_call
def margin(gp_connector, mask, actn_name, promo_start_date, promo_end_date,
            in_code_ruls,
            actnId, DateStart, DateEnd, DateStPre, DateEndLast,
            lengthprev, lengthpost, lengthActn):
    
    month = gp_connector.gp(f""" --sql
    SELECT
        MONTH_ID
        ,min(DAY_ID) as min_dt
        ,max(DAY_ID) as max_dt
    FROM ba.vt_{mask}_days 
    WHERE ACTN_NAME = '{actn_name}'
        and ACTN_PERIOD IN (1,2,3)
    group by MONTH_ID
    order by MONTH_ID
    ;""")

    gp_connector.execute_query(f"""DROP TABLE IF EXISTS ba.vt_{mask}_trn_prd;""")
    gp_connector.execute_query(f""" --sql
    CREATE TABLE ba.vt_{mask}_trn_prd (
    cheque_pk bytea,
    contact_id integer,
    is_ca smallint,
    day_id date,
    orgunit_id integer,
    article_id numeric,
    summ_discounted numeric,
    qnty numeric
    )   
    WITH (
        appendonly=true,
        blocksize=32768,
        compresstype=zstd,
        compresslevel=4,
        orientation=column)
    DISTRIBUTED BY (cheque_pk)
    ;""")

    for i in tqdm_notebook(range(len(month))):
        dt_start = str(month.min_dt[i])
        dt_end = str(month.max_dt[i])
        
        gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_trn_temp;""")
        gp_connector.execute_query(f""" --sql
        CREATE TABLE ba.vt_{mask}_trn_temp (
        contact_id integer,
        orgunit_id integer,
        cheque_pk bytea,
        datetime date,
        article_id integer,
        summ_discounted numeric,
        quantity numeric,
        rn smallint
        )
        WITH (
            appendonly=true,
            blocksize=32768,
            compresstype=zstd,
            compresslevel=4,
            orientation=column)
        DISTRIBUTED BY (cheque_pk)
        ;""")

        gp_connector.execute_query(f""" --sql
        insert into ba.vt_{mask}_trn_temp
        SELECT 
            t.contact_id
            ,t.orgunit_id
            ,t.cheque_pk
            ,t.day_id as datetime
            ,t.article_id
            ,SUM(t.summ_discounted) AS summ_discounted
            ,SUM(t.quantity) AS quantity
            ,1 as rn
        FROM (
            SELECT 
                c.contact_id
                ,c.orgunit_id
                ,c.cheque_pk
                ,date_trunc('day', c.datetime)::date AS day_id
                ,ci.article_id
                ,ci.summ_discounted
                ,ci.quantity
            FROM 
                dm.cheque c
            JOIN
                dm.cheque_item ci on ci.cheque_pk = c.cheque_pk
            WHERE
                operation_type_id = 1
                AND c.contact_id > 0
                AND c.datetime between ('{dt_start}'::timestamp) AND ('{dt_end}'::timestamp + interval '1' day - interval '1' second)
                AND ci.datetime between ('{dt_start}'::timestamp) AND ('{dt_end}'::timestamp + interval '1' day - interval '1' second)
            ) t
        GROUP BY
            t.contact_id,
            t.orgunit_id,
            t.cheque_pk,
            t.day_id,
            t.article_id;
        ;""")

        gp_connector.execute_query(f""" --sql                          
        insert into ba.vt_{mask}_trn_prd
        SELECT
            t.cheque_pk
            ,t.contact_id
            ,c.is_ca
            ,t.datetime
            ,t.orgunit_id
            ,t.article_id
            ,t.summ_discounted
            ,t.quantity
        from
            ba.vt_{mask}_trn_temp t
        JOIN 
            (SELECT DISTINCT contact_id, is_ca FROM BA.T_ZIG_SPR_CUS_LFL WHERE ACTN_ID = {actnId}) c    -- Клиенты для оценки
            ON c.contact_id = t.contact_id
        JOIN 
            ba.vt_{mask}_whs w ON w.orgunit_id = t.orgunit_id
        WHERE
            t.rn = 1
        ;""")

    gp_connector.execute_query(f"""DROP TABLE IF EXISTS ba.vt_{mask}_trn_clear;""")
    gp_connector.execute_query(f""" --sql
    CREATE TABLE ba.vt_{mask}_trn_clear (
    cheque_pk bytea,
    contact_id integer,
    is_ca smallint,
    day_id date,
    orgunit_id integer,
    article_id integer,
    summ_discounted numeric,
    qnty numeric,
    is_trn_fltr smallint
    )
    WITH (
        appendonly=true,
        blocksize=32768,
        compresstype=zstd,
        compresslevel=4,
        orientation=column)
    DISTRIBUTED BY (cheque_pk)
    ;""")

    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_trn_temp;""")
    gp_connector.execute_query(f""" --sql
    CREATE TABLE ba.vt_{mask}_trn_temp AS (
    SELECT
        t.cheque_pk
        ,t.contact_id
        ,t.day_id
    FROM
        ba.vt_{mask}_trn_prd t
    WHERE
        t.day_id between '{DateStart}' AND '{DateEnd}'
    group by 
        t.cheque_pk
        ,t.contact_id
        ,t.day_id
    );""")

    gp_connector.execute_query(f""" --sql
    insert into ba.vt_{mask}_trn_clear
    WITH trn_ca AS (
        SELECT
            t.cheque_pk
            ,t.contact_id
            ,t.day_id
            ,cd.IS_CROSS         -- чеки с пересечением в других акциях
        FROM
            ba.vt_{mask}_trn_temp t
        JOIN 
            (SELECT DISTINCT contact_id FROM ba.vt_{mask}_days_cross) c    -- Клиенты для оценки
            ON c.contact_id = t.contact_id
        JOIN
            ba.vt_{mask}_days_cross cd
            on cd.CONTACT_ID = t.CONTACT_ID
            and cd.DAY_ID = t.day_id
        )

    SELECT
        t.cheque_pk
        ,t.contact_id
        ,t.is_ca
        ,t.day_id
        ,t.orgunit_id
        ,t.article_id
        ,t.summ_discounted
        ,t.qnty
        ,coalesce(tc.IS_CROSS, 0) AS IS_TRN_FLTR         -- чеки с пересечением в других акциях
    FROM
        ba.vt_{mask}_trn_prd t
    left JOIN
        trn_ca tc on tc.cheque_pk = t.cheque_pk
    ;
    """)

    gp_connector.execute_query(f"""
    DELETE from ba.vt_{mask}_trn_clear
    where IS_TRN_FLTR = 1
    ;""")

    # Расчет параметров коммерческой маржи
    print(f'Создаем пустую таблицу для параметров расчета маржи.')
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_aum;""")
    gp_connector.execute_query(f""" --sql
    create table ba.vt_{mask}_aum (
        day_id date,
        article_id integer,
        orgunit_id integer,
        cp_sum numeric,
        comm_comp_out_bill numeric,
        bonus numeric,
        ri_la_intco_margin_sum numeric,
        sp_intco_margin_sum numeric,
        logistics_sum numeric,
        vat_factor numeric
    )
    with (
        appendonly=true,
        blocksize=32768,
        compresstype=zstd,
        compresslevel=4,
        orientation=column)
    ;""")

    # собираем налоги на товары
    print(f'Создаем таблицу с налогами на товары.')
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_vat_vatrate;""")
    gp_connector.execute_query(f""" --sql
        create table ba.vt_{mask}_vat_vatrate as (
        select distinct
            article_id,
            tax_prcnt as vat_vatrate
        from dm.art_ext
    );""")

    for i in tqdm_notebook(range(len(month))):
        dt_start = str(month.min_dt[i])
        dt_end = str(month.max_dt[i])

        print(f'Собираем строки из таблицы с чеками за период c {dt_start} по {dt_end}.')
        gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_trn_temp;""")
        gp_connector.execute_query(f""" --sql
        create table ba.vt_{mask}_trn_temp as (
        select
            day_id,
            orgunit_id,
            article_id
        from ba.vt_{mask}_trn_clear
        where day_id between '{dt_start}' and '{dt_end}'
        group by 
            day_id,
            orgunit_id,
            article_id
        );""")

        
        # собираем строки из AUM за исследуемый период
        print(f'Cобираем строки из AUM за период c {dt_start} по {dt_end}.')
        gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_cbi_aum;""")
        gp_connector.execute_query(f""" --sql
        create table ba.vt_{mask}_cbi_aum as (
        select 
            day_id,
            article_id, 
            orgunit_id,
            cp_sum_wo_nds,
            comm_comp_out_bill_wo_nds,
            opsum_bonus_wo_nds,
            ri_la_intco_margin_sum,
            sp_intco_margin_sum,
            logistics_sum,
            qnty
        from dm.cbi_aum_whs_art
        where day_id between '{dt_start}' and '{dt_end}'
        );""")

        # Предобработка строк из AUM за исследуемый период
        print(f'Предобработка строк из AUM за период c {dt_start} по {dt_end}.')
        gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_aum_pre_calc;""")
        gp_connector.execute_query(f""" --sql
        create table ba.vt_{mask}_aum_pre_calc as (
        select 
            aum.day_id,
            aum.article_id,
            aum.orgunit_id,
            coalesce(sum(aum.cp_sum_wo_nds::float * (1 + (a.vat_vatrate::float / 100))), 0) as cp_sum,
            coalesce(sum(aum.comm_comp_out_bill_wo_nds::float * (1 + (a.vat_vatrate::float / 100))), 0) as comm_comp_out_bill,
            coalesce(sum(aum.opsum_bonus_wo_nds::float * (1 + (a.vat_vatrate::float / 100))), 0) as bonus,
            coalesce(sum(aum.ri_la_intco_margin_sum::float), 0) as ri_la_intco_margin_sum,
            coalesce(sum(aum.sp_intco_margin_sum::float), 0) as sp_intco_margin_sum,
            coalesce(sum(aum.logistics_sum::float), 0) as logistics_sum,
            coalesce(sum(aum.qnty), 0) as qnty,
            max( 1 / (1 + (a.vat_vatrate::float / 100))) as vat_factor
        from 
            ba.vt_{mask}_cbi_aum as aum
            join ba.vt_{mask}_trn_temp as t
                on t.day_id = aum.day_id
                and  t.orgunit_id = aum.orgunit_id
                and  t.article_id = aum.article_id
            join ba.vt_{mask}_vat_vatrate as a
                on a.article_id = aum.article_id
        group by 
            aum.day_id,
            aum.article_id,
            aum.orgunit_id 
        having sum(aum.qnty) > 0.01 
        );""")

        print(f'Заполняем таблицу параметров для расчета маржи за период c {dt_start} по {dt_end}.')
        gp_connector.execute_query(f""" --sql
        insert into ba.vt_{mask}_aum 
        select
            day_id, 													-- дата
            article_id, 												-- продукт
            orgunit_id, 												-- магазин
            cp_sum / qnty as cp_sum, 									-- себестоимость (средняя стоимость товара со склада)
            comm_comp_out_bill / qnty as comm_comp_out_bill,			-- компенсация вне накладной
            bonus / qnty as bonus,										-- бонусы от поставщиков
            ri_la_intco_margin_sum / qnty as ri_la_intco_margin_sum, 	-- внутригрупповые трансферты
            sp_intco_margin_sum / qnty as sp_intco_margin_sum, 			-- внутригрупповые трансферты
            logistics_sum / qnty as logistics_sum, 						-- затраты на логистику
            vat_factor
        from ba.vt_{mask}_aum_pre_calc
        ;""")
        print('***********************************')

    #бонусы списанные / начисленные
    #вариант 1 без вычитания бонусов по механике

    gp_connector.execute_query(f"""DROP TABLE IF EXISTS ba.vt_{mask}_bonuses;""")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.vt_{mask}_bonuses (
    CHEQUE_PK BYTEA,
    SUMM_DISCOUNTED NUMERIC,
    BONUS_ACCRUAL NUMERIC,
    BONUS_REDEMPTION NUMERIC
    )
    WITH (
    appendonly=true,
    blocksize=32768,
    compresstype=zstd,
    compresslevel=4,
    orientation=column);
    """)

    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_bonuses
    WITH trn_clear AS ( 
            select
                cheque_pk
                ,sum(summ_discounted) as summ_discounted
            from
                ba.vt_{mask}_trn_prd
            where
                summ_discounted <> 0
            GROUP by 1
        ), bonus AS (
            select
                cheque_pk
                ,sum(case when bonus_type = 'addition' then value end)/100.0 as BONUS_ACCRUAL --- бонусы начисленные
                ,sum(case when bonus_type = 'write_off' then value end)/100.0 as BONUS_REDEMPTION ---- бонусы списанные
            from
                dm.bonus_all
            where
                date(created_on) between '{DateStPre}' and '{DateEndLast}'
            group by 1
        )

    select
        t.cheque_pk
        ,t.summ_discounted
        ,coalesce(b.BONUS_ACCRUAL, 0)/summ_discounted as BONUS_ACCRUAL --- бонусы начисленные в расчете на 1 руб. 
        ,coalesce(b.BONUS_REDEMPTION, 0)/summ_discounted as BONUS_REDEMPTION ---- бонусы списанные в расчете на 1 руб.
    from
        trn_clear t
    join
        bonus b on b.cheque_pk = t.cheque_pk
    ;
    """)

    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_trn_ruls;""")
    gp_connector.execute_query(f""" --sql
    Create Table ba.vt_{mask}_trn_ruls (
    CHEQUE_PK BYTEA,
    CONTACT_ID INTEGER,
    ORGUNIT_ID INTEGER,
    OPSUM NUMERIC,
    DISC NUMERIC,
    ACCRUEDPOINTS NUMERIC,
    DISCOUNTPOINTS NUMERIC,
    REDEEMEDPOINTS NUMERIC
    )
    WITH (
    appendonly=true,
    blocksize=32768,
    compresstype=zstd,
    compresslevel=4,
    orientation=column);
    """)

    import re

    def _codes_to_in_clause(raw):
        # принимает: "FC_jul25_S_5", или "A;B,C", или "'A','B'"
        parts = re.split(r'[;,]', str(raw)) if raw is not None else []
        parts = [p.strip().strip("'").strip('"') for p in parts if p and p.strip()]
        if not parts:
            return None
        return ",".join("'" + p.replace("'", "''") + "'" for p in parts)

    codes_sql = _codes_to_in_clause(in_code_ruls)
    rule_code_filter = f"rule_code IN ({codes_sql})" if codes_sql else "1=0"

    logging.info("in_code_ruls: %s, rule_code_filter: %s", in_code_ruls, rule_code_filter)
    
    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_trn_ruls
    SELECT
        cheque_pk
        ,contact_id
        ,orgunit_id
        ,sum(transaction_value)       AS opsum 
        ,sum(discount_value)          AS disc
        ,sum(addition_point)/100.0   AS accruedpoints
        ,sum(write_off_point)/100.0  AS redeemedpoints
    FROM (
        SELECT
            rule_code 
            ,orgunit_id
            ,contact_id
            ,cheque_pk
            ,transaction_value
            ,discount_value
            ,addition_point
            ,write_off_point
        FROM 
            dm.transaction_rule t
        WHERE 
            {rule_code_filter}
            and created_on between '{promo_start_date}'::timestamp and '{promo_end_date}'::timestamp + interval '1' day - interval '1' second
        ) d
    GROUP by 
        cheque_pk
        ,contact_id
        ,orgunit_id
    ;
    """)

    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_trn_ruls_clear;""")
    gp_connector.execute_query(f""" --sql
    Create Table ba.vt_{mask}_trn_ruls_clear (
    CONTACT_ID INTEGER,
    FRMT_ID INTEGER,
    REGION_ID INTEGER,
    OPSUM NUMERIC,
    DISC NUMERIC,
    ACCRUEDPOINTS NUMERIC,
    DISCOUNTPOINTS NUMERIC,
    REDEEMEDPOINTS NUMERIC
    )
    WITH (
    appendonly=true,
    blocksize=32768,
    compresstype=zstd,
    compresslevel=4,
    orientation=column);
    """)

    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_trn_ruls_clear
    SELECT
        contact_id
        ,frmt_id
        ,region_id
        ,sum(opsum)         AS opsum 
        ,sum(disc)          AS disc
        ,sum(accruedpoints) AS accruedpoints
        ,sum(redeemedpoints) AS redeemedpoints
    FROM 
        ba.vt_{mask}_trn_ruls r
    JOIN
        (select distinct cheque_pk from ba.vt_{mask}_trn_clear) t on t.cheque_pk = r.cheque_pk
    JOIN
        dm.WHS w on w.orgunit_id = r.orgunit_id
    GROUP by
        contact_id
        ,frmt_id
        ,region_id
    ;""")

    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_dataset;""")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.vt_{mask}_dataset (
    CHEQUE_PK BYTEA,
    CONTACT_ID INTEGER,
    IS_CA SMALLINT,
    ACTN_PERIOD SMALLINT,
    DAY_ID DATE,
    ORGUNIT_ID INTEGER,
    ARTICLE_ID INTEGER,
    IS_ACTN_ART SMALLINT,
    OPSUM_KONTROLLING DECIMAL(20, 2),
    OPSUM_WO_NDS DECIMAL(20, 2),
    GROSS_MARGIN_WO_NDS DECIMAL(20, 2),
    GROSS_MARGIN_WO_NDS_WO_LOGIST DECIMAL(20, 2)
    )
    WITH (
    appendonly=true,
    blocksize=32768,
    compresstype=zstd,
    compresslevel=4,
    orientation=column)
    ;""")

    for i in tqdm_notebook(range(len(month))):
        dt_start = str(month.min_dt[i])
        dt_end = str(month.max_dt[i])

        gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_aum_temp;""")
        gp_connector.execute_query(f""" --sql
        CREATE TABLE ba.vt_{mask}_aum_temp AS (
        SELECT 
            day_id
            ,article_id 
            ,orgunit_id 
            ,cp_sum
            ,comm_comp_out_bill
            ,bonus
            ,ri_la_intco_margin_sum
            ,sp_intco_margin_sum
            ,logistics_sum
            ,vat_factor                      
        FROM 
            ba.vt_{mask}_aum aum
        WHERE
            day_id between '{dt_start}' and '{dt_end}'
        );""")

        gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_trn_temp;""")
        gp_connector.execute_query(f""" --sql
        CREATE TABLE ba.vt_{mask}_trn_temp AS (
        SELECT
            cheque_pk
            ,contact_id
            ,is_ca
            ,day_id
            ,orgunit_id
            ,article_id
            ,summ_discounted
            ,qnty
            ,is_trn_fltr
        FROM
            ba.vt_{mask}_trn_clear
        WHERE
            day_id between '{dt_start}' and '{dt_end}'
        );""")
        
        gp_connector.execute_query(f""" --sql
        INSERT INTO ba.vt_{mask}_dataset    
        SELECT
            cheque_pk
            ,contact_id
            ,is_ca
            ,actn_period
            ,day_id
            ,orgunit_id
            ,article_id
            ,IS_ACTN_ART
            ,sku_sale_amt - bonus_redemption AS OPSUM_KONTROLLING ---так контроллинг считает РТО
            ,(sku_sale_amt - bonus_redemption) * VAT_FACTOR AS OPSUM_WO_NDS
            ,(sku_sale_amt - bonus_redemption - bonus_accrual - cp_sum + comm_comp_out_bill + bonus) *
                VAT_FACTOR + ri_la_intco_margin_sum + sp_intco_margin_sum - logistics_sum AS GROSS_MARGIN_WO_NDS ---- гросс-маржа
            ,(sku_sale_amt - bonus_redemption - bonus_accrual - cp_sum + comm_comp_out_bill + bonus) *
                VAT_FACTOR + ri_la_intco_margin_sum + sp_intco_margin_sum AS GROSS_MARGIN_WO_NDS_WO_LOGIST ---- гросс-маржа без логистики
        FROM (
            SELECT
                t.cheque_pk
                ,t.contact_id
                ,t.is_ca
                ,CASE WHEN t.day_id < '{DateStart}' THEN 1
                    WHEN t.day_id between '{DateStart}' and '{DateEnd}' THEN 2
                    ELSE 3 END
                AS ACTN_PERIOD
                ,t.day_id
                ,t.orgunit_id
                ,t.article_id
                ,CASE WHEN aa.article_id IS NOT NULL THEN 1 ELSE 0 END AS IS_ACTN_ART
                ,t.summ_discounted                                      AS sku_sale_amt
                ,COALESCE(bonus_accrual * t.summ_discounted, 0)         AS bonus_accrual
                ,COALESCE(bonus_redemption * t.summ_discounted, 0)      AS bonus_redemption
                ,COALESCE(cp_sum * qnty, 0)                             AS cp_sum
                ,COALESCE(comm_comp_out_bill * qnty, 0)                 AS comm_comp_out_bill
                ,COALESCE(bonus * qnty, 0)                              AS bonus
                ,COALESCE(ri_la_intco_margin_sum * qnty, 0)             AS ri_la_intco_margin_sum
                ,COALESCE(sp_intco_margin_sum * qnty, 0)                AS sp_intco_margin_sum
                ,COALESCE(logistics_sum * qnty, 0)                      AS logistics_sum
                ,COALESCE(vat_factor, 1)                                AS VAT_FACTOR
            FROM
                ba.vt_{mask}_trn_temp t
            JOIN
                ba.vt_{mask}_aum_temp aum
                ON aum.day_id = t.day_id
                AND aum.article_id = t.article_id
                AND aum.orgunit_id = t.orgunit_id
            LEFT JOIN 
                ba.vt_{mask}_bonuses b ON b.cheque_pk = t.cheque_pk
            LEFT JOIN
                BA.T_ZIG_SPR_ART_ACTN aa
                on aa.article_id = t.article_id
                and aa.actn_name = '{actn_name}'
            ) d
        ;
        """)

    #Т.к. Покупателей в выборке слишком много, то делю на несколько групп
    n = 3 #Кол-во групп. ~по 1 млн. в группе

    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_dataset_cus;""")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.vt_{mask}_dataset_cus (
    GRP smallint,
    contact_id integer
    )
    WITH (
    appendonly=true,
    blocksize=32768,
    compresstype=zstd,
    compresslevel=4,
    orientation=column)
    ;""")

    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_dataset_cus
    SELECT distinct
        mod(contact_id, {n}) AS GRP
        ,contact_id
    FROM 
        ba.vt_{mask}_dataset
    ;""")

    grp = gp_connector.gp(f"""select GRP from ba.vt_{mask}_dataset_cus group by GRP order by GRP;""")
    len(grp)
    
    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_dataset_agg;""")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.vt_{mask}_dataset_agg (
    CONTACT_ID INTEGER,
    IS_CA SMALLINT,
    ACTN_PERIOD SMALLINT,
    FRMT_ID INTEGER,
    REGION_ID INTEGER,
    CNT_TRN INTEGER,
    CNT_TRN_ART_ACTN INTEGER,
    OPSUM_WO_NDS NUMERIC,
    GROSS_MARGIN_WO_NDS_WO_LOGIST NUMERIC,
    OPSUM_WO_NDS_ART_ACTN NUMERIC,
    GROSS_MARGIN_WO_NDS_WO_LOGIST_ART_ACTN NUMERIC
    )
    WITH (
    appendonly=true,
    blocksize=32768,
    compresstype=zstd,
    compresslevel=4,
    orientation=column)
    ;""")

    for n in tqdm_notebook(range(len(grp))):
        gp_connector.execute_query(f""" --sql
        INSERT INTO ba.vt_{mask}_dataset_agg
        WITH dataset AS (
                SELECT
                d.cheque_pk
                ,d.contact_id
                ,d.is_ca
                ,d.actn_period
                ,d.orgunit_id
                ,d.is_actn_art
                ,SUM(OPSUM_WO_NDS) AS OPSUM_WO_NDS
                ,SUM(GROSS_MARGIN_WO_NDS_WO_LOGIST) AS GROSS_MARGIN_WO_NDS_WO_LOGIST
                ,SUM(case when d.IS_ACTN_ART = 1 then OPSUM_WO_NDS else 0 end) AS OPSUM_WO_NDS_ART_ACTN
                ,SUM(case when d.IS_ACTN_ART = 1 then GROSS_MARGIN_WO_NDS_WO_LOGIST else 0 end) AS GROSS_MARGIN_WO_NDS_WO_LOGIST_ART_ACTN
            FROM
                ba.vt_{mask}_dataset d
            JOIN
                ba.vt_{mask}_dataset_cus cl on cl.contact_id = d.contact_id
            WHERE
                cl.GRP = {n}
            GROUP BY
                d.cheque_pk
                ,d.contact_id
                ,d.is_ca
                ,d.actn_period
                ,d.orgunit_id
                ,d.is_actn_art
        ), dataset_agg AS (
            SELECT
                d.cheque_pk
                ,d.contact_id
                ,d.is_ca
                ,d.is_actn_art
                ,d.actn_period
                ,w.frmt_id
                ,w.region_id
                ,OPSUM_WO_NDS
                ,GROSS_MARGIN_WO_NDS_WO_LOGIST
                ,OPSUM_WO_NDS_ART_ACTN
                ,GROSS_MARGIN_WO_NDS_WO_LOGIST_ART_ACTN
                ,case when d.IS_ACTN_ART = 1 then d.cheque_pk end as TRANSACTIONID_ART_ACTN
            FROM
                dataset d
            JOIN
                dm.WHS w on w.orgunit_id = d.orgunit_id
        )

        SELECT
            d.contact_id
            ,d.is_ca
            ,d.actn_period
            ,d.frmt_id
            ,d.region_id
            ,COUNT(distinct d.cheque_pk)                    AS CNT_TRN
            ,COUNT(distinct d.TRANSACTIONID_ART_ACTN)       AS CNT_TRN_ART_ACTN
            ,SUM(OPSUM_WO_NDS)                              AS OPSUM_WO_NDS
            ,SUM(GROSS_MARGIN_WO_NDS_WO_LOGIST)             AS GROSS_MARGIN_WO_NDS_WO_LOGIST
            ,SUM(OPSUM_WO_NDS_ART_ACTN)                     AS OPSUM_WO_NDS_ART_ACTN
            ,SUM(GROSS_MARGIN_WO_NDS_WO_LOGIST_ART_ACTN)    AS GROSS_MARGIN_WO_NDS_WO_LOGIST_ART_ACTN
        FROM
            dataset_agg d
        GROUP by 
            d.contact_id
            ,d.is_ca
            ,d.actn_period
            ,d.frmt_id
            ,d.region_id
        ;""")

    gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_cus_margin;""")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.vt_{mask}_cus_margin (
    CONTACT_ID INTEGER,
    IS_CA SMALLINT,
    ACTN_PERIOD SMALLINT,
    FRMT_ID INTEGER,
    REGION_ID INTEGER,
    CUS_TYPE VARCHAR(10),
    IS_CUS_LFL SMALLINT,
    STAT_TEST SMALLINT,
    CNT_TRN INTEGER,
    CNT_TRN_ART_ACTN INTEGER,
    OPSUM_WO_NDS NUMERIC,
    OPSUM_WO_NDS_ART_ACTN NUMERIC,
    GROSS_MARGIN_WO_NDS_WO_LOGIST NUMERIC,
    GROSS_MARGIN_WO_NDS_WO_LOGIST_ART_ACTN NUMERIC,
    DISC NUMERIC
    )
    WITH (
    appendonly=true,
    blocksize=32768,
    compresstype=zstd,
    compresslevel=4,
    orientation=column)
    ;""")

    gp_connector.execute_query(f"""
    INSERT INTO ba.vt_{mask}_cus_margin
    SELECT
        d.contact_id
        ,d.is_ca
        ,d.actn_period
        ,d.frmt_id
        ,d.region_id
        ,cl.cus_type
        ,cl.is_cus_lfl
        ,stat.stat_test
        ,CNT_TRN
        ,CNT_TRN_ART_ACTN
        ,OPSUM_WO_NDS
        ,OPSUM_WO_NDS_ART_ACTN
        ,GROSS_MARGIN_WO_NDS_WO_LOGIST
        ,GROSS_MARGIN_WO_NDS_WO_LOGIST_ART_ACTN
        ,coalesce(r.ACCRUEDPOINTS, 0) as DISC                         -- Кэшбек на ЛК
    FROM
        ba.vt_{mask}_dataset_agg d
    JOIN
        BA.T_ZIG_SPR_CUS_LFL cl
        on cl.ACTN_ID = {actnId}
        and cl.contact_id = d.contact_id
        and cl.frmt_id = d.frmt_id
        and cl.region_id = d.region_id
        and cl.is_ca = d.is_ca
    JOIN
        BA.T_ZIG_STAT_TEST_PSM stat
        on stat.ACTN_NAME = '{actn_name}'
        and stat.frmt_id = d.frmt_id
        and stat.region_id = d.region_id
    left JOIN
        ba.vt_{mask}_trn_ruls_clear r  
        on r.contact_id = d.contact_id
        and r.frmt_id = d.frmt_id
        and r.region_id = d.region_id
        and d.ACTN_PERIOD = 2
    ;
    """)

    gp_connector.execute_query(f"""
    DELETE FROM BA.T_ZIG_ACTN_MARGIN
    WHERE ACTN_NAME = '{actn_name}'
    ;""")

    gp_connector.execute_query(f"""
    INSERT INTO BA.T_ZIG_ACTN_MARGIN
    select
        '{actn_name}' as ACTN_NAME
        ,cm.contact_id
        ,IS_CA
        ,IS_CUS_LFL
        ,ACTN_PERIOD
        ,case when ACTN_PERIOD = 1 then {lengthprev}
        when ACTN_PERIOD = 3 then {lengthpost}
        when ACTN_PERIOD = 2 and IS_CA = 0 then {lengthActn}
        when CNT_DAY_WO_CROSS is not NULL then CNT_DAY_WO_CROSS
        else 0
        end as CNT_DAY_WO_CROSS
        ,FRMT_ID
        ,REGION_ID
        ,CUS_TYPE
        ,STAT_TEST
        ,CNT_TRN
        ,CNT_TRN_ART_ACTN
        ,OPSUM_WO_NDS
        ,OPSUM_WO_NDS_ART_ACTN
        ,GROSS_MARGIN_WO_NDS_WO_LOGIST
        ,GROSS_MARGIN_WO_NDS_WO_LOGIST_ART_ACTN
        ,DISC
        ,CNT_CUS_TOTAL
    from
        ba.vt_{mask}_cus_margin cm
    left join
        (select distinct contact_id, CNT_DAY_WO_CROSS from ba.vt_{mask}_days_cross) cd
        on cd.contact_id = cm.contact_id
        and cm.ACTN_PERIOD = 2
    JOIN
        (SELECT COUNT(DISTINCT contact_id) AS CNT_CUS_TOTAL FROM BA.T_ZIG_SPR_IDN_ACTN WHERE ACTN_NAME = '{actn_name}') A ON 1=1
    ;""")

## oborot_rto

In [202]:
@log_function_call
def oborot_rto(gp_connector, mask, promo_start_date, promo_end_date, actn_name):	
	query = f"""
	SELECT
		code
	FROM
		dm.WHS w
	JOIN
		ba.vt_{mask}_whs ww 
		on ww.orgunit_id = w.orgunit_id
	;
	"""

	whs_code = gp_connector.gp(query)
	len(whs_code)
	whs_code_lst = gp_connector.to_sql_list(whs_code['code'], quotes=True)
	RTO = teradata_query(f"""
	SELECT
		FRMT_ID
		,REGION_ID
		,cast(SUM(SALE_WO_NDS) as float) AS SALE_WO_NDS
	FROM
		PRD_VD_DM.V_SALE_WHS_DAY S
	JOIN
		PRD_VD_DM.V_WHS W ON W.WHS_ID = S.WHS_ID
	WHERE
		DAY_ID BETWEEN '{promo_start_date}' and '{promo_end_date}'
		AND W.CODE in ({whs_code_lst})
	GROUP by
		FRMT_ID
		,REGION_ID
	;""", odbc_td)
	RTO['ACTN_NAME'] = actn_name
	RTO = RTO[['ACTN_NAME', 'FRMT_ID', 'REGION_ID', 'SALE_WO_NDS']]
	RTO['FRMT_ID'] = RTO.FRMT_ID.astype('str')
	RTO['REGION_ID'] = RTO.REGION_ID.astype('str')
	gp_connector.execute_query(f"""drop table if exists ba.vt_{mask}_opsum_temp;""")
	gp_connector.execute_query(f""" --sql
	CREATE TABLE ba.vt_{mask}_opsum_temp (
	actn_name varchar(50),
	frmt_id integer,
	region_id integer,
	opsum numeric
	)
	;""")

	gp_connector.insert_data(df=RTO, tablename=f'ba.vt_{mask}_opsum_temp')

	gp_connector.execute_query(f"""
	DELETE FROM BA.T_ZIG_OPSUM_FRMT_REGION
	WHERE ACTN_NAME = '{actn_name}';
	""")

	gp_connector.execute_query(f""" --sql
	insert into BA.T_ZIG_OPSUM_FRMT_REGION
	select * from ba.vt_{mask}_opsum_temp
	;""")
	gp_connector.gp(f"""
	SELECT
		actn_name
		,frmt_id
		,region_id
		,opsum_frmt_region
	FROM 
		BA.T_ZIG_OPSUM_FRMT_REGION
	WHERE
		ACTN_NAME = '{actn_name}'
	;""").head(5)

## delete_tables

In [203]:
@log_function_call
def delete_tables(gp_connector, mask):
    # Список шаблонов таблиц, которые нужно исключить
    exclude_templates = [
        f"vt_{mask}_whs",
        f"vt_{mask}_days",
        f"vt_{mask}_days_cross",
        f"vt_{mask}_spr_actn",
        f"vt_{mask}_control_group"
    ]
    exclude_tables = set(exclude_templates)  # Используем set для быстрого поиска

    # Получаем список таблиц из базы данных
    tables = gp_connector.gp(f"""--sql
        SELECT table_schema, table_name
        FROM information_schema.tables
        WHERE table_schema = 'ba'
            AND table_name LIKE '%%{mask}%%'
    ;""")

    print('Кол-во таблиц до фильтрации:', len(tables))

    # Фильтруем таблицы, исключая указанные в exclude_tables
    filtered_tables = [table for table in tables['table_name'] if table not in exclude_tables]

    print('Кол-во таблиц после фильтрации:', len(filtered_tables))

    # Удаляем только отфильтрованные таблицы
    for table in filtered_tables:
        gp_connector.execute_query(f"""--sql
            DROP TABLE ba.{table}
        ;""")
        print(f"Таблица ba.{table} удалена")

## metrics_act

In [204]:
@log_function_call
def metrics_act(gp_connector, mask, type, actn_id, promo_start_date_act, promo_end_date_act, version):
    # ──────────────────────────────────────────────────────────
    # Подготовка участников по расчету
    # ──────────────────────────────────────────────────────────
    gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_{version}_{actn_id}_contact;")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.tmp_{mask}_{type}_{version}_{actn_id}_contact AS
    SELECT is_ca, contact_id 
    FROM BA.T_ZIG_SPR_CUS_LFL 
    WHERE actn_id = {actn_id} and is_cus_lfl = 1 and cus_type = 'REGULAR' 
    ;""")

    # ──────────────────────────────────────────────────────────
    # Чеки с покупками товаров в promo_start_date_act 
    # ──────────────────────────────────────────────────────────
    gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_{version}_cheques_act;")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.tmp_{mask}_{type}_{version}_cheques_act AS
    SELECT 
        ch.cheque_pk,
        ch.contact_id,
        SUM(ci.summ_discounted) AS total_summ,
        SUM(ci.quantity) AS total_qty
    FROM dm.cheque_item ci
    JOIN ba.tmp_{mask}_{type}_articles a ON ci.article_id = a.article_id
    JOIN dm.cheque ch ON ch.cheque_pk = ci.cheque_pk
    WHERE ch.datetime BETWEEN '{promo_start_date_act}'::timestamp
    AND '{promo_end_date_act}'::timestamp + INTERVAL '1 day' - INTERVAL '1 second'
    AND operation_type_id = 1
    GROUP BY 1, 2
    ;""")

    # ──────────────────────────────────────────────────────────
    # Чеки с покупками товаров в promo_start_date_act по клиентам
    # ──────────────────────────────────────────────────────────
    gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_{version}_cheques_act_contacts;")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.tmp_{mask}_{type}_{version}_cheques_act_contacts AS
    SELECT
        c.contact_id,
        c.is_ca,
        ci.cheque_pk,
        ci.total_summ,
        ci.total_qty
    FROM ba.tmp_{mask}_{type}_{version}_cheques_act ci
    JOIN ba.tmp_{mask}_{type}_{version}_{actn_id}_contact c ON ci.contact_id = c.contact_id
    ;""")

    # ──────────────────────────────────────────────────────────
    # Расчет итоговых показателей в promo_start_date_act по ЦА/КГ
    # ──────────────────────────────────────────────────────────
    gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.vt_{mask}_{type}_{version}_total_metrics_act;")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.vt_{mask}_{type}_{version}_total_metrics_act AS
    SELECT 
        is_ca,
        COUNT(DISTINCT cheque_pk)::NUMERIC / NULLIF(COUNT(DISTINCT contact_id), 0) AS visits_per_client_1,
        SUM(total_summ) / NULLIF(COUNT(DISTINCT contact_id), 0) AS sales_per_client_rub_2,
        SUM(total_qty) / NULLIF(COUNT(DISTINCT contact_id), 0) AS qty_per_client_3, 
        (SUM(total_summ) / NULLIF(COUNT(DISTINCT contact_id), 0)) / (SUM(total_qty) / NULLIF(COUNT(DISTINCT contact_id), 0)) AS cost_per_piece_4,
        SUM(total_summ) / NULLIF(COUNT(DISTINCT cheque_pk), 0) AS avg_receipt_rub_5,
        SUM(total_qty) / NULLIF(COUNT(DISTINCT cheque_pk), 0) AS avg_receipt_qty_6,
        COUNT(DISTINCT contact_id) AS unique_clients_7
    FROM ba.tmp_{mask}_{type}_{version}_cheques_act_contacts
    GROUP BY is_ca
    ;""")

## metrics_prev

In [205]:
@log_function_call
def metrics_prev(gp_connector, mask, type, actn_id, promo_start_date_prev, promo_end_date_prev, version):
    # ──────────────────────────────────────────────────────────
    # Подготовка участников по расчету
    # ──────────────────────────────────────────────────────────
    gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_{version}_{actn_id}_contact;")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.tmp_{mask}_{type}_{version}_{actn_id}_contact AS
    SELECT is_ca, contact_id 
    FROM BA.T_ZIG_SPR_CUS_LFL 
    WHERE actn_id = {actn_id} and is_cus_lfl = 1 and cus_type = 'REGULAR' 
    ;""")

    # ──────────────────────────────────────────────────────────
    # Чеки с покупками товаров в promo_start_date_prev 
    # ──────────────────────────────────────────────────────────
    gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_{version}_cheques_prev;")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.tmp_{mask}_{type}_{version}_cheques_prev AS
    SELECT 
        ch.cheque_pk,
        ch.contact_id,
        SUM(ci.summ_discounted) AS total_summ,
        SUM(ci.quantity) AS total_qty
    FROM dm.cheque_item ci
    JOIN ba.tmp_{mask}_{type}_articles a ON ci.article_id = a.article_id
    JOIN dm.cheque ch ON ch.cheque_pk = ci.cheque_pk
    WHERE ch.datetime BETWEEN '{promo_start_date_prev}'::timestamp
    AND '{promo_end_date_prev}'::timestamp + INTERVAL '1 day' - INTERVAL '1 second'
    AND operation_type_id = 1
    GROUP BY 1, 2
    ;""")

    # ──────────────────────────────────────────────────────────
    # Чеки с покупками товаров в promo_start_date_prev по клиентам
    # ──────────────────────────────────────────────────────────
    gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.tmp_{mask}_{type}_{version}_cheques_prev_contacts;")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.tmp_{mask}_{type}_{version}_cheques_prev_contacts AS
    SELECT
        c.contact_id,
        c.is_ca,
        ci.cheque_pk,
        ci.total_summ,
        ci.total_qty
    FROM ba.tmp_{mask}_{type}_{version}_cheques_prev ci
    JOIN ba.tmp_{mask}_{type}_{version}_{actn_id}_contact c ON ci.contact_id = c.contact_id
    ;""")

    # ──────────────────────────────────────────────────────────
    # Расчет итоговых показателей в promo_start_date_prev по ЦА/КГ
    # ──────────────────────────────────────────────────────────
    gp_connector.execute_query(f"DROP TABLE IF EXISTS ba.vt_{mask}_{type}_{version}_total_metrics_prev;")
    gp_connector.execute_query(f"""
    CREATE TABLE ba.vt_{mask}_{type}_{version}_total_metrics_prev AS
    SELECT 
        is_ca,
        COUNT(DISTINCT cheque_pk)::NUMERIC / NULLIF(COUNT(DISTINCT contact_id), 0) AS visits_per_client_1,
        SUM(total_summ) / NULLIF(COUNT(DISTINCT contact_id), 0) AS sales_per_client_rub_2,
        SUM(total_qty) / NULLIF(COUNT(DISTINCT contact_id), 0) AS qty_per_client_3, 
        (SUM(total_summ) / NULLIF(COUNT(DISTINCT contact_id), 0)) / (SUM(total_qty) / NULLIF(COUNT(DISTINCT contact_id), 0)) AS cost_per_piece_4,
        SUM(total_summ) / NULLIF(COUNT(DISTINCT cheque_pk), 0) AS avg_receipt_rub_5,
        SUM(total_qty) / NULLIF(COUNT(DISTINCT cheque_pk), 0) AS avg_receipt_qty_6,
        COUNT(DISTINCT contact_id) AS unique_clients_7
    FROM ba.tmp_{mask}_{type}_{version}_cheques_prev_contacts
    GROUP BY is_ca
    ;""")

## ca_metrics

In [206]:
@log_function_call
def ca_metrics(gp_connector, mask, type, version):
    # Загружаем "До" только для ЦА
    df_before_ca = gp_connector.gp(f"""
        select * from ba.vt_{mask}_{type}_{version}_total_metrics_prev
        where is_ca = 1;
    """)
    df_before_ca["period"] = "До"

    # Загружаем "Акционный" только для ЦА
    df_act_ca = gp_connector.gp(f"""
        select * from ba.vt_{mask}_{type}_{version}_total_metrics_act
        where is_ca = 1;
    """)
    df_act_ca["period"] = "Акционный"

    # Поворачиваем
    df_ca = pd.concat([df_before_ca, df_act_ca])
    df_ca = df_ca.set_index("period").transpose()
    df_ca = df_ca[["Акционный", "До"]]   

    return df_ca

## kg_metrics

In [207]:
@log_function_call
def kg_metrics(gp_connector, mask, type, version):
    # Загружаем "До" только для КГ
    df_before_kg = gp_connector.gp(f"""
        select * from ba.vt_{mask}_{type}_{version}_total_metrics_prev
        where is_ca = 0;
    """)
    df_before_kg["period"] = "До"

    # Загружаем "Акционный" только для КГ
    df_act_kg = gp_connector.gp(f"""
        select * from ba.vt_{mask}_{type}_{version}_total_metrics_act
        where is_ca = 0;
    """)
    df_act_kg["period"] = "Акционный"

    # Поворачиваем
    df_kg = pd.concat([df_before_kg, df_act_kg])
    df_kg = df_kg.set_index("period").transpose()
    df_kg = df_kg[["Акционный", "До"]]   # порядок колонок

    return df_kg

## bonuses

In [208]:
@log_function_call
def bonuses(gp_connector, mask, in_code_ruls, promo_start_date_act, promo_end_date_act):
    bon = gp_connector.gp(f"""
    SELECT
        sum(addition_point)/100.0   AS add_points
    FROM 
        dm.transaction_rule t
    WHERE 
        rule_code in ('{in_code_ruls}')
        and t.created_on between '{promo_start_date_act}':: timestamp and '{promo_end_date_act}'::timestamp + interval '1' day - interval '1' second
    """)
    return bon 

## unique_clients_ca

In [209]:
@log_function_call
def unique_clients_ca(gp_connector, mask, actn_id):
    unique_ca = gp_connector.gp(f"""
    select count(distinct contact_id)
    from BA.T_ZIG_SPR_CUS_LFL 
    where actn_id = {actn_id} and is_cus_lfl = 1 and cus_type = 'REGULAR' and is_ca = 1
    """)
    return unique_ca

## unique_clients_kg

In [210]:
@log_function_call
def unique_clients_kg(gp_connector, mask, actn_id):
    unique_kg = gp_connector.gp(f"""
    select count(distinct contact_id)
    from BA.T_ZIG_SPR_CUS_LFL 
    where actn_id = {actn_id} and is_cus_lfl = 1 and cus_type = 'REGULAR' and is_ca = 0
    """)
    return unique_kg

## loading_report

In [211]:
@log_function_call
def loading_report(gp_connector, promo_start_date_act, promo_end_date_act, actn_name):
    #file = 'Шаблон_Расчет эффекта.xlsx'

    dt_start = pd.to_datetime(promo_start_date_act)
    dt_end   = pd.to_datetime(promo_end_date_act)
    actn_length = int((dt_end - dt_start + pd.Timedelta(days=1)).days)

    safe_actn_name = actn_name.replace("'", "''")
    ### Формат-Регион-Тип клиента (доп.РТО)
    df_frmt_region_type = gp_connector.gp(f"""	--sql
    WITH frmt_region_cus_type_metrics AS (
    SELECT
    frmt_id,
    region_id,
    cus_type,
    is_ca,
    is_cus_lfl,
    contact_id,
        /* Средние траты на 1 клиента в неделю до / вовремя акции */
        SUM(CASE WHEN actn_period = 1 THEN opsum_wo_nds /
                    NULLIF(cnt_day_wo_cross::float, 0) * 7 END) AS spend_prev,
        SUM(CASE WHEN actn_period = 2 THEN opsum_wo_nds /
                    NULLIF(cnt_day_wo_cross::float, 0) * 7 END) AS spend_actn,

        /* Средние траты без акц-товаров */
        SUM(CASE WHEN actn_period = 1 THEN (opsum_wo_nds - COALESCE(opsum_wo_nds_art_actn, 0)) /
                    NULLIF(cnt_day_wo_cross::float, 0) * 7 END) AS spend_wo_art_actn_prev,
        SUM(CASE WHEN actn_period = 2 THEN (opsum_wo_nds - COALESCE(opsum_wo_nds_art_actn, 0)) /
                    NULLIF(cnt_day_wo_cross::float, 0) * 7 END) AS spend_wo_art_actn_actn,

        /* Частота */
        SUM(CASE WHEN actn_period = 1 THEN cnt_trn /
                    NULLIF(cnt_day_wo_cross::float, 0) * 7 END) /
        COUNT(DISTINCT CASE WHEN actn_period = 1 THEN contact_id END) AS frq_prev,

        SUM(CASE WHEN actn_period = 2 THEN cnt_trn /
                    NULLIF(cnt_day_wo_cross::float, 0) * 7 END) /
        COUNT(DISTINCT CASE WHEN actn_period = 2 THEN contact_id END) AS frq_actn,

        /* Сумма / кол-во транзакций (для среднего чека) */
        SUM(CASE WHEN actn_period = 1 THEN opsum_wo_nds /
                    NULLIF(cnt_day_wo_cross::float, 0) * 7 ELSE 0 END) AS opsum_wo_nds_week_prev,
        SUM(CASE WHEN actn_period = 2 THEN opsum_wo_nds /
                    NULLIF(cnt_day_wo_cross::float, 0) * 7 ELSE 0 END) AS opsum_wo_nds_week_actn,
        SUM(CASE WHEN actn_period = 1 THEN cnt_trn /
                    NULLIF(cnt_day_wo_cross::float, 0) * 7 ELSE 0 END) AS cnt_trn_week_prev,
        SUM(CASE WHEN actn_period = 2 THEN cnt_trn /
                    NULLIF(cnt_day_wo_cross::float, 0) * 7 ELSE 0 END) AS cnt_trn_week_actn,

        /* Маржа (на 1 клиента) */
        SUM(CASE WHEN actn_period = 1 THEN (gross_margin_wo_nds_wo_logist * 0.966) /
                    NULLIF(cnt_day_wo_cross::float, 0) * 7 END) AS margin_prev,
        SUM(CASE WHEN actn_period = 2 THEN (gross_margin_wo_nds_wo_logist * 0.966) /
                    NULLIF(cnt_day_wo_cross::float, 0) * 7 END) AS margin_actn,

        /* Маржа без акц-товаров */
        SUM(CASE WHEN actn_period = 1 THEN ((gross_margin_wo_nds_wo_logist -
                    COALESCE(gross_margin_wo_nds_wo_logist_art_actn, 0)) * 0.966) /
                    NULLIF(cnt_day_wo_cross::float, 0) * 7 END) AS margin_wo_art_actn_prev,
        SUM(CASE WHEN actn_period = 2 THEN ((gross_margin_wo_nds_wo_logist -
                    COALESCE(gross_margin_wo_nds_wo_logist_art_actn, 0)) * 0.966) /
                    NULLIF(cnt_day_wo_cross::float, 0) * 7 END) AS margin_wo_art_actn_actn,

        -- Скидки
        SUM(CASE WHEN actn_period = 1 THEN disc END) AS disc_prev,
        SUM(CASE WHEN actn_period = 2 THEN disc END) AS disc_actn,

        -- Макс. число клиентов (ЦА)
        MAX(cnt_cus_total) AS cnt_cus_total
    FROM ba.t_zig_actn_margin
    WHERE
        actn_name = '{actn_name}'
        AND stat_test = 1
    GROUP BY 1,2,3,4,5,6
    )
    SELECT
    w.frmt,
    w.region,
    a.cus_type,
    a.is_ca,
    -- ЦА: cnt_cus_total
    MAX(CASE WHEN a.is_ca=1 THEN a.cnt_cus_total END) AS cnt_cus_total,

    COUNT(DISTINCT a.contact_id) AS cnt_cus_actn,
    COUNT(DISTINCT CASE WHEN a.is_cus_lfl=1 THEN a.contact_id END) AS cnt_cus_lfl,

    -- LFL (общие)
    AVG(CASE WHEN a.is_cus_lfl=1 THEN spend_prev END) AS lfl_spend_prev,
    AVG(CASE WHEN a.is_cus_lfl=1 THEN spend_actn END) AS lfl_spend_actn,

    -- REGULAR LFL
    AVG(CASE WHEN a.cus_type='REGULAR'  AND a.is_cus_lfl=1 THEN spend_prev END)  AS reg_spend_prev,
    AVG(CASE WHEN a.cus_type='REGULAR'  AND a.is_cus_lfl=1 THEN spend_actn END)  AS reg_spend_actn,

    -- NEW / RETURNED LFL (акционные траты)
    AVG(CASE WHEN a.cus_type='NEW'      AND a.is_cus_lfl=1 THEN spend_actn END)     AS new_spend_actn,
    AVG(CASE WHEN a.cus_type='RETURNED' AND a.is_cus_lfl=1 THEN spend_actn END)     AS return_spend_actn,

    -- Без акц.товаров
    AVG(CASE WHEN a.is_cus_lfl=1 THEN spend_wo_art_actn_actn END)                  AS lfl_spend_wo_art_actn_actn,
    AVG(CASE WHEN a.cus_type='REGULAR'  AND a.is_cus_lfl=1 THEN spend_wo_art_actn_prev END)  AS reg_spend_wo_art_actn_prev,
    AVG(CASE WHEN a.cus_type='REGULAR'  AND a.is_cus_lfl=1 THEN spend_wo_art_actn_actn END)  AS reg_spend_wo_art_actn_actn,

    -- NEW/RETURNED: без акц.товаров (всё во время акции)
    AVG(CASE WHEN a.cus_type='NEW'      AND a.is_cus_lfl=1 THEN spend_wo_art_actn_actn END) AS new_spend_wo_art_actn_actn,
    AVG(CASE WHEN a.cus_type='RETURNED' AND a.is_cus_lfl=1 THEN spend_wo_art_actn_actn END) AS return_spend_wo_art_actn_actn,

    -- Частота покупок (LFL)
    AVG(CASE WHEN a.is_cus_lfl=1 THEN frq_actn END) AS lfl_frq_actn,
    AVG(CASE WHEN a.cus_type='REGULAR'  AND a.is_cus_lfl=1 THEN frq_prev END) AS reg_frq_prev,
    AVG(CASE WHEN a.cus_type='REGULAR'  AND a.is_cus_lfl=1 THEN frq_actn END) AS reg_frq_actn,

    -- NEW / RETURNED частота
    AVG(CASE WHEN a.cus_type='NEW'      AND a.is_cus_lfl=1 THEN frq_actn END) AS new_frq_actn,
    AVG(CASE WHEN a.cus_type='RETURNED' AND a.is_cus_lfl=1 THEN frq_actn END) AS return_frq_actn,

    -- Средний чек (LFL)
    SUM(CASE WHEN a.is_cus_lfl=1 THEN opsum_wo_nds_week_actn END)
    / NULLIF(SUM(CASE WHEN a.is_cus_lfl=1 THEN cnt_trn_week_actn END),0) AS lfl_avg_cheque_actn,
    SUM(CASE WHEN a.cus_type='REGULAR' AND a.is_cus_lfl=1 THEN opsum_wo_nds_week_prev END)
    / NULLIF(SUM(CASE WHEN a.cus_type='REGULAR' AND a.is_cus_lfl=1 THEN cnt_trn_week_prev END),0) AS reg_avg_cheque_prev,
    SUM(CASE WHEN a.cus_type='REGULAR' AND a.is_cus_lfl=1 THEN opsum_wo_nds_week_actn END)
    / NULLIF(SUM(CASE WHEN a.cus_type='REGULAR' AND a.is_cus_lfl=1 THEN cnt_trn_week_actn END),0) AS reg_avg_cheque_actn,

    -- NEW / RETURNED средний чек (акционный)
    SUM(CASE WHEN a.cus_type='NEW'      AND a.is_cus_lfl=1 THEN opsum_wo_nds_week_actn END)
    / NULLIF(SUM(CASE WHEN a.cus_type='NEW'      AND a.is_cus_lfl=1 THEN cnt_trn_week_actn END),0) AS new_avg_cheque_actn,
    SUM(CASE WHEN a.cus_type='RETURNED' AND a.is_cus_lfl=1 THEN opsum_wo_nds_week_actn END)
    / NULLIF(SUM(CASE WHEN a.cus_type='RETURNED' AND a.is_cus_lfl=1 THEN cnt_trn_week_actn END),0) AS return_avg_cheque_actn,

    -- Маржа (LFL)
    AVG(CASE WHEN a.is_cus_lfl=1 THEN margin_actn END)              AS lfl_margin_actn,
    AVG(CASE WHEN a.cus_type='REGULAR'  AND a.is_cus_lfl=1 THEN margin_prev END)  AS reg_margin_prev,
    AVG(CASE WHEN a.cus_type='REGULAR'  AND a.is_cus_lfl=1 THEN margin_actn END)  AS reg_margin_actn,

    -- NEW / RETURNED маржа (акционная)
    AVG(CASE WHEN a.cus_type='NEW'      AND a.is_cus_lfl=1 THEN margin_actn END)  AS new_margin_actn,
    AVG(CASE WHEN a.cus_type='RETURNED' AND a.is_cus_lfl=1 THEN margin_actn END)  AS return_margin_actn,

    -- Маржа без акц.товаров
    AVG(CASE WHEN a.is_cus_lfl=1 THEN margin_wo_art_actn_actn END)                        AS lfl_margin_wo_art_actn_actn,
    AVG(CASE WHEN a.cus_type='REGULAR'  AND a.is_cus_lfl=1 THEN margin_wo_art_actn_prev END)  AS reg_margin_wo_art_actn_prev,
    AVG(CASE WHEN a.cus_type='REGULAR'  AND a.is_cus_lfl=1 THEN margin_wo_art_actn_actn END)  AS reg_margin_wo_art_actn_actn,

    AVG(CASE WHEN a.cus_type='NEW'      AND a.is_cus_lfl=1 THEN margin_wo_art_actn_actn END) AS new_margin_wo_art_actn_actn,
    AVG(CASE WHEN a.cus_type='RETURNED' AND a.is_cus_lfl=1 THEN margin_wo_art_actn_actn END) AS return_margin_wo_art_actn_actn,

    -- Сумма скидок
    SUM(a.disc_prev) AS disc_prev,
    SUM(a.disc_actn) AS disc_actn,

    -- (Если нужно) суммарные поля для расчёта маржинальности
    SUM(spend_prev)  AS opsum_prev,
    SUM(spend_actn)  AS opsum_actn,
    SUM(margin_prev) AS opsum_margin_prev,
    SUM(margin_actn) AS opsum_margin_actn,

    -- Оборот (Формат+Регион), только для ЦА
    MAX(CASE WHEN a.is_ca=1 THEN f.opsum_frmt_region END) AS opsum_frmt_region
    FROM frmt_region_cus_type_metrics a
    JOIN (
    SELECT DISTINCT frmt_id, frmt, region_id, region
    FROM dm.whs
    ) w
    ON w.frmt_id = a.frmt_id
    AND w.region_id = a.region_id
    JOIN ba.t_zig_opsum_frmt_region f
    ON f.actn_name = '{actn_name}'
    AND f.frmt_id = a.frmt_id
    AND f.region_id = a.region_id
    GROUP BY
    w.frmt,
    w.region,
    a.cus_type,
    a.is_ca
    ORDER BY
    w.frmt,
    w.region,
    a.cus_type,
    a.is_ca
    ;""")

    df_frmt_region_type.head(4)
    # Объединение ЦА и КГ по frmt и region
    def get_incr(df, metric_actn, metric_prev, incr_name):
        ca = df.query('is_ca == 1 and cus_type == "REGULAR"')[["frmt", "region", metric_actn, metric_prev]]
        kg = df.query('is_ca == 0 and cus_type == "REGULAR"')[["frmt", "region", metric_actn, metric_prev]]

        ca = ca.rename(columns={metric_actn: metric_actn + '_ca', metric_prev: metric_prev + '_ca'})
        kg = kg.rename(columns={metric_actn: metric_actn + '_kg', metric_prev: metric_prev + '_kg'})

        merged = ca.merge(kg, on=['frmt', 'region'], how='inner')

        merged[incr_name] = (
            (merged[metric_actn + '_ca'] / merged[metric_prev + '_ca']) /
            (merged[metric_actn + '_kg'] / merged[metric_prev + '_kg']) - 1
        )

        return merged[['frmt', 'region', incr_name]]

    # Считаем все приросты
    incr_metrics = [
        ('reg_spend_actn', 'reg_spend_prev', 'incr_spend'),
        ('reg_frq_actn', 'reg_frq_prev', 'incr_frq'),
        ('reg_avg_cheque_actn', 'reg_avg_cheque_prev', 'incr_avg_cheque'),
        ('reg_spend_wo_art_actn_actn', 'reg_spend_wo_art_actn_prev', 'incr_wo_art_spend'),
        ('reg_margin_actn', 'reg_margin_prev', 'incr_margin'),
        ('reg_margin_wo_art_actn_actn', 'reg_margin_wo_art_actn_prev', 'incr_wo_art_margin')
    ]

    ca_reg = 'is_ca == 1 and cus_type == "REGULAR"'
    kg_reg = 'is_ca == 0 and cus_type == "REGULAR"'

    df_incr = df_frmt_region_type[['frmt', 'region']].drop_duplicates().reset_index(drop=True)

    for metric_actn, metric_prev, incr_name in incr_metrics:
        df_temp = get_incr(df_frmt_region_type, metric_actn, metric_prev, incr_name)
        df_incr = df_incr.merge(df_temp, on=['frmt', 'region'], how='left')

    # Добавляем приросты обратно

    df_frmt_region_type_new = df_frmt_region_type.merge(df_incr, on=['frmt', 'region'], how='left')

    # Зануляем приросты для КГ
    cols_incr = [x[2] for x in incr_metrics]
    df_frmt_region_type_new.loc[df_frmt_region_type_new['is_ca'] == 0, cols_incr] = None

    # Далее считаем доп.приросты

    for incr_col, actn_col, prev_col, new_col in [
        ('incr_spend', 'reg_spend_actn', 'lfl_spend_actn', 'add_incr_spend'),
        ('incr_frq', 'reg_frq_actn', 'lfl_frq_actn', 'add_incr_frq'),
        ('incr_avg_cheque', 'reg_avg_cheque_actn', 'lfl_avg_cheque_actn', 'add_incr_avg_cheque'),
        ('incr_wo_art_spend', 'reg_spend_wo_art_actn_actn', 'lfl_spend_wo_art_actn_actn', 'add_incr_wo_art_spend'),
        ('incr_margin', 'reg_margin_actn', 'lfl_margin_actn', 'add_incr_margin'),
        ('incr_wo_art_margin', 'reg_margin_wo_art_actn_actn', 'lfl_margin_wo_art_actn_actn', 'add_incr_wo_art_margin')]:

        df_frmt_region_type_new[new_col] = (
            df_frmt_region_type_new[actn_col].combine_first(df_frmt_region_type_new[prev_col])
            - df_frmt_region_type_new[actn_col].combine_first(df_frmt_region_type_new[prev_col]) / (df_frmt_region_type_new[incr_col] + 1)
        )

        # Зануляем для КГ
        df_frmt_region_type_new.loc[df_frmt_region_type_new['is_ca'] == 0, new_col] = None

    # Расчёт ДОП. РТО и Маржи
    for col, add_col in [
        ('add_incr_spend', 'add_rto'),
        ('add_incr_wo_art_spend', 'add_rto_wo_art_actn'),
        ('add_incr_margin', 'add_margin'),
        ('add_incr_wo_art_margin', 'add_margin_wo_art')]:

        df_frmt_region_type_new[add_col] = (
            df_frmt_region_type_new['cnt_cus_actn'] * df_frmt_region_type_new[col] / 7 * actn_length
        )

    # ДОП.РТО и Маржа по акционным товарам

    df_frmt_region_type_new['add_rto_art_actn'] = df_frmt_region_type_new['add_rto'] - df_frmt_region_type_new['add_rto_wo_art_actn']
    df_frmt_region_type_new['add_margin_art_actn'] = df_frmt_region_type_new['add_margin'] - df_frmt_region_type_new['add_margin_wo_art']
    # Абсолютный прирост числа транзакций за всё время акции
    df_frmt_region_type_new['add_cnt_txn'] = (
        df_frmt_region_type_new['cnt_cus_actn']      # клиентов, участвовавших в акции
        * df_frmt_region_type_new['add_incr_frq']    # доп. транзакций на клиента в неделю
        / 7                                          # перевод к 1-му дню
        * actn_length                                # × длительность акции (дн.)
    )

    df_frmt_region_type_new.loc[
        df_frmt_region_type_new['is_ca'] == 0,
        'add_cnt_txn'
    ] = None

    df_frmt_region_type_new['impact_rto'] = (
    df_frmt_region_type_new['add_rto']
    / df_frmt_region_type_new['opsum_frmt_region']
    )

    global_ca_mask = (df_frmt_region_type_new['is_ca']==1) & (df_frmt_region_type_new['cus_type']=='REGULAR')
    df_global_ca = df_frmt_region_type_new[global_ca_mask]

    if len(df_global_ca) > 0:
        row_ca_reg = df_global_ca.iloc[0]
        tmp_incr_avg = row_ca_reg['incr_avg_cheque'] # прирост ср.чека (ЦА vs КГ)
        tmp_avg_cheque_a = row_ca_reg['reg_avg_cheque_actn']
        tmp_avg_cheque_p = row_ca_reg['reg_avg_cheque_prev']

        if tmp_incr_avg is not None and tmp_incr_avg != 0:
            # res_4_val = (avg_cheque_actn - avg_cheque_actn/(1+incr)) / avg_cheque_prev
            res_4_val = (
                tmp_avg_cheque_a
                - tmp_avg_cheque_a/(tmp_incr_avg + 1)
            ) / tmp_avg_cheque_p
        else:
            res_4_val = 0.0

        tmp_incr_frq = row_ca_reg['incr_frq']  # прирост частоты (ЦА vs КГ)
        tmp_frq_actn = row_ca_reg['reg_frq_actn']
        tmp_frq_prev = row_ca_reg['reg_frq_prev']

        if tmp_incr_frq is not None and tmp_incr_frq != 0:
            # res_5_val = (frq_actn - frq_actn/(1+incr_frq)) / frq_prev
            res_5_val = (
                tmp_frq_actn
                - tmp_frq_actn/(tmp_incr_frq + 1)
            ) / tmp_frq_prev
        else:
            res_5_val = 0.0
    else:
        # Если нет ни одной строки REGULAR & is_ca=1 вообще,
        # ставим оба 0 -> влияние на ср.чек/трафик будет 0
        res_4_val = 0.0
        res_5_val = 0.0


    avg_spent = (
        (df_frmt_region_type_new['reg_spend_prev'] + df_frmt_region_type_new['reg_spend_actn'])
        / 2
        / 7
    )


    # Влияние на РТО (ср.чек)
    df_frmt_region_type_new['impact_avg_txn'] = (
        df_frmt_region_type_new['add_incr_avg_cheque']
        * df_frmt_region_type_new['cnt_cus_actn']
        / 7
        * actn_length
    ) / df_frmt_region_type_new['opsum_frmt_region']

    # Влияние на РТО (трафик)
    df_frmt_region_type_new['impact_cnt_txn'] = (
        df_frmt_region_type_new['add_incr_frq']
        * df_frmt_region_type_new['cnt_cus_actn']
        / 7
        * actn_length
    ) / df_frmt_region_type_new['opsum_frmt_region']


    df_frmt_region_type_new['margin_ratio_prev'] = (
        df_frmt_region_type_new['opsum_margin_prev']
        / df_frmt_region_type_new['opsum_prev']
    )

    df_frmt_region_type_new['margin_ratio_actn'] = (
        df_frmt_region_type_new['opsum_margin_actn']
        / df_frmt_region_type_new['opsum_actn']
    )



    def unify_columns(df, reg_col, new_col, ret_col):
        """
        Склеить 3 колонки (REGULAR, NEW, RETURNED) в одну.
        Возвращаем название новой колонки.
        """
        univ_col = reg_col.replace('reg_', '') + '_univ'

        df[univ_col] = (
            df[reg_col]
            .combine_first(df[new_col])
            .combine_first(df[ret_col])
        )
        return univ_col

    col_groups = [
        ('reg_spend_actn', 'new_spend_actn', 'return_spend_actn', 'reg_spend_actn'),
        ('reg_frq_actn', 'new_frq_actn', 'return_frq_actn', 'reg_frq_actn'),
        ('reg_avg_cheque_actn','new_avg_cheque_actn','return_avg_cheque_actn','reg_avg_cheque_actn'),
        ('reg_margin_actn','new_margin_actn','return_margin_actn','reg_margin_actn'),
        ('reg_spend_wo_art_actn_actn','new_spend_wo_art_actn_actn','return_spend_wo_art_actn_actn','reg_spend_wo_art_actn_actn'),
        ('reg_margin_wo_art_actn_actn','new_margin_wo_art_actn_actn','return_margin_wo_art_actn_actn','reg_margin_wo_art_actn_actn')
    ]

    for (reg_col, new_col, ret_col, final_col) in col_groups:
        # Проверим, есть ли эти колонки в df (чтобы не упасть)
        if all(c in df_frmt_region_type_new.columns for c in [reg_col, new_col, ret_col]):
            # 1. Склеиваем
            univ_col = unify_columns(df_frmt_region_type_new, reg_col, new_col, ret_col)

            # 2. Переименовываем univ_col -> final_col (например, spend_actn_univ -> reg_spend_actn)
            # но перед этим удалим final_col, если вдруг был, чтобы не конфликтовать
            if final_col in df_frmt_region_type_new.columns:
                df_frmt_region_type_new.drop(columns=[final_col], inplace=True)

            df_frmt_region_type_new.rename(columns={univ_col: final_col}, inplace=True)

            # 3. Удаляем исходные (new_col, ret_col), чтобы не мешались
            df_frmt_region_type_new.drop(columns=[new_col, ret_col], inplace=True, errors='ignore')
        else:
            print(f"Пропускаем объединение для {reg_col}, {new_col}, {ret_col} — не все колонки найдены в df.")

    cols_for_export = [
        'frmt','region','cus_type','is_ca',
        'cnt_cus_actn','cnt_cus_lfl',
        'reg_spend_prev','reg_spend_actn','incr_spend',
        'reg_frq_prev','reg_frq_actn','incr_frq',
        'reg_avg_cheque_prev','reg_avg_cheque_actn','incr_avg_cheque',
        'add_rto', 'add_cnt_txn', 'add_rto_wo_art_actn','add_rto_art_actn',
        'add_margin','add_margin_wo_art','add_margin_art_actn',
        'impact_rto','impact_avg_txn','impact_cnt_txn',
        'opsum_frmt_region',
        'margin_ratio_prev','margin_ratio_actn'
    ]

    df_frmt_region_cus_exp = df_frmt_region_type_new[cols_for_export].copy()

    ind_kg = df_frmt_region_cus_exp.query('is_ca == 0').index
    df_frmt_region_cus_exp.loc[ind_kg, [
        'add_cnt_txn',
        'incr_spend','incr_frq','incr_avg_cheque',
        'add_rto','add_rto_wo_art_actn','add_rto_art_actn',
        'add_margin','add_margin_wo_art','add_margin_art_actn',
        'impact_rto','impact_avg_txn','impact_cnt_txn',
        'opsum_frmt_region'
    ]] = None


    df_frmt_region_cus_exp['is_ca'].replace({0:'КГ',1:'ЦА'}, inplace=True)


    df_frmt_region_cus_exp = df_frmt_region_cus_exp.set_index(
        ['frmt','region','cus_type','is_ca']
    ).transpose()
    ### Сводная Тотал
    df_total = gp_connector.gp(f"""	--sql
    with frmt_region_cus_type_metrics as (
        select
            frmt_id,
            region_id,
            cus_type,
            is_ca,
            is_cus_lfl,
            contact_id,
            sum(case when actn_period = 1 then opsum_wo_nds / cnt_day_wo_cross::float * 7 end)                          as spend_prev,
            sum(case when actn_period = 2 then opsum_wo_nds / cnt_day_wo_cross::float * 7 end)                          as spend_actn,
            sum(case when actn_period = 1 then (opsum_wo_nds - coalesce(opsum_wo_nds_art_actn, 0)) / cnt_day_wo_cross::float * 7 end) as spend_wo_art_actn_prev,
            sum(case when actn_period = 2 then (opsum_wo_nds - coalesce(opsum_wo_nds_art_actn, 0)) / cnt_day_wo_cross::float * 7 end) as spend_wo_art_actn_actn,
            sum(case when actn_period = 1 then cnt_trn / cnt_day_wo_cross::float * 7 end) / 
            count(distinct(case when actn_period = 1 then contact_id end)) 				                                as frq_prev,
            sum(case when actn_period = 2 then cnt_trn / cnt_day_wo_cross::float * 7 end) / 
            count(distinct(case when actn_period = 2 then contact_id end)) 				                                as frq_actn,
            sum(case when actn_period = 1 then opsum_wo_nds / cnt_day_wo_cross::float * 7 else 0 end)                   as opsum_wo_nds_week_prev,
            sum(case when actn_period = 2 then opsum_wo_nds / cnt_day_wo_cross::float * 7 else 0 end)                   as opsum_wo_nds_week_actn,
            sum(case when actn_period = 1 then cnt_trn / cnt_day_wo_cross::float * 7 else 0 end)                        as cnt_trn_week_prev,
            sum(case when actn_period = 2 then cnt_trn / cnt_day_wo_cross::float * 7 else 0 end)                        as cnt_trn_week_actn,
            sum(case when actn_period = 1 then (gross_margin_wo_nds_wo_logist*0.966) / cnt_day_wo_cross::float * 7 end) as margin_prev,
            sum(case when actn_period = 2 then (gross_margin_wo_nds_wo_logist*0.966) / cnt_day_wo_cross::float * 7 end) as margin_actn,
            sum(case when actn_period = 1 then ((gross_margin_wo_nds_wo_logist - 
                            coalesce(gross_margin_wo_nds_wo_logist_art_actn, 0)) *0.966) / cnt_day_wo_cross::float * 7 end) as margin_wo_art_actn_prev,
            sum(case when actn_period = 2 then ((gross_margin_wo_nds_wo_logist - 
                            coalesce(gross_margin_wo_nds_wo_logist_art_actn, 0)) *0.966) / cnt_day_wo_cross::float * 7 end) as margin_wo_art_actn_actn,
            sum(case when actn_period = 1 then disc end)                                                                as disc_prev,
            sum(case when actn_period = 2 then disc end)                                                                as disc_actn,
            max(cnt_cus_total)                                                                                          as cnt_cus_total
        from ba.t_zig_actn_margin
        where
            actn_name = '{actn_name}'
            and stat_test = 1
        group by 1,2,3,4,5,6
        ), opsum_total as (
        select sum(opsum_frmt_region) as opsum_frmt_region
        from ba.t_zig_opsum_frmt_region
        where actn_name = '{actn_name}'
        )
    select
        a.is_ca,
        max(case when a.is_ca = 1 then cnt_cus_total end)                                                   as cnt_cus_total,
        count(distinct a.contact_id)                                                                        as cnt_cus_actn,
        count(distinct case when a.is_cus_lfl = 1 then a.contact_id end)                                    as cnt_cus_lfl,
        avg(case when a.cus_type = 'REGULAR' and a.is_cus_lfl = 1 then spend_prev end)                      as reg_spend_prev,
        avg(case when a.cus_type = 'REGULAR' and a.is_cus_lfl = 1 then spend_actn end)                      as reg_spend_actn,
        avg(case when a.cus_type = 'REGULAR' and a.is_cus_lfl = 1 then frq_prev end)                        as reg_frq_prev,
        avg(case when a.cus_type = 'REGULAR' and a.is_cus_lfl = 1 then frq_actn end)                        as reg_frq_actn,
        sum(case when a.cus_type = 'REGULAR' and a.is_cus_lfl = 1 then opsum_wo_nds_week_prev end) /
        sum(case when a.cus_type = 'REGULAR' and a.is_cus_lfl = 1 then cnt_trn_week_prev end)               as reg_avg_cheque_prev,
        sum(case when a.cus_type = 'REGULAR' and a.is_cus_lfl = 1 then opsum_wo_nds_week_actn end) /
        sum(case when a.cus_type = 'REGULAR' and a.is_cus_lfl = 1 then cnt_trn_week_actn end)               as reg_avg_cheque_actn,
        sum(case when a.is_cus_lfl = 1 then opsum_wo_nds_week_actn end) /
        avg(case when a.cus_type = 'REGULAR' and a.is_cus_lfl = 1 then margin_prev end)                     as reg_margin_prev,
        avg(case when a.cus_type = 'REGULAR' and a.is_cus_lfl = 1 then margin_actn end)                     as reg_margin_actn,
        sum(disc_prev)                                                                                      as disc_prev,
        sum(disc_actn)                                                                                      as disc_actn,
        sum(spend_prev)                                                                                     as opsum_prev,
        sum(spend_actn)                                                                                     as opsum_actn,
        sum(margin_prev)                                                                                    as opsum_margin_prev,
        sum(margin_actn)                                                                                    as opsum_margin_actn,
        max(case when a.is_ca = 1 then f.opsum_frmt_region end)                                             as opsum_frmt_region
    from frmt_region_cus_type_metrics a
    join (select distinct frmt_id, frmt, region_id, region from dm.whs) w on w.frmt_id = a.frmt_id and w.region_id = a.region_id
    join opsum_total f on 1=1
    group by 1
    order by 1
    ;""")

    df_total.head(4)
    oborot_ap = gp_connector.gp(f"""
    with stat as (
    select distinct frmt_id, region_id
    from ba.t_zig_actn_margin
    where 1=1
    and actn_name = '{actn_name}'
    and stat_test = 1
    )

    select sum(opsum_frmt_region) as opsum_frmt_region
    from ba.t_zig_opsum_frmt_region r
    join stat s on s.frmt_id = r.frmt_id and s.region_id = r.region_id
    where actn_name = '{actn_name}'
    """)['opsum_frmt_region'][0]

    df_total['opsum_frmt_region'] = oborot_ap

    # Зануляем оборот АП для КГ
    ind = df_total.query('is_ca == 0').index
    df_total.loc[ind, ['opsum_frmt_region']] = None
    # Чистый прирост Трат
    df_11 = df_total.query('is_ca == 1')['reg_spend_actn'] / df_total.query('is_ca == 1')['reg_spend_prev']
    df_12 = df_total.query('is_ca == 0')['reg_spend_actn'] / df_total.query('is_ca == 0')['reg_spend_prev']
    res_1 = df_11.reset_index(drop=True) / df_12.reset_index(drop=True) - 1
    res_1.name = 'incr_spend'

    # Чистый прирост Частоты
    df_21 = df_total.query('is_ca == 1')['reg_frq_actn'] / df_total.query('is_ca == 1')['reg_frq_prev']
    df_22 = df_total.query('is_ca == 0')['reg_frq_actn'] / df_total.query('is_ca == 0')['reg_frq_prev']
    res_2 = df_21.reset_index(drop=True) / df_22.reset_index(drop=True) - 1
    res_2.name = 'incr_frq'

    # Чистый прирост Ср.чек
    df_31 = df_total.query('is_ca == 1')['reg_avg_cheque_actn'] / df_total.query('is_ca == 1')['reg_avg_cheque_prev']
    df_32 = df_total.query('is_ca == 0')['reg_avg_cheque_actn'] / df_total.query('is_ca == 0')['reg_avg_cheque_prev']
    res_3 = df_31.reset_index(drop=True) / df_32.reset_index(drop=True) - 1
    res_3.name = 'incr_avg_cheque'
    df_incr = pd.concat([res_1, res_2], axis=1)
    df_incr = pd.concat([df_incr, res_3], axis=1)
    df_incr
    # Добавляю приросты в Таблицу
    df_total = df_total.merge(df_incr, how='cross')
    # Добавляю Доп.РТО и Доп.маржу в Таблицу
    add_rto = df_frmt_region_type_new.groupby('is_ca')[['add_rto', 'add_rto_wo_art_actn', 'add_rto_art_actn', 'add_margin', 'add_margin_wo_art', 'add_margin_art_actn', 'add_cnt_txn']].sum()
    df_total = pd.concat([df_total, add_rto], axis=1)

    avg_spent = (df_total.reg_spend_actn + df_total.reg_spend_prev) / 2 / 7

    # Рассчитываю влияние на РТО
    df_total['impact_rto'] = df_total['add_rto'] / df_total['opsum_frmt_region']

    # Рассчитываю влияние на Ср.чек
    #res_4 = (df_total.query('is_ca == 1')['reg_spend_actn'] - df_total.query('is_ca == 1')['reg_spend_actn'] / (res_3.values + 1)) / df_total.query('is_ca == 1')['reg_spend_prev']
    res_4 = (df_total.query('is_ca == 1')['reg_avg_cheque_actn'] - df_total.query('is_ca == 1')['reg_avg_cheque_actn'] / (res_3.values + 1)) / df_total.query('is_ca == 1')['reg_avg_cheque_prev']
    df_total['impact_avg_txn'] = (avg_spent * df_total.cnt_cus_actn * actn_length * res_4.values) / df_total.opsum_frmt_region

    # Рассчитываю влияние на Трафик
    res_5 = (df_total.query('is_ca == 1')['reg_frq_actn'] - df_total.query('is_ca == 1')['reg_frq_actn'] / (res_2.values + 1)) / df_total.query('is_ca == 1')['reg_frq_prev']
    df_total['impact_cnt_txn'] = (avg_spent * df_total.cnt_cus_actn * actn_length * res_5.values) / df_total.opsum_frmt_region

    # Маржинальность
    df_total['margin_ratio_prev'] = df_total.opsum_margin_prev / df_total.opsum_prev
    df_total['margin_ratio_actn'] = df_total.opsum_margin_actn / df_total.opsum_actn
    df_total_exp = df_total[[
        'is_ca', 'cnt_cus_total', 'cnt_cus_actn', 'cnt_cus_lfl',
        'reg_spend_prev', 'reg_spend_actn', 'incr_spend', 
        'reg_frq_prev', 'reg_frq_actn', 'incr_frq',
        'reg_avg_cheque_prev', 'reg_avg_cheque_actn', 'incr_avg_cheque',
        'add_rto', 'add_cnt_txn', 'add_rto_wo_art_actn', 'add_rto_art_actn',
        'add_margin', 'add_margin_wo_art', 'add_margin_art_actn',
        'impact_rto', 'impact_avg_txn', 'impact_cnt_txn',
        'opsum_frmt_region', 'margin_ratio_prev', 'margin_ratio_actn'
    ]]
    # Зануляю приросты для КГ
    # Зануляю приросты для КГ
    ind = df_total_exp.query('is_ca == 0').index
    df_total_exp.loc[ind, ['add_cnt_txn', 'incr_spend', 'incr_frq', 'incr_avg_cheque', 'add_rto', 'add_rto_wo_art_actn', 'add_rto_art_actn', 'add_margin',
                'add_margin_wo_art', 'add_margin_art_actn', 'impact_rto', 'impact_avg_txn', 'impact_cnt_txn', 'opsum_frmt_region']] = None

    df_total_exp['is_ca'].replace({0:'КГ', 1:'ЦА'}, inplace=True)
    df_total_exp = df_total_exp.set_index('is_ca').transpose()
    df_total_exp
    
    # 1) Формируем компактный блок для v1: Доп.РТО, Доп.Маржа, Маржинальность (во время)
    need_rows = ['add_rto', 'add_margin', 'margin_ratio_actn']
    exist_rows = [r for r in need_rows if r in df_total_exp.index]

    compact = df_total_exp.loc[exist_rows].copy()

    # Добиваем до 6 строк под шаблон B6:C11
    while compact.shape[0] < 6:
        compact.loc[f'_pad_{compact.shape[0]+1}'] = [None, None]

    # Возвращаем ТОЛЬКО компактный блок 6×2 (ЦА/КГ в двух столбцах)
    result = {
        "compact_block": compact.values,
        "add_rto": {},
        "add_margin": {},
        "margin_ratio_actn": {},
    }

    def _row_to_dict(row_name):
        if row_name in df_total_exp.index:
            row = df_total_exp.loc[row_name]
            return {
                col: (None if pd.isna(val) else float(val))
                for col, val in row.items()
            }
        return {}

    result["add_rto"] = _row_to_dict("add_rto")
    result["add_margin"] = _row_to_dict("add_margin")
    result["margin_ratio_actn"] = _row_to_dict("margin_ratio_actn")

    return result


## pick_metrics

In [212]:
def _pick_metrics_block(df, rows_wanted=None, n_rows=6):
    """
    df: DataFrame с колонками ['Акционный','До'] и индексом-метриками.
    """
    if rows_wanted is None:
        rows_wanted = [
            'reg_spend_actn','reg_spend_prev',
            'reg_frq_actn','reg_frq_prev',
            'reg_avg_cheque_actn','reg_avg_cheque_prev',
            'reg_margin_actn','reg_margin_prev',
            'cnt_cus_actn','cnt_cus_lfl'
        ]
    rows = [r for r in rows_wanted if r in df.index]

    if len(rows) < n_rows:
        # добавим первые встречные строки с числовыми значениями
        for idx, s in df.iterrows():
            if idx in rows:
                continue
            try:
                is_num = pd.api.types.is_numeric_dtype(s.dtype) or pd.api.types.is_float_dtype(s.dtype)
            except Exception:
                is_num = False
            if is_num:
                rows.append(idx)
                if len(rows) >= n_rows:
                    break

    block = df.loc[rows[:n_rows], ['Акционный','До']].copy()
    while block.shape[0] < n_rows:
        block.loc[f'_pad_{block.shape[0]+1}'] = [None, None]
    return block.values

## prepare_v2_v3

In [213]:
def prepare_v2_v3_payload(gp_connector, mask, type, in_code_ruls,
                          promo_start_date_act, promo_end_date_act, actn_id,
                          version=None):
    # Метрики (пробуем с передачей версий, иначе — старый вызов)
    try:
        if version is not None:
            df_ca = ca_metrics(gp_connector, mask, type, version)
            df_kg = kg_metrics(gp_connector, mask, type, version)
        else:
            raise TypeError
    except TypeError:
        df_ca = ca_metrics(gp_connector, mask, type)
        df_kg = kg_metrics(gp_connector, mask, type)

    ca_block = _pick_metrics_block(df_ca, n_rows=6)   # 6×2
    kg_block = _pick_metrics_block(df_kg, n_rows=6)   # 6×2

    # Бонусы — нормализуем список кодов под IN (...)
    bon_df = gp_connector.gp(f"""
        SELECT sum(addition_point)/100.0 AS add_points
        FROM dm.transaction_rule t
        WHERE rule_code IN ('{in_code_ruls}')
          AND t.created_on BETWEEN '{promo_start_date_act}'::timestamp
                               AND '{promo_end_date_act}'::timestamp + interval '1 day' - interval '1 second'
    """)
    bon = float(bon_df.iloc[0, 0]) if not bon_df.empty else 0.0

    # Уники (фильтр по is_ca уже есть в самих функциях)
    u_ca_df = unique_clients_ca(gp_connector, mask, actn_id)
    u_kg_df = unique_clients_kg(gp_connector, mask, actn_id)
    uniq_ca = int(u_ca_df.iloc[0, 0]) if not u_ca_df.empty else 0
    uniq_kg = int(u_kg_df.iloc[0, 0]) if not u_kg_df.empty else 0

    return ca_block, kg_block, bon, uniq_ca, uniq_kg

## edit_file

In [214]:
def edit_file_batch(file_name, sheet_name, writes):
    """
    writes: список кортежей (cell_top_left, value, write_index, write_header)
    value — 2D-массив/df или скаляр.
    """
    import xlwings as xw
    import time, gc

    app = None
    wb = None
    try:
        app = xw.App(visible=False, add_book=False)
        app.display_alerts = False
        app.screen_updating = False

        wb = app.books.open(file_name, update_links=False, read_only=False)
        sh = wb.sheets[sheet_name]  # бросит KeyError, если листа нет

        for cell, value, idx, hdr in writes:
            sh.range(cell).options(index=bool(idx), header=bool(hdr)).value = value

        wb.save()
    finally:
        try:
            if wb is not None:
                wb.close()
        except Exception as e:
            logging.warning(f"wb.close() error: {e}")
        try:
            if app is not None:
                app.quit()
        except Exception as e:
            logging.warning(f"app.quit() error: {e}")
        # подчистка COM-прокси
        try: del sh
        except Exception: pass
        try: del wb
        except Exception: pass
        try: del app
        except Exception: pass
        gc.collect()
        time.sleep(0.2)

# Функции обработки акций

In [215]:
def process_action(gp_connector, type, version, file_name):

    """
    Обрабатывает одну акцию по уникальной маске.
    """
    # Получаем параметры акции из таблицы helper_action по маске
    df_action = gp_connector.gp(f"""
        SELECT mask,
        actn_name,
        actn_type,
        is_processed,
        is_kg_needed,
        lengthprev,
        lengthpost,
        promo_start_date_act,
        promo_end_date_act,
        promo_start_date_prev,
        promo_end_date_prev,
        type_art_group_level, 
        art_grp_lvl_name,
        code_act,
        code_prev,
        version,
        type, 
        in_code_ruls
    FROM BA.helper_category
    WHERE type = '{type}' and version = {version}
    """)
    if df_action.empty:
        logging.warning(f"[process_action][type = {type}] Акция не найдена в helper_category.")
        return
    mask                  = df_action['mask'].iloc[0]
    actn_name             = df_action['actn_name'].iloc[0]
    actn_type             = df_action['actn_type'].iloc[0]
    lengthprev            = df_action['lengthprev'].iloc[0]
    lengthpost            = df_action['lengthpost'].iloc[0]
    promo_start_date_act  = str(df_action['promo_start_date_act'].iloc[0])
    promo_end_date_act    = str(df_action['promo_end_date_act'].iloc[0])
    promo_start_date_prev = str(df_action['promo_start_date_prev'].iloc[0])
    promo_end_date_prev   = str(df_action['promo_end_date_prev'].iloc[0])
    type_art_group_level  = df_action['type_art_group_level'].iloc[0]
    art_grp_lvl_2_name    = df_action['art_grp_lvl_name'].iloc[0]
    code_act              = df_action['code_act'].iloc[0]
    code_prev             = df_action['code_prev'].iloc[0]
    version               = df_action['version'].iloc[0]
    type                  = df_action['type'].iloc[0]
    in_code_ruls          = df_action['in_code_ruls'].iloc[0]

    logging.info(f"[process_action][type = {type}] Начинаем обработку акции '{actn_name}'.")

    # # Получаем числовой идентификатор акции из таблицы vt_{mask}_spr_actn
    # df_spr = gp_connector.gp(f"""
    #     SELECT actn_id,
    #         actn_grp,
    #         actn_length,
    #         date_start,
    #         date_end,
    #         date_st_pre,
    #         date_end_last
    #     FROM ba.vt_{mask}_spr_actn
    #     WHERE actn_name = '{actn_name}'
    # """)
    # if df_spr.empty:
    #     logging.warning(f"[process_action][mask = {mask}] Запись в vt_{mask}_spr_actn не найдена.")
    #     return

    # actn_id    = df_spr['actn_id'].iloc[0]
    # lengthActn = df_spr['actn_length'].iloc[0]
    # DateStart  = str(df_spr['date_start'].iloc[0])
    # DateEnd    = str(df_spr['date_end'].iloc[0])
    # DateStPre  = str(df_spr['date_st_pre'].iloc[0])
    # DateEndLast= str(df_spr['date_end_last'].iloc[0])

    # logging.info(f"[process_action][mask = {mask}] n_KG = {n_KG}. Формируем контрольные группы с n_KG = {n_KG}.")

    # Выполнение всех шагов последовательно
    try:
        create_whs(gp_connector, mask)
        logging.info("Шаг 1 (whs) выполнен успешно.")

        create_cus_ruls(gp_connector, mask, art_grp_lvl_2_name, type_art_group_level, promo_start_date_act, promo_end_date_act, actn_name, type, version, code_act, code_prev, promo_start_date_prev, promo_end_date_prev)
        logging.info("Шаг 2 (cus_ruls) выполнен успешно.")

        n_KG = create_kg(gp_connector, mask, actn_name, art_grp_lvl_2_name, promo_start_date_act, promo_end_date_act, type, code_act, code_prev, promo_start_date_prev, promo_end_date_prev)
        logging.info("Шаг 3 (kg) выполнен успешно.")

        create_spr_actn(gp_connector, mask, actn_type, actn_name, lengthprev, lengthpost)
        logging.info("Шаг 4 (spr_actn) выполнен успешно.")

        actn_id = gp_connector.gp(f"""SELECT Actn_id FROM ba.vt_{mask}_spr_actn WHERE actn_name = '{actn_name}';""").actn_id[0]
        lengthActn = gp_connector.gp(f"""SELECT Actn_Length FROM ba.vt_{mask}_spr_actn WHERE actn_name = '{actn_name}';""").actn_length[0]
        DateStart = str(gp_connector.gp(f"""SELECT DATE_START FROM ba.vt_{mask}_spr_actn WHERE actn_name = '{actn_name}';""").date_start[0])
        DateEnd = str(gp_connector.gp(f"""SELECT DATE_END FROM ba.vt_{mask}_spr_actn WHERE actn_name = '{actn_name}';""").date_end[0])
        DateStPre = str(gp_connector.gp(f"""SELECT Date_St_Pre FROM ba.vt_{mask}_spr_actn WHERE actn_name = '{actn_name}';""").date_st_pre[0])
        DateEndLast = str(gp_connector.gp(f"""SELECT Date_End_Last FROM ba.vt_{mask}_spr_actn WHERE actn_name = '{actn_name}';""").date_end_last[0])

        cutoff = create_days(gp_connector, mask, actn_id, DateStPre, DateEndLast)
        logging.info("Шаг 5 (days) выполнен успешно.")

        create_days_cross(gp_connector, mask, cutoff, actn_id)
        logging.info("Шаг 6 (days_cross) выполнен успешно.")

        frod(gp_connector, mask)
        logging.info("Шаг - (frod) выполнен успешно.")

        create_trn_0(gp_connector, mask, actn_id, lengthprev)
        logging.info("Шаг 7 (create_trn_0) выполнен успешно.")

        create_trn_0_v2(gp_connector, mask, actn_id, lengthActn)
        logging.info("Шаг 8 (create_trn_0_v2) выполнен успешно.")

        create_agg(gp_connector, mask, DateStart)
        logging.info("Шаг 9 (create_agg) выполнен успешно.")

        process_clear_and_aggregate(gp_connector, mask)
        logging.info("Шаг 10 (process_clear_and_aggregate) выполнен успешно.")

        reg_new_returned(gp_connector, mask, promo_start_date_act)
        logging.info("Шаг 11 (reg_new_returned) выполнен успешно.")

        clear_reg(gp_connector, mask, actn_id, actn_name, lengthprev, lengthActn)
        logging.info("Шаг 12 (clear_reg) выполнен успешно.")

        dna(gp_connector, mask, lengthActn)
        logging.info("Шаг 13 (dna) выполнен успешно.")

        kg_for_ca(gp_connector, mask, n_KG)
        logging.info("Шаг 14 (kg_for_ca) выполнен успешно.")

        cnt_actn(gp_connector, mask, DateStPre, DateStart, DateEnd, actn_id)
        logging.info("Шаг 14 (cnt_actn) выполнен успешно.")

        dynamic_gr20(gp_connector, mask, actn_id)
        logging.info("Шаг 16 (dynamic_gr20) выполнен успешно.")

        gr20_transp(gp_connector, mask)
        logging.info("Шаг 17 (gr20_transp) выполнен успешно.")

        age_and_active_virt(gp_connector, mask)
        logging.info("Шаг 18 (age_and_active_virt) выполнен успешно.")

        itog(gp_connector, mask, lengthActn)
        logging.info("Шаг 19 (itog) выполнен успешно.")

        psm(gp_connector, mask, actn_name, actn_id)
        logging.info("Шаг 20 (psm) выполнен успешно.")

        if version == 1:
            margin(gp_connector, mask, actn_name, promo_start_date_act, promo_end_date_act, in_code_ruls, actn_id, DateStart, DateEnd, DateStPre, DateEndLast, lengthprev, lengthpost, lengthActn)
            logging.info("Шаг 21 (margin) выполнен успешно.")

            oborot_rto(gp_connector, mask, promo_start_date_act, promo_end_date_act, actn_name)
            logging.info("Шаг 22 (oborot_rto) выполнен успешно.")

            metrics_act(gp_connector, mask, type, actn_id, promo_start_date_act, promo_end_date_act, version)
            logging.info("Шаг 24 (metrics_act) выполнен успешно.")

            metrics_prev(gp_connector, mask, type, actn_id, promo_start_date_prev, promo_end_date_prev, version)
            logging.info("Шаг 25 (metrics_prev) выполнен успешно.")

            df_ca = ca_metrics(gp_connector, mask, type, version)
            logging.info("Шаг 25 (ca_metrics) выполнен успешно.")

            df_kg = kg_metrics(gp_connector, mask, type, version)
            logging.info("Шаг 25 (kg_metrics) выполнен успешно.")

            bonus_df = bonuses(gp_connector, mask, in_code_ruls, promo_start_date_act, promo_end_date_act)
            logging.info("Шаг 25 (bonuses) выполнен успешно.")

            uniq_ca_df = unique_clients_ca(gp_connector, mask, actn_id)
            logging.info("Шаг 25 (unique_clients_ca) выполнен успешно.")

            uniq_kg_df = unique_clients_kg(gp_connector, mask, actn_id)
            logging.info("Шаг 25 (unique_clients_kg) выполнен успешно.")

            report_payload = loading_report(gp_connector, promo_start_date_act, promo_end_date_act, actn_name)
            logging.info("Шаг 24 (loading_report) выполнен успешно.")

            ca_block = _pick_metrics_block(df_ca, n_rows=6)
            kg_block = _pick_metrics_block(df_kg, n_rows=6)

            bonus_value = 0.0
            if not bonus_df.empty:
                raw_bonus = bonus_df.iloc[0, 0]
                bonus_value = 0.0 if pd.isna(raw_bonus) else float(raw_bonus)

            uniq_ca = 0
            if not uniq_ca_df.empty:
                raw_uniq_ca = uniq_ca_df.iloc[0, 0]
                uniq_ca = 0 if pd.isna(raw_uniq_ca) else int(raw_uniq_ca)

            uniq_kg = 0
            if not uniq_kg_df.empty:
                raw_uniq_kg = uniq_kg_df.iloc[0, 0]
                uniq_kg = 0 if pd.isna(raw_uniq_kg) else int(raw_uniq_kg)

            margin_map = report_payload.get("margin_ratio_actn", {}) if isinstance(report_payload, dict) else {}
            add_rto_map = report_payload.get("add_rto", {}) if isinstance(report_payload, dict) else {}
            add_margin_map = report_payload.get("add_margin", {}) if isinstance(report_payload, dict) else {}

            def _to_float_or_none(value):
                if value is None or pd.isna(value):
                    return None
                return float(value)

            margin_ca = _to_float_or_none(margin_map.get('ЦА'))
            margin_kg = _to_float_or_none(margin_map.get('КГ'))
            add_rto_ca = _to_float_or_none(add_rto_map.get('ЦА'))
            add_margin_ca = _to_float_or_none(add_margin_map.get('ЦА'))

            edit_file_batch(
                file_name,
                sheet_name="ЦА и КГ регулярные на клиента",
                writes=[
                    ("B6", ca_block, False, False),
                    ("E6", kg_block, False, False),
                    ("B12", uniq_ca, False, False),
                    ("E12", uniq_kg, False, False),
                    ("B13", margin_ca, False, False),
                    ("E13", margin_kg, False, False),
                    ("B14", bonus_value, False, False),
                    ("B15", add_rto_ca, False, False),
                    ("B16", add_margin_ca, False, False),
                ]
            )


            delete_tables(gp_connector, mask)
            logging.info("Шаг 23 (delete_tables) выполнен успешно.")
  
        else:
            metrics_act(gp_connector, mask, type, actn_id, promo_start_date_act, promo_end_date_act, version)

            metrics_prev(gp_connector, mask, type, actn_id, promo_start_date_prev, promo_end_date_prev, version)

            ca_block, kg_block, _bonus, uniq_ca, uniq_kg = prepare_v2_v3_payload(
                gp_connector, mask, type, in_code_ruls, promo_start_date_act, promo_end_date_act, actn_id, version=version
            )
            left  = 'B24' if version == 2 else 'B37'
            right = 'E24' if version == 2 else 'E37'
            uniq_ca_cell = 'B30' if version == 2 else 'B43'
            uniq_kg_cell = 'E30' if version == 2 else 'E43'
            edit_file_batch(
                file_name,
                sheet_name="ЦА и КГ регулярные на клиента",
                writes=[
                    (left,  ca_block, False, False),  # 6×2 ЦА
                    (right, kg_block, False, False),  # 6×2 КГ
                    (uniq_ca_cell, uniq_ca, False, False),
                    (uniq_kg_cell, uniq_kg, False, False),
                ]
            )

            delete_tables(gp_connector, mask)
            logging.info("Шаг 21 (delete_tables) выполнен успешно.")

    except Exception as e:
        logging.error(f"[process_action][mask = {mask}] Ошибка при обработке: {e}")
        raise e

    logging.info(f"[process_action][mask = {mask}] Обработка акции '{actn_name}' завершена успешно.")

In [None]:
def run_all_actions(gp_connector):
    logging.info("[run_all_actions] Начинаем обработку всех акций без повторных попыток.")

    # Берём сразу type+version (+month для имени файла)
    df_actions = gp_connector.gp("""
        SELECT type,
               version,
               EXTRACT(month from promo_start_date_act)::int AS month
        FROM BA.helper_category
        WHERE is_processed = FALSE
        ORDER BY type, version;
    """)

    if df_actions.empty:
        logging.info("[run_all_actions] Нет новых акций для обработки (is_processed = FALSE).")
        return

    original_file = 'Анализ Пива_шаблон.xlsx'

    # Один файл на (type, month); внутри — только те версии, что реально ждут обработки
    for (current_type, current_month), grp in df_actions.groupby(['type', 'month']):
        file_name = f"Анализ_{current_type}_{int(current_month)}.xlsx"
        shutil.copy2(original_file, file_name)

        for version in sorted(grp['version'].unique()):
            # Доп. защита от повторов/гонок: перепроверка флага прямо перед запуском
            flag = gp_connector.gp(f"""
                SELECT is_processed
                FROM BA.helper_category
                WHERE type = '{current_type}' AND version = {int(version)}
                LIMIT 1
            """)
            if not flag.empty and bool(flag['is_processed'].iloc[0]):
                logging.info(f"[run_all_actions][{current_type} v{version}] Уже обработана, пропускаю.")
                continue

            process_action(gp_connector, current_type, int(version), file_name)

            gp_connector.execute_query(f"""
                UPDATE BA.helper_category
                SET is_processed = TRUE
                WHERE type = '{current_type}' AND version = {int(version)};
            """)
            logging.info(f"[run_all_actions][type={current_type}, version={version}] Отмечена как обработанная.")

    logging.info("[run_all_actions] Обработка всех акций завершена.")


In [217]:
# def main():
#     """
#     Главная функция запуска автоматической обработки акций.
#     При запуске устанавливается подключение к базе данных, запускается цикл обработки,
#     а по завершении соединение закрывается.
#     """
#     logging.info("=== Запуск автоматической обработки акций ===")
#     try:
#         logging.info("Подключаемся к базе данных GreenPlum.")
#         logging.info("Запускаем обработку всех акций через run_all_actions.")
#         run_all_actions(gp_connector)
#     except Exception as e:
#         logging.error(f"Ошибка в main(): {e}")
#         raise e
#     finally:
#         logging.info("Закрываем соединение с базой данных.")
#         try:
#             gp_connector.close()
#         except Exception as close_error:
#             logging.error(f"Ошибка при закрытии соединения: {close_error}")
#     logging.info("=== Автоматическая обработка акций завершена ===")


In [218]:
def safe_close_resources(gp_connector, *, kill_excel=False):
    # Закрываем БД-коннектор без падения
    try:
        if hasattr(gp_connector, "close"):
            gp_connector.close()
    except Exception as e:
        logging.warning(f"Ошибка при закрытии gp_connector: {e}")
    try:
        if hasattr(gp_connector, "engine") and gp_connector.engine:
            gp_connector.engine.dispose()
    except Exception:
        pass

    # Мягкая попытка закрыть Excel-приложения без обращения к xw.apps
    try:
        import xlwings as xw
        # создаём скрытое приложение и сразу закрываем — это безопасно,
        # и часто корректно снимает COM-хвосты
        app = xw.App(visible=False, add_book=False)
        try:
            app.quit()
        except Exception:
            pass
        del app
    except Exception:
        pass

    # Жёсткая чистка (опционально)
    if kill_excel:
        try:
            import subprocess, os
            subprocess.run(
                ["taskkill", "/F", "/IM", "EXCEL.EXE", "/T"],
                stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False
            )
        except Exception:
            pass

In [219]:
def main():
    logging.info("=== Запуск автоматической обработки акций ===")
    try:
        logging.info("Подключаемся к базе данных GreenPlum.")
        logging.info("Запускаем обработку всех акций через run_all_actions.")
        run_all_actions(gp_connector)
    except Exception as e:
        logging.error(f"Ошибка в main(): {e}")
        raise
    finally:
        logging.info("Закрываем соединения/ресурсы.")
        safe_close_resources(gp_connector, kill_excel=False)   # при необходимости поставишь True
    logging.info("=== Автоматическая обработка акций завершена ===")

In [220]:
# Запуск основного процесса
main()

2025-09-15 14:55:29,017 - INFO - === Запуск автоматической обработки акций ===
2025-09-15 14:55:29,019 - INFO - Подключаемся к базе данных GreenPlum.
2025-09-15 14:55:29,020 - INFO - Запускаем обработку всех акций через run_all_actions.
2025-09-15 14:55:29,021 - INFO - [run_all_actions] Начинаем обработку всех акций без повторных попыток.
2025-09-15 14:55:32,458 - INFO - [process_action][type = kolbasa] Начинаем обработку акции 'ЛК_Колбасы1_июль25_тест'.
2025-09-15 14:55:32,459 - INFO - Начало выполнения функции create_whs с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_1'), kwargs={}
2025-09-15 14:55:33,356 - INFO - Функция create_whs завершена успешно.
2025-09-15 14:55:33,360 - INFO - Шаг 1 (whs) выполнен успешно.
2025-09-15 14:55:33,362 - INFO - Начало выполнения функции create_cus_ruls с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_1', 'Вареные колбасы; Ветчина; Копченые к

  0%|          | 0/2 [00:00<?, ?it/s]

2025-09-15 15:25:19,286 - INFO - Функция create_trn_0 завершена успешно.
2025-09-15 15:25:19,289 - INFO - Шаг 7 (create_trn_0) выполнен успешно.
2025-09-15 15:25:19,290 - INFO - Начало выполнения функции create_trn_0_v2 с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_1', 2793, 31), kwargs={}
2025-09-15 15:25:19,295 - ERROR - Операционная ошибка SELECT (попытка 1): (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[SQL: 
            select
                    month_id
                    ,min(day_id) as min_dt
                    ,max(day_id) as max_dt
            from
                    ba.vt_pva_kolbasa_1_days
                where
                    actn_id = 2793 
                    and actn_period = 2
                group by 1 
                order by 1
            ;]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
2025-09-15 15:25:19,296 - INFO - Повторная попытка через 30 секунд...


  0%|          | 0/1 [00:00<?, ?it/s]

2025-09-15 15:25:50,663 - ERROR - Операционная ошибка (попытка 1): SSL SYSCALL error: EOF detected

2025-09-15 15:25:50,665 - INFO - Повторная попытка через 30 секунд...
2025-09-15 15:26:21,408 - INFO - Успешно подключились к GreenPlum.
2025-09-15 15:28:10,437 - INFO - Функция create_trn_0_v2 завершена успешно.
2025-09-15 15:28:10,438 - INFO - Шаг 8 (create_trn_0_v2) выполнен успешно.
2025-09-15 15:28:10,438 - INFO - Начало выполнения функции create_agg с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_1', '2025-07-01'), kwargs={}
2025-09-15 15:28:16,666 - ERROR - Операционная ошибка SELECT (попытка 1): (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[SQL: 
        SELECT is_kg_needed
        FROM BA.helper_category
        WHERE mask = 'pva_kolbasa_1'
    ]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
2025-09-15 15:28:16,667 - INFO - Повторная попытка через 30 секунд...
2025-09-15 15:28:47,952 - INFO - [

  0%|          | 0/2 [00:00<?, ?it/s]

2025-09-15 15:38:59,338 - INFO - Функция dynamic_gr20 завершена успешно.
2025-09-15 15:38:59,341 - INFO - Шаг 16 (dynamic_gr20) выполнен успешно.
2025-09-15 15:38:59,342 - INFO - Начало выполнения функции gr20_transp с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_1'), kwargs={}
2025-09-15 15:39:06,110 - INFO - Функция gr20_transp завершена успешно.
2025-09-15 15:39:06,111 - INFO - Шаг 17 (gr20_transp) выполнен успешно.
2025-09-15 15:39:06,111 - INFO - Начало выполнения функции age_and_active_virt с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_1'), kwargs={}
2025-09-15 15:39:14,513 - INFO - Функция age_and_active_virt завершена успешно.
2025-09-15 15:39:14,518 - INFO - Шаг 18 (age_and_active_virt) выполнен успешно.
2025-09-15 15:39:14,520 - INFO - Начало выполнения функции itog с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolba

Доля мэтчинга ЛФЛ ЦА/КГ:

Загрузка данных frmt_id=104, region_id=180 батчами...
Загружено всего строк: 100000, в текущем батче: 17693
104 180 [94.62783172]

Загрузка данных frmt_id=104, region_id=247 батчами...
Загружено всего строк: 100000, в текущем батче: 14312
104 247 [95.09202454]

Загрузка данных frmt_id=104, region_id=275 батчами...
Загружено всего строк: 100000, в текущем батче: 15151
104 275 [93.79310345]

Загрузка данных frmt_id=104, region_id=227 батчами...
Загружено всего строк: 100000, в текущем батче: 19795
104 227 [95.1372549]

Загрузка данных frmt_id=104, region_id=197 батчами...
Загружено всего строк: 100000, в текущем батче: 9965
104 197 [92.46861925]

Загрузка данных frmt_id=104, region_id=280 батчами...
Загружено всего строк: 100000, в текущем батче: 28367
104 280 [96.28022319]

Загрузка данных frmt_id=104, region_id=284 батчами...
Загружено всего строк: 100000, в текущем батче: 19825
104 284 [95.31615925]

Загрузка данных frmt_id=104, region_id=173 батчами...
Загру

2025-09-15 15:48:29,376 - ERROR - Операционная ошибка SELECT (попытка 1): (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[SQL: 
                    SELECT * FROM ba.vt_pva_kolbasa_1_cus_profile 
                    WHERE frmt_id = '3' AND region_id = '227'
                    LIMIT 100000 OFFSET 0
                ]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
2025-09-15 15:48:29,377 - INFO - Повторная попытка через 30 секунд...


3 275 [93.26728429]

Загрузка данных frmt_id=3, region_id=227 батчами...
Загружено всего строк: 100000, в текущем батче: 100000
Загружено всего строк: 200000, в текущем батче: 100000
Загружено всего строк: 300000, в текущем батче: 100000
Загружено всего строк: 400000, в текущем батче: 100000
Загружено всего строк: 500000, в текущем батче: 100000
Загружено всего строк: 600000, в текущем батче: 38441
3 227 [92.13347921]

Загрузка данных frmt_id=3, region_id=197 батчами...
Загружено всего строк: 100000, в текущем батче: 100000
Загружено всего строк: 200000, в текущем батче: 100000
Загружено всего строк: 300000, в текущем батче: 45570
3 197 [92.77518824]

Загрузка данных frmt_id=3, region_id=280 батчами...
Загружено всего строк: 100000, в текущем батче: 100000
Загружено всего строк: 200000, в текущем батче: 100000
Загружено всего строк: 300000, в текущем батче: 100000
Загружено всего строк: 400000, в текущем батче: 100000
Загружено всего строк: 500000, в текущем батче: 100000
Загружено все

2025-09-15 15:59:42,794 - ERROR - Операционная ошибка SELECT (попытка 1): (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[SQL: 
                    SELECT * FROM ba.vt_pva_kolbasa_1_cus_profile 
                    WHERE frmt_id = '3' AND region_id = '173'
                    LIMIT 100000 OFFSET 0
                ]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
2025-09-15 15:59:42,795 - INFO - Повторная попытка через 30 секунд...


3 284 [93.76830208]

Загрузка данных frmt_id=3, region_id=173 батчами...
Загружено всего строк: 100000, в текущем батче: 100000
Загружено всего строк: 200000, в текущем батче: 100000
Загружено всего строк: 300000, в текущем батче: 38099
3 173 [92.40050663]

Загрузка данных frmt_id=2, region_id=180 батчами...
Загружено всего строк: 100000, в текущем батче: 100000
Загружено всего строк: 200000, в текущем батче: 100000
Загружено всего строк: 300000, в текущем батче: 100000
Загружено всего строк: 400000, в текущем батче: 100000
Загружено всего строк: 500000, в текущем батче: 95454
2 180 [89.08802494]

Загрузка данных frmt_id=2, region_id=247 батчами...
Загружено всего строк: 100000, в текущем батче: 100000
Загружено всего строк: 200000, в текущем батче: 13685
2 247 [92.06821873]

Загрузка данных frmt_id=2, region_id=275 батчами...
Загружено всего строк: 100000, в текущем батче: 100000
Загружено всего строк: 200000, в текущем батче: 100000
Загружено всего строк: 300000, в текущем батче: 100

2025-09-15 16:27:50,000 - ERROR - Операционная ошибка (попытка 1): SSL SYSCALL error: EOF detected

2025-09-15 16:27:50,001 - INFO - Повторная попытка через 30 секунд...
2025-09-15 16:28:20,742 - INFO - Успешно подключились к GreenPlum.
2025-09-15 16:28:21,490 - INFO - Успешно вставлено 32 строк в таблицу ba.vt_pva_kolbasa_1_stat_temp за 0.358 сек.
2025-09-15 16:28:22,073 - ERROR - Операционная ошибка SELECT (попытка 1): (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[SQL: SELECT count(1) FROM BA.T_ZIG_STAT_TEST_PSM where ACTN_NAME = 'ЛК_Колбасы1_июль25_тест';]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
2025-09-15 16:28:22,076 - INFO - Повторная попытка через 30 секунд...
2025-09-15 16:28:53,236 - ERROR - Операционная ошибка (попытка 1): SSL SYSCALL error: EOF detected

2025-09-15 16:28:53,237 - INFO - Повторная попытка через 30 секунд...
2025-09-15 16:29:23,986 - INFO - Успешно подключились к GreenPlum.
2025-09-15 16:29:43,469 - INFO - Успешно вставлено

  0%|          | 0/4 [00:00<?, ?it/s]

Создаем пустую таблицу для параметров расчета маржи.
Создаем таблицу с налогами на товары.


  0%|          | 0/4 [00:00<?, ?it/s]

Собираем строки из таблицы с чеками за период c 2025-05-06 по 2025-05-31.
Cобираем строки из AUM за период c 2025-05-06 по 2025-05-31.
Предобработка строк из AUM за период c 2025-05-06 по 2025-05-31.
Заполняем таблицу параметров для расчета маржи за период c 2025-05-06 по 2025-05-31.
***********************************
Собираем строки из таблицы с чеками за период c 2025-06-01 по 2025-06-30.
Cобираем строки из AUM за период c 2025-06-01 по 2025-06-30.
Предобработка строк из AUM за период c 2025-06-01 по 2025-06-30.
Заполняем таблицу параметров для расчета маржи за период c 2025-06-01 по 2025-06-30.
***********************************
Собираем строки из таблицы с чеками за период c 2025-07-01 по 2025-07-31.
Cобираем строки из AUM за период c 2025-07-01 по 2025-07-31.
Предобработка строк из AUM за период c 2025-07-01 по 2025-07-31.
Заполняем таблицу параметров для расчета маржи за период c 2025-07-01 по 2025-07-31.
***********************************
Собираем строки из таблицы с чеками з

2025-09-15 17:02:00,310 - INFO - in_code_ruls: FC_jul25_S_5, rule_code_filter: rule_code IN ('FC_jul25_S_5')


  0%|          | 0/4 [00:00<?, ?it/s]

2025-09-15 17:09:26,221 - ERROR - Операционная ошибка SELECT (попытка 1): (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[SQL: select GRP from ba.vt_pva_kolbasa_1_dataset_cus group by GRP order by GRP;]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
2025-09-15 17:09:26,224 - INFO - Повторная попытка через 30 секунд...
2025-09-15 17:09:57,694 - ERROR - Операционная ошибка (попытка 1): SSL SYSCALL error: EOF detected

2025-09-15 17:09:57,696 - INFO - Повторная попытка через 30 секунд...
2025-09-15 17:10:28,166 - INFO - Успешно подключились к GreenPlum.


  0%|          | 0/3 [00:00<?, ?it/s]

2025-09-15 17:12:37,522 - INFO - Функция margin завершена успешно.
2025-09-15 17:12:37,525 - INFO - Шаг 21 (margin) выполнен успешно.
2025-09-15 17:12:37,527 - INFO - Начало выполнения функции oborot_rto с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_1', '2025-07-01', '2025-07-31', 'ЛК_Колбасы1_июль25_тест'), kwargs={}
2025-09-15 17:12:37,530 - ERROR - Операционная ошибка SELECT (попытка 1): (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[SQL: 
	SELECT
		code
	FROM
		dm.WHS w
	JOIN
		ba.vt_pva_kolbasa_1_whs ww 
		on ww.orgunit_id = w.orgunit_id
	;
	]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
2025-09-15 17:12:37,531 - INFO - Повторная попытка через 30 секунд...
  df = pd.read_sql_query(script, connect_td)
2025-09-15 17:13:50,147 - ERROR - Операционная ошибка (попытка 1): SSL SYSCALL error: EOF detected

2025-09-15 17:13:50,149 - INFO - Повторная попытка через 30 секунд...
2025-09-15 17:14:20,681 - I

Кол-во таблиц до фильтрации: 60
Кол-во таблиц после фильтрации: 56


2025-09-15 17:40:31,613 - INFO - Успешно подключились к GreenPlum.


Таблица ba.tmp_pva_kolbasa_1_kolbasa_articles удалена
Таблица ba.vt_pva_kolbasa_1_cus_ruls удалена
Таблица ba.tmp_pva_kolbasa_1_kolbasa_buyers_act удалена
Таблица ba.tmp_pva_kolbasa_1_kolbasa_offer_accept_act удалена
Таблица ba.tmp_pva_kolbasa_1_kolbasa_offer_accept_prev удалена
Таблица ba.vt_pva_kolbasa_1_cus_ctrl удалена
Таблица ba.pva_kolbasa_1_ca_and_kg удалена
Таблица ba.vt_pva_kolbasa_1_promo_week удалена
Таблица ba.vt_pva_kolbasa_1_actn_duble удалена
Таблица ba.vt_pva_kolbasa_1_frod удалена
Таблица ba.vt_pva_kolbasa_1_trn_0 удалена
Таблица ba.vt_pva_kolbasa_1_trn_1 удалена
Таблица ba.vt_pva_kolbasa_1_ca_clear удалена
Таблица ba.vt_pva_kolbasa_1_cus_clear удалена
Таблица ba.vt_pva_kolbasa_1_trn удалена
Таблица ba.vt_pva_kolbasa_1_cus_type удалена
Таблица ba.vt_pva_kolbasa_1_cus_reg удалена
Таблица ba.vt_pva_kolbasa_1_ca_frmt удалена
Таблица ba.vt_pva_kolbasa_1_kg_frmt удалена
Таблица ba.vt_pva_kolbasa_1_ca_frmt_grp удалена
Таблица ba.vt_pva_kolbasa_1_kg_lfl_frmt удалена
Таблица b

2025-09-15 17:40:52,744 - INFO - Функция delete_tables завершена успешно.
2025-09-15 17:40:52,746 - INFO - Шаг 23 (delete_tables) выполнен успешно.
2025-09-15 17:40:52,747 - INFO - [process_action][mask = pva_kolbasa_1] Обработка акции 'ЛК_Колбасы1_июль25_тест' завершена успешно.


Таблица ba.vt_pva_kolbasa_1_kolbasa_1_total_metrics_prev удалена


2025-09-15 17:40:53,048 - INFO - [run_all_actions][type=kolbasa, version=1] Отмечена как обработанная.
2025-09-15 17:40:53,050 - ERROR - Операционная ошибка SELECT (попытка 1): (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[SQL: 
                SELECT is_processed
                FROM BA.helper_category
                WHERE type = 'kolbasa' AND version = 2
                LIMIT 1
            ]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
2025-09-15 17:40:53,052 - INFO - Повторная попытка через 30 секунд...
2025-09-15 17:41:24,103 - INFO - [process_action][type = kolbasa] Начинаем обработку акции 'ЛК_Колбасы2_июль25_тест'.
2025-09-15 17:41:24,104 - INFO - Начало выполнения функции create_whs с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_2'), kwargs={}
2025-09-15 17:41:24,105 - ERROR - Операционная ошибка (попытка 1): SSL SYSCALL error: EOF detected

2025-09-15 17:41:24,106 - INFO - Повторная попытк

  0%|          | 0/2 [00:00<?, ?it/s]

2025-09-15 18:11:41,572 - INFO - Функция create_trn_0 завершена успешно.
2025-09-15 18:11:41,574 - INFO - Шаг 7 (create_trn_0) выполнен успешно.
2025-09-15 18:11:41,576 - INFO - Начало выполнения функции create_trn_0_v2 с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_2', 2820, 31), kwargs={}
2025-09-15 18:11:41,579 - ERROR - Операционная ошибка SELECT (попытка 1): (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[SQL: 
            select
                    month_id
                    ,min(day_id) as min_dt
                    ,max(day_id) as max_dt
            from
                    ba.vt_pva_kolbasa_2_days
                where
                    actn_id = 2820 
                    and actn_period = 2
                group by 1 
                order by 1
            ;]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
2025-09-15 18:11:41,580 - INFO - Повторная попытка через 30 секунд...


  0%|          | 0/1 [00:00<?, ?it/s]

2025-09-15 18:12:12,408 - ERROR - Операционная ошибка (попытка 1): SSL SYSCALL error: EOF detected

2025-09-15 18:12:12,409 - INFO - Повторная попытка через 30 секунд...
2025-09-15 18:12:42,839 - INFO - Успешно подключились к GreenPlum.
2025-09-15 18:13:42,792 - INFO - Функция create_trn_0_v2 завершена успешно.
2025-09-15 18:13:42,794 - INFO - Шаг 8 (create_trn_0_v2) выполнен успешно.
2025-09-15 18:13:42,796 - INFO - Начало выполнения функции create_agg с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_2', '2025-07-01'), kwargs={}
2025-09-15 18:13:46,464 - ERROR - Операционная ошибка SELECT (попытка 1): (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[SQL: 
        SELECT is_kg_needed
        FROM BA.helper_category
        WHERE mask = 'pva_kolbasa_2'
    ]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
2025-09-15 18:13:46,467 - INFO - Повторная попытка через 30 секунд...
2025-09-15 18:14:17,208 - INFO - [

  0%|          | 0/2 [00:00<?, ?it/s]

2025-09-15 18:24:19,190 - INFO - Функция dynamic_gr20 завершена успешно.
2025-09-15 18:24:19,193 - INFO - Шаг 16 (dynamic_gr20) выполнен успешно.
2025-09-15 18:24:19,194 - INFO - Начало выполнения функции gr20_transp с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_2'), kwargs={}
2025-09-15 18:24:26,748 - INFO - Функция gr20_transp завершена успешно.
2025-09-15 18:24:26,752 - INFO - Шаг 17 (gr20_transp) выполнен успешно.
2025-09-15 18:24:26,754 - INFO - Начало выполнения функции age_and_active_virt с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_2'), kwargs={}
2025-09-15 18:24:34,341 - INFO - Функция age_and_active_virt завершена успешно.
2025-09-15 18:24:34,342 - INFO - Шаг 18 (age_and_active_virt) выполнен успешно.
2025-09-15 18:24:34,344 - INFO - Начало выполнения функции itog с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolba

Доля мэтчинга ЛФЛ ЦА/КГ:

Загрузка данных frmt_id=104, region_id=180 батчами...
Загружено всего строк: 100000, в текущем батче: 16407
104 180 [96.24853458]

Загрузка данных frmt_id=104, region_id=247 батчами...
Загружено всего строк: 100000, в текущем батче: 12050
104 247 [96.4360587]

Загрузка данных frmt_id=104, region_id=275 батчами...
Загружено всего строк: 100000, в текущем батче: 13817
104 275 [96.75516224]

Загрузка данных frmt_id=104, region_id=227 батчами...
Загружено всего строк: 100000, в текущем батче: 17511
104 227 [97.56097561]

Загрузка данных frmt_id=104, region_id=197 батчами...
Загружено всего строк: 100000, в текущем батче: 8817
104 197 [93.47408829]

Загрузка данных frmt_id=104, region_id=280 батчами...
Загружено всего строк: 100000, в текущем батче: 25644
104 280 [98.11557789]

Загрузка данных frmt_id=104, region_id=284 батчами...
Загружено всего строк: 100000, в текущем батче: 17577
104 284 [97.8021978]

Загрузка данных frmt_id=104, region_id=173 батчами...
Загруж

2025-09-15 19:04:58,014 - ERROR - Операционная ошибка (попытка 1): SSL SYSCALL error: EOF detected

2025-09-15 19:04:58,015 - INFO - Повторная попытка через 30 секунд...


1 173 [81.0288216]
Процесс завершен.


2025-09-15 19:05:28,438 - INFO - Успешно подключились к GreenPlum.
2025-09-15 19:05:29,718 - INFO - Успешно вставлено 32 строк в таблицу ba.vt_pva_kolbasa_2_stat_temp за 0.542 сек.
2025-09-15 19:05:30,892 - ERROR - Операционная ошибка SELECT (попытка 1): (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[SQL: SELECT count(1) FROM BA.T_ZIG_STAT_TEST_PSM where ACTN_NAME = 'ЛК_Колбасы2_июль25_тест';]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
2025-09-15 19:05:30,893 - INFO - Повторная попытка через 30 секунд...
2025-09-15 19:06:01,884 - ERROR - Операционная ошибка (попытка 1): SSL SYSCALL error: EOF detected

2025-09-15 19:06:01,885 - INFO - Повторная попытка через 30 секунд...
2025-09-15 19:06:32,305 - INFO - Успешно подключились к GreenPlum.
2025-09-15 19:06:45,567 - INFO - Успешно вставлено 956782 строк в таблицу ba.vt_pva_kolbasa_2_cus_temp за 11.531 сек.
2025-09-15 19:06:56,794 - INFO - Функция psm завершена успешно.
2025-09-15 19:06:56,797 - INFO - Шаг 2

Кол-во таблиц до фильтрации: 46
Кол-во таблиц после фильтрации: 42


2025-09-15 19:25:44,354 - INFO - Успешно подключились к GreenPlum.


Таблица ba.tmp_pva_kolbasa_2_kolbasa_articles удалена
Таблица ba.tmp_pva_kolbasa_2_kolbasa_buyers_act_prev удалена
Таблица ba.vt_pva_kolbasa_2_cus_ruls удалена
Таблица ba.tmp_pva_kolbasa_2_kolbasa_buyers_act удалена
Таблица ba.tmp_pva_kolbasa_2_kolbasa_offer_accept_act удалена
Таблица ba.tmp_pva_kolbasa_2_kolbasa_offer_accept_prev удалена
Таблица ba.vt_pva_kolbasa_2_cus_ctrl удалена
Таблица ba.pva_kolbasa_2_ca_and_kg удалена
Таблица ba.vt_pva_kolbasa_2_promo_week удалена
Таблица ba.vt_pva_kolbasa_2_actn_duble удалена
Таблица ba.vt_pva_kolbasa_2_frod удалена
Таблица ba.vt_pva_kolbasa_2_trn_0 удалена
Таблица ba.vt_pva_kolbasa_2_trn_1 удалена
Таблица ba.vt_pva_kolbasa_2_ca_clear удалена
Таблица ba.vt_pva_kolbasa_2_cus_clear удалена
Таблица ba.vt_pva_kolbasa_2_trn удалена
Таблица ba.vt_pva_kolbasa_2_cus_type удалена
Таблица ba.vt_pva_kolbasa_2_cus_reg удалена
Таблица ba.vt_pva_kolbasa_2_ca_frmt удалена
Таблица ba.vt_pva_kolbasa_2_kg_frmt удалена
Таблица ba.vt_pva_kolbasa_2_ca_frmt_grp удал

2025-09-15 19:25:52,120 - INFO - Функция delete_tables завершена успешно.
2025-09-15 19:25:52,123 - INFO - Шаг 21 (delete_tables) выполнен успешно.
2025-09-15 19:25:52,125 - INFO - [process_action][mask = pva_kolbasa_2] Обработка акции 'ЛК_Колбасы2_июль25_тест' завершена успешно.


Таблица ba.vt_pva_kolbasa_2_kolbasa_2_total_metrics_prev удалена


2025-09-15 19:25:52,359 - INFO - [run_all_actions][type=kolbasa, version=2] Отмечена как обработанная.
2025-09-15 19:25:52,362 - ERROR - Операционная ошибка SELECT (попытка 1): (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[SQL: 
                SELECT is_processed
                FROM BA.helper_category
                WHERE type = 'kolbasa' AND version = 3
                LIMIT 1
            ]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
2025-09-15 19:25:52,365 - INFO - Повторная попытка через 30 секунд...
2025-09-15 19:26:23,319 - INFO - [process_action][type = kolbasa] Начинаем обработку акции 'ЛК_Колбасы3_июль25_тест'.
2025-09-15 19:26:23,322 - INFO - Начало выполнения функции create_whs с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_3'), kwargs={}
2025-09-15 19:26:23,324 - ERROR - Операционная ошибка (попытка 1): SSL SYSCALL error: EOF detected

2025-09-15 19:26:23,326 - INFO - Повторная попытк

  0%|          | 0/2 [00:00<?, ?it/s]

2025-09-15 19:48:31,181 - INFO - Функция create_trn_0 завершена успешно.
2025-09-15 19:48:31,183 - INFO - Шаг 7 (create_trn_0) выполнен успешно.
2025-09-15 19:48:31,184 - INFO - Начало выполнения функции create_trn_0_v2 с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_3', 2834, 31), kwargs={}
2025-09-15 19:48:31,187 - ERROR - Операционная ошибка SELECT (попытка 1): (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[SQL: 
            select
                    month_id
                    ,min(day_id) as min_dt
                    ,max(day_id) as max_dt
            from
                    ba.vt_pva_kolbasa_3_days
                where
                    actn_id = 2834 
                    and actn_period = 2
                group by 1 
                order by 1
            ;]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
2025-09-15 19:48:31,188 - INFO - Повторная попытка через 30 секунд...


  0%|          | 0/1 [00:00<?, ?it/s]

2025-09-15 19:49:02,212 - ERROR - Операционная ошибка (попытка 1): SSL SYSCALL error: EOF detected

2025-09-15 19:49:02,213 - INFO - Повторная попытка через 30 секунд...
2025-09-15 19:49:32,655 - INFO - Успешно подключились к GreenPlum.
2025-09-15 19:50:36,866 - INFO - Функция create_trn_0_v2 завершена успешно.
2025-09-15 19:50:36,868 - INFO - Шаг 8 (create_trn_0_v2) выполнен успешно.
2025-09-15 19:50:36,869 - INFO - Начало выполнения функции create_agg с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_3', '2025-07-01'), kwargs={}
2025-09-15 19:50:40,047 - ERROR - Операционная ошибка SELECT (попытка 1): (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[SQL: 
        SELECT is_kg_needed
        FROM BA.helper_category
        WHERE mask = 'pva_kolbasa_3'
    ]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
2025-09-15 19:50:40,048 - INFO - Повторная попытка через 30 секунд...
2025-09-15 19:51:11,067 - INFO - [

  0%|          | 0/2 [00:00<?, ?it/s]

2025-09-15 19:59:47,845 - INFO - Функция dynamic_gr20 завершена успешно.
2025-09-15 19:59:47,846 - INFO - Шаг 16 (dynamic_gr20) выполнен успешно.
2025-09-15 19:59:47,847 - INFO - Начало выполнения функции gr20_transp с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_3'), kwargs={}
2025-09-15 19:59:53,756 - INFO - Функция gr20_transp завершена успешно.
2025-09-15 19:59:53,757 - INFO - Шаг 17 (gr20_transp) выполнен успешно.
2025-09-15 19:59:53,758 - INFO - Начало выполнения функции age_and_active_virt с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolbasa_3'), kwargs={}
2025-09-15 20:00:00,678 - INFO - Функция age_and_active_virt завершена успешно.
2025-09-15 20:00:00,681 - INFO - Шаг 18 (age_and_active_virt) выполнен успешно.
2025-09-15 20:00:00,683 - INFO - Начало выполнения функции itog с args=(<Connector_package.db_connector.GreenPlumConnector object at 0x00000230FDDB34D0>, 'pva_kolba

Доля мэтчинга ЛФЛ ЦА/КГ:

Загрузка данных frmt_id=104, region_id=180 батчами...
Загружено всего строк: 100000, в текущем батче: 15698
104 180 [99.53198128]

Загрузка данных frmt_id=104, region_id=247 батчами...
Загружено всего строк: 100000, в текущем батче: 12223
104 247 [98.69281046]

Загрузка данных frmt_id=104, region_id=275 батчами...
Загружено всего строк: 100000, в текущем батче: 13341
104 275 [99.12434326]

Загрузка данных frmt_id=104, region_id=227 батчами...
Загружено всего строк: 100000, в текущем батче: 17160
104 227 [99.2920354]

Загрузка данных frmt_id=104, region_id=197 батчами...
Загружено всего строк: 100000, в текущем батче: 8476
104 197 [98.4496124]

Загрузка данных frmt_id=104, region_id=280 батчами...
Загружено всего строк: 100000, в текущем батче: 24601
104 280 [99.34036939]

Загрузка данных frmt_id=104, region_id=284 батчами...
Загружено всего строк: 100000, в текущем батче: 17393
104 284 [99.14236707]

Загрузка данных frmt_id=104, region_id=173 батчами...
Загруж

2025-09-15 20:38:00,694 - ERROR - Операционная ошибка (попытка 1): SSL SYSCALL error: EOF detected

2025-09-15 20:38:00,694 - INFO - Повторная попытка через 30 секунд...


1 173 [80.11663853]
Процесс завершен.


2025-09-15 20:38:31,135 - INFO - Успешно подключились к GreenPlum.
2025-09-15 20:38:31,638 - INFO - Успешно вставлено 32 строк в таблицу ba.vt_pva_kolbasa_3_stat_temp за 0.242 сек.
2025-09-15 20:38:32,009 - ERROR - Операционная ошибка SELECT (попытка 1): (psycopg2.OperationalError) SSL SYSCALL error: EOF detected

[SQL: SELECT count(1) FROM BA.T_ZIG_STAT_TEST_PSM where ACTN_NAME = 'ЛК_Колбасы3_июль25_тест';]
(Background on this error at: https://sqlalche.me/e/20/e3q8)
2025-09-15 20:38:32,010 - INFO - Повторная попытка через 30 секунд...
2025-09-15 20:39:02,725 - ERROR - Операционная ошибка (попытка 1): SSL SYSCALL error: EOF detected

2025-09-15 20:39:02,726 - INFO - Повторная попытка через 30 секунд...
2025-09-15 20:39:33,380 - INFO - Успешно подключились к GreenPlum.
2025-09-15 20:39:40,561 - INFO - Успешно вставлено 1099924 строк в таблицу ba.vt_pva_kolbasa_3_cus_temp за 5.266 сек.
2025-09-15 20:39:47,394 - INFO - Функция psm завершена успешно.
2025-09-15 20:39:47,398 - INFO - Шаг 2

Кол-во таблиц до фильтрации: 45
Кол-во таблиц после фильтрации: 41


2025-09-15 20:58:22,405 - INFO - Успешно подключились к GreenPlum.


Таблица ba.tmp_pva_kolbasa_3_kolbasa_articles удалена
Таблица ba.vt_pva_kolbasa_3_cus_ruls удалена
Таблица ba.tmp_pva_kolbasa_3_kolbasa_buyers_act удалена
Таблица ba.tmp_pva_kolbasa_3_kolbasa_offer_accept_act удалена
Таблица ba.tmp_pva_kolbasa_3_kolbasa_offer_accept_prev удалена
Таблица ba.vt_pva_kolbasa_3_cus_ctrl удалена
Таблица ba.pva_kolbasa_3_ca_and_kg удалена
Таблица ba.vt_pva_kolbasa_3_promo_week удалена
Таблица ba.vt_pva_kolbasa_3_actn_duble удалена
Таблица ba.vt_pva_kolbasa_3_frod удалена
Таблица ba.vt_pva_kolbasa_3_trn_0 удалена
Таблица ba.vt_pva_kolbasa_3_trn_1 удалена
Таблица ba.vt_pva_kolbasa_3_ca_clear удалена
Таблица ba.vt_pva_kolbasa_3_cus_clear удалена
Таблица ba.vt_pva_kolbasa_3_trn удалена
Таблица ba.vt_pva_kolbasa_3_cus_type удалена
Таблица ba.vt_pva_kolbasa_3_cus_reg удалена
Таблица ba.vt_pva_kolbasa_3_ca_frmt удалена
Таблица ba.vt_pva_kolbasa_3_kg_frmt удалена
Таблица ba.vt_pva_kolbasa_3_ca_frmt_grp удалена
Таблица ba.vt_pva_kolbasa_3_kg_lfl_frmt удалена
Таблица b

2025-09-15 20:58:28,900 - INFO - Функция delete_tables завершена успешно.
2025-09-15 20:58:28,901 - INFO - Шаг 21 (delete_tables) выполнен успешно.
2025-09-15 20:58:28,902 - INFO - [process_action][mask = pva_kolbasa_3] Обработка акции 'ЛК_Колбасы3_июль25_тест' завершена успешно.
2025-09-15 20:58:29,064 - INFO - [run_all_actions][type=kolbasa, version=3] Отмечена как обработанная.
2025-09-15 20:58:29,066 - INFO - [run_all_actions] Обработка всех акций завершена.
2025-09-15 20:58:29,070 - INFO - Закрываем соединения/ресурсы.
2025-09-15 20:58:29,073 - INFO - Соединение с базой данных закрыто.


Таблица ba.vt_pva_kolbasa_3_kolbasa_3_total_metrics_prev удалена


2025-09-15 20:58:29,981 - INFO - === Автоматическая обработка акций завершена ===
