# AeroClub RecSys 2025 - XGBoost Ranking Baseline

This notebook implements an improved ranking approach using XGBoost and Polars for the AeroClub recommendation challenge.

In [None]:
# Выполняется ли локально
IS_LOCAL = True
IS_LOAD_MAP = False
num_boost_round = 1200

In [2]:
# %%capture

# if not IS_LOCAL:
#     !pip install -U xgboost
#     !pip install -U polars
#     !pip install -U lightgbm

In [3]:
import polars as pl
import numpy as np
from collections import defaultdict

import matplotlib.pyplot as plt
import time
import xgboost as xgb

import gc
import random
import pickle

from functools import reduce
import operator

from scipy.special import expit  # это σ(x), сигмоида

from datetime import datetime
from tqdm import tqdm

from copy import deepcopy

from scipy import sparse

import lightgbm as lgb

RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)

pl.Config.set_fmt_str_lengths(200)
pl.Config.set_tbl_rows(50)

polars.config.Config

## Helpers

In [4]:
def hitrate_at_3(y_true, y_pred, groups):
    df = pl.DataFrame({
        'group': groups,
        'pred': y_pred,
        'true': y_true
    })
    
    return (
        df.filter(pl.col("group").count().over("group") > 10)
        .sort(["group", "pred"], descending=[False, True])
        .group_by("group", maintain_order=True)
        .head(3)
        .group_by("group")
        .agg(pl.col("true").max())
        .select(pl.col("true").mean())
        .item()
    )

## Feature Engineering

### Обработка категориальных переменных

In [5]:
# Категориальные признаки
cat_features_final = [
    'nationality',
    'companyID',
    'corporateTariffCode',
    # 'bySelf',
    # 'sex',
    'legs0_segments0_aircraft_code',
    'legs0_segments0_arrivalTo_airport_city_iata',
    'legs0_segments0_arrivalTo_airport_iata',
    'legs0_segments0_departureFrom_airport_iata',
    'legs0_segments0_marketingCarrier_code',
    'legs0_segments0_operatingCarrier_code',
    'legs0_segments1_aircraft_code',
    'legs0_segments1_arrivalTo_airport_city_iata',
    'legs0_segments1_arrivalTo_airport_iata',
    'legs0_segments1_departureFrom_airport_iata',
    'legs0_segments1_marketingCarrier_code',
    'legs0_segments1_operatingCarrier_code',
    'legs0_segments2_aircraft_code',
    'legs0_segments2_arrivalTo_airport_city_iata',
    'legs0_segments2_arrivalTo_airport_iata',
    'legs0_segments2_departureFrom_airport_iata',
    'legs0_segments2_marketingCarrier_code',
    'legs0_segments2_operatingCarrier_code',
    'legs1_segments0_aircraft_code',
    'legs1_segments0_arrivalTo_airport_city_iata',
    'legs1_segments0_arrivalTo_airport_iata',
    'legs1_segments0_departureFrom_airport_iata',
    'legs1_segments0_marketingCarrier_code',
    'legs1_segments0_operatingCarrier_code',
    'legs1_segments1_aircraft_code',
    'legs1_segments1_arrivalTo_airport_city_iata',
    'legs1_segments1_arrivalTo_airport_iata',
    'legs1_segments1_departureFrom_airport_iata',
    'legs1_segments1_marketingCarrier_code',
    'legs1_segments1_operatingCarrier_code',
    'legs1_segments2_aircraft_code',
    'legs1_segments2_arrivalTo_airport_city_iata',
    'legs1_segments2_arrivalTo_airport_iata',
    'legs1_segments2_departureFrom_airport_iata',
    'legs1_segments2_marketingCarrier_code',
    'legs1_segments2_operatingCarrier_code',
    'legs0_segments0_flightNumber',
    'legs0_segments1_flightNumber',
    'legs0_segments2_flightNumber',
    'legs1_segments0_flightNumber',
    'legs1_segments1_flightNumber',
    'legs1_segments2_flightNumber',
]

In [6]:
def encode_categories(df: pl.DataFrame, cat_cols: list[str]) -> tuple[pl.DataFrame, dict[str, dict[str, int]]]:
    '''
    Создаёт маппинг для категориальных колонок и кодирует их в Int16,
    заменяя неизвестные на -1. Значение "missing" всегда кодируется как -1.
    '''
    cat_map = {}
    for col in cat_cols:
        unique_vals = df[col].drop_nulls().unique().to_list()
        
        # Определяем тип колонки
        col_dtype = df.schema[col]

        # Создаем маппинг в зависимости от типа данных
        if col_dtype == pl.Utf8:
            # Для строковых колонок: "missing" → -1
            unique_vals = [v for v in unique_vals if v != "missing"]
            mapping = {"missing": -1}
        else:
            # Для числовых колонок: -1 → -1
            unique_vals = [v for v in unique_vals if v != -1]
            mapping = {-1: -1}

        mapping.update({v: i for i, v in enumerate(unique_vals)})

        cat_map[col] = mapping
        max_index = -1  # используется как default для неизвестных

        df = df.with_columns([
            pl.col(col)
            .replace_strict(mapping, default=max_index)
            .cast(pl.Int16)
            .alias(col)
        ])

    return df, cat_map


def apply_category_map(df: pl.DataFrame, cat_map: dict[str, dict[str, int]]) -> pl.DataFrame:
    '''
    Применяет ранее созданный маппинг к другому DataFrame (например, test), подставляя -1 для unseen значений.
    '''
    for col, mapping in cat_map.items():
        #max_index = max(mapping.values()) + 1
        max_index = -1
        df = df.with_columns([
            pl.col(col)
            .replace_strict(mapping, default=max_index)
            .cast(pl.Int16)
            .alias(col)
        ])

    return df

    
def get_cat_map():
    global cat_features_final
    '''
    Возвращает мапинг категориальных фич по всему датасету трейн плюс тест
    '''
    train = pl.read_parquet('/kaggle/input/aeroclub-recsys-2025/train.parquet').drop('__index_level_0__')
    test = pl.read_parquet('/kaggle/input/aeroclub-recsys-2025/test.parquet').drop('__index_level_0__').with_columns(pl.lit(0, dtype=pl.Int64).alias("selected"))
    data_raw = pl.concat((train, test))
    del train, test
    
    data_raw, cat_map = encode_categories(data_raw, cat_features_final)

    return cat_map

### Создание исторических признаков по клиентам

In [7]:
# More efficient duration to minutes converter
def dur_to_min(col):
    # Extract days and time parts in one pass
    days = col.str.extract(r"^(\d+)\.", 1).cast(pl.Int64).fill_null(0) * 1440
    time_str = pl.when(col.str.contains(r"^\d+\.")).then(col.str.replace(r"^\d+\.", "")).otherwise(col)
    hours = time_str.str.extract(r"^(\d+):", 1).cast(pl.Int64).fill_null(0) * 60
    minutes = time_str.str.extract(r":(\d+):", 1).cast(pl.Int64).fill_null(0)
    return (days + hours + minutes).fill_null(0)


# def make_history_avg(df, source_col, target_col, group_col):
def make_history_avg(df, source_cols, group_col, suffix):
    '''
    Добавляет историческую информацию по выборам билетов для пользователя или компании
    для числовых колонок
    '''
    # global df_means
    
    # Создаем датафрейм с выбранными записями (selected=1)
    selected_df = df.filter(pl.col("selected") == 1).select(["ranker_id", "requestDate", group_col] + source_cols)
    
    # Создаем словари для быстрого доступа
    ranker_to_profile = dict(zip(
        selected_df["ranker_id"].to_list(),
        selected_df[group_col].to_list()
    ))

    # Словарь ranker_id -> requestDate
    ranker_to_timestamp = dict(zip(
        selected_df["ranker_id"].to_list(),
        selected_df["requestDate"].to_list()
    ))
    
    # Создаем датафрейм с историческими данными (исключая текущие группы)
    history_df = selected_df.select(["ranker_id", "requestDate", group_col] + source_cols)
    
    all_stats_dict = {}  # ranker_id -> {col_mean: val, col_std: val}
    
    # Получаем уникальные значения ranker_id
    unique_ranker_ids = df["ranker_id"].unique().to_list()
        
    # Проходим по каждой группе
    for current_ranker_id in tqdm(unique_ranker_ids, desc="Обработка ranker_id", mininterval=10.0):
        # Получаем profile_id из словаря
        current_profile_id = ranker_to_profile.get(current_ranker_id)

        # timestamp текущей группы
        current_timestamp = ranker_to_timestamp.get(current_ranker_id)
        
        # Фильтруем исторические данные для текущего профиля
        profile_history = history_df.filter(
            (pl.col(group_col) == current_profile_id) &
            (pl.col("ranker_id") != current_ranker_id)
            & (pl.col("requestDate") < current_timestamp)
        )

        
        # Вычисляем статистики для всех колонок сразу
        agg_result = profile_history.select([
            *[pl.col(col).mean().alias(f"{col}{suffix}_mean") for col in source_cols],
            *[pl.col(col).std().alias(f"{col}{suffix}_std") for col in source_cols],
            *[pl.col(col).count().cast(pl.Int16).alias(f"{col}{suffix}_count") for col in source_cols],  # Добавляем счётчики
            *[pl.col(col).median().alias(f"{col}{suffix}_median") for col in source_cols], 
            *[pl.col(col).quantile(0.25).alias(f"{col}{suffix}_q25") for col in source_cols],
            *[pl.col(col).quantile(0.75).alias(f"{col}{suffix}_q75") for col in source_cols]
        ])

        if agg_result.height > 0:
            row = agg_result.row(0)
            n_cols = len(source_cols)
            all_stats_dict[current_ranker_id] = {
                **{
                    f"{col}{suffix}_mean": row[i] if row[i] is not None else -1
                    for i, col in enumerate(source_cols)
                },
                **{
                    f"{col}{suffix}_std": row[i+n_cols] if row[i+n_cols] is not None else -1
                    for i, col in enumerate(source_cols)
                },
                **{
                    f"{col}{suffix}_count": row[i+2*n_cols] if row[i+2*n_cols] is not None else -1  # Добавляем счётчики
                    for i, col in enumerate(source_cols)
                },
                **{
                    f"{col}{suffix}_median": row[i+3*n_cols] if row[i+3*n_cols] is not None else -1  # Добавляем счётчики
                    for i, col in enumerate(source_cols)
                },
                **{
                    f"{col}{suffix}_q25": row[i+4*n_cols] if row[i+4*n_cols] is not None else -1  # Добавляем счётчики
                    for i, col in enumerate(source_cols)
                },
                **{
                    f"{col}{suffix}_q75": row[i+5*n_cols] if row[i+5*n_cols] is not None else -1  # Добавляем счётчики
                    for i, col in enumerate(source_cols)
                },
            }
        else:
            all_stats_dict[current_ranker_id] = {
                **{f"{col}{suffix}_mean": -1 for col in source_cols},
                **{f"{col}{suffix}_std": -1 for col in source_cols},
                **{f"{col}{suffix}_count": -1 for col in source_cols}  # По умолчанию 0 записей
                **{f"{col}{suffix}_median": -1 for col in source_cols}  # По умолчанию 0 записей
                **{f"{col}{suffix}_q25": -1 for col in source_cols}  # По умолчанию 0 записей
                **{f"{col}{suffix}_q75": -1 for col in source_cols}  # По умолчанию 0 записей
            }
    
    # Создаем датафрейм для обновления
    update_data = []
    for ranker_id, stats in all_stats_dict.items():
        row = {"ranker_id": ranker_id, **stats}
        update_data.append(row)
    
    update_df = pl.DataFrame(update_data)
    
    # Обновляем основной датафрейм
    df = df.join(update_df, on="ranker_id", how="left")
    
    # Заполняем пропуски и приводим типы
    for col in source_cols:
        df = df.with_columns([
            pl.col(f"{col}{suffix}_mean").fill_null(-1),
            pl.col(f"{col}{suffix}_std").fill_null(-1),
            pl.col(f"{col}{suffix}_count").cast(pl.Int16).fill_null(-1),
            pl.col(f"{col}{suffix}_median").fill_null(-1),
            pl.col(f"{col}{suffix}_q25").fill_null(-1),
            pl.col(f"{col}{suffix}_q75").fill_null(-1),
        ])

    # Создаем список агрегаций
    agg_exprs = []

    for col in source_cols:
        # Среднее значение
        agg_exprs.append(pl.col(col).mean().alias(f"{col}{suffix}_mean"))
    for col in source_cols:
        # Стандартное отклонение
        agg_exprs.append(pl.col(col).std().alias(f"{col}{suffix}_std"))
    for col in source_cols:
        # Количество записей (не null)
        agg_exprs.append(pl.col(col).count().alias(f"{col}{suffix}_count"))
    for col in source_cols:
        # Количество записей (не null)
        agg_exprs.append(pl.col(col).median().alias(f"{col}{suffix}_median"))
    for col in source_cols:
        # Количество записей (не null)
        agg_exprs.append(pl.col(col).quantile(0.25).alias(f"{col}{suffix}_q25"))
    for col in source_cols:
        # Количество записей (не null)
        agg_exprs.append(pl.col(col).quantile(0.75).alias(f"{col}{suffix}_q75"))

    # Применяем агрегации
    df_stats = (
        df.filter(pl.col("selected") == 1)
        .group_by(group_col)
        .agg(agg_exprs)
    )
    # df_stats = to_32(df_stats)

    return df, df_stats

### Генерация новых признаков

In [None]:
def make_fetures(df, selected_df, split_groups=False, is_train=False):
    global df_stats_pr, df_stats_co, profile_counts
    
    # Time features - batch process
    time_exprs = []
    for col in ("legs0_departureAt", "legs0_arrivalAt", "legs1_departureAt", "legs1_arrivalAt"):
        if col in df.columns:
            dt = pl.col(col).str.to_datetime(strict=False)
            h = dt.dt.hour().fill_null(12)
            time_exprs.extend([
                h.alias(f"{col}_hour"),
                dt.dt.weekday().fill_null(0).alias(f"{col}_weekday"),
                (((h >= 6) & (h <= 9)) | ((h >= 17) & (h <= 20))).cast(pl.Int32).alias(f"{col}_business_time")
            ])
    if time_exprs:
        df = df.with_columns(time_exprs)

    # Process duration columns
    dur_cols = ["legs0_duration", "legs1_duration"] + [f"legs{l}_segments{s}_duration" for l in (0, 1) for s in (0, 1, 2)]
    dur_exprs = [dur_to_min(pl.col(c)).alias(c) for c in dur_cols if c in df.columns]

    # Apply duration transformations first
    if dur_exprs:
        df = df.with_columns(dur_exprs)

    # Precompute marketing carrier columns check
    mc_cols = [f'legs{l}_segments{s}_marketingCarrier_code' for l in (0, 1) for s in range(4)]
    mc_exists = [col for col in mc_cols if col in df.columns]

    # Combine all initial transformations
    df = df.with_columns([
            # Price features
            (pl.col("totalPrice") / (pl.col("taxes") + 1)).alias("price_per_tax"),
            (pl.col("taxes") / (pl.col("totalPrice") + 1)).alias("tax_rate"),
            pl.col("totalPrice").log1p().alias("log_price"),
            
            # Duration features
            (pl.col("legs0_duration").fill_null(0) + pl.col("legs1_duration").fill_null(0)).alias("total_duration"),
            pl.when(pl.col("legs1_duration").fill_null(0) > 0)
                .then(pl.col("legs0_duration") / (pl.col("legs1_duration") + 1))
                .otherwise(1.0).alias("duration_ratio"),
            
            # Trip type
            (pl.col("legs1_duration").is_null() | 
            (pl.col("legs1_duration") == 0) | 
            pl.col("legs1_segments0_departureFrom_airport_iata").is_null()).cast(pl.Int32).alias("is_one_way"),
            
            # Total segments count
            (pl.sum_horizontal(pl.col(col).is_not_null().cast(pl.UInt8) for col in mc_exists) 
            if mc_exists else pl.lit(0)).alias("l0_seg"),
            
            # FF features
            (pl.col("frequentFlyer").fill_null("").str.count_matches("/") + 
            (pl.col("frequentFlyer").fill_null("") != "").cast(pl.Int32)).alias("n_ff_programs"),
            
            # Binary features
            pl.col("corporateTariffCode").is_not_null().cast(pl.Int32).alias("has_corporate_tariff"),
            (pl.col("pricingInfo_isAccessTP") == 1).cast(pl.Int32).alias("has_access_tp"),
            
            # Baggage & fees
            (pl.col("legs0_segments0_baggageAllowance_quantity").fill_null(0) + 
            pl.col("legs1_segments0_baggageAllowance_quantity").fill_null(0)).alias("baggage_total"),

            (
                (pl.col("miniRules0_monetaryAmount") == 0)
                & (pl.col("miniRules0_statusInfos") == 1)
            )
            .cast(pl.Int8)
            .alias("free_cancel"),
            (
                (pl.col("miniRules1_monetaryAmount") == 0)
                & (pl.col("miniRules1_statusInfos") == 1)
            )
            .cast(pl.Int8)
            .alias("free_exchange"),
            
            # Routes & carriers
            pl.col("searchRoute").is_in(["MOWLED/LEDMOW", "LEDMOW/MOWLED", "MOWLED", "LEDMOW", "MOWAER/AERMOW"])
                .cast(pl.Int32).alias("is_popular_route"),
            
            # Cabin
            pl.mean_horizontal(["legs0_segments0_cabinClass", "legs1_segments0_cabinClass"]).alias("avg_cabin_class"),
            (pl.col("legs0_segments0_cabinClass").fill_null(0) - 
            pl.col("legs1_segments0_cabinClass").fill_null(0)).alias("cabin_class_diff"),
    ])

    # Segment counts - more efficient
    seg_exprs = []
    for leg in (0, 1):
        seg_cols = [f"legs{leg}_segments{s}_duration" for s in range(4) if f"legs{leg}_segments{s}_duration" in df.columns]
        if seg_cols:
            seg_exprs.append(
                pl.sum_horizontal(pl.col(c).is_not_null() for c in seg_cols)
                    .cast(pl.Int32).alias(f"n_segments_leg{leg}")
            )
        else:
            seg_exprs.append(pl.lit(0).cast(pl.Int32).alias(f"n_segments_leg{leg}"))

    # Add segment-based features
    # First create segment counts
    df = df.with_columns(seg_exprs)

    # Then use them for derived features
    df = df.with_columns([
        (pl.col("n_segments_leg0") + pl.col("n_segments_leg1")).alias("total_segments"),
        (pl.col("n_segments_leg0") == 1).cast(pl.Int32).alias("is_direct_leg0"),
        pl.when(pl.col("is_one_way") == 1).then(0)
            .otherwise((pl.col("n_segments_leg1") == 1).cast(pl.Int32)).alias("is_direct_leg1"),
    ])

    # More derived features
    df = df.with_columns([
        (pl.col("is_direct_leg0") & pl.col("is_direct_leg1")).cast(pl.Int32).alias("both_direct"),
        ((pl.col("isVip") == 1) | (pl.col("n_ff_programs") > 0)).cast(pl.Int32).alias("is_vip_freq"),
        (pl.col("baggage_total") > 0).cast(pl.Int32).alias("has_baggage"),
        pl.col("Id").count().over("ranker_id").alias("group_size"),
    ])

    # Add major carrier flag if column exists
    if "legs0_segments0_marketingCarrier_code" in df.columns:
        df = df.with_columns(
            pl.col("legs0_segments0_marketingCarrier_code").is_in(["SU", "S7", "U6"])
                .cast(pl.Int32).alias("is_major_carrier")
        )
    else:
        df = df.with_columns(pl.lit(0).alias("is_major_carrier"))

    df = df.with_columns(pl.col("group_size").log1p().alias("group_size_log"))

    # Batch rank computations - more efficient with single pass
    # First apply the columns that will be used for ranking
    df = df.with_columns([
        pl.col("group_size").log1p().alias("group_size_log"),
    ])

    # Price and duration basic ranks
    rank_exprs = []
    for col, alias in [("totalPrice", "price"), ("total_duration", "duration")]:
        rank_exprs.append(pl.col(col).rank().over("ranker_id").alias(f"{alias}_rank"))

    # Price-specific features
    price_exprs = [
        (pl.col("totalPrice").rank("average").over("ranker_id") / 
        pl.col("totalPrice").count().over("ranker_id")).alias("price_pct_rank"),
        (pl.col("totalPrice") == pl.col("totalPrice").min().over("ranker_id")).cast(pl.Int32).alias("is_cheapest"),
        ((pl.col("totalPrice") - pl.col("totalPrice").median().over("ranker_id")) / 
        (pl.col("totalPrice").std().over("ranker_id") + 1)).alias("price_from_median"),
        ((pl.col("total_duration") - pl.col("total_duration").median().over("ranker_id")) / 
        (pl.col("total_duration").std().over("ranker_id") + 1)).alias("duration_from_median"),
        (pl.col("l0_seg") == pl.col("l0_seg").min().over("ranker_id")).cast(pl.Int32).alias("is_min_segments"),
    ]

    # Apply initial ranks
    df = df.with_columns(rank_exprs + price_exprs)

    # Cheapest direct - more efficient
    direct_cheapest = (
        df.filter(pl.col("is_direct_leg0") == 1)
        .group_by("ranker_id")
        .agg(pl.col("totalPrice").min().alias("min_direct"))
    )

    df = df.join(direct_cheapest, on="ranker_id", how="left").with_columns(
        ((pl.col("is_direct_leg0") == 1) & 
        (pl.col("totalPrice") == pl.col("min_direct"))).cast(pl.Int32).fill_null(0).alias("is_direct_cheapest")
    ).drop("min_direct")

    # Popularity features - efficient join
    df = (
        df.join(
            df.group_by('legs0_segments0_marketingCarrier_code').agg(pl.mean('selected').alias('carrier0_pop')),
            on='legs0_segments0_marketingCarrier_code', 
            how='left'
        )
        .join(
            df.group_by('legs1_segments0_marketingCarrier_code').agg(pl.mean('selected').alias('carrier1_pop')),
            on='legs1_segments0_marketingCarrier_code', 
            how='left'
        )
        .with_columns([
            pl.col('carrier0_pop').fill_null(0.0),
            pl.col('carrier1_pop').fill_null(0.0),
        ])
    )

    # Final features including popularity
    df = df.with_columns([
        (pl.col('carrier0_pop') * pl.col('carrier1_pop')).alias('carrier_pop_product'),
    ])

    source_cols=['legs0_departureAt_hour','legs1_departureAt_hour','legs0_arrivalAt_hour','legs1_arrivalAt_hour',
                 'price_rank', 'price_from_median', 'duration_rank', 'avg_cabin_class',
                 'baggage_total', 'l0_seg',
                 'legs0_segments0_seatsAvailable', 'legs0_segments0_baggageAllowance_quantity', 'legs0_segments0_cabinClass',
                 'miniRules1_statusInfos', 'miniRules0_statusInfos',
                 'duration_from_median',
                 ]

    if is_train:
        print("avg на начало:", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

        df, df_stats_pr = make_history_avg(df, source_cols=source_cols, group_col="profileId", suffix='_pr')
        df, df_stats_co = make_history_avg(df, source_cols=source_cols, group_col="companyID", suffix='_co')
        print("avg на завершение:", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        selected_df = df.filter(pl.col("selected") == 1)

        # Считаем количество уникальных ranker_id для каждого profileId
        # и сохраняем в profile_counts для использования в валидационном или тестовом датасете
        profile_counts = (
            df
            .group_by("profileId")
            .agg(pl.col("ranker_id").n_unique().alias("ranker_count"))
        )

        # Делаем счетчики с нуля не используя
        df = df.with_columns(
            pl.col("ranker_id")
            .n_unique()
            .over("profileId")
            .cast(pl.Int16)
            .alias("ranker_count")
        )

        # # Присоединяем обратно к исходному df
        # df = df.join(profile_counts, on="profileId")

    else:
        # Присоединяем статистики фичей по отдельным клиентам
        df = df.join(
            df_stats_pr,
            on="profileId",
            how="left"
        )

        # Присоединяем статистики фичей по компаниям клиентов
        df = df.join(
            df_stats_co,
            on="companyID",
            how="left"
        )

        # Присоединяем profile_counts к new_df
        df = df.join(profile_counts, on="profileId", how="left")

        # Заполняем пропуски 1
        df = df.with_columns(
            pl.col("ranker_count").fill_null(1)
        )

    print ('Обрабатываем пропуски')
    
    for col in df.select(pl.selectors.numeric()).columns:
        df = df.with_columns(pl.col(col).fill_null(-1).alias(col))

    for col in df.select(pl.selectors.string()).columns:
        df = df.with_columns(pl.col(col).fill_null("missing").alias(col))

    return df, selected_df

## Feature Selection

In [9]:
def select_fetures(data):
    # Возвращает data еще со всеми столбцами
    # В feature_cols указаны те столбцы что нужно потом оставить для обучения

    # Columns to exclude (uninformative or problematic)
    exclude_cols = [
        'Id', 'ranker_id', 'selected', 'profileId',     'requestDate',
        'legs0_departureAt', 'legs0_arrivalAt', 'legs1_departureAt', 'legs1_arrivalAt',
        'miniRules0_percentage', 'miniRules1_percentage',  # >90% missing
        'frequentFlyer',  # Already processed
        'group_size',
        'searchRoute',
        # Exclude constant columns
        'pricingInfo_passengerCount', 
    ]

    # Exclude segment 3 columns (>98% missing)
    for leg in [0, 1]:
        for seg in [3]:
            for suffix in ['aircraft_code', 'arrivalTo_airport_city_iata', 'arrivalTo_airport_iata',
                        'baggageAllowance_quantity', 'baggageAllowance_weightMeasurementType',
                        'cabinClass', 'departureFrom_airport_iata', 'duration', 'flightNumber',
                        'marketingCarrier_code', 'operatingCarrier_code', 'seatsAvailable']:
                exclude_cols.append(f'legs{leg}_segments{seg}_{suffix}')

    # # Исключаем слишком уникальные фичи
    # for leg in [0, 1]:
    #     for seg in [0, 1, 2]:
    #         for suffix in ['flightNumber',
    #                        #'arrivalTo_airport_city_iata', 'arrivalTo_airport_iata', 'departureFrom_airport_iata'
    #                        ]:
    #             exclude_cols.append(f'legs{leg}_segments{seg}_{suffix}')

    feature_cols = [col for col in data.columns if col not in exclude_cols]
    
    print(f"Using {len(feature_cols)} features ({len(cat_features_final)} categorical)")
    groups = data.select('ranker_id')

    # return data, groups, feature_cols
    return data, groups, feature_cols#, cat_features_final


## Подготовка к тренировке моделей

### Prepair Data

In [10]:
# Load data
train = pl.read_parquet('/kaggle/input/aeroclub-recsys-2025/train.parquet').drop('__index_level_0__')

n2 = train.height
n1 = 14501077 if IS_LOCAL else train.height

if IS_LOCAL:
    validate=train[n1:n2]
    train = train[:n1]

In [11]:
requestDate_tr = train.select('requestDate')
profileId_tr =  train.select('profileId')
# Создаем новые признаки
train, train_selected_df = make_fetures(train,
                                        selected_df = None,
                                        is_train=True)

# Выбираем необходимые для дальнейшего обучения признаки
train, groups_tr, feature_cols = select_fetures(train)

print('feature_cols:', feature_cols)

y_tr = train.select('selected')
train = train.select(feature_cols)
# Кодируем train
train, cat_mapping = encode_categories(train, cat_features_final)

avg на начало: 2025-08-15 16:42:43


Обработка ranker_id: 100%|██████████| 105539/105539 [07:31<00:00, 233.65it/s]
Обработка ranker_id: 100%|██████████| 105539/105539 [07:56<00:00, 221.31it/s]


avg на завершение: 2025-08-15 16:58:21
Обрабатываем пропуски
Using 330 features (45 categorical)
feature_cols: ['bySelf', 'companyID', 'corporateTariffCode', 'nationality', 'isAccess3D', 'isVip', 'legs0_duration', 'legs0_segments0_aircraft_code', 'legs0_segments0_arrivalTo_airport_city_iata', 'legs0_segments0_arrivalTo_airport_iata', 'legs0_segments0_baggageAllowance_quantity', 'legs0_segments0_baggageAllowance_weightMeasurementType', 'legs0_segments0_cabinClass', 'legs0_segments0_departureFrom_airport_iata', 'legs0_segments0_duration', 'legs0_segments0_flightNumber', 'legs0_segments0_marketingCarrier_code', 'legs0_segments0_operatingCarrier_code', 'legs0_segments0_seatsAvailable', 'legs0_segments1_aircraft_code', 'legs0_segments1_arrivalTo_airport_city_iata', 'legs0_segments1_arrivalTo_airport_iata', 'legs0_segments1_baggageAllowance_quantity', 'legs0_segments1_baggageAllowance_weightMeasurementType', 'legs0_segments1_cabinClass', 'legs0_segments1_departureFrom_airport_iata', 'legs0_s

In [12]:
def get_dmatrix(df, split_groups=False):
    '''
    Генерирует DMatrix для XGBoost
    '''
    df, _ = make_fetures(df, selected_df = train_selected_df, split_groups=split_groups)
    df, groups_df, feature_cols = select_fetures(df)

    y_df = df.select('selected')
    df = df.select(feature_cols)
    # Применяем маппинг к df
    df = apply_category_map(df, cat_mapping)
    group_sizes_df = groups_df.group_by('ranker_id').agg(pl.len()).sort('ranker_id')['len'].to_numpy()
    feature_names=list(df.columns)
    
    d_df = xgb.DMatrix(df, label=y_df, missing=-1, group=group_sizes_df, feature_names=feature_names)

    return d_df, y_df, groups_df, df

In [13]:
if IS_LOCAL:
    validate = validate.filter(
        pl.len().over("ranker_id") > 10
    )
    dval, y_va, groups_va, validate = get_dmatrix(validate, split_groups=True)

### Optuna

In [14]:
IS_OPTUNA_XGB = False

if IS_OPTUNA_XGB:
    import optuna
    
    train_source = train.select(feature_cols).with_columns(
        groups_tr["ranker_id"],
        y_tr["selected"],
        requestDate_tr["requestDate"]
    )


    MAX_PER_GROUP = 50

    def objective(trial):
        params = {
            #'objective': 'rank:ndcg',
            'objective': 'rank:pairwise',
            'eval_metric': 'ndcg@3',
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.05),
            'max_depth': trial.suggest_int('max_depth', 3, 50),
            'min_child_weight': trial.suggest_int('min_child_weight', 1, 60),
            'subsample': trial.suggest_float('subsample', 0.9, 1.0),
            'colsample_bytree': trial.suggest_float('colsample_bytree', 0.1, 0.6),
            'lambda': trial.suggest_float('lambda', 0.0, 15.0),
            'alpha': trial.suggest_float('alpha', 0.0, 5.0),
            'gamma': trial.suggest_float('gamma', 0, 7),
            'n_jobs': -1,
            'verbosity': 0,
            'seed': 42
        }

        train = train_source.clone()

        rng = np.random.default_rng(RANDOM_STATE)

        # Получаем уникальные ranker_id
        unique_ids = train.select("ranker_id").unique()
        shifts = rng.integers(-2, 11, size=unique_ids.height)  # от -2 до 30 включительно

        # Создаем таблицу с ranker_id и соответствующим сдвигом
        shift_table = unique_ids.with_columns([
            pl.Series("group_shift", shifts)
        ])

        # Добавляем в train индекс строки и объединяем с shift_table по ranker_id
        rand_series = pl.Series("rand", rng.random(len(train)))
        train = (
            train
            .with_row_index("row_idx")
            .with_columns(rand_series)
            .join(shift_table, on="ranker_id", how="left")  # добавляем group_shift
            .with_columns([
                pl.len().over("ranker_id").alias("grp_size"),
                pl.col("rand").rank(method="dense").over("ranker_id").alias("rand_rank"),
                (MAX_PER_GROUP + pl.col("group_shift")).alias("adjusted_max")
            ])
            .filter(
                (pl.col("grp_size") <= MAX_PER_GROUP) |             # маленькие группы — целиком
                (pl.col("selected") == 1) |                         # выбранные строки всегда
                (pl.col("rand_rank") <= pl.col("adjusted_max"))     # большие группы — отбираем с учетом смещения
            )
            .sort("row_idx")
            .drop(["grp_size", "rand", "rand_rank", "row_idx", "adjusted_max", "group_shift", "requestDate"])
        )
        
        y_tr = train.select('selected')
        groups_tr = train.select('ranker_id')
        train = train.drop(["selected", "ranker_id"])

        group_sizes_tr = groups_tr.group_by('ranker_id').agg(pl.len()).sort('ranker_id')['len'].to_numpy()
        dtrain = xgb.DMatrix(train, label=y_tr, group=group_sizes_tr, feature_names=list(train.columns))

        model = xgb.train(
            params,
            dtrain,
            num_boost_round=800,
            #num_boost_round=100,
            evals=[(dval, 'val')],
            #early_stopping_rounds=200,
            verbose_eval=False
        )
        
        xgb_va_preds = model.predict(dval)
        xgb_hr3 = hitrate_at_3(y_va, xgb_va_preds, groups_va)
        return -xgb_hr3

    # Создание и запуск Optuna
    study = optuna.create_study(direction='minimize')
    study.optimize(objective, n_trials=300)

    print("Best trial:")
    print(study.best_trial.params)

In [15]:
IS_OPTUNA_LGB = False

if IS_OPTUNA_LGB:
    import optuna
    
    train_source = train.select(feature_cols).with_columns(
        groups_tr["ranker_id"],
        y_tr["selected"],
        requestDate_tr["requestDate"]
    )

    MAX_PER_GROUP = 50

    def objective(trial):
        params = {
            'objective': 'lambdarank',
            'metric': 'ndcg',
            'ndcg_eval_at': [3],
            
            # Дерево
            'num_leaves': trial.suggest_int('num_leaves', 64, 2048),  # ~ depth 6-10
            'min_child_samples': trial.suggest_int('min_child_samples', 2, 50),
            
            # Сэмплирование
            'subsample': trial.suggest_float('subsample', 0.7, 1.0),
            'colsample_bytree': trial.suggest_float('colsample_bytree', 0.2, 0.8),
            
            # Регуляризация
            'lambda_l2': trial.suggest_float('lambda_l2', 1.0, 50.0, log=True),
            'lambda_l1': trial.suggest_float('lambda_l1', 0.0, 1.0),
            
            # Обучение
            'learning_rate': trial.suggest_float('learning_rate', 0.002, 0.05),
            'bagging_freq': 1,
            'force_row_wise': trial.suggest_categorical('force_row_wise', [True, False]),
            'verbosity': -1,
            'seed': RANDOM_STATE,
            'n_jobs': -1,
        }

        train = train_source.clone()

        rng = np.random.default_rng(RANDOM_STATE)

        # Получаем уникальные ranker_id
        unique_ids = train.select("ranker_id").unique()
        shifts = rng.integers(-2, 11, size=unique_ids.height)  # от -2 до 30 включительно

        # Создаем таблицу с ranker_id и соответствующим сдвигом
        shift_table = unique_ids.with_columns([
            pl.Series("group_shift", shifts)
        ])

        # Добавляем в train индекс строки и объединяем с shift_table по ranker_id
        rand_series = pl.Series("rand", rng.random(len(train)))
        train = (
            train
            .with_row_index("row_idx")
            .with_columns(rand_series)
            .join(shift_table, on="ranker_id", how="left")  # добавляем group_shift
            .with_columns([
                pl.len().over("ranker_id").alias("grp_size"),
                pl.col("rand").rank(method="dense").over("ranker_id").alias("rand_rank"),
                (MAX_PER_GROUP + pl.col("group_shift")).alias("adjusted_max")
            ])
            .filter(
                (pl.col("grp_size") <= MAX_PER_GROUP) |             # маленькие группы — целиком
                (pl.col("selected") == 1) |                         # выбранные строки всегда
                (pl.col("rand_rank") <= pl.col("adjusted_max"))     # большие группы — отбираем с учетом смещения
            )
            .sort("row_idx")
            .drop(["grp_size", "rand", "rand_rank", "row_idx", "adjusted_max", "group_shift", "requestDate"])
        )
        
        y_tr = train.select('selected').to_numpy().flatten()
        groups_tr = train.select('ranker_id')
        train = train.drop(["selected", "ranker_id"])
        # -----------------------------------

        # Подготовка данных для LightGBM
        train_data = lgb.Dataset(
            train,
            label=y_tr,
            group=groups_tr.group_by('ranker_id').agg(pl.len()).sort('ranker_id')['len'].to_list(),
            feature_name=list(train.columns),
            # categorical_feature=cat_features_final,
        )

        if IS_LOCAL:
            val_data = lgb.Dataset(
                validate,
                label=y_va.to_numpy().flatten(),
                group=groups_va.group_by('ranker_id').agg(pl.len()).sort('ranker_id')['len'].to_list(),
                reference=train_data,
                feature_name=list(validate.columns),
                #categorical_feature=cat_features_final,
            )
        else:
            val_data = None
        
        # Обучаем модель
        model = lgb.train(
            params,
            train_data,
            num_boost_round=800,
            # valid_sets=val_data,
            # callbacks=[lgb.log_evaluation(50)] if IS_LOCAL else None
            # callbacks=[biased_bagging_callback(neg_fraction=0.8)]
        )

        lgb_va_preds_1 = model.predict(validate)
        # # Evaluate XGBoost
        lgb_hr3 = hitrate_at_3(y_va, lgb_va_preds_1, groups_va)
        return -lgb_hr3

    # Создание и запуск Optuna
    study = optuna.create_study(direction='minimize')
    study.optimize(objective, n_trials=300)

    print("Best trial:")
    print(study.best_trial.params)

## Тренировка моделей

In [16]:
# Исходные данные до удаления столбцов и строк для тренировки
train_source = train.select(feature_cols).with_columns(
    groups_tr["ranker_id"],
    y_tr["selected"],
    requestDate_tr["requestDate"],
    profileId_tr["profileId"]
)

if IS_LOCAL:
    # Исходные данные до удаления столбцов и строк для валидации
    val_source = validate.with_columns(
        groups_va["ranker_id"],
        y_va["selected"],
    )

In [17]:
def drop_random_groups(df: pl.DataFrame, frac: float, group_col: str = "ranker_id") -> pl.DataFrame:
    """
    Удаляет случайную долю групп из датафрейма.
    
    Parameters
    ----------
    df : pl.DataFrame
        Исходный DataFrame.
    frac : float
        Доля групп, которую нужно удалить (0 < frac <= 1).
    group_col : str
        Название колонки, по которой определяются группы.
    
    Returns
    -------
    pl.DataFrame
        DataFrame без удалённых групп.
    """
    if not (0 < frac <= 1):
        raise ValueError("frac должно быть в диапазоне (0, 1].")
    
    # Уникальные группы
    unique_groups = df.select(group_col).unique()[group_col].to_list()
    
    # Кол-во групп для удаления (минимум 1)
    n_remove = max(1, int(len(unique_groups) * frac))
    
    # Случайный выбор групп
    groups_to_remove = random.sample(unique_groups, n_remove)
    
    # Фильтрация
    return df.filter(~pl.col(group_col).is_in(groups_to_remove))

In [18]:
def make_short_groups(max_per_group=40):
    '''Делает короткие группы из тех что превышают заданный размер'''

    global train_source

    train = train_source.clone()

    rng = np.random.default_rng(RANDOM_STATE)

    # Получаем уникальные ranker_id
    unique_ids = train.select("ranker_id").unique()
    shifts = rng.integers(-2, max_per_group//5, size=unique_ids.height)  # от -2 до 30 включительно

    # Создаем таблицу с ranker_id и соответствующим сдвигом
    shift_table = unique_ids.with_columns([
        pl.Series("group_shift", shifts)
    ])

    # Добавляем в train индекс строки и объединяем с shift_table по ranker_id
    rand_series = pl.Series("rand", rng.random(len(train)))
    print(train.height)
    train = (
        train
        .with_row_index("row_idx")
        .with_columns(rand_series)
        .join(shift_table, on="ranker_id", how="left")  # добавляем group_shift
        .with_columns([
            pl.len().over("ranker_id").alias("grp_size"),
            pl.col("rand").rank(method="dense").over("ranker_id").alias("rand_rank"),
            (max_per_group + pl.col("group_shift")).alias("adjusted_max")
        ])
        .filter(
            (pl.col("grp_size") <= max_per_group) |             # маленькие группы — целиком
            (pl.col("selected") == 1) |                         # выбранные строки всегда
            (pl.col("rand_rank") <= pl.col("adjusted_max"))     # большие группы — отбираем с учетом смещения
        )
        .sort("row_idx")
        .drop(["grp_size", "rand", "rand_rank", "row_idx", "adjusted_max", "group_shift"])
    )
    print(train.height)

    return train

In [19]:
all_preds = []  # Список для хранения предсказаний каждой модели
feature_names=list(train.columns)

In [20]:
def get_fin_df(df, model_idx):
    '''
    В зависимости от индекса model_ids модели удаляем колонки в датафрейме df
    '''
    if model_idx % 2 == 0:
        # Эту модель учим без истории пользователя
        # Формируем список колонок, которые надо удалить
        suffixes = ("pr_std", "pr_mean", "pr_count", "pr_median")

        cols_to_drop = [col for col in df.columns if col.endswith(suffixes)]

        # Удаляем их
        df = df.drop(cols_to_drop)
    
    return df

def mask_predictions(df: pl.DataFrame, preds: np.ndarray, model_idx: int) -> np.ndarray:
    """
    Маскирует предсказания в зависимости от model_idx.
    
    Параметры:
        df (pl.DataFrame): Датафрейм с колонкой 'ranker_count'.
        preds (np.ndarray): Массив предсказаний для всех строк.
        model_idx (int): Индекс модели (итерации).
    
    Возвращает:
        np.ndarray: Массив предсказаний с примененной маской.
    """
    preds = preds.copy()  # чтобы не портить исходный массив

    ranker_count = df["ranker_count"].to_numpy()

    if model_idx % 2 == 0:
        # четный индекс → обнуляем, где ranker_count > 1
        preds[ranker_count > 1] = 0
    else:
        # нечетный индекс → обнуляем, где ranker_count == 1
        preds[ranker_count == 1] = 0

    return preds

In [21]:
def ensure_1pct_minus1(df: pl.DataFrame) -> pl.DataFrame:
    '''Добавляет 1% пропусков случайно. Пропуски это -1.'''
    
    df_new = df.clone()

    n_rows = df.height
    target_count = int(np.floor(n_rows * 0.005))  # 1% от строк

    for col in df.columns:
        series = df_new[col]
        
        # Пропускаем булевые колонки
        if series.dtype == pl.Boolean:
            continue

        current_minus1_count = (series == -1).sum()

        if current_minus1_count >= target_count:
            # Уже есть ≥1% -1, пропускаем
            continue

        # Сколько ещё нужно добавить -1
        need_to_replace = target_count - current_minus1_count
        if need_to_replace <= 0:
            continue

        # Индексы, которые не равны -1
        available_indices = np.where(series.to_numpy() != -1)[0]

        # Случайные индексы для замены
        replace_indices = np.random.choice(
            available_indices, size=need_to_replace, replace=False
        )

        # Создаём копию колонки с заменой
        col_values = series.to_numpy().copy()
        col_values[replace_indices] = -1

        # Обновляем колонку
        df_new = df_new.with_columns(pl.Series(name=col, values=col_values))

    return df_new

### LightGBM

In [22]:
# Список для хранения обученных моделей
trained_models_lgb = []

IS_TRAIN_LGB = False
if IS_TRAIN_LGB:
    base_params = {
        'objective': 'lambdarank',
        'metric': 'ndcg',
        'ndcg_eval_at': [3],
        'num_leaves': 1537,
        'min_child_samples': 31,
        'subsample': 0.9862229989258597,
        'colsample_bytree': 0.25649343883100706,
        'lambda_l2': 3.5174195365138967,
        'lambda_l1': 0.32540659371520836,
        'learning_rate': 0.018049973250769295,
        'force_row_wise': True,
        'verbosity': -1,
        'seed': RANDOM_STATE,
        'n_jobs': -1,
        #'n_jobs': 48,
    }

    # Список различных комбинаций параметров
    params_list = [
        # {**deepcopy(base_params), 'num_boost_round': num_boost_round+50},  # Базовая модель
        # {**deepcopy(base_params), 'num_boost_round': num_boost_round, 'seed': RANDOM_STATE+1},
        # {**deepcopy(base_params), 'num_boost_round': num_boost_round+100, 'seed': RANDOM_STATE+2},
        # {**deepcopy(base_params), 'num_boost_round': num_boost_round-50, 'seed': RANDOM_STATE+3},
    ]

    # Обучение каждой модели
    for idx, params in enumerate(params_list):
        print(f"\nTraining model with params: {params}")

        train = make_short_groups(max_per_group=30+idx)

        y_tr = train.select('selected').to_numpy().flatten()
        groups_tr = train.select('ranker_id')
        train = train.drop(["selected", "ranker_id", "requestDate", "profileId"])
        train_columns = train.columns
        print('train.columns:', train.columns)

        print('Преобразуем в numpy')
        train = train.to_numpy().astype(np.float32)

        print('Преобразуем в sparse')
        # Преобразуем в sparse матрицу
        train = sparse.csr_matrix(train)

        # Подготовка данных для LightGBM
        train_data = lgb.Dataset(
            train,
            label=y_tr,
            group=groups_tr.group_by('ranker_id').agg(pl.len()).sort('ranker_id')['len'].to_list(),
            feature_name=list(train_columns),
        )

        if IS_LOCAL:

            val_data = lgb.Dataset(
                validate,
                label=y_va.to_numpy().flatten(),
                group=groups_va.group_by('ranker_id').agg(pl.len()).sort('ranker_id')['len'].to_list(),
                reference=train_data,
                feature_name=list(validate.columns),
            )
            valid_sets = [train_data, val_data]
            valid_names = ['train', 'valid']
        else:
            val_data = None
            valid_sets = None
            valid_names = None
        # Извлекаем num_boost_round, так как он не является параметром XGBoost, а передается отдельно
        num_boost_round = params.pop('num_boost_round')
        
        print('Обучаем модель')
        # Обучаем модель
        model = lgb.train(
            params,
            train_data,
            num_boost_round=num_boost_round,
            valid_names=valid_names,
            valid_sets=valid_sets,
            callbacks=[lgb.log_evaluation(200)] if IS_LOCAL else None
        )
        # Добавляем обученную модель в список
        trained_models_lgb.append(model)

        if IS_LOCAL:
            lgb_va_preds_1 = model.predict(validate)
            all_preds.append(lgb_va_preds_1)
            # # Evaluate XGBoost
            lgb_hr3 = hitrate_at_3(y_va, lgb_va_preds_1, groups_va)
            print(f"HitRate@3: {lgb_hr3 :.8f}")

            ensemble_preds = np.sum(all_preds, axis=0)

            xgb_hr3 = hitrate_at_3(y_va, ensemble_preds, groups_va)
            print(f"HitRate@3 All: {xgb_hr3:.8f}")
        
        # Возвращаем num_boost_round обратно в params для возможного дальнейшего использования
        params['num_boost_round'] = num_boost_round

    print(f"\nTotal models trained: {len(trained_models_lgb)}")

### XGBoost

In [None]:
base_params_1 = {
    'objective': 'rank:pairwise',
    'eval_metric': 'ndcg@3',
    'learning_rate': 0.03430629350470104,
    'max_depth': 20,
    'min_child_weight': 24,
    'subsample': 0.9680833434687813,
    'colsample_bytree': 0.1859969541434444,
    'lambda': 13.73979976725866,
    'alpha': 4.315374828140574,
    'gamma': 0.145064313893779,
    'n_jobs': -1,
}

base_params_2 = {
    'objective': 'rank:pairwise',
    'eval_metric': 'ndcg@3',
    'learning_rate': 0.01874682748459287,
    'max_depth': 17,
    'min_child_weight': 14,
    'subsample': 0.9910104516963302,
    'colsample_bytree': 0.24822082790651878,
    'lambda': 14.476928639040858,
    'alpha': 0.06344642592268934,
    'gamma': 0.2556097003945161,
    'n_jobs': -1,
}

# Список различных комбинаций параметров
params_list = [
    {**deepcopy(base_params_2), 'num_boost_round': num_boost_round},  # Базовая модель
    {**deepcopy(base_params_1), 'num_boost_round': num_boost_round+50, 'objective': 'rank:ndcg', 'seed': RANDOM_STATE+1},
    {**deepcopy(base_params_2), 'num_boost_round': num_boost_round+100, 'seed': RANDOM_STATE+2},  # Базовая модель
    {**deepcopy(base_params_1), 'num_boost_round': num_boost_round-50, 'seed': RANDOM_STATE+3},
    {**deepcopy(base_params_2), 'num_boost_round': num_boost_round+500, 'seed': RANDOM_STATE+4},

    
    {**deepcopy(base_params_2), 'num_boost_round': num_boost_round, 'seed': RANDOM_STATE+5},  # Базовая модель
    {**deepcopy(base_params_1), 'num_boost_round': num_boost_round+50, 'objective': 'rank:ndcg', 'seed': RANDOM_STATE+6},
    {**deepcopy(base_params_2), 'num_boost_round': num_boost_round+100, 'seed': RANDOM_STATE+7},  # Базовая модель
    {**deepcopy(base_params_1), 'num_boost_round': num_boost_round-50, 'seed': RANDOM_STATE+8},
    {**deepcopy(base_params_2), 'num_boost_round': num_boost_round+500, 'seed': RANDOM_STATE+9},
]

# Список для хранения обученных моделей
trained_models_xgb = []

MAX_PER_GROUP = 50

# Обучение каждой модели
for idx, params in enumerate(params_list):
    print(f"\nTraining model with params: {params}")

    train = make_short_groups(max_per_group=MAX_PER_GROUP)
    
    y_tr = train.select('selected')
    groups_tr = train.select('ranker_id')
    train = train.drop(["selected", "ranker_id", "requestDate", "profileId"])

    print('Add -1 to train')
    # Добавляем -1 до 1% в данных (-1 это null)
    train = ensure_1pct_minus1(train)

    group_sizes_tr = groups_tr.group_by('ranker_id').agg(pl.len()).sort('ranker_id')['len'].to_numpy()
    dtrain = xgb.DMatrix(train, label=y_tr, missing=-1, group=group_sizes_tr, feature_names=list(train.columns))

    evals = [(dtrain, 'train'), (dval, 'val')] if IS_LOCAL else None
    
    # Извлекаем num_boost_round, так как он не является параметром XGBoost, а передается отдельно
    num_boost_round = params.pop('num_boost_round')
    
    print('Start train model')
    # Обучаем модель
    model = xgb.train(
        params,
        dtrain,
        num_boost_round=num_boost_round,
        # num_boost_round=1500,
        evals=evals if evals else [],
        verbose_eval=200,
    )
    # Добавляем обученную модель в список
    trained_models_xgb.append(model)

    if IS_LOCAL:
        xgb_va_preds_1 = model.predict(dval)
        all_preds.append(xgb_va_preds_1)
        # # Evaluate XGBoost
        xgb_hr3 = hitrate_at_3(y_va, xgb_va_preds_1, groups_va)
        print(f"HitRate@3: {xgb_hr3:.8f}")
        
        ensemble_preds = np.sum(all_preds, axis=0)
        xgb_hr3 = hitrate_at_3(y_va, ensemble_preds, groups_va)
        print(f"HitRate@3 All: {xgb_hr3:.8f}")
    
    # Возвращаем num_boost_round обратно в params для возможного дальнейшего использования
    params['num_boost_round'] = num_boost_round

print(f"\nTotal models trained: {len(trained_models_xgb)}")


Training model with params: {'objective': 'rank:pairwise', 'eval_metric': 'ndcg@3', 'learning_rate': 0.01874682748459287, 'max_depth': 17, 'min_child_weight': 14, 'subsample': 0.9910104516963302, 'colsample_bytree': 0.24822082790651878, 'lambda': 14.476928639040858, 'alpha': 0.06344642592268934, 'gamma': 0.2556097003945161, 'n_jobs': -1, 'num_boost_round': 1200}
18145372
3931973
Add -1 to train
Start train model

Training model with params: {'objective': 'rank:ndcg', 'eval_metric': 'ndcg@3', 'learning_rate': 0.03430629350470104, 'max_depth': 20, 'min_child_weight': 24, 'subsample': 0.9680833434687813, 'colsample_bytree': 0.1859969541434444, 'lambda': 13.73979976725866, 'alpha': 4.315374828140574, 'gamma': 0.145064313893779, 'n_jobs': -1, 'num_boost_round': 1250, 'seed': 43}
18145372
3932922
Add -1 to train
Start train model

Training model with params: {'objective': 'rank:pairwise', 'eval_metric': 'ndcg@3', 'learning_rate': 0.01874682748459287, 'max_depth': 17, 'min_child_weight': 14,

In [24]:
if IS_LOCAL: 
   # Суммируем все предсказания
    ensemble_preds = np.sum(all_preds[0:5], axis=0)

    xgb_hr3 = hitrate_at_3(y_va, ensemble_preds, groups_va)
    print(f"HitRate@3 All: {xgb_hr3:.8f}")

## Check Metric

In [25]:
def apply_hour_penalty(df: pl.DataFrame, hour_col: str, hour_round: int = 1, penalty_factor: float = 0.2) -> pl.DataFrame:
    rounded_col = f"{hour_col}_rounded"

    # 1. Округляем часы и заменяем 24 → 0
    df = df.with_columns(
        pl.when(
            ((pl.col(hour_col) / hour_round).round() * hour_round).cast(pl.Int8) == 24
        )
        .then(0)
        .otherwise((pl.col(hour_col) / hour_round).round() * hour_round)
        .cast(pl.Int8)
        .alias(rounded_col)
    )

    # 2. Находим максимум pred_score в группе ranker_id + округлённый час
    max_col = f"max_score_same_{rounded_col}"
    df = df.with_columns(
        pl.max("pred_score")
        .over(["ranker_id", rounded_col])
        .alias(max_col)
    )

    # 3. Обновляем pred_score с учётом штрафа
    df = df.with_columns(
        (
            pl.col("pred_score")
            - penalty_factor * (pl.col(max_col) - pl.col("pred_score"))
        ).alias("pred_score")
    )

    return df

def apply_col_penalty(df: pl.DataFrame, target_col: str, penalty_factor: float = 0.2) -> pl.DataFrame:

    # 2. Находим максимум pred_score в группе ranker_id + округлённый час
    max_col = f"max_score_same_{target_col}"
    df = df.with_columns(
        pl.max("pred_score")
        .over(["ranker_id", target_col])
        .alias(max_col)
    )

    # 3. Обновляем pred_score с учётом штрафа
    df = df.with_columns(
        (
            pl.col("pred_score")
            - penalty_factor * (pl.col(max_col) - pl.col("pred_score"))
        ).alias("pred_score")
    )

    return df

In [26]:
if False:
    # Загружаем список моделей из файла
    with open("save/trained_models_xgb.pkl", "rb") as f:
        trained_models_xgb = pickle.load(f)
    trained_models_lgb = []

In [27]:
if IS_LOCAL:

    all_preds = []  # Список для хранения предсказаний каждой модели

    # for model in trained_models_lgb:
    # # for model in [trained_models[i] for i in [0, 1, 3]]:
    #     preds = model.predict(validate)
    #     all_preds.append(preds)
    
    for model in trained_models_xgb:
    # for model in [trained_models[i] for i in [0, 1, 3]]:
        preds = model.predict(dval)
        all_preds.append(preds)

    # Суммируем все предсказания
    ensemble_preds = np.sum(all_preds, axis=0)

    xgb_hr3 = hitrate_at_3(y_va, ensemble_preds, groups_va)
    print(f"HitRate@3 All: {xgb_hr3:.8f}")

### Скор после пост обработки

In [28]:
if IS_LOCAL:
    penalty_factor = 0.1
    hour_round = 1

    val_score = val_source.with_columns(pl.Series('pred_score', ensemble_preds))

    val_score = apply_hour_penalty(val_score, "legs0_departureAt_hour", hour_round=hour_round, penalty_factor=penalty_factor)
    val_score = apply_hour_penalty(val_score, "legs0_arrivalAt_hour", hour_round=hour_round, penalty_factor=penalty_factor)
    # val_score = apply_hour_penalty(val_score, "legs1_departureAt_hour", hour_round=hour_round, penalty_factor=penalty_factor)
    # val_score = apply_hour_penalty(val_score, "legs1_arrivalAt_hour", hour_round=hour_round, penalty_factor=penalty_factor)
    # val_score = apply_col_penalty(val_score, "legs0_segments0_marketingCarrier_code", penalty_factor=0.05)
    # val_score = apply_col_penalty(val_score, "legs1_segments0_marketingCarrier_code", penalty_factor=0.05)
    # val_score = apply_col_penalty(val_score, "miniRules0_statusInfos", penalty_factor=0.05)
    

    reorder_score = val_score["pred_score"].to_numpy()
    xgb_hr3 = hitrate_at_3(y_va, reorder_score, groups_va)
    print(f"HitRate@3 All: {xgb_hr3:.8f}")

### Важность признаков

In [29]:
if IS_LOCAL:
    if trained_models_xgb:
        model = trained_models_xgb[0]
        xgb_importance = model.get_score(importance_type='gain')
        xgb_importance_df = pl.DataFrame(
            [{'feature': k, 'importance': v} for k, v in xgb_importance.items()]
        ).sort('importance', descending=bool(1))
        print(xgb_importance_df.head(1000).to_pandas().to_string())
        del xgb_importance
        del xgb_importance_df
        gc.collect()

In [30]:
if IS_LOCAL:
    if trained_models_lgb:
        model = trained_models_lgb[0]
        booster = model.booster_ if hasattr(model, "booster_") else model

        # Получаем словарь: {feature_name: importance}
        lgb_importance = dict(zip(
            booster.feature_name(),
            booster.feature_importance(importance_type='gain')
        ))

        # Переводим в Polars DataFrame и сортируем
        lgb_importance_df = (
            pl.DataFrame(
                [{"feature": k, "importance": v} for k, v in lgb_importance.items()]
            )
            .sort("importance", descending=True)
        )

        # Выводим топ-1000 как в XGB
        print(lgb_importance_df.head(1000).to_pandas().to_string())

### Error analysis and visualization

In [31]:
if IS_LOCAL:
    # Color palette
    red = (0.86, 0.08, 0.24)
    blue = (0.12, 0.56, 1.0)

    # Prepare data for analysis
    va_df = pl.DataFrame({
        'ranker_id': groups_va.to_numpy().flatten(),
        'pred_score': ensemble_preds,
        'selected': y_va.to_numpy().flatten()
    })

    # Add group size and filter
    va_df = va_df.join(
        va_df.group_by('ranker_id').agg(pl.len().alias('group_size')), 
        on='ranker_id'
    ).filter(pl.col('group_size') > 10)

    # Calculate group size quantiles
    size_quantiles = va_df.select('ranker_id', 'group_size').unique().select(
        pl.col('group_size').quantile(0.25).alias('q25'),
        pl.col('group_size').quantile(0.50).alias('q50'),
        pl.col('group_size').quantile(0.75).alias('q75')
    ).to_dicts()[0]

    # Function to calculate hitrate curve efficiently
    def calculate_hitrate_curve(df, k_values):
        # Sort once and calculate all k values
        sorted_df = df.sort(["ranker_id", "pred_score"], descending=[False, True])
        return [
            sorted_df.group_by("ranker_id", maintain_order=True)
            .head(k)
            .group_by("ranker_id")
            .agg(pl.col("selected").max().alias("hit"))
            .select(pl.col("hit").mean())
            .item()
            for k in k_values
        ]

    # Calculate curves
    k_values = list(range(1, 21))
    curves = {
        'All groups (>10)': calculate_hitrate_curve(va_df, k_values),
        f'Small (11-{int(size_quantiles["q25"])})': calculate_hitrate_curve(
            va_df.filter(pl.col('group_size') <= size_quantiles['q25']), k_values
        ),
        f'Medium ({int(size_quantiles["q25"]+1)}-{int(size_quantiles["q75"])})': calculate_hitrate_curve(
            va_df.filter((pl.col('group_size') > size_quantiles['q25']) & 
                        (pl.col('group_size') <= size_quantiles['q75'])), k_values
        ),
        f'Large (>{int(size_quantiles["q75"])})': calculate_hitrate_curve(
            va_df.filter(pl.col('group_size') > size_quantiles['q75']), k_values
        )
    }

    # Calculate hitrate@3 by group size using log-scale bins
    # Create log-scale bins
    min_size = va_df['group_size'].min()
    max_size = va_df['group_size'].max()
    bins = np.logspace(np.log10(min_size), np.log10(max_size), 51)  # 51 edges = 50 bins

    # Calculate hitrate@3 for each ranker_id
    ranker_hr3 = (
        va_df.sort(["ranker_id", "pred_score"], descending=[False, True])
        .group_by("ranker_id", maintain_order=True)
        .agg([
            pl.col("selected").head(3).max().alias("hit_top3"),
            pl.col("group_size").first()
        ])
    )

    # Assign bins and calculate hitrate per bin
    bin_centers = (bins[:-1] + bins[1:]) / 2  # Geometric mean would be more accurate for log scale
    bin_indices = np.digitize(ranker_hr3['group_size'].to_numpy(), bins) - 1

    size_analysis = pl.DataFrame({
        'bin_idx': bin_indices,
        'bin_center': bin_centers[np.clip(bin_indices, 0, len(bin_centers)-1)],
        'hit_top3': ranker_hr3['hit_top3']
    }).group_by(['bin_idx', 'bin_center']).agg([
        pl.col('hit_top3').mean().alias('hitrate3'),
        pl.len().alias('n_groups')
    ]).filter(pl.col('n_groups') >= 3).sort('bin_center')  # At least 3 groups per bin

    # Create combined figure
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(8, 4), dpi=400)

    # Left plot: HitRate@k curves
    # Create color gradient from blue to red for size groups
    colors = ['black']  # All groups is black
    for i in range(3):  # 3 size groups
        t = i / 2  # 0, 0.5, 1
        color = tuple(blue[j] * (1 - t) + red[j] * t for j in range(3))
        colors.append(color)

    for (label, hitrates), color in zip(curves.items(), colors):
        ax1.plot(k_values, hitrates, marker='o', label=label, color=color, markersize=3)
    ax1.set_xlabel('k (top-k predictions)')
    ax1.set_ylabel('HitRate@k')
    ax1.set_title('HitRate@k by Group Size')
    ax1.legend(fontsize=8)
    ax1.grid(True, alpha=0.3)
    ax1.set_xlim(0, 21)
    ax1.set_ylim(-0.025, 1.025)

    # Right plot: HitRate@3 vs Group Size (log scale)
    ax2.scatter(size_analysis['bin_center'], size_analysis['hitrate3'], s=30, alpha=0.6, color=blue)
    ax2.set_xlabel('Group Size')
    ax2.set_ylabel('HitRate@3')
    ax2.set_title('HitRate@3 vs Group Size')
    ax2.set_xscale('log')
    ax2.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

In [32]:
if IS_LOCAL:
    # Summary
    print(f"HitRate@1: {curves['All groups (>10)'][0]:.3f}")
    print(f"HitRate@3: {curves['All groups (>10)'][2]:.3f}")
    print(f"HitRate@5: {curves['All groups (>10)'][4]:.3f}")
    print(f"HitRate@10: {curves['All groups (>10)'][9]:.3f}")

In [None]:
IS_DUMP = False

if IS_DUMP:
    # # Сохранение всего списка models
    # with open("save/trained_models_lgb.pkl", "wb") as f:
    #     pickle.dump(trained_models_lgb, f)

    # Сохранение всего списка models
    with open("save/trained_models_xgb.pkl", "wb") as f:
        pickle.dump(trained_models_xgb, f)

    # Сохранение мапинга категореальных переменных
    with open("save/cat_mapping.pkl", "wb") as f:
        pickle.dump(cat_mapping, f)

## Submission

### Предсказания

In [37]:
if not IS_LOCAL:
    # Load data
    del train
    del train_source
    gc.collect()

    test = pl.read_parquet('/kaggle/input/aeroclub-recsys-2025/test.parquet').drop('__index_level_0__').with_columns(pl.lit(0, dtype=pl.Int64).alias("selected"))

    test, _ = make_fetures(test, selected_df=train_selected_df)
    test, groups_te, feature_cols = select_fetures(test)

    y_te = test.select('selected')
    test = test.select(feature_cols)
    # Применяем маппинг к test
    test = apply_category_map(test, cat_mapping)

    all_preds = []  # Список для хранения предсказаний каждой модели
    group_sizes_te = groups_te.group_by('ranker_id').agg(pl.len()).sort('ranker_id')['len'].to_numpy()
    dtest  = xgb.DMatrix(test, label=y_te, missing=-1, group=group_sizes_te, feature_names=test.columns)

    for model in trained_models_lgb:
        preds = model.predict(test)
        all_preds.append(preds)

    for model in trained_models_xgb:
        preds = model.predict(dtest)
        all_preds.append(preds)

    # Суммируем все предсказания
    ensemble_preds = np.sum(all_preds, axis=0)

Обрабатываем пропуски
Using 330 features (45 categorical)


### Постобработка предсказания на тесте

In [38]:
if not IS_LOCAL:
    IS_RERANK = False

    if IS_RERANK:
        test_score = test.with_columns(
            groups_te["ranker_id"],
            y_te["selected"],
        )
        penalty_factor = 0.1
        hour_round = 1

        test_score = test_score.with_columns(pl.Series('pred_score', ensemble_preds))

        test_score = apply_hour_penalty(test_score, "legs0_departureAt_hour", hour_round=hour_round, penalty_factor=penalty_factor)
        test_score = apply_hour_penalty(test_score, "legs0_arrivalAt_hour", hour_round=hour_round, penalty_factor=penalty_factor)

        ensemble_preds = test_score["pred_score"].to_numpy()

In [39]:
if not IS_LOCAL:
    test = pl.read_parquet('/kaggle/input/aeroclub-recsys-2025/test.parquet').drop('__index_level_0__').with_columns(pl.lit(0, dtype=pl.Int64).alias("selected"))

    # Без постобработки
    submission_xgb = (
        test.select(['Id', 'ranker_id'])
        #.with_columns(pl.Series('pred_score', xgb_model_1.predict(dtest)+xgb_model_2.predict(dtest)))
        .with_columns(pl.Series('pred_score', ensemble_preds))
        .with_columns(
            pl.col('pred_score')
            .rank(method='ordinal', descending=True)
            .over('ranker_id')
            .cast(pl.Int32)
            .alias('selected')
        )
        .select(['Id', 'ranker_id', 'selected'])
    )

    submission_xgb.write_csv('submission.csv')