In [2]:
# @title Подключение к диску с данными
import os
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# @title Добавление новых фич (версия с разделением на 3 части улучшенный mcc target encoder и для 2 других столбцов)
import polars as pl
import gc
pl.Config.set_streaming_chunk_size(100_000)  # Ограничение чанков для streaming

# Пути
DATA_PATH = "/content/drive/MyDrive/ml-vtb-data-fusion-strazh/data/"
# TEMP_PATH = "/content/temp_te/"

PARTS = [1,2,3]

print("Вычисляем глобальное среднее target по всем частям (для сглаживания)")
global_sum = 0
global_cnt = 0
for i in PARTS:
    stats = pl.scan_parquet(f"{DATA_PATH}train_full_part_{i}.parquet") \
              .select(pl.sum("target"), pl.len()) \
              .collect()
    global_sum += stats[0, 0]
    global_cnt += stats[0, 1]
global_target_mean = global_sum / global_cnt
C = 15

for i in PARTS:
    # Загружаем lazyFrame
    df = pl.scan_parquet(f"{DATA_PATH}train_full_part_{i}.parquet")


    # 1. Базовые time-features
    df = df.with_columns([
        pl.col("event_dttm").dt.hour().alias("hour"),
        pl.col("event_dttm").dt.weekday().alias("dow"),
        pl.col("event_dttm").dt.date().alias("date"),
        (pl.col("operaton_amt").log1p()).alias("log_amt")
        ])


    # 2. Device risk score (с fill_null для null)
    df = df.with_columns([
        pl.col("compromised").cast(pl.Int32),
        pl.col("developer_tools").cast(pl.Int32),
        pl.col("web_rdp_connection").cast(pl.Int32),
        pl.col("phone_voip_call_state").cast(pl.Int32)
        ])

    df = df.with_columns(
        pl.sum_horizontal([
            "compromised",
            "developer_tools",
            "web_rdp_connection",
            "phone_voip_call_state"
            ]).alias("device_risk")
            )


    # 3. Rolling features по customer_id (отдельно agg + join)
    windows = ["1h", "24h", "7d"]

    for w in windows:
        # Сортируем (для rolling) так как перед каждой такой операцией нам нужна строгая сортировка
        df = df.sort(["customer_id", "event_dttm"])
        agg = df.rolling(
            index_column="event_dttm",    # временная колонка
            period=w,                      # размер окна
            group_by="customer_id",        # группировка
            closed="left",                 # не включаем текущую строку (только прошлое)
            ).agg([
                pl.col("event_id").count().alias(f"cnt_{w}"),
                pl.col("operaton_amt").sum().alias(f"sum_amt_{w}"),
                pl.col("operaton_amt").mean().alias(f"mean_amt_{w}"),
                pl.col("mcc_code").n_unique().alias(f"uniq_mcc_{w}"),
                ])

        # Join обратно к df
        df = df.join(agg, on=["customer_id", "event_dttm"], how="left")

    # Сохраняем intermediate после rolling (чтобы освободить память перед TE)
    print("Сохраняем intermediate после rolling...")
    df.collect(engine="streaming").write_parquet(f"/content/train_after_rolling_part_{i}.parquet", compression="zstd")
    del df
    gc.collect()

    # Перезагружаем lazy для TE
    df = pl.scan_parquet(f"/content/train_after_rolling_part_{i}.parquet")
    cat_cols = ["mcc_code", "event_type_nm", "channel_indicator_type"]
    for col in cat_cols:
        print(f"TE для {col} (expanding smoothing)...")

        # Сортируем для корректного кумулятивного суммирования
        df = df.sort(["customer_id", col, "event_dttm"])

        # Вычисляем кумулятивные суммы target и номер строки в группе
        df = df.with_columns([
            pl.col("target").cum_sum().over(["customer_id", col]).alias("cum_target_all"),
            pl.int_range(0, pl.len()).over(["customer_id", col]).alias("cum_cnt_all")  # 0,1,2,... в группе
        ])

        # Получаем сумму и количество ТОЛЬКО для предыдущих строк (исключая текущую)
        df = df.with_columns([
            (pl.col("cum_target_all") - pl.col("target")).alias("cum_target_prev"),
            pl.col("cum_cnt_all").alias("cum_cnt_prev")  # для первой строки = 0
        ])

        # Применяем сглаживание с глобальным средним
        df = df.with_columns(
            ((pl.col("cum_target_prev").fill_null(0) + C * global_target_mean) /
            (pl.col("cum_cnt_prev").fill_null(0) + C)).alias(f"te_{col}_30d")
        )

        # Удаляем временные колонки
        df = df.drop(["cum_target_all", "cum_cnt_all", "cum_target_prev", "cum_cnt_prev"])

        # Сохраняем промежуточный результат
        temp_file = f"/content/temp_te_{col}_part_{i}.parquet"
        df.collect(engine="streaming").write_parquet(temp_file, compression="zstd", row_group_size=1000000)
        print(f"Temp сохранён для {col}: {temp_file}")

        # Освобождаем память и перезагружаем для следующей колонки
        del df
        gc.collect()
        df = pl.scan_parquet(temp_file)


    # Финальный save
    print("Финальное сохранение...")
    df.sink_parquet(f"{DATA_PATH}train_features_part_{i}.parquet", compression="zstd", row_group_size=1000000)
    print("Все сохранено  !!!!!!!")

    del df
    gc.collect()

In [3]:
import polars as pl
import gc
import os
from google.colab import drive

# === Конфигурация ===
# Ограничение размера чанков для потоковой обработки
pl.Config.set_streaming_chunk_size(100_000)

# Проверяем и подключаем диск
if not os.path.exists('/content/drive'):
    print("Mounting Google Drive...")
    drive.mount('/content/drive')
else:
    print("Drive already mounted.")

# Пути к данным
DATA_PATH = "/content/drive/MyDrive/ml-vtb-data-fusion-strazh/data/"

# Проверка наличия данных
if os.path.exists(DATA_PATH):
    print(f"OK: Папка с данными найдена: {DATA_PATH}")
    try:
        files = os.listdir(DATA_PATH)
        print(f"Файлов в папке: {len(files)}")
        print("Пример файлов:", files[:5])
    except Exception as e:
        print(f"Ошибка при чтении папки: {e}")
else:
    print(f"ERROR: Папка не найдена: {DATA_PATH}")
    print("Проверьте путь. Содержимое /content/drive/MyDrive/:")
    try:
        print(os.listdir("/content/drive/MyDrive/")[:10])
    except Exception as e:
        print(f"Не удалось прочитать папку /content/drive/MyDrive/: {e}")

# Список частей для обработки
PARTS = [1, 2, 3]

# Параметр сглаживания для Target Encoding
C_SMOOTHING = 15

Drive already mounted.
OK: Папка с данными найдена: /content/drive/MyDrive/ml-vtb-data-fusion-strazh/data/
Файлов в папке: 18
Пример файлов: ['train_labels.parquet', 'sample_submit.csv', 'pretrain_part_1.parquet', 'train_part_3.parquet', 'pretrain_part_2.parquet']


In [9]:
def get_global_target_mean(parts, data_path):
    """
    Вычисляет глобальное среднее значение целевой переменной (target)
    по всем частям данных для сглаживания Target Encoding.
    """
    print("Вычисляем глобальное среднее target по всем частям...")
    global_sum = 0
    global_cnt = 0

    for i in parts:
        file_path = f"{data_path}train_full_part_{i}.parquet"
        stats = pl.scan_parquet(file_path) \
                  .select(pl.sum("target"), pl.len()) \
                  .collect()

        global_sum += stats[0, 0]
        global_cnt += stats[0, 1]

    global_mean = global_sum / global_cnt
    print(f"Global Target Mean: {global_mean:.6f}")
    return global_mean

In [10]:
def add_basic_features(df):
    """
    Добавляет базовые временные признаки и риск-факторы устройства.
    """
    # 1. Базовые time-features
    df = df.with_columns([
        pl.col("event_dttm").dt.hour().alias("hour"),
        pl.col("event_dttm").dt.weekday().alias("dow"),
        pl.col("event_dttm").dt.date().alias("date"),
        (pl.col("operaton_amt").log1p()).alias("log_amt")
    ])

    # 2. Device risk score
    df = df.with_columns([
        pl.col("compromised").cast(pl.Int32),
        pl.col("developer_tools").cast(pl.Int32),
        pl.col("web_rdp_connection").cast(pl.Int32),
        pl.col("phone_voip_call_state").cast(pl.Int32)
    ])

    df = df.with_columns(
        pl.sum_horizontal([
            "compromised",
            "developer_tools",
            "web_rdp_connection",
            "phone_voip_call_state"
        ]).alias("device_risk")
    )
    return df

def add_velocity_features(df):
    """
    Добавляет новые фичи: дельты времени/суммы и скорость операций.
    """
    # Сортировка важна для функций с shift и rolling
    df = df.sort(["customer_id", "event_dttm"])

    # 1. Delta features (разница с предыдущей транзакцией пользователя)
    df = df.with_columns([
        (pl.col("event_dttm") - pl.col("event_dttm").shift(1).over("customer_id")).dt.total_seconds().alias("time_delta_sec"),

        (pl.col("operaton_amt") - pl.col("operaton_amt").shift(1).over("customer_id")).alias("amt_delta"),
    ])

    # 2. Rolling velocity (сумма за час / время)
    # Используем .rolling() с указанием index_column для временного окна
    df = df.with_columns([
        # Сумма operaton_amt за предыдущий час
        pl.col("operaton_amt")
        .rolling(index_column="event_dttm", period="1h", closed="left")
        .agg(pl.col("operaton_amt").sum().alias(f"sum_amt_{w}"))
        .over("customer_id")
        .alias("roll_sum_amt"),

        # Среднее time_delta_sec за предыдущий час
        pl.col("time_delta_sec")
        .rolling(index_column="event_dttm", period="1h", closed="left")
        .mean()
        .over("customer_id")
        .alias("roll_mean_time")
    ])

    # 3. Скорость операций: сумма / средний интервал (с защитой от деления на 0)
    df = df.with_columns(
        (pl.col("roll_sum_amt") / (pl.col("roll_mean_time") + 1e-6))
        .alias("amt_velocity_1h")
    )


    return df

In [11]:
def add_rolling_aggregations(df):
    """
    Добавляет агрегации (count, sum, mean, unique) по окнам 1h, 24h, 7d.
    Использует rolling на DataFrame + join.
    """
    windows = ["1h", "24h", "7d"]

    for w in windows:
        # Сортировка обязательна перед rolling
        df = df.sort(["customer_id", "event_dttm"])

        agg = df.rolling(
            index_column="event_dttm",    # Временная колонка
            period=w,                     # Размер окна
            group_by="customer_id",       # Группировка
            closed="left",                # Не включаем текущую строку
        ).agg([
            pl.col("event_id").count().alias(f"cnt_{w}"),
            pl.col("operaton_amt").sum().alias(f"sum_amt_{w}"),
            pl.col("operaton_amt").mean().alias(f"mean_amt_{w}"),
            pl.col("mcc_code").n_unique().alias(f"uniq_mcc_{w}"),
        ])

        # Join обратно к основному датафрейму
        df = df.join(agg, on=["customer_id", "event_dttm"], how="left")

    return df



def apply_target_encoding(df, global_target_mean, smoothing_c, temp_path_prefix, part_id):
    """
    Применяет сглаженный Target Encoding с расширяющимся окном (expanding).
    Сохраняет промежуточные файлы для экономии памяти.
    """
    cat_cols = ["mcc_code", "event_type_nm", "channel_indicator_type"]

    for col in cat_cols:
        print(f"   TE для {col} (expanding smoothing)...")

        # Сортируем для корректного кумулятивного суммирования
        df = df.sort(["customer_id", col, "event_dttm"])

        # Вычисляем кумулятивные суммы target и счетчик
        df = df.with_columns([
            pl.col("target").cum_sum().over(["customer_id", col]).alias("cum_target_all"),
            pl.int_range(0, pl.len()).over(["customer_id", col]).alias("cum_cnt_all")
        ])

        # Берем значения только для предыдущих строк
        df = df.with_columns([
            (pl.col("cum_target_all") - pl.col("target")).alias("cum_target_prev"),
            pl.col("cum_cnt_all").alias("cum_cnt_prev")
        ])

        # Формула сглаживания
        df = df.with_columns(
            ((pl.col("cum_target_prev") + smoothing_c * global_target_mean) /
            (pl.col("cum_cnt_prev") + smoothing_c)).fill_nan(0).alias(f"te_{col}_30d")
        )

        # Удаляем временные колонки
        df = df.drop(["cum_target_all", "cum_cnt_all", "cum_target_prev", "cum_cnt_prev"])

        # Промежуточное сохранение для освобождения RAM
        temp_file = f"/content/temp_te_{col}_part_{part_id}.parquet"
        df.collect(engine="streaming").write_parquet(temp_file, compression="zstd", row_group_size=100_000)

        # Перезагрузка
        del df
        gc.collect()
        df = pl.scan_parquet(temp_file)

    return df

In [12]:
# === Основной пайплайн ===

# 1. Считаем глобальное среднее
global_target_mean = get_global_target_mean(PARTS, DATA_PATH)

for i in PARTS:
    print(f"\nОбработка части {i}...")

    # Загружаем данные
    df = pl.scan_parquet(f"{DATA_PATH}train_full_part_{i}.parquet")

    # Добавляем базовые фичи
    df = add_basic_features(df)

    # Добавляем новые фичи (Velocity и Delta)
    # df = add_velocity_features(df)

    # Добавляем Rolling агрегации (сохраняем промежуточно, т.к. это тяжелая операция)
    # Для rolling агрегаций нам нужен materialize (но здесь мы используем lazy df -> join -> lazy)
    # В Polars lazy operations накапливаются.

    # Чтобы применить rolling (который требует сортировки и join), лучше это делать поэтапно
    # Но функция add_rolling_aggregations возвращает LazyFrame (после join), так что все ок.
    df = add_rolling_aggregations(df)

    # Сохраняем промежуточный результат перед TE (Target Encoding тяжелый)
    intermediate_path = f"/content/train_after_rolling_part_{i}.parquet"
    print(f"   Сохранение промежуточного файла: {intermediate_path}")
    df.collect(engine="streaming").write_parquet(intermediate_path, compression="zstd")

    # Очистка памяти
    del df
    gc.collect()

    # Загружаем для Target Encoding
    df = pl.scan_parquet(intermediate_path)

    # Применяем TE
    df = apply_target_encoding(df, global_target_mean, C_SMOOTHING, "/content/", i)

    # Финальное сохранение
    final_path = f"{DATA_PATH}train_features_v2_part_{i}.parquet"
    print(f"   Финальное сохранение в: {final_path}")
    df.sink_parquet(final_path, compression="zstd", row_group_size=1_000_000)

    # Очистка
    del df
    gc.collect()
    print(f"Часть {i} готова!")

print("\nВсе части успешно обработаны!")

Вычисляем глобальное среднее target по всем частям...
Global Target Mean: -0.998378

Обработка части 1...
   Сохранение промежуточного файла: /content/train_after_rolling_part_1.parquet
   TE для mcc_code (expanding smoothing)...
   TE для event_type_nm (expanding smoothing)...
   TE для channel_indicator_type (expanding smoothing)...
   Финальное сохранение в: /content/drive/MyDrive/ml-vtb-data-fusion-strazh/data/train_features_v2_part_1.parquet
Часть 1 готова!

Обработка части 2...
   Сохранение промежуточного файла: /content/train_after_rolling_part_2.parquet
   TE для mcc_code (expanding smoothing)...
   TE для event_type_nm (expanding smoothing)...
   TE для channel_indicator_type (expanding smoothing)...
   Финальное сохранение в: /content/drive/MyDrive/ml-vtb-data-fusion-strazh/data/train_features_v2_part_2.parquet
Часть 2 готова!

Обработка части 3...
   Сохранение промежуточного файла: /content/train_after_rolling_part_3.parquet
   TE для mcc_code (expanding smoothing)...
   T