## 前言
这是我的第一次kaggle比赛：FlightRank 2025: Aeroclub RecSys Cup，目的是做一个推荐系统，输出用户查询航班最可能的推荐结果。最终的public score为0.474，前200名基本在0.49-0.50之间，第一名在0.53左右，仍有一定差距。模型采用的是xgboost + lightgbm融合，最后rerank的方式。这次比赛让我认识到了特征工程的重要性，质量较高的特征能大幅提高准确率，而非一味地调参。
* structure.pdf中包含了各个特征的描述。

In [None]:
# !pip install --no-cache-dir lightgbm
# !pip install --no-cache-dir xgboost
# !pip install --no-cache-dir polars
# !pip install --upgrade pip 
# !pip install --no-cache-dir ipywidgets

import pandas as pd
import numpy as np
import lightgbm as lgb
from pathlib import Path # 路径管理，支持多个系统
from sklearn.preprocessing import LabelEncoder
import gc # 内存管理，释放内存
from tqdm.notebook import tqdm # 进度条
import xgboost as xgb
import polars as pl

DATA_DIR = Path("/kaggle/input/aeroclub-recsys-2025/")
TRAIN_PATH = DATA_DIR / "train.parquet"
TEST_PATH = DATA_DIR / "test.parquet"
SAMPLE_SUB_PATH = DATA_DIR / "sample_submission.parquet"
OUTPUT_PATH = Path("/kaggle/working/submission.csv")
np.random.seed(42)

在kaggle notebook运行的过程中遇到了严重的内存不足问题，导致只能使用TPU（不支持xgboost和lightgbm）而非GPU，因此降低内存具有一定的意义和重要性。

In [2]:
# # 内存优化(pandas版本)
# def reduce_mem_usage(df, verbose=True):
#     numerics = ["int16", "int32", "int64", "float16", "float32", "float64"]
#     start_mem = df.memory_usage().sum() / 1024**2 # Calculate initial memory usage in MB
#     for col in tqdm(df.columns, desc="Reducing Memory"):
#         col_type = df[col].dtypes
#         if col_type in numerics:
#             c_min = df[col].min()
#             c_max = df[col].max()
#             if np.isinf(c_min) or np.isinf(c_max) or np.isnan(c_min) or np.isnan(c_max):
#                 continue  
#             if str(col_type)[:3] == "int":
#                 # Check if integer column can be downcasted to a smaller integer type
#                 if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
#                     df[col] = df[col].astype(np.int8)
#                 elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
#                     df[col] = df[col].astype(np.int16)
#                 elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
#                     df[col] = df[col].astype(np.int32)
#                 elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
#                     df[col] = df[col].astype(np.int64)
#             else:
#                 # Check if float column can be downcasted to a smaller float type
#                 if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
#                     df[col] = df[col].astype(np.float16)
#                 elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
#                     df[col] = df[col].astype(np.float32)
#                 else:
#                     df[col] = df[col].astype(np.float64)
#     end_mem = df.memory_usage().sum() / 1024**2 # Calculate final memory usage in MB
#     if verbose: 
#         print(f"Mem. usage decreased to {end_mem:5.2f} Mb ({100 * (start_mem - end_mem) / start_mem:.1f}% reduction)")
#     return df

In [3]:
# polars版本
def reduce_mem_usage(df: pl.DataFrame, verbose: bool = True) -> pl.DataFrame:
    # Polars 的数值类型，没有float16
    numerics = [
        pl.Int8, pl.Int16, pl.Int32, pl.Int64,
        pl.Float32, pl.Float64
    ]

    start_mem = df.estimated_size("mb") 
        
    for col in tqdm(df.columns, desc="Reducing Memory"):
        col_type = df[col].dtype
        
        if col_type in numerics:
            if df[col].is_infinite().any() or df[col].is_nan().any():
                continue
                
            c_min = df[col].min()
            c_max = df[col].max()

            if c_min is None or c_max is None:
                continue
                
            if col_type.is_integer(): # 检查是否为整数类型
                # 检查整数列是否可以向下转换为更小的整数类型
                if c_min >= np.iinfo(np.int8).min and c_max <= np.iinfo(np.int8).max:
                    # Polars DataFrame 是不可变的，必须重新赋值
                    df = df.with_columns(pl.col(col).cast(pl.Int8))
                elif c_min >= np.iinfo(np.int16).min and c_max <= np.iinfo(np.int16).max:
                    df = df.with_columns(pl.col(col).cast(pl.Int16))
                elif c_min >= np.iinfo(np.int32).min and c_max <= np.iinfo(np.int32).max:
                    df = df.with_columns(pl.col(col).cast(pl.Int32))
                elif c_min >= np.iinfo(np.int64).min and c_max <= np.iinfo(np.int64).max:
                    df = df.with_columns(pl.col(col).cast(pl.Int64))
            
            elif col_type.is_float(): # 检查是否为浮点数类型
                # 检查浮点数列是否可以向下转换为更小的浮点数类型
                if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df = df.with_columns(pl.col(col).cast(pl.Float32))
                else:
                    df = df.with_columns(pl.col(col).cast(pl.Float64))
                    
    # 计算最终内存使用 (MB)
    end_mem = df.estimated_size("mb")
    
    if verbose:
        reduction = 0.0
        if start_mem > 0:
            reduction = 100 * (start_mem - end_mem) / start_mem
        print(f"Mem. usage decreased to {end_mem:5.2f} Mb ({reduction:.1f}% reduction)")
        
    return df

## Feature Engineering
- #### create_base_features 
Focuses on extracting features from individual flight options.
- #### create_group_features
Generates features that capture the relative characteristics of each flight option within its search session (ranker_id). This is particularly important for ranking problems.

* 在这次比赛中，尽管对特征的重要性有了更深的认识，但对如何构建特征仍比较迷茫

In [4]:
# pandas版本
# def create_base_features(df):
    
#     # 把时间转换成正确的格式
#     for col in ["legs0_departureAt", "legs0_arrivalAt", "legs1_departureAt", "legs1_arrivalAt", "requestDate"]:
#         df[col] = pd.to_datetime(df[col],errors='coerce')

#     # 转成数字
#     numeric_cols = ["legs0_duration", "legs1_duration", "totalPrice", "taxes", "legs0_segments0_seatsAvailable"]
#     for col in numeric_cols:
#         df[col] = pd.to_numeric(df[col], errors="coerce")

#     # 提取往程的时间特征
#     df["dep_hour_0"] = df["legs0_departureAt"].dt.hour
#     df["dep_dayofweek_0"] = df["legs0_departureAt"].dt.dayofweek 
#     df["arr_hour_0"] = df["legs0_arrivalAt"].dt.hour

#     # 提取回程的时间特征，-1代表没有回程（单程）
#     df["dep_hour_1"] = df["legs1_departureAt"].dt.hour.fillna(-1).astype("int8")
#     df["arr_hour_1"] = df["legs1_arrivalAt"].dt.hour.fillna(-1).astype("int8")

#     df["request_hour"] = df["requestDate"].dt.hour

#     # 计算提前预定时间（小时）
#     df["time_to_departure_hrs"] = (df["legs0_departureAt"] - df["requestDate"]).dt.total_seconds() / 3600

#     # 根据legs1_departureAt是否为空判断是否往返
#     df["is_round_trip"] = df["legs1_departureAt"].notna().astype("int8")

#     # 总飞行时间
#     df["total_duration"] = df["legs0_duration"].fillna(0) + df["legs1_duration"].fillna(0)

#     # 计算航段数量，航段越多可能越不方便
#     df["num_segments_0"] = df.filter(like="legs0_segments").notna().sum(axis=1) # 去程的航段数
#     df["num_segments_1"] = df.filter(like="legs1_segments").notna().sum(axis=1) # 回程的航段数
#     df["total_segments"] = df["num_segments_0"] + df["num_segments_1"]

#     # 税的比例
#     df["tax_ratio"] = df["taxes"] / (df["totalPrice"] + 1e-6) 

#     # 价格和时间的比率
#     df["price_per_duration"] = df["totalPrice"] / (df["total_duration"] + 1e-6)
#     df["price_per_segment"] = df["totalPrice"] / (df["total_segments"] + 1e-6)

#     df["price_x_duration"] = df["totalPrice"] * df["total_duration"]
#     df["seats_x_price"] = df["legs0_segments0_seatsAvailable"].fillna(0) * df["totalPrice"]

#     # 把布尔值转为int
#     df["is_vip"] = df["isVip"].astype("int8")
#     df["books_by_self"] = df["bySelf"].astype("int8")

#     # 处理营销航空公司代码
#     mc_cols = [f'legs{l}_segments{s}_marketingCarrier_code' for l in (0, 1) for s in range(4)]
#     mc_exists = [col for col in mc_cols if col in df.columns]
#     if mc_exists:
#         df['l0_seg'] = df[mc_exists].notna().astype('uint8').sum(axis=1)
#     else:
#         df['l0_seg'] = 0  
    
#     #飞行常客计划的数目
#     df["n_ff_programs"] = (df["frequentFlyer"].fillna("").str.count("/") + 
#                            (df["frequentFlyer"].fillna("") != "").astype(int))

    
#     if "legs0_segments0_marketingCarrier_code" in df.columns:
#         df["is_major_carrier"] = df["legs0_segments0_marketingCarrier_code"].isin(["SU", "S7", "U6"]).astype(int)
#     else:
#         df["is_major_carrier"] = 0

#     df['has_access_tp'] = (df['pricingInfo_isAccessTP'] == 1).astype('int32')

#     #经济舱/商务舱/高级舱
#     col0 = df['legs0_segments0_cabinClass'].fillna(0)
#     col1 = df['legs1_segments0_cabinClass'].fillna(0)
#     df['avg_cabin_class'] = df[['legs0_segments0_cabinClass', 'legs1_segments0_cabinClass']].mean(axis=1)
#     df['cabin_class_diff'] =  col0 - col1

#     #fees
#     df['total_fees'] = (df['miniRules0_monetaryAmount'].fillna(0) +df['miniRules1_monetaryAmount'].fillna(0))
#     df['has_fees'] = (df['total_fees'] > 0).astype('int32')
    
#     return df

In [5]:
# polars版本
def create_base_features(df: pl.DataFrame) -> pl.DataFrame:

    time_cols = ["legs0_departureAt", "legs0_arrivalAt", "legs1_departureAt", "legs1_arrivalAt", "requestDate"]
    numeric_cols = ["legs0_duration", "legs1_duration", "totalPrice", "taxes", "legs0_segments0_seatsAvailable"] # 这步可以省略

    conversion_exprs = []
    for col_name in time_cols:
        if df[col_name].dtype == pl.String:
            conversion_exprs.append(pl.col(col_name).str.to_datetime(strict=False)) # 转为正确的时间格式

    for col_name in numeric_cols:
        if not df[col_name].dtype.is_float():
            conversion_exprs.append(pl.col(col_name).cast(pl.Float64, strict=False))
    
    if conversion_exprs:
        df = df.with_columns(conversion_exprs)

    seg0_cols = [c for c in df.columns if c.startswith("legs0_segments")]
    seg1_cols = [c for c in df.columns if c.startswith("legs1_segments")]
    
    # 动态获取营销航空公司代码列
    mc_cols = [f'legs{l}_segments{s}_marketingCarrier_code' for l in (0, 1) for s in range(4)]
    mc_exists = [col for col in mc_cols if col in df.columns]

    first_pass_exprs = [
        # 时间特征
        pl.col("legs0_departureAt").dt.hour().alias("dep_hour_0"),
        (pl.col("legs0_departureAt").dt.weekday() - 1).alias("dep_dayofweek_0"), # Polars: 0=Mon..6=Sun
        pl.col("legs0_arrivalAt").dt.hour().alias("arr_hour_0"),
        pl.col("legs1_departureAt").dt.hour().fill_null(-1).cast(pl.Int8).alias("dep_hour_1"),
        pl.col("legs1_arrivalAt").dt.hour().fill_null(-1).cast(pl.Int8).alias("arr_hour_1"),
        # pl.col("requestDate").dt.hour().alias("request_hour"),
        
        # 提前预定时间
        ((pl.col("legs0_departureAt") - pl.col("requestDate")).dt.total_seconds() / 3600).alias("time_to_departure_hrs"),
        
        # 是否往返
        pl.col("legs1_departureAt").is_not_null().cast(pl.Int8).alias("is_round_trip"),
        (pl.col("legs1_duration").is_null() | 
         (pl.col("legs1_duration") == 0) | 
         pl.col("legs1_segments0_departureFrom_airport_iata").is_null()).cast(pl.Int32).alias("is_one_way"),
        
        # 总飞行时间
        (pl.col("legs0_duration").fill_null(0) + pl.col("legs1_duration").fill_null(0)).alias("total_duration"),
         pl.when(pl.col("legs1_duration").fill_null(0) > 0)
            .then(pl.col("legs0_duration") / (pl.col("legs1_duration") + 1))
            .otherwise(1.0).alias("duration_ratio"),

        # 税率
        (pl.col("taxes") / (pl.col("totalPrice") + 1e-6)).alias("tax_ratio"),
        
        # 布尔/类别转整数
        pl.col("isVip").cast(pl.Int8).alias("is_vip"),
        pl.col("bySelf").cast(pl.Int8).alias("books_by_self"),
        (pl.col('pricingInfo_isAccessTP') == 1).cast(pl.Int32).alias('has_access_tp'),
        pl.col("corporateTariffCode").is_not_null().cast(pl.Int32).alias("has_corporate_tariff"),
            
        # 飞行常客计划数目
        (pl.col("frequentFlyer").fill_null("").str.count_matches(r"/") + pl.col("frequentFlyer").fill_null("").ne("").cast(pl.Int32)).alias("n_ff_programs"),
        
        # 舱位特征
        pl.mean_horizontal(['legs0_segments0_cabinClass', 'legs1_segments0_cabinClass']).alias('avg_cabin_class'),
        (pl.col('legs0_segments0_cabinClass').fill_null(0) - pl.col('legs1_segments0_cabinClass').fill_null(0)).alias('cabin_class_diff'),
        
        # 费用
        (pl.col('miniRules0_monetaryAmount').fill_null(0) + pl.col('miniRules1_monetaryAmount').fill_null(0)).alias('total_fees'),
        
        # 交互特征
        (pl.col("legs0_segments0_seatsAvailable").fill_null(0) * pl.col("totalPrice")).alias("seats_x_price"),


        ((pl.col("miniRules0_monetaryAmount") == 0)& (pl.col("miniRules0_statusInfos") == 1)).cast(pl.Int8).alias("free_cancel"),
        ((pl.col("miniRules1_monetaryAmount") == 0)& (pl.col("miniRules1_statusInfos") == 1)).cast(pl.Int8).alias("free_exchange"),

        pl.col("searchRoute").is_in(["MOWLED/LEDMOW", "LEDMOW/MOWLED", "MOWLED", "LEDMOW"]).cast(pl.Int32).alias("is_popular_route"),                
    ]
    
    if seg0_cols:
        first_pass_exprs.append(pl.sum_horizontal([pl.col(c).is_not_null() for c in seg0_cols]).alias("num_segments_0"))
    else:
        first_pass_exprs.append(pl.lit(0).alias("num_segments_0"))

    if seg1_cols:
        first_pass_exprs.append(pl.sum_horizontal([pl.col(c).is_not_null() for c in seg1_cols]).alias("num_segments_1"))
    else:
        first_pass_exprs.append(pl.lit(0).alias("num_segments_1"))

    if mc_exists:
        first_pass_exprs.append(pl.sum_horizontal([pl.col(c).is_not_null() for c in mc_exists]).cast(pl.UInt8).alias('l0_seg'))
    else:
        first_pass_exprs.append(pl.lit(0, dtype=pl.UInt8).alias('l0_seg'))

    if "legs0_segments0_marketingCarrier_code" in df.columns:
        first_pass_exprs.append(pl.col("legs0_segments0_marketingCarrier_code").is_in(["SU", "S7", "U6"]).cast(pl.Int32).alias("is_major_carrier"))
    else:
        first_pass_exprs.append(pl.lit(0, dtype=pl.Int32).alias("is_major_carrier"))
        
    df = df.with_columns(first_pass_exprs)
        
    df = df.with_columns([
        (pl.col("num_segments_0") + pl.col("num_segments_1")).alias("total_segments"),
        (pl.col("num_segments_0") == 1).cast(pl.Int32).alias("is_direct_leg0"),
        pl.when(pl.col("is_one_way") == 1).then(0)
            .otherwise((pl.col("num_segments_1") == 1).cast(pl.Int32)).alias("is_direct_leg1"),])

    df = df.with_columns([
    (pl.col("is_direct_leg0") & pl.col("is_direct_leg1")).cast(pl.Int32).alias("both_direct"),
    ((pl.col("isVip") == 1) | (pl.col("n_ff_programs") > 0)).cast(pl.Int32).alias("is_vip_freq"),
     pl.col("Id").count().over("ranker_id").alias("group_size")])
    df = df.with_columns(pl.col("group_size").log1p().alias("group_size_log"))

    # Time features
    time_exprs = []
    for col in ("legs0_departureAt", "legs0_arrivalAt", "legs1_departureAt", "legs1_arrivalAt"):
        if col in df.columns:
            dt = pl.col(col)
            h = dt.dt.hour().fill_null(12)
            time_exprs.extend([
                h.alias(f"{col}_hour"),
                dt.dt.weekday().fill_null(0).alias(f"{col}_weekday"),
                (((h >= 6) & (h <= 9)) | ((h >= 17) & (h <= 20))).cast(pl.Int32).alias(f"{col}_business_time")
            ])
    if time_exprs:
        df = df.with_columns(time_exprs)

    second_pass_exprs = [
        (pl.col("totalPrice") * pl.col("total_duration")).alias("price_x_duration"),
        (pl.col('total_fees') > 0).cast(pl.Int32).alias('has_fees'),
    ]
    df = df.with_columns(second_pass_exprs)

    third_pass_exprs = [
        (pl.col("totalPrice") / (pl.col("total_duration") + 1e-6)).alias("price_per_duration"),
        (pl.col("totalPrice") / (pl.col("total_segments") + 1e-6)).alias("price_per_segment"),
         pl.col("totalPrice").log1p().alias("log_price"),
    ]
    df = df.with_columns(third_pass_exprs)

    df_1 = df.clone()
    df = (df.join(
        df_1.group_by('legs0_segments0_marketingCarrier_code').agg(pl.mean('selected').alias('carrier0_pop')),
        on='legs0_segments0_marketingCarrier_code', 
        how='left')
    .join(
        df_1.group_by('legs1_segments0_marketingCarrier_code').agg(pl.mean('selected').alias('carrier1_pop')),
        on='legs1_segments0_marketingCarrier_code', 
        how='left')
    .with_columns([
        pl.col('carrier0_pop').fill_null(0.0),
        pl.col('carrier1_pop').fill_null(0.0),]))
    
    df = df.with_columns([ (pl.col('carrier0_pop') * pl.col('carrier1_pop')).alias('carrier_pop_product'),])
    return df

## create_group_features：在同一搜索会话中与其他可选航班的对比情况
1. #### Rank Features
- 根据某一特征在组内进行升序或降序排名
2. #### _norm_by_mean
3. #### _diff_from_mean 
4. #### _ratio_to_max

In [6]:
# pandas版本
# def create_group_features(df):

#     # 希望计算组内特征的列
#     group_agg_features = [
#         "totalPrice", "total_duration", "time_to_departure_hrs", 
#         "total_segments", "tax_ratio"
#     ]

#     for col in tqdm(group_agg_features, desc="Creating Group Features"):
    
#         # ranker_id是每组搜索的标识符
#         # ascending = True，价格低的rank1
#         df[f"{col}_rank_asc"] = df.groupby("ranker_id")[col].rank(method="dense", ascending=True)
#         # ascending = False 时间长的rank1
#         df[f"{col}_rank_desc"] = df.groupby("ranker_id")[col].rank(method="dense", ascending=False)
        
      
#         group_mean = df.groupby("ranker_id")[col].transform("mean")
#         # 与平均值比较
#         df[f"{col}_norm_by_mean"] = df[col] / (group_mean + 1e-6) 
#         df[f"{col}_diff_from_mean"] = df[col] - group_mean
#         group_max = df.groupby("ranker_id")[col].transform("max")
#         df[f"{col}_ratio_to_max"] = df[col] / (group_max + 1e-6) 

#         min_vals = df.groupby("ranker_id")[col].transform("min")
#         df[f"is_min_{col}"] = np.isclose(df[col], min_vals).astype(int)
#     return df

In [7]:
# polars版本
def create_group_features(df: pl.DataFrame) -> pl.DataFrame:
    
    # 希望计算组内特征的列
    group_agg_features = [
        "totalPrice", "total_duration", "time_to_departure_hrs", 
        "total_segments", "tax_ratio","total_fees","taxes"
    ]

    # 创建一个列表来收集所有新的特征表达式
    feature_expressions = []

    for col in tqdm(group_agg_features, desc="Creating Group Features"):
        feature_expressions.append(
            pl.col(col).rank(method="dense", descending=False).over("ranker_id").alias(f"{col}_rank_asc")
        )
        feature_expressions.append(
            pl.col(col).rank(method="dense", descending=True).over("ranker_id").alias(f"{col}_rank_desc")
        )
        # 计算组内平均值
        group_mean = pl.col(col).mean().over("ranker_id")
        
        # 与平均值比较
        feature_expressions.append(
            (pl.col(col) / (group_mean + 1e-6)).alias(f"{col}_norm_by_mean")
        )
        feature_expressions.append(
            (pl.col(col) - group_mean).alias(f"{col}_diff_from_mean")
        )
        
        # 计算组内最大值
        group_max = pl.col(col).max().over("ranker_id")
        
        # 与最大值比较
        feature_expressions.append(
            (pl.col(col) / (group_max + 1e-6)).alias(f"{col}_ratio_to_max")
        )

        # 检查是否为组内最小值
        feature_expressions.append(
            (pl.col(col) == pl.col(col).min().over("ranker_id")).cast(pl.Int32).alias(f"is_min_{col}")
        )
        
    df.with_columns(feature_expressions)
    direct_cheapest = (df.filter(pl.col("is_direct_leg0") == 1).group_by("ranker_id").agg(pl.col("totalPrice").min().alias("min_direct")))

    df = df.join(direct_cheapest, on="ranker_id", how="left").with_columns(((pl.col("is_direct_leg0") == 1) & 
    (pl.col("totalPrice") == pl.col("min_direct"))).cast(pl.Int32).fill_null(0).alias("is_direct_cheapest")).drop("min_direct")

    df = df.with_columns(
    [pl.col(c).fill_null(0) for c in df.select(pl.selectors.numeric()).columns] +
    [pl.col(c).fill_null("missing") for c in df.select(pl.selectors.string()).columns])
    return df

#### Data Loading and Preprocessing Pipeline
- 加载数据
- 应用特征工程
- 把分类特征转为数字
- 内存优化
- ···

* 相比pandas，polars更快、更高效，内存占用更低，且有惰性版本lazy dataframe

In [None]:
print("Loading data...")
train_df = pl.read_parquet(TRAIN_PATH).drop('__index_level_0__')
test_df = pl.read_parquet(TEST_PATH).drop('__index_level_0__').with_columns(pl.lit(0, dtype=pl.Int64).alias("selected"))

print("Feature engineering...")
train_df = create_base_features(train_df)
test_df = create_base_features(test_df)
gc.collect()
train_df = create_group_features(train_df)
test_df = create_group_features(test_df)
gc.collect()

# 把分类变量、字符串变量转换为数值类型
print("Encoding categorical features...")
categorical_cols = ["profileId", "companyID", "searchRoute",'sex',"legs0_segments0_cabinClass",'legs0_segments0_marketingCarrier_code',
              'legs0_segments1_marketingCarrier_code','legs1_segments0_marketingCarrier_code','legs1_segments1_marketingCarrier_code',
            "legs0_segments0_aircraft_code","legs0_segments1_aircraft_code","legs1_segments0_aircraft_code","legs1_segments1_aircraft_code",
            "legs0_segments1_departureFrom_airport_iata",'legs0_segments0_departureFrom_airport_iata',
            "legs1_segments0_departureFrom_airport_iata",'legs1_segments1_departureFrom_airport_iata',
            'legs0_segments0_arrivalTo_airport_iata','legs0_segments1_arrivalTo_airport_iata',
            'legs1_segments0_arrivalTo_airport_iata','legs1_segments1_arrivalTo_airport_iata','nationality','corporateTariffCode',
            'legs0_segments0_arrivalTo_airport_city_iata','legs0_segments1_arrivalTo_airport_city_iata','legs1_segments0_arrivalTo_airport_city_iata',
            'legs1_segments1_arrivalTo_airport_city_iata','legs0_segments0_operatingCarrier_code','legs0_segments1_operatingCarrier_code','legs1_segments0_operatingCarrier_code',
            'legs1_segments1_operatingCarrier_code','legs0_segments0_duration', 'legs0_segments0_flightNumber', 
            'legs0_segments1_duration', 'legs1_segments0_duration', 'legs1_segments1_duration'       
                   ]

# 注意要补全缺失值
pl.enable_string_cache() # 避免训练集和测试集编码不一致
cat_exprs = [pl.col(c).fill_null("missing").cast(pl.String).cast(pl.Categorical).to_physical() for c in categorical_cols]
train_df = train_df.with_columns(cat_exprs)
test_df = test_df.with_columns(cat_exprs)
pl.disable_string_cache()

train_df = reduce_mem_usage(train_df)
test_df = reduce_mem_usage(test_df)
gc.collect()

In [9]:
print("Preparing data for training...")
exclude_cols = [
    'Id', 'ranker_id', 'selected', 'profileId', 'requestDate','profileId',
    'legs0_departureAt', 'legs0_arrivalAt', 'legs1_departureAt', 'legs1_arrivalAt',"requestDate",
    "legs0_segments1_flightNumber", "legs1_segments0_flightNumber","isVip", "bySelf",
    'miniRules0_percentage', 'miniRules1_percentage',  # >90% missing
    'frequentFlyer',  # Already processed
    # Exclude constant columns
    'pricingInfo_passengerCount']

for leg in [0, 1]:
    for seg in [0, 1]:
        if seg == 0:
            suffixes = [
                "seatsAvailable",
            ]
        else:
            suffixes = [
                "cabinClass",
                "seatsAvailable",
                "baggageAllowance_quantity",
                "baggageAllowance_weightMeasurementType",
                "aircraft_code",
                "arrivalTo_airport_city_iata",
                "arrivalTo_airport_iata",
                "departureFrom_airport_iata",
                "flightNumber",
                "marketingCarrier_code",
                "operatingCarrier_code",
            ]
        for suffix in suffixes:
            exclude_cols.append(f"legs{leg}_segments{seg}_{suffix}")
    
for leg in [0, 1]:
    for seg in [2, 3]:
        for suffix in ['aircraft_code', 'arrivalTo_airport_city_iata', 'arrivalTo_airport_iata',
                      'baggageAllowance_quantity', 'baggageAllowance_weightMeasurementType',
                      'cabinClass', 'departureFrom_airport_iata', 'duration', 'flightNumber',
                      'marketingCarrier_code', 'operatingCarrier_code', 'seatsAvailable']:
            exclude_cols.append(f'legs{leg}_segments{seg}_{suffix}')
            
features = [col for col in train_df.columns if col not in exclude_cols]
print(f"Using {len(features)} features.")

# # 时间序列不能随机打乱，要用历史的数据训练，预测未来
train_cutoff_date = train_df.get_column("requestDate").to_pandas().quantile(0.85, interpolation="nearest") # 分割时间点
train = train_df.filter(pl.col("requestDate") <= train_cutoff_date)
val = train_df.filter(pl.col("requestDate") > train_cutoff_date)
del train_df,train_cutoff_date
gc.collect()
cols = features + ["selected", "ranker_id"]
train = train.select(cols).fill_null(np.nan)
val = val.select(cols).fill_null(np.nan)

X_train = train.select(features).fill_null(np.nan).to_numpy()
y_train = train.get_column("selected").fill_null(np.nan).to_numpy()

X_val = val.select(features).to_numpy()
y_val = val.get_column("selected").to_numpy()

train_groups = train.group_by("ranker_id", maintain_order=True).agg(pl.len()).get_column("len").to_numpy()
val_groups = val.group_by("ranker_id", maintain_order=True).agg(pl.len()).get_column("len").to_numpy()
train_query_ids = train.get_column("ranker_id").to_numpy()
val_query_ids = val.get_column("ranker_id").to_numpy()
del train,val
gc.collect()

# free_raw_data是否保存原始数据
lgb_train = lgb.Dataset(X_train, y_train, group=train_groups, free_raw_data=True)
lgb_val = lgb.Dataset(X_val, y_val, group=val_groups, reference=lgb_train, free_raw_data=True)

dtrain = xgb.DMatrix(X_train, label=y_train,enable_categorical=True)
dval = xgb.DMatrix(X_val, label=y_val,enable_categorical=True)

dtrain.set_group(train_groups)
dval.set_group(val_groups)
del X_train, y_train,train_groups,val_groups
gc.collect()

Preparing data for training...
Using 93 features.


0

#### 辅助函数hitrate@3
$$
\text{HitRate@3} = \frac{1}{|Q|} \sum_{i=1}^{|Q|} \mathbf{1}( \text{rank}_i \leq 3 )
$$
- $ |Q| $  查询总数（即唯一 `ranker_id` 的数量）
- $ \text{rank}_i $ 在第 $ i $ 个查询中，正确航班的预测排名（从 1 开始）
- $ \mathbf{1}(\text{rank}_i \leq 3) $ 指示函数：<br>若 $ \text{rank}_i \leq 3 $，值为 1（命中）；否则为 0（未命中）

In [10]:
def hitrate_at_3(y_true, y_pred, groups):
    df = pl.DataFrame({
        'group': groups,
        'pred': y_pred,
        'true': y_true
    })
    
    return (
        df.sort(["group", "pred"], descending=[False, True])
        .group_by("group", maintain_order=True)
        .head(3)
        .group_by("group")
        .agg(pl.col("true").max())
        .select(pl.col("true").mean())
        .item()
    )

print("Already prepared hitrate@3")

Already prepared hitrate@3


#### 尝试用XGBOOST模型
*  目标函数包含正则化项（叶子节点数和叶子权重）
*  XGBoost 在每一步优化中使用损失函数的二阶导数（Hessian），而传统GBDT只用一阶梯度

#### 先用Optuna贝叶斯优化自动寻找最优参数

In [None]:
# ! pip install optuna
import optuna

def objective_xgb(trial):
    param_space = {
        'objective': 'rank:ndcg',
        'eval_metric': 'ndcg@3',
        'seed': 42,
        'tree_method': 'hist',
        'verbosity': 0,
        "learning_rate": trial.suggest_float("learning_rate", 0.02, 0.04, log=True),
        'max_depth': trial.suggest_int("max_depth", 8, 12),
        "min_child_weight": trial.suggest_int("min_child_weight", 1, 8),
        'subsample': trial.suggest_float("subsample", 0.6, 0.9),
        'colsample_bytree': trial.suggest_float("colsample_bytree", 0.5, 0.8),
        "lambda": trial.suggest_float("lambda", 1.0, 6, log=True),
        "alpha": trial.suggest_float("alpha", 0.3, 0.8, log=True),
    }
    xgb_model = xgb.train(
        params=param_space,
        dtrain=dtrain,
        num_boost_round=250,
        evals=[(dval, 'eval')],
        early_stopping_rounds=50,
        verbose_eval=False
    )
    return xgb_model.best_score

print("Starting hyperparameter optimization...")
study_xgb = optuna.create_study(direction='maximize')
study_xgb.optimize(objective_xgb, n_trials=30, show_progress_bar=True) 

best_params_xgb = study_xgb.best_trial.params
print(f"Best trial's ndcg@3: {study_xgb.best_value}")
print("Best parameters found: ", best_params_xgb)

In [13]:
# 用最优参数训练模型
print("Training XGBoost Model")
xgb_params = {
    'objective': 'rank:ndcg', # XGBoost中的LambdaRank
    'eval_metric': 'ndcg@3',   # 评估指标，可以指定top-k
    "learning_rate": 0.036101364219956196,
    'max_depth': 12,
    "min_child_weight":2,
    'subsample':0.8411394006439512,
    'colsample_bytree': 0.5345076787463193,
    'seed': 42,
    "lambda": 3.134173153619119,
    "alpha": 0.46089915831602574,
    'tree_method': 'hist', 
    # 'device':'cuda',
    'verbosity': 1,
    "max_bin": 127,
    'n_jobs': -1
}

xgb_model = xgb.train(
    params=xgb_params,
    dtrain=dtrain,
    num_boost_round=1000,
    evals=[(dtrain, 'train'), (dval, 'eval')],
    early_stopping_rounds=100,
    verbose_eval=100
)

# 在验证集上获取XGB的预测分数
xgb_value = xgb_model.predict(dval)
xgb_hitrate = hitrate_at_3(y_val,xgb_value,val_query_ids)
print(f"xgb_hitrate@3:{xgb_hitrate}")
best_ntree_limit = xgb_model.best_iteration + 1

Training XGBoost Model
[0]	train-ndcg@3:0.34143	eval-ndcg@3:0.35434
[100]	train-ndcg@3:0.58339	eval-ndcg@3:0.44984
[200]	train-ndcg@3:0.66303	eval-ndcg@3:0.45990
[300]	train-ndcg@3:0.71783	eval-ndcg@3:0.46284
[400]	train-ndcg@3:0.75737	eval-ndcg@3:0.46560
[500]	train-ndcg@3:0.79660	eval-ndcg@3:0.46762
[600]	train-ndcg@3:0.82080	eval-ndcg@3:0.46972
[700]	train-ndcg@3:0.83828	eval-ndcg@3:0.46873
[735]	train-ndcg@3:0.84317	eval-ndcg@3:0.46904
xgb_hitrate@3:0.5631561498508122


#### 打印xgboost模型最重要的30个特征和最不重要的10个特征

In [14]:
feature_names = features
importance = xgb_model.get_score(importance_type='gain')
mapped_importance = {feature_names[int(k.replace('f', ''))]: v for k, v in importance.items()}
importance = sorted(mapped_importance.items(), key=lambda x: x[1], reverse=True)

print("top30 important")
for feat, score in importance[:30]:
    print(f"{feat}: {score}")
    
print("\n not important")
for feat, score in importance[-10:]:
    print(f"{feat}: {score}")

del dval,dtrain
gc.collect()

top30 important
num_segments_0: 148.2381134033203
l0_seg: 62.78269958496094
free_exchange: 49.147220611572266
free_cancel: 48.76648712158203
num_segments_1: 34.44593811035156
avg_cabin_class: 33.27167892456055
legs0_segments0_cabinClass: 30.824491500854492
has_access_tp: 29.87773895263672
legs0_segments1_duration: 28.991907119750977
pricingInfo_isAccessTP: 26.205120086669922
has_fees: 21.393226623535156
is_major_carrier: 18.606416702270508
total_segments: 18.223217010498047
is_popular_route: 11.50790023803711
legs0_segments0_marketingCarrier_code: 11.427835464477539
legs0_segments0_baggageAllowance_quantity: 10.711195945739746
is_vip: 8.32663631439209
searchRoute: 8.176483154296875
miniRules1_monetaryAmount: 7.966120719909668
legs0_segments0_baggageAllowance_weightMeasurementType: 7.792277812957764
legs0_segments0_arrivalTo_airport_iata: 7.568896770477295
carrier_pop_product: 7.359681129455566
miniRules1_statusInfos: 7.009939193725586
legs1_segments0_cabinClass: 6.829896926879883
carri

1834

#### 训练lightGBM模型
* https://lightgbm.readthedocs.io/en/latest/
* 基于直方图的决策树算法（Histogram-based），将连续特征值离散化为离散的“bin”，从而减少计算量
* 传统 GBDT 使用 Level-wise（按层）生长，而 LightGBM 使用 Leaf-wise：每次选择增益最大的叶子节点进行分裂

In [None]:
# 同样先用贝叶斯优化寻找最优参数
def objective_lgb(trial):
    param_space = {
        'objective': 'lambdarank',      # LambdaRank 目标
        'metric': 'ndcg',               # 评估指标
        'eval_at': [3],                   # 计算 ndcg@3
        'boosting_type': 'gbdt',
        'seed': 42,
        'verbosity': -1,                # 在优化时关闭详细日志
        "learning_rate": trial.suggest_float("learning_rate", 0.02, 0.05, log=True),
        'num_leaves': trial.suggest_int("num_leaves",120, 250),
        'max_depth': trial.suggest_int("max_depth", 10, 14),
        'min_child_samples':30,
        'subsample': trial.suggest_float("subsample", 0.6, 0.9, step=0.1),
        'colsample_bytree': trial.suggest_float("colsample_bytree", 0.4, 0.8, step=0.1),
        'reg_alpha': trial.suggest_float("reg_alpha", 0.4, 0.6, log=True), # L1 正则
        'reg_lambda': trial.suggest_float("reg_lambda", 2, 5, log=True), # L2 正则
    }

    # 训练模型
    lgb_model = lgb.train(
        params=param_space,
        train_set=lgb_train,
        num_boost_round=300,
        valid_sets=[lgb_val],
        valid_names=['eval'], # 给验证集命名为 'eval'
        # 使用回调函数进行 early stopping
        callbacks=[lgb.early_stopping(stopping_rounds=50, verbose=False)]
    )
    
    ndcg_score = lgb_model.best_score['eval']['ndcg@3']
    
    return ndcg_score

print("Starting hyperparameter optimization for LightGBM...")
study_lgb = optuna.create_study(direction='maximize')
study_lgb.optimize(objective_lgb, n_trials=15, show_progress_bar=True)
best_params_lgb = study_lgb.best_trial.params
print("\nOptimization finished.")
print(f"Best trial's ndcg@3: {study_lgb.best_value}")
print("Best parameters found: ", best_params_lgb)

In [None]:
print("Starting model training...")

# --- LightGBM Model Parameters ---
# These parameters are crucial for the model's performance and training behavior.
# - `objective`: `lambdarank` is specified for learning-to-rank tasks.
# - `metric@3`: `ndcg` (Normalized Discounted Cumulative Gain) is a common ranking metric,
#             closely related to HitRate@3.
# - `boosting_type`: `gbdt`(标准梯度提升树)、dart(带dropout)、goss(速度快)
# - `n_estimators`: Maximum number of boosting rounds.
# - `learning_rate`: Controls the step size shrinkage to prevent overfitting.
# - `num_leaves`: Maximum number of leaves in one tree. Higher values increase model complexity.
# - `max_depth`: Maximum depth of the tree. -1 means no limit.
# - `seed`: Random seed for reproducibility.
# - `n_jobs`: Number of parallel threads. -1 uses all available cores.
# - `verbose`: Controls the verbosity of training output. -1 means silent.
# - `colsample_bytree`:构建每棵树时，随机采样多少比例的特征
# - `subsample`: 每轮迭代时，随机选取多少比例的样本
# - `reg_alpha`, `reg_lambda`: L1 and L2 regularization terms to prevent overfitting.
# - `device_type`: Set to `gpu` to leverage GPU acceleration if available.
# - `gpu_platform_id`, `gpu_device_id`: Specific GPU device to use.
# - `max_bin`: Maximum number of bins for feature discretization. Higher values can improve accuracy but increase memory/time.

params = {
    "objective": "lambdarank",
    # "objective": "rank_xendcg",
    "metric": "ndcg",
    "eval_at": [3],
    "boosting_type": "gbdt",
    "learning_rate": 0.0416,
    "num_leaves":213,
    "num_boost_round":1000,
    "max_depth":13,
    "seed": 42,
    # "n_jobs": -1,
    "verbose": -1,
    "colsample_bytree": 0.5,
    "subsample": 0.6,
    "reg_alpha": 0.4342867,
    "reg_lambda": 2.9,
    "min_child_samples": 30
    # # GPU settings for P100 (adjust based on your GPU)
    # "device_type": "gpu",
    # "gpu_platform_id": 0,
    # "gpu_device_id": 0,
    # "max_bin": 127, # Can be higher on GPU for better performance
}

lgb_model = lgb.train(
    params,
    lgb_train,
    valid_sets=[lgb_train, lgb_val],
    callbacks=[lgb.early_stopping(50, verbose=True), lgb.log_evaluation(100)]
)

lgb_value = lgb_model.predict(X_val)
lgb_hitrate = hitrate_at_3(y_val,lgb_value,val_query_ids)
print(f"lgb:{lgb_hitrate}")

#### 打印lightGBM最重要的30个特征和最不重要的10个特征

In [None]:
importance = lgb_model.feature_importance(importance_type='gain')
importance_dict = dict(zip(feature_names, importance))
importance = sorted(importance_dict.items(), key=lambda x: x[1], reverse=True)
print("Top 30 important features:")
for feat, score in importance[:30]:
    print(f"{feat}: {score}")
print("\nLeast 20 important features:")
for feat, score in importance[-20:]:
    print(f"{feat}: {score}")

#### 找最佳融合参数

In [None]:
del lgb_train, lgb_val,X_val
gc.collect()

def best_weights(preds1, preds2, true_labels, val_query_ids):
    best_hitrate = 0
    best_w = 0
    for w1 in np.arange(0, 1.01, 0.05):
        blended_preds = w1 * preds1 + (1 - w1) * preds2
        
        # 使用你的最终指标来评估
        current_hitrate = hitrate_at_3(true_labels, blended_preds, val_query_ids)
        
        print(f"Weight_LGB={w1:.2f}, Validation HitRate@3: {current_hitrate:.5f}")
        
        if current_hitrate > best_hitrate:
            best_hitrate = current_hitrate
            best_w = w1
            
    print(f"\nBest weight for LGB: {best_w:.2f}, Best HitRate@3 on validation: {best_hitrate:.5f}")
    return best_w

best_weight = best_weights(lgb_value, xgb_value, y_val, val_query_ids)
del y_val
gc.collect()

#### 接下来用训练好的xgboost模型、lightgbm模型和融合参数在test集上进行预测

In [None]:
print("Generating predictions on the test set...")
X_test = test_df[features]

lgb_scores = lgb_model.predict(X_test, num_iteration=lgb_model.best_iteration)
dtest = xgb.DMatrix(X_test)
del X_test
gc.collect()
xgb_scores = xgb_model.predict(dtest, iteration_range=(0, best_ntree_limit))
predictions = best_weight * lgb_scores + (1-best_weight) * xgb_scores

submission = (test_df.select(['Id', 'ranker_id']).with_columns(pl.Series("score", predictions)).with_columns(
    pl.col("score").rank(method='ordinal', descending=True).over("ranker_id").cast(pl.Int32).alias("selected")).
select(['Id', 'ranker_id', 'selected', 'score'])
)

print("Submission created.")

#### 模型可能会对同一路径给出不同的评分，因此需要重新排序(rule-based rerank)，同一路径以最高分为代表，降低其它较低的分数

In [None]:
def re_rank(test: pl.DataFrame, submission: pl.DataFrame, penalty_factor=0.1):
    COLS_TO_COMPARE = [
        "legs0_departureAt",
        "legs0_arrivalAt",
        "legs1_departureAt",
        "legs1_arrivalAt",
        "legs0_segments0_flightNumber",
        "legs1_segments0_flightNumber",
        "legs0_segments0_aircraft_code",
        "legs1_segments0_aircraft_code",
        "legs0_segments0_departureFrom_airport_iata",
        "legs1_segments0_departureFrom_airport_iata",
    ]

    test = test.with_columns(
        [pl.col(c).cast(str).fill_null("NULL") for c in COLS_TO_COMPARE]
    )

    df = submission.join(test, on=["Id", "ranker_id"], how="left")

    df = df.with_columns(
        (
            pl.col("legs0_departureAt")
            + "_"
            + pl.col("legs0_arrivalAt")
            + "_"
            + pl.col("legs1_departureAt")
            + "_"
            + pl.col("legs1_arrivalAt")
            + "_"
            + pl.col("legs0_segments0_flightNumber")
            + "_"
            + pl.col("legs1_segments0_flightNumber")
        ).alias("flight_hash")
    )

    df = df.with_columns(
        pl.max("score")
        .over(["ranker_id", "flight_hash"])
        .alias("max_score_same_flight")
    )

    df = df.with_columns(
        (
            pl.col("score")
            - penalty_factor * (pl.col("max_score_same_flight") - pl.col("score"))
        ).alias("reorder_score")
    )

    df = df.with_columns(
        pl.col("reorder_score")
        .rank(method="ordinal", descending=True)
        .over("ranker_id")
        .cast(pl.Int32)
        .alias("new_selected")
    )

    return df.select(["Id", "ranker_id", "new_selected", "score", "reorder_score"])
print("rule-based rerank prepared.")

In [None]:
print("Reranking...")
top = re_rank(test_df, submission)
submission = (
    submission.join(top, on=["Id", "ranker_id"], how="left")
    .with_columns(
        [
            pl.when(pl.col("new_selected").is_not_null())
            .then(pl.col("new_selected"))
            .otherwise(pl.col("selected"))
            .alias("selected")
        ]
    )
    .select(["Id", "ranker_id", "selected"])
)
submission.write_csv('submission.csv')

print("Submission head:")
print(submission.head())
print("\nSubmission tail:")
print(submission.tail())