In [1]:
# Импорт необходимых библиотек

import pandas as pd
import numpy as np
import gc
import sys
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import GridSearchCV  # Для закомментированного GridSearchCV
from sklearn.preprocessing import (
    StandardScaler,
    OneHotEncoder,
    OrdinalEncoder,
    KBinsDiscretizer,
)
from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
import pickle
from sklearn.preprocessing import StandardScaler
from lightgbm import LGBMClassifier
from sklearn.metrics import roc_auc_score
from warnings import simplefilter

simplefilter(action="ignore", category=FutureWarning)
simplefilter(action="ignore", category=UserWarning)

In [2]:
# 1. Определение вспомогательных функций


def process_notset_none(df: pd.DataFrame):
    """
    Обнаруживает строки 'not set' и 'none' и заменяет соответствующие им значения на np.nan во всех столбцах DataFrame.
    Модифицирует DataFrame напрямую.
    """
    if not isinstance(df, pd.DataFrame):
        print("Ошибка: Входной аргумент должен быть pandas DataFrame.")
        return

    strings_to_find_and_replace = ["not set", "none"]

    print("Начало обработки 'not set' и 'none'", file=sys.stderr)
    print(
        "--------------------------------------------------------------",
        file=sys.stderr,
    )

    for col in df.columns:
        col_series_str = df[col].astype(str)
        mask_not_set = col_series_str.str.contains(
            strings_to_find_and_replace[0], case=False, regex=False
        )
        mask_none = col_series_str.str.contains(
            strings_to_find_and_replace[1], case=False, regex=False
        )
        combined_mask = mask_not_set | mask_none
        total_found = combined_mask.sum()

        if total_found > 0:
            # print(f"\nСтолбец '{col}': обнаружено {total_found} вхождений", file=sys.stderr)
            n_nan_before = df[col].isnull().sum()
            # print(f"  -> Количество NaN до замены: {n_nan_before}", file=sys.stderr)
            df.loc[combined_mask, col] = np.nan
            # final_count_nan = df[col].isnull().sum()
            # print(f"  -> После замены, количество NaN в столбце: {final_count_nan}", file=sys.stderr)

    print("\nОбработка 'not set' и 'none' завершена.", file=sys.stderr)
    print(
        "-------------------------------------------------------------", file=sys.stderr
    )

In [3]:
def encode_column_with_nulls(df: pd.DataFrame, column_name: str) -> pd.DataFrame:
    """
    Кодирует значения в указанном столбце числовыми метками (начиная с 1),
    заменяя пропуски (NaN) на 0. Добавляет новый столбец {column_name}_encoded.
    Возвращает новый DataFrame.
    """
    df = df.copy()  # Работаем с копией, как в оригинале
    if column_name not in df.columns:
        print(
            f"Ошибка: Столбец '{column_name}' не найден в DataFrame.", file=sys.stderr
        )
        return df

    # .astype(str) для избежания проблем с типами в factorize
    encoded, uniques = pd.factorize(df[column_name].astype(str))
    # Заменяем -1 (метка для NaN в factorize) на 0, остальные метки увеличиваем на 1
    df[f"{column_name}_encoded"] = np.where(encoded == -1, 0, encoded + 1)
    df[f"{column_name}_encoded"] = df[f"{column_name}_encoded"].astype("int64")

    # print(f"Кодирование столбца '{column_name}':", file=sys.stderr)
    # print(f"Уникальных значений (вкл. NaN): {len(uniques) + (1 if -1 in encoded else 0)}", file=sys.stderr)
    # print(f"Максимальный код: {df[f'{column_name}_encoded'].max()}", file=sys.stderr)

    return df

In [4]:
def get_mode(x):
    """Вспомогательная функция для получения моды, обрабатывающая пустые Series"""
    mode_val = x.mode()
    if not mode_val.empty:
        return mode_val[0]
    return np.nan

In [5]:
def fill_missing_by_groups(df: pd.DataFrame) -> pd.DataFrame:
    """
    Заполняет пропущенные значения в DataFrame по группам и общей модой.
    Работает in-place для экономии памяти.
    """
    print(
        "Начало оптимизированного заполнения пропусков по группам и модой (in-place).",
        file=sys.stderr,
    )
    print(
        "--------------------------------------------------------------",
        file=sys.stderr,
    )

    # Связка 1: utm_medium + device_category -> utm_source
    if "utm_source" in df.columns and df["utm_source"].isna().any():
        print(
            "Заполнение пропусков в 'utm_source' по группам ('utm_medium', 'device_category').",
            file=sys.stderr,
        )
        if "utm_medium" in df.columns and "device_category" in df.columns:
            group_modes_source = df.groupby(["utm_medium", "device_category"])[
                "utm_source"
            ].transform(get_mode)
            df["utm_source"].fillna(group_modes_source, inplace=True)
        print(
            f"Пропусков в 'utm_source' после заполнения по группе: {df['utm_source'].isna().sum()}",
            file=sys.stderr,
        )

    # Связка 2: utm_medium + device_os -> utm_campaign
    if "utm_campaign" in df.columns and df["utm_campaign"].isna().any():
        print(
            "Заполнение пропусков в 'utm_campaign' по группам ('utm_medium', 'device_os').",
            file=sys.stderr,
        )
        if "utm_medium" in df.columns and "device_os" in df.columns:
            group_modes_campaign = df.groupby(["utm_medium", "device_os"])[
                "utm_campaign"
            ].transform(get_mode)
            df["utm_campaign"].fillna(group_modes_campaign, inplace=True)
        print(
            f"Пропусков в 'utm_campaign' после заполнения по группе: {df['utm_campaign'].isna().sum()}",
            file=sys.stderr,
        )

    # Заполнение оставшихся пропусков в более широких группах
    group_columns_broad = ["utm_medium", "device_category", "device_os"]
    # Проверяем наличие всех столбцов для группировки
    if all(col in df.columns for col in group_columns_broad):
        print(
            f"Заполнение оставшихся пропусков по более широким группам {group_columns_broad} (для столбцов с пропусками).",
            file=sys.stderr,
        )
        cols_with_nan_after_specific = df.columns[df.isna().any()].tolist()

        for col in cols_with_nan_after_specific:
            # Пропускаем, если уже нет пропусков после специфичного заполнения
            if col in ["utm_source", "utm_campaign"] and not df[col].isna().any():
                continue

            # print(f"  Обработка столбца '{col}'...", file=sys.stderr)
            # Вычисляем моду для текущего столбца в широких группах
            # Игнорируем пропуски в group_columns_broad при группировке по умолчанию
            group_modes_broad_col = df.groupby(group_columns_broad)[col].transform(
                get_mode
            )

            # Заполняем пропуски
            df[col].fillna(group_modes_broad_col, inplace=True)
            # print(f"    Пропусков в '{col}' после заполнения по широкой группе: {df[col].isna().sum()}", file=sys.stderr)
    else:
        print(
            f"Не все столбцы для широкой группировки ({group_columns_broad}) найдены. Пропуск шага.",
            file=sys.stderr,
        )

    # Финальное заполнение модой для любых оставшихся пропусков
    print(
        "Финальное заполнение любых оставшихся пропусков общей модой по столбцам.",
        file=sys.stderr,
    )
    cols_with_nan_final = df.columns[df.isna().any()].tolist()
    for col in cols_with_nan_final:
        mode_val = df[col].mode()
        if not mode_val.empty:
            # print(f"  Финальное заполнение '{col}' общей модой ({mode_val[0]}).", file=sys.stderr)
            df[col].fillna(mode_val[0], inplace=True)
        else:
            print(
                f"  Нет не-NaN значений для вычисления общей моды в '{col}', пропуски остаются.",
                file=sys.stderr,
            )

    print(
        "\nОптимизированное заполнение пропусков завершено (in-place).", file=sys.stderr
    )
    print(
        "-------------------------------------------------------------", file=sys.stderr
    )

    return df  # Возвращаем для удобства

In [6]:
def fill_hits_and_events(df: pd.DataFrame) -> pd.DataFrame:
    """
    Заполняет пропуски в столбцах hit_referer и event_label
    оптимизированным способом, используя группировку и константы.
    Работает НАПРЯМУЮ с входным DataFrame (in-place).
    """
    print(
        "Начало оптимизированного заполнения пропусков в hit_referer и event_label (in-place).",
        file=sys.stderr,
    )
    print(
        "--------------------------------------------------------------",
        file=sys.stderr,
    )

    # Заполнение hit_referer первым не-NaN значением в группе по hit_page_path
    if (
        "hit_referer" in df.columns
        and "hit_page_path" in df.columns
        and df["hit_referer"].isna().any()
    ):
        print(
            "Заполнение пропусков в 'hit_referer' первым значением в группе по 'hit_page_path'.",
            file=sys.stderr,
        )
        referer_by_page = df.groupby("hit_page_path")["hit_referer"].transform("first")
        df["hit_referer"].fillna(referer_by_page, inplace=True)
        print(
            f"Пропусков в 'hit_referer' после заполнения по группе: {df['hit_referer'].isna().sum()}",
            file=sys.stderr,
        )
    elif "hit_referer" in df.columns:
        print(
            "'hit_referer' не требует заполнения по группе или 'hit_page_path' отсутствует.",
            file=sys.stderr,
        )

    # Заполнение event_label первым не-NaN значением в группе по event_action
    if (
        "event_label" in df.columns
        and "event_action" in df.columns
        and df["event_label"].isna().any()
    ):
        print(
            "Заполнение пропусков в 'event_label' первым значением в группе по 'event_action'.",
            file=sys.stderr,
        )
        label_by_action = df.groupby("event_action")["event_label"].transform("first")
        df["event_label"].fillna(label_by_action, inplace=True)
        print(
            f"Пропусков в 'event_label' после заполнения по группе: {df['event_label'].isna().sum()}",
            file=sys.stderr,
        )
    elif "event_label" in df.columns:
        print(
            "'event_label' не требует заполнения по группе или 'event_action' отсутствует.",
            file=sys.stderr,
        )

    # --- Финальное заполнение оставшихся пропусков ---
    if "hit_referer" in df.columns and df["hit_referer"].isna().any():
        print(
            "Финальное заполнение оставшихся пропусков в 'hit_referer' значением 'direct'.",
            file=sys.stderr,
        )
        df["hit_referer"].fillna("direct", inplace=True)
        print(
            f"Пропусков в 'hit_referer' после финального заполнения: {df['hit_referer'].isna().sum()}",
            file=sys.stderr,
        )

    if "event_label" in df.columns and df["event_label"].isna().any():
        print(
            "Финальное заполнение оставшихся пропусков в 'event_label' значением 'none'.",
            file=sys.stderr,
        )
        df["event_label"].fillna("none", inplace=True)
        print(
            f"Пропусков в 'event_label' после финального заполнения: {df['event_label'].isna().sum()}",
            file=sys.stderr,
        )

    print(
        "\nОптимизированное заполнение пропусков в hit_referer и event_label завершено (in-place).",
        file=sys.stderr,
    )
    print(
        "-------------------------------------------------------------", file=sys.stderr
    )

    return df  # Возвращаем для удобства

In [7]:
def aggregate_session_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Агрегирует данные хитов до уровня сессий, извлекая ключевые метрики.
    """
    print("Начало агрегации данных хитов по сессиям.", file=sys.stderr)
    print(
        "--------------------------------------------------------------",
        file=sys.stderr,
    )

    agg_rules = {
        "hit_number": ["min", "max", "count"],
        # Проверяем существование колонок перед добавлением правила
        **(
            {"hit_page_path": lambda x: x.iloc[0] if not x.empty else np.nan}
            if "hit_page_path" in df.columns
            else {}
        ),
        **(
            {
                "event_category_grouped": lambda x: (
                    x.value_counts().index[0] if not x.empty else np.nan
                )
            }
            if "event_category_grouped" in df.columns
            else {}
        ),
        **(
            {
                "event_action_grouped": lambda x: (
                    x.value_counts().index[0] if not x.empty else np.nan
                )
            }
            if "event_action_grouped" in df.columns
            else {}
        ),
        **({"hit_time_2": "sum"} if "hit_time_2" in df.columns else {}),
        **(
            {
                "hit_referer_encoded": lambda x: (
                    x.value_counts().index[0] if not x.empty else np.nan
                )
            }
            if "hit_referer_encoded" in df.columns
            else {}
        ),
        **(
            {
                "event_label_encoded": lambda x: (
                    x.value_counts().index[0] if not x.empty else np.nan
                )
            }
            if "event_label_encoded" in df.columns
            else {}
        ),
    }

    # Добавляем все UTM-метки, если они есть в данных и не были удалены ранее
    utm_columns_in_hits = [
        col for col in df.columns if col.startswith("utm_") and col not in agg_rules
    ]
    for col in utm_columns_in_hits:
        agg_rules[col] = lambda x: (
            x.iloc[0] if not x.empty else np.nan
        )  # берем первое значение

    # Убедимся, что ключи в agg_rules соответствуют столбцам в df
    # (кроме session_id, по которому идет группировка)
    # Этот check уже встроен при использовании **{} выражений выше

    # Проверяем, есть ли 'session_id' для группировки
    if "session_id" not in df.columns:
        print(
            "Ошибка: Столбец 'session_id' отсутствует в DataFrame для агрегации.",
            file=sys.stderr,
        )
        return pd.DataFrame()  # Возвращаем пустой DF при ошибке

    try:
        aggregated = df.groupby("session_id").agg(agg_rules)
        print("Агрегация выполнена.", file=sys.stderr)
    except Exception as e:
        print(f"Ошибка при выполнении агрегации: {e}", file=sys.stderr)
        return pd.DataFrame()

    # Строим явный словарь переименования из MultiIndex/SingleIndex имен в финальные имена

    rename_map = {}
    for original_col_tuple in aggregated.columns.values:
        # Определяем имя столбца после агрегации (MultiIndex tuple или SingleIndex string)
        if isinstance(original_col_tuple, tuple):
            # Для MultiIndex: ('original_name', 'agg_func') или ('original_name', '<lambda>')
            original_name = original_col_tuple[0]
            agg_name = original_col_tuple[1]
            col_key = original_col_tuple  # Ключ в aggregated.columns
        else:
            # Для SingleIndex (редко после agg, но может быть после reset_index)
            original_name = original_col_tuple
            agg_name = None
            col_key = original_col_tuple  # Ключ в aggregated.columns

        # Определяем финальное имя
        final_name = str(original_col_tuple)  # По умолчанию

        if isinstance(original_col_tuple, tuple):
            if original_name == "hit_number":
                if agg_name == "min":
                    final_name = "first_hit_number"
                elif agg_name == "max":
                    final_name = "last_hit_number"
                elif agg_name == "count":
                    final_name = "total_hits"
            elif original_name == "hit_time_2" and agg_name == "sum":
                final_name = "total_time"
            elif agg_name == "<lambda>":
                if original_name == "hit_page_path":
                    final_name = "entry_page"
                elif original_name == "event_category_grouped":
                    final_name = "main_category_grouped"
                elif original_name == "event_action_grouped":
                    final_name = "main_action_grouped"
                elif original_name == "hit_referer_encoded":
                    final_name = "main_referer"
                elif original_name == "event_label_encoded":
                    final_name = "main_label"
                elif original_name in utm_columns_in_hits:
                    final_name = original_name  # У UTM-меток сохраняем исходное имя
                else:
                    final_name = f"{original_name}_lambda_agg"  # Fallback
            else:
                # Для других стандартных агрегаций (mean, max, etc.), если они были бы добавлены
                final_name = f"{original_name}_{agg_name}"

        # Добавляем в словарь переименований
        rename_map[col_key] = final_name

    # Применяем переименование
    # Преобразуем MultiIndex в Index, применяя переименования
    try:
        aggregated.columns = [
            rename_map.get(col, str(col)) for col in aggregated.columns
        ]
    except Exception as e:
        print(f"Ошибка при применении переименования столбцов: {e}", file=sys.stderr)
        aggregated.columns = [
            "_".join(col).strip("_") if isinstance(col, tuple) else str(col)
            for col in aggregated.columns.values
        ]

    # Сбрасываем индекс, чтобы session_id стал обычным столбцом
    # session_id до этого момента был индексом
    aggregated = aggregated.reset_index()
    # После reset_index, столбец с session_id называется 'session_id'

    print("Переименование и финальная обработка столбцов завершена.", file=sys.stderr)
    print(
        "-------------------------------------------------------------", file=sys.stderr
    )

    return aggregated

In [8]:
# Она используется и для EDA, и для анализа, может быть удалена в продакшн-скрипте.
def analyze_dataframe_columns(df: pd.DataFrame):
    """
    Выполняет анализ каждого столбца pandas DataFrame, выводя информацию
    об уникальных значениях, первых/последних/частых значениях,
    количестве и проценте пропусков, а также типе данных.
    Используется для EDA.
    """
    if not isinstance(df, pd.DataFrame):
        print("Ошибка: Входной аргумент должен быть pandas DataFrame.")
        return

    if df.empty:
        print("DataFrame пуст.")
        return

    for col in df.columns:
        print(f"\n--- Столбец: {col} ---")
        print("----------------------------------------")

        nunique = df[col].nunique()
        print(f"Количество уникальных значений: {nunique}")

        print("Первые 5 значений:")
        if not df[col].empty:
            try:
                # Добавляем .astype(str) на всякий случай для сложных объектов
                print(df[col].head().astype(str).tolist())
            except Exception as e:
                print(
                    f"Не удалось вывести первые 5 значений (ошибка: {e}). Показываем первые 5 строк серии:\n{df[col].head()}"
                )
        else:
            print("Столбец пуст.")

        print("\nТоп-5 наиболее частых значений:")
        if not df[col].empty and df[col].count() > 0:
            print(df[col].value_counts().head())
        else:
            print("Столбец пуст или содержит только пропуски.")

        print("\nТоп-5 наиболее редких значений:")
        if not df[col].empty and df[col].count() > 0:
            value_counts = df[col].value_counts()
            if len(value_counts) >= 5:
                print(value_counts.tail())
            else:
                # Если уникальных значений меньше 5, показываем все
                print(value_counts)
        else:
            print("Столбец пуст или содержит только пропуски.")

        n_missing = df[col].isnull().sum()
        print(f"\nКоличество пропусков: {n_missing}")

        total_count = len(df[col])
        if total_count > 0:
            percent_missing = (n_missing / total_count) * 100
            print(f"Процент пропусков: {percent_missing:.2f}%")
        else:
            print("Процент пропусков: 0.00% (столбец пуст)")

        data_type = df[col].dtype
        print(f"Тип данных: {data_type}")
        print("----------------------------------------\n")

In [9]:
def merge_and_save(
    sessions_df: pd.DataFrame,
    hits_df: pd.DataFrame,
    output_pkl: str = "merged_data.pkl",
) -> pd.DataFrame:
    """
    Объединяет данные сессий и хитов, сохраняет результат в PKL и возвращает датафрейм
    """

    # Вложенная функция для оптимизации типов данных
    def optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame:
        print("  Оптимизация типов данных...", file=sys.stderr)
        initial_memory = df.memory_usage(deep=True).sum() / (1024 * 1024)

        for col in df.columns:
            # Оптимизация объектов (строк)
            if df[col].dtype == "object":
                # Если уникальных значений меньше порога, преобразуем в категорию
                # Порог 0.5 может быть агрессивным, 0.05-0.1 обычно безопаснее
                if df[col].nunique() / len(df) < 0.1:  # Изменил порог для безопасности
                    try:
                        df[col] = df[col].astype("category")
                    except Exception as e:
                        print(
                            f"  Не удалось преобразовать столбец '{col}' в категорию: {e}",
                            file=sys.stderr,
                        )

            # Оптимизация чисел
            elif pd.api.types.is_integer_dtype(df[col]):
                # Попытка downcast для целых
                try:
                    df[col] = pd.to_numeric(df[col], downcast="integer")
                except Exception as e:
                    print(
                        f"  Не удалось downcast столбец '{col}' (integer): {e}",
                        file=sys.stderr,
                    )

            elif pd.api.types.is_float_dtype(df[col]):
                # Попытка downcast для float
                try:
                    df[col] = pd.to_numeric(df[col], downcast="float")
                except Exception as e:
                    print(
                        f"  Не удалось downcast столбец '{col}' (float): {e}",
                        file=sys.stderr,
                    )
        final_memory = df.memory_usage(deep=True).sum() / (1024 * 1024)
        print(
            f"  Память после оптимизации: {final_memory:.2f} MB (было {initial_memory:.2f} MB)",
            file=sys.stderr,
        )
        return df

    print("Начало объединения данных.", file=sys.stderr)
    print(
        "--------------------------------------------------------------",
        file=sys.stderr,
    )

    # 1. Оптимизация типов данных перед объединением

    print("Оптимизация типов данных перед объединением...", file=sys.stderr)
    sessions_df = optimize_dtypes(sessions_df)
    hits_df = optimize_dtypes(hits_df)

    # 2. Объединение данных
    print("Объединение данных по session_id...", file=sys.stderr)
    # Проверим, существует ли 'session_id' в обоих DF (необходимо будет для валидации)
    if "session_id" not in sessions_df.columns:
        print(
            "Ошибка: Столбец 'session_id' отсутствует в sessions_df.", file=sys.stderr
        )
        # Возвращаем один из DF или пустой DF, в зависимости от желаемого поведения
        return pd.DataFrame()
    if "session_id" not in hits_df.columns:
        print("Ошибка: Столбец 'session_id' отсутствует в hits_df.", file=sys.stderr)
        # Если sessions_df есть, можно вернуть его, или пустой DF
        return sessions_df.copy()  # Вернем sessions_df, т.к. это left merge

    try:
        merged_df = pd.merge(
            sessions_df,
            hits_df,
            on="session_id",
            how="left",  # Сохраняем все сессии, даже если нет соответствующих хитов
            # validate='one_to_many' # Может быть медленно на больших данных, опционально
        )
        print("Объединение завершено.", file=sys.stderr)
    except Exception as e:
        print(f"Ошибка при объединении данных: {e}", file=sys.stderr)
        # Возвращаем пустой DataFrame или обрабатываем ошибку
        return pd.DataFrame()

    # 3. Оптимизация объединенных данных
    print("Оптимизация объединенных данных...", file=sys.stderr)
    merged_df = optimize_dtypes(merged_df)

    # 4. Сохранение в PKL
    print(f"Сохранение данных в {output_pkl}...", file=sys.stderr)
    try:
        with open(output_pkl, "wb") as f:
            pickle.dump(merged_df, f, protocol=pickle.HIGHEST_PROTOCOL)
        # 5. Проверка размера файла
        pkl_size = Path(output_pkl).stat().st_size / (1024 * 1024)
        print(f"Файл сохранен. Размер: {pkl_size:.2f} MB", file=sys.stderr)
    except Exception as e:
        print(f"Ошибка при сохранении файла {output_pkl}: {e}", file=sys.stderr)

    print("Объединение и сохранение завершены.", file=sys.stderr)
    print(
        "-------------------------------------------------------------", file=sys.stderr
    )

    return merged_df

In [10]:
def reduce_cardinality(
    df: pd.DataFrame, categorical_features: list, max_categories: int = 50
) -> pd.DataFrame:
    """Уменьшает количество категорий для высококардинальных признаков"""
    df = df.copy()
    for col in categorical_features:
        if (
            col in df.columns and df[col].dtype == "object"
        ):  # Применяем только к object типам, т.к. категории обрабатываются OrdinalEncoder
            if df[col].nunique() > max_categories:
                # print(f"Уменьшение кардинальности для '{col}' ({df[col].nunique()} > {max_categories})", file=sys.stderr)
                top_categories = (
                    df[col].value_counts().nlargest(max_categories - 1).index
                )
                # Убедимся, что 'OTHER' не входит в топ категории, если уже существует
                if "OTHER" in top_categories:
                    top_categories = (
                        df[col].value_counts().nlargest(max_categories).index
                    )  # Берем на одну больше, если OTHER в топе
                    if (
                        "OTHER" in top_categories
                    ):  # Если OTHER все еще в топе после взятия больше, удаляем его
                        top_categories = top_categories.drop("OTHER")
                        top_categories = top_categories[
                            : max_categories - 1
                        ]  # Обрезаем до нужного размера
                df[col] = np.where(df[col].isin(top_categories), df[col], "OTHER")
                # После замены на 'OTHER' можем преобразовать обратно в category для экономии памяти
                try:
                    df[col] = df[col].astype("category")
                except Exception as e:
                    print(
                        f"Не удалось преобразовать столбец '{col}' в категорию после reduce_cardinality: {e}",
                        file=sys.stderr,
                    )
            elif df[col].nunique() > 0:
                # Если не высококардинальный object, конвертируем в category
                try:
                    df[col] = df[col].astype("category")
                except Exception as e:
                    print(
                        f"Не удалось преобразовать столбец '{col}' в категорию: {e}",
                        file=sys.stderr,
                    )

    return df

In [11]:
def filter_rare_classes(y: pd.Series, min_samples: int = 2) -> pd.Index:
    """
    Возвращает индекс классов целевой переменной,
    количество образцов в которых >= min_samples.
    """
    if not isinstance(y, pd.Series) or y.empty:
        print(
            "Предупреждение: y для filter_rare_classes пусто или не Series.",
            file=sys.stderr,
        )
        return pd.Index([])  # Возвращаем пустой индекс для пустого ввода

    value_counts = y.value_counts()
    # Исключаем редкие классы, включая те, которые могут быть NaN (если target не обрабатывался)
    # Хотя обычно target_column не должен содержать NaN
    valid_classes = value_counts[value_counts >= min_samples].index
    # Убедимся, что NaN не попадает в valid_classes, если он есть в value_counts
    if pd.isna(valid_classes).any():
        valid_classes = valid_classes.dropna()

    if len(valid_classes) < len(value_counts):
        rare_count = len(value_counts) - len(valid_classes)
        print(
            f"Отфильтровано {rare_count} редких классов (менее {min_samples} образцов).",
            file=sys.stderr,
        )
        # print(f"Оставшиеся классы: {valid_classes.tolist()}", file=sys.stderr)
    else:
        print("Редкие классы не найдены (все классы >= min_samples).", file=sys.stderr)

    return valid_classes

In [12]:
def train_and_save_lgbm_model(
    data: pd.DataFrame,
    target_column: str,
    output_file: str = "model.pkl",
    min_class_samples: int = 5,
):
    """Обучает LGBM модель с оптимизацией памяти и сохраняет в файл"""
    print(
        "\n==============================================================================",
        file=sys.stderr,
    )
    print("Начало обучения LGBM модели.", file=sys.stderr)
    print(
        "--------------------------------------------------------------",
        file=sys.stderr,
    )

    # === ОТЛАДОЧНЫЙ ПРИНТ: Проверка наличия целевого столбца в начале функции ===
    print(
        f"Debug: Проверка наличия целевого столбца '{target_column}' в data.columns в начале train_and_save_lgbm_model...",
        file=sys.stderr,
    )
    print(
        f"Debug: '{target_column}' in data.columns: {target_column in data.columns}",
        file=sys.stderr,
    )
    if target_column not in data.columns:
        print(
            f"Ошибка: Целевой столбец '{target_column}' не найден в DataFrame.",
            file=sys.stderr,
        )
        return None
    # ===========================================================================

    # Применяем уменьшение кардинальности ДО разделения на train/test
    # Это важно, чтобы 'OTHER' категория была согласована между выборками
    # Функция reduce_cardinality обрабатывает копию, что безопасно.
    data_processed = reduce_cardinality(data, categorical_features)

    # Разделяем на признаки и целевую переменную
    # Используем data_processed после reduce_cardinality
    X = data_processed.drop(columns=[target_column, "session_id"], errors="ignore")
    y = data_processed[target_column]

    # Проверяем целевую переменную перед фильтрацией
    if y.isnull().any():
        print(
            f"Предупреждение: Целевая переменная '{target_column}' содержит пропуски ({y.isnull().sum()}). Они будут исключены при фильтрации.",
            file=sys.stderr,
        )

    # Фильтруем редкие классы и пропуски в целевой переменной
    valid_classes = filter_rare_classes(y, min_samples=min_class_samples)
    mask = y.isin(valid_classes)  # Маска для оставшихся валидных классов
    X_filtered = X[mask].copy()
    y_filtered = y[mask].copy()

    print(
        f"Обучение будет проходить на {len(y_filtered)} образцах после фильтрации.",
        file=sys.stderr,
    )

    # Освобождаем память
    del (
        X,
        y,
        mask,
        data_processed,
    )  # Удаляем data_processed, т.к. используем X_filtered, y_filtered
    gc.collect()

    if X_filtered.empty or y_filtered.empty:
        print(
            "Ошибка: Не осталось данных для обучения после фильтрации редких классов или пропусков.",
            file=sys.stderr,
        )
        return None

    # Разделяем на train/test
    # Убедимся, что стратификация возможна (т.е. в каждом классе >= 2 образцов в train/test)
    # filter_rare_classes с min_samples=5 уже помогает.
    try:
        X_train, X_test, y_train, y_test = train_test_split(
            X_filtered, y_filtered, test_size=0.2, random_state=42, stratify=y_filtered
        )
        print(
            f"Данные разделены на train ({len(X_train)}) и test ({len(X_test)}).",
            file=sys.stderr,
        )
    except ValueError as e:
        print(
            f"Ошибка при разделении данных на train/test со стратификацией: {e}",
            file=sys.stderr,
        )
        print(
            "Возможно, в каком-то классе осталось менее 2 образцов после фильтрации.",
            file=sys.stderr,
        )
        return None

    # Создаем пайплайны предобработки
    numeric_transformer = Pipeline(
        [("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
    )

    categorical_transformer = Pipeline(
        [
            ("imputer", SimpleImputer(strategy="constant", fill_value="unknown")),
            (
                "encoder",
                OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1),
            ),
        ]
    )

    # Определяем препроцессор. Важно убедиться, что списки features корректны
    # и столбцы существуют в X_train
    numeric_ft = [f for f in numeric_features if f in X_train.columns]
    categorical_ft = [f for f in categorical_features if f in X_train.columns]
    other_ft = [
        f for f in X_train.columns if f not in numeric_ft and f not in categorical_ft
    ]  # Столбцы, которые не в списках

    print(
        f"Числовые признаки для пайплайна ({len(numeric_ft)}): {numeric_ft}",
        file=sys.stderr,
    )
    print(
        f"Категориальные признаки для пайплайна ({len(categorical_ft)}): {categorical_ft}",
        file=sys.stderr,
    )
    if other_ft:
        print(
            f"Признаки не в списках numeric/categorical ({len(other_ft)}). Они будут удалены ColumnTransformer'ом.",
            file=sys.stderr,
        )

    preprocessor = ColumnTransformer(
        [
            ("numeric", numeric_transformer, numeric_ft),
            ("categorical", categorical_transformer, categorical_ft),
        ],
        remainder="drop",
    )  # 'drop' означает, что столбцы не из списков features будут удалены

    # Инициализация LGBM
    lgbm_params = {
        "random_state": 42,
        "n_estimators": 150,  # лучший параметр после серии тестов
        "learning_rate": 0.2,  # лучший параметр после серии тестов
        "max_depth": 9,  # лучший параметр после серии тестов
        "num_leaves": 31,  # лучший параметр после серии тестов
        "subsample": 0.8,  # лучший параметр после серии тестов
        "colsample_bytree": 0.8,  # лучший параметр после серии тестов
        "reg_alpha": 0.1,  # лучший параметр после серии тестов
        "reg_lambda": 0.1,  # лучший параметр после серии тестов
        "n_jobs": -1,
        "verbose": -1,
        "boosting_type": "gbdt",  # Добавим явно, если не указан в lgbm_params
    }

    # Определяем objective и metric
    n_classes = len(y_train.unique())
    if n_classes > 2:
        lgbm_params["objective"] = "multiclass"
        lgbm_params["metric"] = "multi_logloss"
        lgbm_params["num_class"] = (
            n_classes  # Указываем количество классов для multiclass
        )
        print(
            f"Задача: Многоклассовая классификация ({n_classes} классов).",
            file=sys.stderr,
        )
    elif n_classes == 2:
        lgbm_params["objective"] = "binary"
        lgbm_params["metric"] = "auc"
        print("Задача: Бинарная классификация.", file=sys.stderr)
    else:
        print(
            f"Ошибка: Недостаточно классов ({n_classes}) для обучения классификатора.",
            file=sys.stderr,
        )
        return None

    # Дла отладки. Включаем балансировку весов классов для несбалансированных данных, если не указано иное
    # Восстановим здесь, т.к. ROC-AUC на несбалансированных данных без этого может быть обманчив
    # lgbm_params['class_weight'] = 'balanced' # Добавим, если нужно балансировать

    lgbm = LGBMClassifier(**lgbm_params)

    # Создаем финальный пайплайн
    model_pipeline = Pipeline([("preprocessor", preprocessor), ("classifier", lgbm)])

    # Определяем индексы категориальных признаков ПОСЛЕ препроцессинга для LGBM
    # LGBM может использовать информацию о категориальных признаках напрямую
    # Однако, OrdinalEncoder уже преобразует их в числа.

    print("Обучение модели...", file=sys.stderr)
    try:
        model_pipeline.fit(X_train, y_train)
        print("Обучение завершено.", file=sys.stderr)
    except Exception as e:
        print(f"Ошибка при обучении модели: {e}", file=sys.stderr)
        return None

    # Оценка качества
    print("Оценка модели...", file=sys.stderr)
    try:
        if n_classes > 2:
            # Для многоклассовой AUC predict_proba необходим
            y_train_pred_proba = model_pipeline.predict_proba(X_train)
            y_test_pred_proba = model_pipeline.predict_proba(X_test)
            train_score = roc_auc_score(y_train, y_train_pred_proba, multi_class="ovr")
            test_score = roc_auc_score(y_test, y_test_pred_proba, multi_class="ovr")
        else:  # n_classes == 2
            # Для бинарной AUC predict_proba[:, 1] необходим
            y_train_pred_proba = model_pipeline.predict_proba(X_train)[:, 1]
            y_test_pred_proba = model_pipeline.predict_proba(X_test)[:, 1]
            train_score = roc_auc_score(y_train, y_train_pred_proba)
            test_score = roc_auc_score(y_test, y_test_pred_proba)

        print(
            f"Train ROC-AUC: {train_score:.4f}, Test ROC-AUC: {test_score:.4f}",
            file=sys.stderr,
        )
        print(f"Обучено на {n_classes} классах.", file=sys.stderr)

    except Exception as e:
        print(f"Ошибка при оценке модели: {e}", file=sys.stderr)
        # Попробуем хотя бы accuracy, если AUC не получается
        try:
            train_acc = model_pipeline.score(X_train, y_train)
            test_acc = model_pipeline.score(X_test, y_test)
            print(
                f"Train Accuracy: {train_acc:.4f}, Test Accuracy: {test_acc:.4f}",
                file=sys.stderr,
            )
        except Exception as e_acc:
            print(f"Не удалось рассчитать даже Accuracy: {e_acc}", file=sys.stderr)

    # Сохранение модели в файл
    print(f"Сохранение модели в файл: {output_file}...", file=sys.stderr)
    try:
        with open(output_file, "wb") as f:
            pickle.dump(model_pipeline, f)
        print(f"Модель успешно сохранена.", file=sys.stderr)
    except Exception as e:
        print(
            f"Ошибка при сохранении модели в файл {output_file}: {e}", file=sys.stderr
        )

    print(
        "--------------------------------------------------------------",
        file=sys.stderr,
    )
    print("Обучение LGBM модели завершено.", file=sys.stderr)
    print(
        "==============================================================================",
        file=sys.stderr,
    )

    return model_pipeline

In [13]:
# Определение признаков для модели

numeric_features = [
    "visit_number",
    "day_of_week_num",
    "month",
    "day",
    "utm_source_encoded",
    "utm_campaign_encoded",
    "first_hit_number",
    "last_hit_number",
    "total_hits",
    "total_time",
    "main_referer",  # Это hit_referer_encoded_<lambda> переименованный в aggregate_session_data
    "main_label",  # Это event_label_encoded_<lambda> переименованный в aggregate_session_data
    "geo_country",  # Это бинарный 0/1 признак
]


categorical_features = [
    "utm_medium",  # Остается после drop в Блоке 2, используется для fillna и в фичах
    "device_category",  # Остается после drop в Блоке 2
    "device_brand",  # Остается после drop в Блоке 2
    "geo_city_grouped",  # Создано в Блоке 1, остается
    "entry_page",  # Создано в aggregate_session_data (ранее hit_page_path)
    "main_category_grouped",  # Создано в Блоке 2, используется в aggregate_session_data, остается
    # 'main_action_grouped' # Это целевая переменная
]

# Убедимся, что целевая переменная не попала в признаки
if "main_action_grouped" in numeric_features:
    numeric_features.remove("main_action_grouped")
if "main_action_grouped" in categorical_features:
    categorical_features.remove("main_action_grouped")

In [14]:
# Основной исполняемый код (Пайплайн)

if __name__ == "__main__":

    print("Запуск пайплайна обработки и обучения модели.", file=sys.stderr)
    print(
        "==============================================================================",
        file=sys.stderr,
    )

    # Настройка путей к данным
    file_path_hits = "ga_hits.pkl"
    file_path_sessions = "ga_sessions.pkl"
    output_merged_pkl_1 = "merged_data.pkl"
    output_merged_pkl_2 = "merged_data_2.pkl"  # Файл после финал.удаления столбцов
    output_model_pkl = "lgbm_model.pkl"
    target_column_name = "main_action_grouped"  # Целевая переменная

    # --------------------------------------------------------------------------
    # Этап 1: Загрузка данных
    # --------------------------------------------------------------------------
    print("\n--- Этап 1: Загрузка данных ---", file=sys.stderr)
    hits = None
    sessions = None

    try:
        hits = pd.read_pickle(file_path_hits)
        print(f"Файл {file_path_hits} успешно загружен.", file=sys.stderr)
        # print(hits.head()) # EDA/Отладка
    except FileNotFoundError:
        print(f"Ошибка: Файл {file_path_hits} не найден.", file=sys.stderr)
        sys.exit(1)  # Выход с ошибкой, если файл не найден
    except Exception as e:
        print(f"Не удалось загрузить {file_path_hits}: {e}", file=sys.stderr)
        sys.exit(1)

    try:
        sessions = pd.read_pickle(file_path_sessions)
        print(f"\nФайл {file_path_sessions} успешно загружен.", file=sys.stderr)
        # print(sessions.head()) # EDA/Отладка
    except FileNotFoundError:
        print(f"Ошибка: Файл {file_path_sessions} не найден.", file=sys.stderr)
        sys.exit(1)  # Выход с ошибкой
    except Exception as e:
        print(f"Не удалось загрузить {file_path_sessions}: {e}", file=sys.stderr)
        sys.exit(1)

    # Копируем датафреймы для работы
    ga_hits = hits.copy()
    ga_sessions = sessions.copy()
    del hits, sessions  # Освобождаем память от исходных
    gc.collect()

    # Применяем начальную предобработку 'not set'/'none' (Из Блока 1)
    process_notset_none(ga_sessions)
    process_notset_none(ga_hits)

    # --------------------------------------------------------------------------
    # Этап 2: Предобработка данных сессий (ga_sessions_filled)
    # --------------------------------------------------------------------------

    print("\n--- Этап 2: Предобработка данных сессий ---", file=sys.stderr)

    ga_sessions_filled = ga_sessions.copy()
    del ga_sessions
    gc.collect()

    # visit_date → разобьём на день, месяц и год + выделим день недели (Из Блока 1)
    print("Создание признаков даты из visit_date...", file=sys.stderr)
    if "visit_date" in ga_sessions_filled.columns:
        try:
            ga_sessions_filled["visit_date"] = pd.to_datetime(
                ga_sessions_filled["visit_date"]
            )
            ga_sessions_filled["year"] = ga_sessions_filled[
                "visit_date"
            ].dt.year.astype("int16")
            ga_sessions_filled["month"] = ga_sessions_filled[
                "visit_date"
            ].dt.month.astype("int16")
            ga_sessions_filled["day"] = ga_sessions_filled["visit_date"].dt.day.astype(
                "int16"
            )
            ga_sessions_filled["day_of_week_num"] = ga_sessions_filled[
                "visit_date"
            ].dt.dayofweek.astype("int16")
            print("Признаки даты созданы.", file=sys.stderr)
        except Exception as e:
            print(f"Ошибка при создании признаков даты: {e}", file=sys.stderr)
    else:
        print("'visit_date' столбец отсутствует.", file=sys.stderr)

    # utm_medium device_category device_os → заполним пропуски
    print("Заполнение пропусков в ga_sessions_filled...", file=sys.stderr)
    ga_sessions_filled = fill_missing_by_groups(ga_sessions_filled)
    print(
        f"Пропусков в ga_sessions_filled после fill_missing_by_groups: {ga_sessions_filled.isna().sum().sum()}",
        file=sys.stderr,
    )

    # utm_source → заменим хэш коды на числа
    if "utm_source" in ga_sessions_filled.columns:
        print("Кодирование utm_source...", file=sys.stderr)
        ga_sessions_filled = encode_column_with_nulls(ga_sessions_filled, "utm_source")
    else:
        print("'utm_source' столбец отсутствует.", file=sys.stderr)

    # utm_campaign → заменим хэш коды на числа
    if "utm_campaign" in ga_sessions_filled.columns:
        print("Кодирование utm_campaign...", file=sys.stderr)
        ga_sessions_filled = encode_column_with_nulls(
            ga_sessions_filled, "utm_campaign"
        )
    else:
        print("'utm_campaign' столбец отсутствует.", file=sys.stderr)

    # device_model → заменим хэш коды на числа
    if "device_model" in ga_sessions_filled.columns:
        print("Кодирование device_model...", file=sys.stderr)
        ga_sessions_filled = encode_column_with_nulls(
            ga_sessions_filled, "device_model"
        )
    else:
        print("'device_model' столбец отсутствует.", file=sys.stderr)

    # geo_country → заменим Россию на 1, а остальные значения на 0
    if "geo_country" in ga_sessions_filled.columns:
        print("Преобразование geo_country...", file=sys.stderr)
        ga_sessions_filled["geo_country"] = ga_sessions_filled["geo_country"].apply(
            lambda x: 1 if x == "Russia" else 0
        )
    else:
        print("'geo_country' столбец отсутствует.", file=sys.stderr)

    # geo_city → преобразуем города с малым количеством вхождений в категорию иных
    if "geo_city" in ga_sessions_filled.columns:
        print("Группировка geo_city...", file=sys.stderr)
        threshold = len(ga_sessions_filled) * 0.01
        value_counts = ga_sessions_filled["geo_city"].value_counts()
        replace_dict = {
            category: "OTHER"
            for category in value_counts[value_counts < threshold].index
        }
        ga_sessions_filled["geo_city_grouped"] = ga_sessions_filled["geo_city"].replace(
            replace_dict
        )
        print(
            f"Создан 'geo_city_grouped'. Города < {threshold:.0f} объединены.",
            file=sys.stderr,
        )

        # import plotly.express as px # Импорт для EDA
        # # EDA/Визуализация - опционально для деплоя
        # try:
        #      city_counts = ga_sessions_filled['geo_city_grouped'].value_counts().reset_index()
        #      city_counts.columns = ['city', 'count']
        #      fig = px.bar(city_counts, x='city', y='count', title=f'Распределение городов (города < {threshold:,.0f} событий объединены в OTHER)')
        #      # fig.show() # Показать график - требует интерактивной среды
        #      print("График распределения городов готов (EDA).", file=sys.stderr)
        # except Exception as e:
        #      print(f"Не удалось построить график geo_city: {e}", file=sys.stderr)
    else:
        print("'geo_city' столбец отсутствует.", file=sys.stderr)

    # print(ga_sessions_filled.head()) # Отладка
    # print(ga_sessions_filled.isna().sum()) # Отладка

    # --------------------------------------------------------------------------
    # Этап 3: Предобработка и агрегация данных хитов (ga_hits_aggregated)
    # --------------------------------------------------------------------------

    print("\n--- Этап 3: Предобработка и агрегация данных хитов ---", file=sys.stderr)

    ga_hits_filled = ga_hits.copy()
    del ga_hits
    gc.collect()

    # Заполнение пропусков на основе группировки в hits (Из Блока 2)
    print("Заполнение пропусков в ga_hits_filled...", file=sys.stderr)
    ga_hits_filled = fill_hits_and_events(ga_hits_filled)
    print(
        f"Пропусков в ga_hits_filled после fill_hits_and_events: {ga_hits_filled.isna().sum().sum()}",
        file=sys.stderr,
    )

    # hit_time → учитывая количество пропусков (перезодов без действий) преобразуем в есть/нет действий
    if "hit_time" in ga_hits_filled.columns:
        print("Создание бинарного признака hit_time_2...", file=sys.stderr)
        ga_hits_filled["hit_time_2"] = ga_hits_filled["hit_time"].notna().astype(int)
        print("'hit_time_2' создан.", file=sys.stderr)
    else:
        print(
            "'hit_time' столбец отсутствует, не удалось создать 'hit_time_2'.",
            file=sys.stderr,
        )

    # hit_referer → преобразуем хэш коды в цифры
    if "hit_referer" in ga_hits_filled.columns:
        print("Кодирование hit_referer...", file=sys.stderr)
        ga_hits_filled = encode_column_with_nulls(ga_hits_filled, "hit_referer")
    else:
        print("'hit_referer' столбец отсутствует.", file=sys.stderr)

    # event_category → заменим малозначимые параметры на иные
    if "event_category" in ga_hits_filled.columns:
        print("Группировка event_category...", file=sys.stderr)
        threshold = len(ga_hits_filled) * 0.01
        value_counts = ga_hits_filled["event_category"].value_counts()
        replace_dict = {
            category: "OTHER"
            for category in value_counts[value_counts < threshold].index
        }
        ga_hits_filled["event_category_grouped"] = ga_hits_filled[
            "event_category"
        ].replace(replace_dict)
        print(
            f"Создан 'event_category_grouped'. Категории < {threshold:.0f} объединены.",
            file=sys.stderr,
        )

        # import plotly.express as px # Импорт для EDA
        # # EDA/Визуализация - опционально для деплоя
        # try:
        #      event_counts = ga_hits_filled['event_category_grouped'].value_counts().reset_index()
        #      event_counts.columns = ['category', 'count']
        #      fig = px.bar(event_counts, x='category', y='count', title=f'Распределение категорий событий (категории < {threshold:,.0f} событий объединены в OTHER)')
        #      # fig.show() # Показать график
        #      print("График распределения категорий событий готов (EDA).", file=sys.stderr)
        # except Exception as e:
        #      print(f"Не удалось построить график event_category: {e}", file=sys.stderr)
    else:
        print("'event_category' столбец отсутствует.", file=sys.stderr)

    # event_action → заменим малозначимые параметры на иные
    if "event_action" in ga_hits_filled.columns:
        print("Группировка event_action...", file=sys.stderr)
        threshold = len(ga_hits_filled) * 0.01
        value_counts = ga_hits_filled["event_action"].value_counts()
        replace_dict = {
            category: "OTHER"
            for category in value_counts[value_counts < threshold].index
        }
        ga_hits_filled["event_action_grouped"] = ga_hits_filled["event_action"].replace(
            replace_dict
        )
        print(
            f"Создан 'event_action_grouped'. Действия < {threshold:.0f} объединены.",
            file=sys.stderr,
        )
        # import plotly.express as px # Импорт для EDA
        # # EDA/Визуализация - опционально для деплоя
        # try:
        #      action_counts = ga_hits_filled['event_action_grouped'].value_counts().reset_index()
        #      action_counts.columns = ['action', 'count']
        #      fig = px.bar(action_counts, x='action', y='count', title=f'Распределение действий (действия < {threshold:,.0f} событий объединены в OTHER)')
        #      # fig.show() # Показать график
        #      print("График распределения действий готов (EDA).", file=sys.stderr)
        # except Exception as e:
        #      print(f"Не удалось построить график event_action: {e}", file=sys.stderr)
    else:
        print("'event_action' столбец отсутствует.", file=sys.stderr)

    # event_label → преобразуем хэш коды в цифры
    if "event_label" in ga_hits_filled.columns:
        print("Кодирование event_label...", file=sys.stderr)
        ga_hits_filled = encode_column_with_nulls(ga_hits_filled, "event_label")
    else:
        print("'event_label' столбец отсутствует.", file=sys.stderr)

    # Удаляем неактуальные для анализа столбцы из ga_hits_filled перед агрегацией
    # Порядок важен: сначала создаем новые признаки, потом удаляем старые.
    # Новые признаки используются в агрегации.
    columns_to_drop_from_hits_filled = [
        "event_value",
        "hit_date",
        "hit_type",
        "hit_time",
        "hit_referer",
        "event_label",
    ]
    existing_cols_to_drop_hits = [
        col for col in columns_to_drop_from_hits_filled if col in ga_hits_filled.columns
    ]
    if existing_cols_to_drop_hits:
        print(
            f"Удаление столбцов из ga_hits_filled: {existing_cols_to_drop_hits}",
            file=sys.stderr,
        )
        ga_hits_filled = ga_hits_filled.drop(columns=existing_cols_to_drop_hits, axis=1)
    else:
        print("Нет столбцов для удаления из ga_hits_filled по списку.", file=sys.stderr)

    # Удаляем неактуальные для анализа столбцы из ga_sessions_filled (Из Блока 2)
    columns_to_drop_from_sessions_filled = [
        "visit_date",
        "visit_time",
        "utm_source",
        "utm_campaign",
        "utm_adcontent",
        "utm_keyword",
        "device_screen_resolution",
        "device_os",
        "geo_city",
        "year",
    ]
    existing_cols_to_drop_sessions = [
        col
        for col in columns_to_drop_from_sessions_filled
        if col in ga_sessions_filled.columns
    ]
    if existing_cols_to_drop_sessions:
        print(
            f"Удаление столбцов из ga_sessions_filled: {existing_cols_to_drop_sessions}",
            file=sys.stderr,
        )
        ga_sessions_filled = ga_sessions_filled.drop(
            columns=existing_cols_to_drop_sessions, axis=1
        )
    else:
        print(
            "Нет столбцов для удаления из ga_sessions_filled по списку.",
            file=sys.stderr,
        )

    # Агрегация ga_hits_filled до уровня сессий
    ga_hits_aggregated = aggregate_session_data(ga_hits_filled)
    del ga_hits_filled
    gc.collect()
    # print(ga_hits_aggregated.head()) # Отладка

    # Удаляем неактуальные после агрегации столбцы из ga_hits_aggregated
    columns_to_drop_from_hits_aggregated = [
        "main_category",
        "main_action",
    ]  # Это агрегированные версии ОРИГИНАЛОВ, которые были удалены ранее.
    # В aggregate_session_data агрегация для них используется,
    # но затем они удаляются. Оставим как есть по коду.
    existing_cols_to_drop_hits_agg = [
        col
        for col in columns_to_drop_from_hits_aggregated
        if col in ga_hits_aggregated.columns
    ]
    if existing_cols_to_drop_hits_agg:
        print(
            f"Удаление столбцов из ga_hits_aggregated: {existing_cols_to_drop_hits_agg}",
            file=sys.stderr,
        )
        ga_hits_aggregated = ga_hits_aggregated.drop(
            columns=existing_cols_to_drop_hits_agg, axis=1
        )
    else:
        print(
            "Нет столбцов для удаления из ga_hits_aggregated по списку.",
            file=sys.stderr,
        )

    # EDA/Анализ агрегированных данных - опционально для деплоя
    # analyze_dataframe_columns(ga_hits_aggregated)

    # --------------------------------------------------------------------------
    # Этап 4: Объединение датафреймов (Из Блока 3)
    # --------------------------------------------------------------------------

    print("\n--- Этап 4: Объединение датафреймов ---", file=sys.stderr)

    # Объединим оба датафрейма в единый датасет
    data = merge_and_save(
        ga_sessions_filled, ga_hits_aggregated, output_pkl=output_merged_pkl_1
    )
    del ga_sessions_filled, ga_hits_aggregated
    gc.collect()

    if data.empty:
        print(
            "Ошибка: Объединенный датафрейм пуст. Дальнейшая обработка невозможна.",
            file=sys.stderr,
        )
        sys.exit(1)

    # EDA/Анализ объединенного датасета - опционально для деплоя
    # print("\nОбъединенные данные (до финального дропа):", file=sys.stderr)
    # print(data.info(memory_usage='deep'), file=sys.stderr)
    # print(data.head(), file=sys.stderr)

    # Удаляем неактуальные столбцы из объединенного датасета
    columns_to_drop_from_data = [
        "device_model",
        "device_browser",
        "device_model_encoded",
    ]
    existing_cols_to_drop_data = [
        col for col in columns_to_drop_from_data if col in data.columns
    ]
    if existing_cols_to_drop_data:
        print(
            f"Удаление столбцов из объединенного датасета: {existing_cols_to_drop_data}",
            file=sys.stderr,
        )
        data = data.drop(columns=existing_cols_to_drop_data, axis=1)
    else:
        print(
            "Нет столбцов для удаления из объединенного датасета по списку.",
            file=sys.stderr,
        )

    # Сохраняем итоговый датасет в pkl файл
    # Этот файл уже содержит финальный набор признаков после частичного дропа
    # Перед финальным удалением строк.
    print(
        f"Сохранение финального датасета (перед dropna) в {output_merged_pkl_2}...",
        file=sys.stderr,
    )
    try:
        with open(output_merged_pkl_2, "wb") as f:
            pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)
        print(f"Финальный датасет сохранен в {output_merged_pkl_2}.", file=sys.stderr)
    except Exception as e:
        print(
            f"Ошибка при сохранении файла {output_merged_pkl_2}: {e}", file=sys.stderr
        )

    # EDA/Матрица корреляции - опционально для деплоя
    # try:
    #      print("Построение матрицы корреляции (EDA)...", file=sys.stderr)
    #      # Выбираем только числовые столбцы, которые точно есть в data
    #      # Используем список numeric_features как ориентир, но проверяем по data.columns
    #      # Добавим session_id, т.к. он числовой, но обычно не участвует в корреляции признаков
    #      cols_for_corr = [col for col in data.select_dtypes(include=np.number).columns if col != 'session_id']
    #      # Убедимся, что целевая переменная тоже не в корреляции признаков, если она числовая
    #      if target_column_name in cols_for_corr:
    #          cols_for_corr.remove(target_column_name)

    #      if len(cols_for_corr) > 1:
    #           correlation_matrix = data[cols_for_corr].corr(method='pearson')
    #           mask = np.triu(np.ones_like(correlation_matrix, dtype=bool))
    #           f, ax = plt.subplots(figsize=(11, 9))
    #           cmap = sns.diverging_palette(230, 20, as_cmap=True)
    #           sns.heatmap(correlation_matrix, mask=mask, cmap=cmap, vmax=.3, center=0,
    #                        square=True, linewidths=.5, cbar_kws={"shrink": .5})
    #           # plt.show() # Показать график
    #           print("Матрица корреляции построена (EDA).", file=sys.stderr)
    #      else:
    #          print("Недостаточно числовых столбцов для построения матрицы корреляции.", file=sys.stderr)

    # except Exception as e:
    #      print(f"Не удалось построить матрицу корреляции: {e}", file=sys.stderr)

    # === ОТЛАДОЧНЫЙ ПРИНТ: Проверка наличия целевого столбца перед dropna ===
    print(
        f"Debug: Проверка наличия целевого столбца '{target_column_name}' в data.columns перед dropna...",
        file=sys.stderr,
    )
    print(
        f"Debug: '{target_column_name}' in data.columns (before dropna): {target_column_name in data.columns}",
        file=sys.stderr,
    )
    if target_column_name in data.columns:
        print(
            f"Debug: data['{target_column_name}'].isnull().sum() (before dropna): {data[target_column_name].isnull().sum()}",
            file=sys.stderr,
        )
        print(
            f"Debug: data['{target_column_name}'].value_counts().head() (before dropna):\n{data[target_column_name].value_counts().head()}",
            file=sys.stderr,
        )
    # =======================================================================

    # Удаляем действия без пользователей (строки с NaN в 'first_hit_number')
    # Это финальный шаг очистки данных перед обучением
    if "first_hit_number" in data.columns:
        initial_rows = len(data)
        data = data.dropna(subset=["first_hit_number"])
        if len(data) < initial_rows:
            print(
                f"Удалено {initial_rows - len(data)} строк с NaN в 'first_hit_number'.",
                file=sys.stderr,
            )
        else:
            print(
                "'first_hit_number' не содержит NaN или столбец отсутствует (строки не удалены).",
                file=sys.stderr,
            )
    else:
        print(
            "'first_hit_number' столбец отсутствует, строки не удалены по этому критерию.",
            file=sys.stderr,
        )

    if data.empty:
        print("Ошибка: Датафрейм пуст после финальной очистки.", file=sys.stderr)
        sys.exit(1)

    # === ОТЛАДОЧНЫЙ ПРИНТ: Проверка наличия целевого столбца после dropna ===
    print(
        f"Debug: Проверка наличия целевого столбца '{target_column_name}' в data.columns после dropna...",
        file=sys.stderr,
    )
    print(
        f"Debug: '{target_column_name}' in data.columns (after dropna): {target_column_name in data.columns}",
        file=sys.stderr,
    )
    if target_column_name in data.columns:
        print(
            f"Debug: data['{target_column_name}'].isnull().sum() (after dropna): {data[target_column_name].isnull().sum()}",
            file=sys.stderr,
        )
        print(
            f"Debug: data['{target_column_name}'].value_counts().head() (after dropna):\n{data[target_column_name].value_counts().head()}",
            file=sys.stderr,
        )
    # ======================================================================

    # print("\nФинальные данные перед обучением:", file=sys.stderr)
    # print(data.info(memory_usage='deep'), file=sys.stderr)

    # --------------------------------------------------------------------------
    # Этап 5: Обучение модели LGBM (Из Блока 5)
    # --------------------------------------------------------------------------

    print("\n--- Этап 5: Обучение модели ---", file=sys.stderr)

    # Обучение модели LGBM и сохранение
    # Используем функцию train_and_save_lgbm_model из Блока 5
    trained_pipeline = train_and_save_lgbm_model(
        data,
        target_column=target_column_name,
        output_file=output_model_pkl,
        min_class_samples=5,  # Параметр фильтрации редких классов
    )

    if trained_pipeline is None:
        print("Обучение модели не удалось.", file=sys.stderr)
        sys.exit(1)

    # EDA/Пример предсказаний после обучения - опционально для деплоя
    # print("\nПример предсказаний на всем датасете:", file=sys.sys.stderr)
    # try:
    #      # Предсказания (только для известных классов, т.к. редкие были отфильтрованы при обучении)
    #      # predict возвращает предсказанный класс
    #      predictions = trained_pipeline.predict(data)
    #      print("Первые 10 предсказанных классов:", predictions[:10], file=sys.stderr)

    #      # predict_proba возвращает вероятности для каждого класса
    #      # probabilities = trained_pipeline.predict_proba(data)
    #      # print("Первые 10 предсказанных вероятностей:", probabilities[:10], file=sys.stderr)

    # except Exception as e:
    #      print(f"Ошибка при выполнении примера предсказаний: {e}", file=sys.stderr)

    # Пример загрузки модели из файла (опционально для деплоя/инференса)
    # print(f"\nПример загрузки модели из {output_model_pkl}...", file=sys.stderr)
    # loaded_model = None
    # try:
    #     with open(output_model_pkl, 'rb') as f:
    #          loaded_model = pickle.load(f)
    #     print("Модель успешно загружена.", file=sys.stderr)

    #     # Пример использования загруженной модели для предсказания на новых данных
    #     # (здесь просто используем те же данные 'data' для демонстрации)
    #     # if loaded_model is not None:
    #     #      print("\nПредсказания с загруженной моделью (первые 5):", file=sys.stderr)
    #     #      loaded_predictions = loaded_model.predict(data.head())
    #     #      print(loaded_predictions, file=sys.stderr)

    # except FileNotFoundError:
    #     print(f"Ошибка: Файл модели {output_model_pkl} не найден для загрузки.", file=sys.stderr)
    # except Exception as e:
    #      print(f"Ошибка при загрузке модели из {output_model_pkl}: {e}", file=sys.stderr)

    print(
        "\n==============================================================================",
        file=sys.stderr,
    )
    print("Пайплайн обработки и обучения модели завершен.", file=sys.stderr)
    print(
        "==============================================================================",
        file=sys.stderr,
    )


# Примечание по деплою:
# Для развертывания (деплоя) этого скрипта вам потребуется:
# 1. Установить все необходимые библиотеки (pandas, numpy, sklearn, lightgbm).
# 2. Убедиться, что файлы данных ('ga_hits.pkl', 'ga_sessions.pkl') доступны по указанным путям.
# 3. Убедиться, что скрипт имеет права на запись файлов ('merged_data.pkl', 'merged_data_2.pkl', 'lgbm_model.pkl').
# 4. Для инференса на новых данных, вам потребуется загрузить сохраненную модель ('lgbm_model.pkl')
#    и применить к новым данным ту же последовательность предобработки, что была использована перед обучением.
#    Часть кода для загрузки модели и примера предсказаний закомментирована в конце скрипта.
# 5. Возможно, потребуется параметризовать входные/выходные пути и параметры модели
#    (например, через аргументы командной строки или файл конфигурации) для удобства развертывания.

Запуск пайплайна обработки и обучения модели.

--- Этап 1: Загрузка данных ---
Файл ga_hits.pkl успешно загружен.

Файл ga_sessions.pkl успешно загружен.
Начало обработки 'not set' и 'none'
--------------------------------------------------------------

Обработка 'not set' и 'none' завершена.
-------------------------------------------------------------
Начало обработки 'not set' и 'none'
--------------------------------------------------------------

Обработка 'not set' и 'none' завершена.
-------------------------------------------------------------

--- Этап 2: Предобработка данных сессий ---
Создание признаков даты из visit_date...
Признаки даты созданы.
Заполнение пропусков в ga_sessions_filled...
Начало оптимизированного заполнения пропусков по группам и модой (in-place).
--------------------------------------------------------------
Заполнение пропусков в 'utm_source' по группам ('utm_medium', 'device_category').
Пропусков в 'utm_source' после заполнения по группе: 97
Заполнение

In [15]:
Здесь пауза. строка в формате кода, так что при автоматическом воспроизведении выдаст ошибку

SyntaxError: invalid syntax (170577031.py, line 1)

In [None]:
# Скрипт для выполнения предсказания на валидационных данных
# с замером времени.

# ==============================================================================
# 0. Импорт необходимых библиотек
# ==============================================================================

import pandas as pd
import numpy as np
import pickle
import time  # Для замера времени
import sys
import gc
from pathlib import Path

# Импорты sklearn (необходимы для функций предобработки и загрузки модели)
from sklearn.model_selection import (
    train_test_split,
)  # Хотя train_test_split не используется в предсказании, он часть preprocess_data
from sklearn.preprocessing import StandardScaler, OrdinalEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.metrics import (
    roc_auc_score,
)  # Не используется для предсказания, но может быть в загруженной модели

# Импорт LGBMClassifier (необходим для загрузки модели)
from lightgbm import LGBMClassifier

from warnings import simplefilter

simplefilter(action="ignore", category=FutureWarning)
simplefilter(action="ignore", category=UserWarning)


# ==============================================================================
# 1. Определение вспомогательных функций (Скопированы из полного пайплайна)
#    Примечание: Эти функции нужны для работы preprocess_data.
# ==============================================================================


def process_notset_none(df: pd.DataFrame):
    """
    Обнаруживает строки 'not set' и 'none' и заменяет на np.nan. In-place.
    """
    if not isinstance(df, pd.DataFrame):
        return  # Добавлена проверка
    strings_to_find_and_replace = ["not set", "none"]
    # print("Начало обработки 'not set' и 'none'", file=sys.stderr)
    for col in df.columns:
        col_series_str = df[col].astype(str)
        combined_mask = col_series_str.str.contains(
            strings_to_find_and_replace[0], case=False, regex=False
        ) | col_series_str.str.contains(
            strings_to_find_and_replace[1], case=False, regex=False
        )
        if combined_mask.sum() > 0:
            df.loc[combined_mask, col] = np.nan
    # print("Обработка 'not set' и 'none' завершена.", file=sys.stderr)


def encode_column_with_nulls(df: pd.DataFrame, column_name: str) -> pd.DataFrame:
    """Кодирует столбец числовыми метками (NaN->0, остальное 1+). Возвращает новый DF."""
    df = df.copy()
    if column_name not in df.columns:
        return df
    encoded, uniques = pd.factorize(df[column_name].astype(str))
    df[f"{column_name}_encoded"] = np.where(encoded == -1, 0, encoded + 1)
    df[f"{column_name}_encoded"] = df[f"{column_name}_encoded"].astype("int64")
    return df


def get_mode(x):
    """Вспомогательная функция для моды."""
    mode_val = x.mode()
    if not mode_val.empty:
        return mode_val[0]
    return np.nan


def fill_missing_by_groups(df: pd.DataFrame) -> pd.DataFrame:
    """Заполняет пропуски по группам и модой. In-place."""
    # print("Начало заполнения пропусков по группам...", file=sys.stderr)
    if (
        "utm_source" in df.columns
        and "utm_medium" in df.columns
        and "device_category" in df.columns
        and df["utm_source"].isna().any()
    ):
        group_modes_source = df.groupby(["utm_medium", "device_category"])[
            "utm_source"
        ].transform(get_mode)
        df["utm_source"].fillna(group_modes_source, inplace=True)
    if (
        "utm_campaign" in df.columns
        and "utm_medium" in df.columns
        and "device_os" in df.columns
        and df["utm_campaign"].isna().any()
    ):
        group_modes_campaign = df.groupby(["utm_medium", "device_os"])[
            "utm_campaign"
        ].transform(get_mode)
        df["utm_campaign"].fillna(group_modes_campaign, inplace=True)

    group_columns_broad = ["utm_medium", "device_category", "device_os"]
    if all(col in df.columns for col in group_columns_broad):
        cols_with_nan_after_specific = df.columns[df.isna().any()].tolist()
        for col in cols_with_nan_after_specific:
            if col in ["utm_source", "utm_campaign"] and not df[col].isna().any():
                continue
            group_modes_broad_col = df.groupby(group_columns_broad)[col].transform(
                get_mode
            )
            df[col].fillna(group_modes_broad_col, inplace=True)

    cols_with_nan_final = df.columns[df.isna().any()].tolist()
    for col in cols_with_nan_final:
        mode_val = df[col].mode()
        if not mode_val.empty:
            df[col].fillna(mode_val[0], inplace=True)
    # print("Заполнение пропусков завершено.", file=sys.stderr)
    return df


def fill_hits_and_events(df: pd.DataFrame) -> pd.DataFrame:
    """Заполняет пропуски в hit_referer и event_label. In-place."""
    # print("Начало заполнения пропусков в hits...", file=sys.stderr)
    if (
        "hit_referer" in df.columns
        and "hit_page_path" in df.columns
        and df["hit_referer"].isna().any()
    ):
        referer_by_page = df.groupby("hit_page_path")["hit_referer"].transform("first")
        df["hit_referer"].fillna(referer_by_page, inplace=True)
    if (
        "event_label" in df.columns
        and "event_action" in df.columns
        and df["event_label"].isna().any()
    ):
        label_by_action = df.groupby("event_action")["event_label"].transform("first")
        df["event_label"].fillna(label_by_action, inplace=True)
    if "hit_referer" in df.columns and df["hit_referer"].isna().any():
        df["hit_referer"].fillna("direct", inplace=True)
    if "event_label" in df.columns and df["event_label"].isna().any():
        df["event_label"].fillna("none", inplace=True)
    # print("Заполнение пропусков в hits завершено.", file=sys.stderr)
    return df


def aggregate_session_data(df: pd.DataFrame) -> pd.DataFrame:
    """Агрегирует данные хитов до уровня сессий."""
    # print("Начало агрегации данных хитов...", file=sys.stderr)
    if "session_id" not in df.columns:
        print("Ошибка: 'session_id' отсутствует для агрегации.", file=sys.stderr)
        return pd.DataFrame()

    agg_rules = {
        "hit_number": ["min", "max", "count"],
        **(
            {"hit_page_path": lambda x: x.iloc[0] if not x.empty else np.nan}
            if "hit_page_path" in df.columns
            else {}
        ),
        **(
            {
                "event_category_grouped": lambda x: (
                    x.value_counts().index[0] if not x.empty else np.nan
                )
            }
            if "event_category_grouped" in df.columns
            else {}
        ),
        **(
            {
                "event_action_grouped": lambda x: (
                    x.value_counts().index[0] if not x.empty else np.nan
                )
            }
            if "event_action_grouped" in df.columns
            else {}
        ),
        **({"hit_time_2": "sum"} if "hit_time_2" in df.columns else {}),
        **(
            {
                "hit_referer_encoded": lambda x: (
                    x.value_counts().index[0] if not x.empty else np.nan
                )
            }
            if "hit_referer_encoded" in df.columns
            else {}
        ),
        **(
            {
                "event_label_encoded": lambda x: (
                    x.value_counts().index[0] if not x.empty else np.nan
                )
            }
            if "event_label_encoded" in df.columns
            else {}
        ),
    }
    utm_columns_in_hits = [
        col for col in df.columns if col.startswith("utm_") and col not in agg_rules
    ]
    for col in utm_columns_in_hits:
        agg_rules[col] = lambda x: x.iloc[0] if not x.empty else np.nan

    agg_rules = {
        key: value for key, value in agg_rules.items() if key in df.columns
    }  # Финальная проверка наличия колонок

    try:
        aggregated = df.groupby("session_id").agg(agg_rules)
    except Exception as e:
        print(f"Ошибка при агрегации: {e}", file=sys.stderr)
        return pd.DataFrame()

    # === Логика переименования ===
    rename_map = {}
    for original_col_tuple in aggregated.columns.values:
        if isinstance(original_col_tuple, tuple):
            original_name = original_col_tuple[0]
            agg_name = original_col_tuple[1]
            col_key = original_col_tuple
            final_name = str(original_col_tuple)  # По умолчанию

            if original_name == "hit_number":
                if agg_name == "min":
                    final_name = "first_hit_number"
                elif agg_name == "max":
                    final_name = "last_hit_number"
                elif agg_name == "count":
                    final_name = "total_hits"
            elif original_name == "hit_time_2" and agg_name == "sum":
                final_name = "total_time"
            elif agg_name == "<lambda>":
                if original_name == "hit_page_path":
                    final_name = "entry_page"
                elif original_name == "event_category_grouped":
                    final_name = "main_category_grouped"
                elif original_name == "event_action_grouped":
                    final_name = "main_action_grouped"
                elif original_name == "hit_referer_encoded":
                    final_name = "main_referer"
                elif original_name == "event_label_encoded":
                    final_name = "main_label"
                elif original_name in utm_columns_in_hits:
                    final_name = original_name
                else:
                    final_name = f"{original_name}_lambda_agg"
            else:
                final_name = f"{original_name}_{agg_name}"
            rename_map[col_key] = final_name
        else:
            rename_map[original_col_tuple] = str(original_col_tuple)  # Для SingleIndex

    try:
        aggregated.columns = [
            rename_map.get(col, str(col)) for col in aggregated.columns
        ]
    except Exception as e:
        print(f"Ошибка при применении переименования столбцов: {e}", file=sys.stderr)
        aggregated.columns = [
            "_".join(col).strip("_") if isinstance(col, tuple) else str(col)
            for col in aggregated.columns.values
        ]

    aggregated = aggregated.reset_index()

    # print("Агрегация завершена.", file=sys.stderr)
    return aggregated


def optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame:
    """Оптимизирует типы данных DataFrame. In-place."""
    # print("  Оптимизация типов данных...", file=sys.stderr)
    # initial_memory = df.memory_usage(deep=True).sum() / (1024 * 1024)
    for col in df.columns:
        if df[col].dtype == "object":
            if df[col].nunique() / len(df) < 0.1:
                try:
                    df[col] = df[col].astype("category")
                except Exception as e:
                    print(
                        f"  Не удалось преобразовать '{col}' в категорию: {e}",
                        file=sys.stderr,
                    )
        elif pd.api.types.is_integer_dtype(df[col]):
            try:
                df[col] = pd.to_numeric(df[col], downcast="integer")
            except Exception as e:
                print(f"  Не удалось downcast '{col}' (integer): {e}", file=sys.stderr)
        elif pd.api.types.is_float_dtype(df[col]):
            try:
                df[col] = pd.to_numeric(df[col], downcast="float")
            except Exception as e:
                print(f"  Не удалось downcast '{col}' (float): {e}", file=sys.stderr)
    # final_memory = df.memory_usage(deep=True).sum() / (1024 * 1024)
    # print(f"  Память после оптимизации: {final_memory:.2f} MB", file=sys.stderr)
    return df


def merge_and_save(
    sessions_df: pd.DataFrame, hits_df: pd.DataFrame, output_pkl: str = None
) -> pd.DataFrame:
    """Объединяет сессии и хиты, опционально сохраняет. Возвращает DF."""
    # print("Начало объединения данных...", file=sys.stderr)
    sessions_df = optimize_dtypes(sessions_df)
    hits_df = optimize_dtypes(hits_df)

    if "session_id" not in sessions_df.columns or "session_id" not in hits_df.columns:
        print(
            "Ошибка: Столбец 'session_id' отсутствует в одном из датафреймов для объединения.",
            file=sys.stderr,
        )
        return pd.DataFrame()

    try:
        merged_df = pd.merge(sessions_df, hits_df, on="session_id", how="left")
        # print("Объединение завершено.", file=sys.stderr)
    except Exception as e:
        print(f"Ошибка при объединении данных: {e}", file=sys.stderr)
        return pd.DataFrame()

    merged_df = optimize_dtypes(merged_df)

    if output_pkl:
        # print(f"Сохранение данных в {output_pkl}...", file=sys.stderr)
        try:
            with open(output_pkl, "wb") as f:
                pickle.dump(merged_df, f, protocol=pickle.HIGHEST_PROTOCOL)
            # pkl_size = Path(output_pkl).stat().st_size / (1024 * 1024)
            # print(f"Файл сохранен. Размер: {pkl_size:.2f} MB", file=sys.stderr)
        except Exception as e:
            print(f"Ошибка при сохранении файла {output_pkl}: {e}", file=sys.stderr)

    # print("Объединение и сохранение завершены.", file=sys.stderr)
    return merged_df


def reduce_cardinality(
    df: pd.DataFrame, categorical_features: list, max_categories: int = 50
) -> pd.DataFrame:
    """Уменьшает кардинальность категориальных признаков."""
    df = df.copy()
    for col in categorical_features:
        if col in df.columns and df[col].dtype == "object":
            if df[col].nunique() > max_categories:
                top_categories = (
                    df[col].value_counts().nlargest(max_categories - 1).index
                )
                if "OTHER" in top_categories:
                    top_categories = (
                        df[col].value_counts().nlargest(max_categories).index
                    )
                    if "OTHER" in top_categories:
                        top_categories = top_categories.drop("OTHER")[
                            : max_categories - 1
                        ]
                df[col] = np.where(df[col].isin(top_categories), df[col], "OTHER")
                try:
                    df[col] = df[col].astype("category")
                except Exception as e:
                    print(
                        f"Не удалось преобразовать '{col}' в категорию после reduce_cardinality: {e}",
                        file=sys.stderr,
                    )
            elif df[col].nunique() > 0:
                try:
                    df[col] = df[col].astype("category")
                except Exception as e:
                    print(
                        f"Не удалось преобразовать '{col}' в категорию: {e}",
                        file=sys.stderr,
                    )
    return df


def filter_rare_classes(y: pd.Series, min_samples: int = 2) -> pd.Index:
    """Возвращает индекс классов с достаточным количеством образцов."""
    if not isinstance(y, pd.Series) or y.empty:
        return pd.Index([])
    value_counts = y.value_counts()
    valid_classes = value_counts[value_counts >= min_samples].index
    if pd.isna(valid_classes).any():
        valid_classes = valid_classes.dropna()
    return valid_classes


# ==============================================================================
# 2. Определение признаков для модели (Скопированы из полного пайплайна)
#    Примечание: Эти списки должны точно соответствовать тем, что использовались при обучении.
# ==============================================================================

numeric_features = [
    "visit_number",
    "day_of_week_num",
    "month",
    "day",
    "utm_source_encoded",
    "utm_campaign_encoded",
    "first_hit_number",
    "last_hit_number",
    "total_hits",
    "total_time",
    "main_referer",
    "main_label",
    "geo_country",
]

categorical_features = [
    "utm_medium",
    "device_category",
    "device_brand",
    "geo_city_grouped",
    "entry_page",
    "main_category_grouped",
]

# Имя целевого столбца (не признак)
target_column_name_for_prediction = "main_action_grouped"


# ==============================================================================
# 3. Функция предобработки данных (Скопирована из полного пайплайна)
#    Инкапсулирует весь пайплайн предобработки данных.
# ==============================================================================


def preprocess_data(
    ga_sessions_df_raw: pd.DataFrame, ga_hits_df_raw: pd.DataFrame
) -> pd.DataFrame:
    """
    Выполняет полную предобработку данных сессий и хитов, объединяет их
    и возвращает датафрейм, готовый для обучения или предсказания.
    Не включает начальную загрузку файлов и process_notset_none,
    а также финальное обучение/предсказание.
    """
    # print("\n--- Запуск функции preprocess_data ---", file=sys.stderr)

    # Создаем копии для работы внутри функции
    ga_sessions_filled = ga_sessions_df_raw.copy()
    ga_hits_filled = ga_hits_df_raw.copy()

    # --------------------------------------------------------------------------
    # Предобработка данных сессий (ga_sessions_filled)
    # --------------------------------------------------------------------------
    # print("-> Предобработка данных сессий...", file=sys.stderr)
    if "visit_date" in ga_sessions_filled.columns:
        try:
            ga_sessions_filled["visit_date"] = pd.to_datetime(
                ga_sessions_filled["visit_date"], errors="coerce"
            )  # errors='coerce' заменит некорректные даты на NaT
            ga_sessions_filled.dropna(
                subset=["visit_date"], inplace=True
            )  # Удалим строки с некорректными датами
            if not ga_sessions_filled.empty:
                ga_sessions_filled["year"] = ga_sessions_filled[
                    "visit_date"
                ].dt.year.astype("int16")
                ga_sessions_filled["month"] = ga_sessions_filled[
                    "visit_date"
                ].dt.month.astype("int16")
                ga_sessions_filled["day"] = ga_sessions_filled[
                    "visit_date"
                ].dt.day.astype("int16")
                ga_sessions_filled["day_of_week_num"] = ga_sessions_filled[
                    "visit_date"
                ].dt.dayofweek.astype("int16")
        except Exception as e:
            print(f"Ошибка при создании признаков даты: {e}", file=sys.stderr)

    ga_sessions_filled = fill_missing_by_groups(ga_sessions_filled)

    if "utm_source" in ga_sessions_filled.columns:
        ga_sessions_filled = encode_column_with_nulls(ga_sessions_filled, "utm_source")
    if "utm_campaign" in ga_sessions_filled.columns:
        ga_sessions_filled = encode_column_with_nulls(
            ga_sessions_filled, "utm_campaign"
        )
    if "device_model" in ga_sessions_filled.columns:
        ga_sessions_filled = encode_column_with_nulls(
            ga_sessions_filled, "device_model"
        )

    if "geo_country" in ga_sessions_filled.columns:
        ga_sessions_filled["geo_country"] = (
            ga_sessions_filled["geo_country"]
            .astype(str)
            .apply(lambda x: 1 if x == "Russia" else 0)
        )

    if "geo_city" in ga_sessions_filled.columns and not ga_sessions_filled.empty:
        threshold = len(ga_sessions_filled) * 0.01
        value_counts = ga_sessions_filled["geo_city"].value_counts()
        replace_dict = {
            category: "OTHER"
            for category in value_counts[value_counts < threshold].index
        }
        ga_sessions_filled["geo_city_grouped"] = ga_sessions_filled["geo_city"].replace(
            replace_dict
        )
    elif "geo_city" in ga_sessions_filled.columns:  # Handle empty df but column exists
        ga_sessions_filled["geo_city_grouped"] = "OTHER"

    # print("-> Предобработка данных сессий завершена.", file=sys.stderr)

    # --------------------------------------------------------------------------
    # Предобработка и агрегация данных хитов (ga_hits_aggregated)
    # --------------------------------------------------------------------------
    # print("\n-> Предобработка и агрегация данных хитов...", file=sys.stderr)
    ga_hits_filled = fill_hits_and_events(ga_hits_filled)

    if "hit_time" in ga_hits_filled.columns:
        ga_hits_filled["hit_time_2"] = ga_hits_filled["hit_time"].notna().astype(int)

    if "hit_referer" in ga_hits_filled.columns:
        ga_hits_filled = encode_column_with_nulls(ga_hits_filled, "hit_referer")

    if "event_category" in ga_hits_filled.columns and not ga_hits_filled.empty:
        threshold = len(ga_hits_filled) * 0.01
        value_counts = ga_hits_filled["event_category"].value_counts()
        replace_dict = {
            category: "OTHER"
            for category in value_counts[value_counts < threshold].index
        }
        ga_hits_filled["event_category_grouped"] = ga_hits_filled[
            "event_category"
        ].replace(replace_dict)
    elif (
        "event_category" in ga_hits_filled.columns
    ):  # Handle empty df but column exists
        ga_hits_filled["event_category_grouped"] = "OTHER"

    if "event_action" in ga_hits_filled.columns and not ga_hits_filled.empty:
        threshold = len(ga_hits_filled) * 0.01
        value_counts = ga_hits_filled["event_action"].value_counts()
        replace_dict = {
            category: "OTHER"
            for category in value_counts[value_counts < threshold].index
        }
        ga_hits_filled["event_action_grouped"] = ga_hits_filled["event_action"].replace(
            replace_dict
        )
    elif "event_action" in ga_hits_filled.columns:  # Handle empty df but column exists
        ga_hits_filled["event_action_grouped"] = "OTHER"

    if "event_label" in ga_hits_filled.columns:
        ga_hits_filled = encode_column_with_nulls(ga_hits_filled, "event_label")

    columns_to_drop_from_hits_filled = [
        "event_value",
        "hit_date",
        "hit_type",
        "hit_time",
        "hit_referer",
        "event_label",
    ]
    existing_cols_to_drop_hits = [
        col for col in columns_to_drop_from_hits_filled if col in ga_hits_filled.columns
    ]
    if existing_cols_to_drop_hits:
        ga_hits_filled = ga_hits_filled.drop(columns=existing_cols_to_drop_hits, axis=1)

    columns_to_drop_from_sessions_filled = [
        "visit_date",
        "visit_time",
        "utm_source",
        "utm_campaign",
        "utm_adcontent",
        "utm_keyword",
        "device_screen_resolution",
        "device_os",
        "geo_city",
        "year",
    ]
    existing_cols_to_drop_sessions = [
        col
        for col in columns_to_drop_from_sessions_filled
        if col in ga_sessions_filled.columns
    ]
    if existing_cols_to_drop_sessions:
        ga_sessions_filled = ga_sessions_filled.drop(
            columns=existing_cols_to_drop_sessions, axis=1
        )

    ga_hits_aggregated = aggregate_session_data(ga_hits_filled)
    del ga_hits_filled
    gc.collect()
    if ga_hits_aggregated.empty:
        print(
            "Предупреждение: ga_hits_aggregated пуст после агрегации.", file=sys.stderr
        )

    columns_to_drop_from_hits_aggregated = ["main_category", "main_action"]
    existing_cols_to_drop_hits_agg = [
        col
        for col in columns_to_drop_from_hits_aggregated
        if col in ga_hits_aggregated.columns
    ]
    if existing_cols_to_drop_hits_agg:
        ga_hits_aggregated = ga_hits_aggregated.drop(
            columns=existing_cols_to_drop_hits_agg, axis=1
        )

    # print("-> Предобработка и агрегация данных хитов завершена.", file=sys.stderr)

    # --------------------------------------------------------------------------
    # Объединение датафреймов
    # --------------------------------------------------------------------------
    # print("\n-> Объединение датафреймов...", file=sys.stderr)
    # Не сохраняем промежуточные файлы merged_data.pkl / merged_data_2.pkl в функции предобработки
    # Это опционально и должно быть вне этой функции, если нужно для отладки или хранения
    data = merge_and_save(
        ga_sessions_filled, ga_hits_aggregated, output_pkl=None
    )  # Убрал сохранение
    del ga_sessions_filled, ga_hits_aggregated
    gc.collect()

    if data.empty:
        print("Ошибка: Объединенный датафрейм пуст после слияния.", file=sys.stderr)
        return pd.DataFrame()

    columns_to_drop_from_data = [
        "device_model",
        "device_browser",
        "device_model_encoded",
    ]
    existing_cols_to_drop_data = [
        col for col in columns_to_drop_from_data if col in data.columns
    ]
    if existing_cols_to_drop_data:
        data = data.drop(columns=existing_cols_to_drop_data, axis=1)

    # Удаляем действия без пользователей (строки с NaN в 'first_hit_number')
    if "first_hit_number" in data.columns:
        data.dropna(subset=["first_hit_number"], inplace=True)
    # else:
    # print("'first_hit_number' столбец отсутствует.", file=sys.stderr)

    if data.empty:
        print(
            "Ошибка: Датафрейм пуст после финальной очистки (dropna).", file=sys.stderr
        )
        return pd.DataFrame()

    # print("\n--- Функция preprocess_data завершена ---", file=sys.stderr)
    return data


# ==============================================================================
# 4. Основной исполняемый код: Замер времени предсказания
# ==============================================================================

if __name__ == "__main__":

    print("Запуск скрипта для замера времени предсказания.", file=sys.stderr)
    print(
        "==============================================================================",
        file=sys.stderr,
    )

    # Настройка путей к валидационным данным и обученной модели
    file_path_hits_valid = "ga_hits_valid.pkl"  # Файл с валидационными хитами
    file_path_sessions_valid = "ga_sessions_valid.pkl"  # Файл с валидационными сессиями
    model_filepath = "lgbm_model.pkl"  # Файл с обученной моделью

    # Убедимся, что файлы существуют
    if not Path(file_path_hits_valid).exists():
        print(
            f"Ошибка: Валидационный файл хитов '{file_path_hits_valid}' не найден. Замер невозможен.",
            file=sys.stderr,
        )
        sys.exit(1)
    if not Path(file_path_sessions_valid).exists():
        print(
            f"Ошибка: Валидационный файл сессий '{file_path_sessions_valid}' не найден. Замер невозможен.",
            file=sys.stderr,
        )
        sys.exit(1)
    if not Path(model_filepath).exists():
        print(
            f"Ошибка: Файл обученной модели '{model_filepath}' не найден. Замер невозможен.",
            file=sys.stderr,
        )
        sys.exit(1)

    # --- Начало замера времени ---
    start_time = time.time()
    print("\n--- Начало замера времени ---", file=sys.stderr)

    # 1) Прочитать датафреймы
    print("Загрузка валидационных данных...", file=sys.stderr)
    sessions_valid_raw = None
    hits_valid_raw = None
    try:
        sessions_valid_raw = pd.read_pickle(file_path_sessions_valid)
        print(
            f"Файл '{file_path_sessions_valid}' успешно загружен. Размер: {len(sessions_valid_raw)} строк.",
            file=sys.stderr,
        )
    except Exception as e:
        print(f"Ошибка при загрузке '{file_path_sessions_valid}': {e}", file=sys.stderr)
        sys.exit(1)

    try:
        hits_valid_raw = pd.read_pickle(file_path_hits_valid)
        print(
            f"Файл '{file_path_hits_valid}' успешно загружен. Размер: {len(hits_valid_raw)} строк.",
            file=sys.stderr,
        )
    except Exception as e:
        print(f"Ошибка при загрузке '{file_path_hits_valid}': {e}", file=sys.stderr)
        sys.exit(1)

    if sessions_valid_raw.empty or hits_valid_raw.empty:
        print(
            "Ошибка: Загруженные валидационные датафреймы пусты. Замер невозможен.",
            file=sys.stderr,
        )
        sys.exit(1)

    # Применяем начальную предобработку 'not set'/'none'
    print(
        "Применение начальной предобработки 'not set'/'none' к валидационным данным...",
        file=sys.stderr,
    )
    process_notset_none(sessions_valid_raw)
    process_notset_none(hits_valid_raw)

    # 2) Выполнить весь пиплайн предобработки
    print(
        "\nВыполнение полного пайплайна предобработки на валидационных данных...",
        file=sys.stderr,
    )
    data_valid_preprocessed = preprocess_data(
        sessions_valid_raw.copy(), hits_valid_raw.copy()
    )

    # Освобождаем память от исходных валидационных данных
    del sessions_valid_raw, hits_valid_raw
    gc.collect()

    if data_valid_preprocessed.empty:
        print(
            "Ошибка: Данные после предобработки пусты. Предсказание невозможно.",
            file=sys.stderr,
        )
        sys.exit(1)

    # 3) Загрузить обученную модель
    print(f"\nЗагрузка обученной модели из '{model_filepath}'...", file=sys.stderr)
    model_pipeline = None
    try:
        with open(model_filepath, "rb") as f:
            model_pipeline = pickle.load(f)
        print("Модель успешно загружена.", file=sys.stderr)
    except Exception as e:
        print(f"Ошибка при загрузке модели: {e}", file=sys.stderr)
        sys.exit(1)

    # Подготовка данных для предсказания (удаление целевого столбца, если есть, и session_id)
    # preprocess_data не удаляет целевой столбец и session_id
    print("Подготовка данных для предсказания...", file=sys.stderr)
    X_valid = data_valid_preprocessed.drop(
        columns=[target_column_name_for_prediction, "session_id"], errors="ignore"
    )

    if X_valid.empty:
        print(
            "Ошибка: Нет признаков для предсказания после подготовки данных. Замер невозможен.",
            file=sys.stderr,
        )
        sys.exit(1)

    # 4) Выдача предикта
    print("\nВыполнение предсказания...", file=sys.stderr)
    predictions = None
    try:
        # Используйте .predict() для получения предсказанных классов
        predictions = model_pipeline.predict(X_valid)
        print("Предсказание выполнено.", file=sys.stderr)

        # Если нужны вероятности, используйте .predict_proba()
        # probabilities = model_pipeline.predict_proba(X_valid)
        # print("Предсказанные вероятности получены.")

    except Exception as e:
        print(f"Ошибка при выполнении предсказания: {e}", file=sys.stderr)
        sys.exit(1)

    # --- Конец замера времени ---
    end_time = time.time()
    elapsed_time = end_time - start_time

    print("\n--- Конец замера времени ---", file=sys.stderr)
    print(
        f"Общее время выполнения пайплайна (загрузка валидации -> предобработка -> загрузка модели -> предсказание): {elapsed_time:.4f} секунд",
        file=sys.stderr,
    )

    # Вывод первых нескольких предсказаний для проверки
    print("\nПервые 10 предсказанных значений:", file=sys.stderr)
    if predictions is not None:
        print(predictions[:10], file=sys.stderr)
    else:
        print("Предсказания не были получены.", file=sys.stderr)

    print(
        "\n==============================================================================",
        file=sys.stderr,
    )
    print("Скрипт замера времени предсказания завершен.", file=sys.stderr)
    print(
        "==============================================================================",
        file=sys.stderr,
    )