In [1]:
import pandas as pd
import numpy as np
import pyarrow.parquet as pq
from memory_profiler import profile
import matplotlib.pyplot as plt
import seaborn as sns
import os
import warnings

In [2]:
warnings.filterwarnings("ignore")
@profile
def load_data_with_memory_optimization(file_path):
    """
    内存优化的数据加载函数
    使用PyArrow的Parquet读取器进行分块处理
    """
    # 首先只读取元数据获取列信息
    parquet_file = pq.ParquetFile(file_path)
    
    # 获取列的数据类型
    schema = parquet_file.schema.to_arrow_schema()
    column_types = {name: str(type_) for name, type_ in zip(schema.names, schema.types)}
    
    # 定义优化的数据类型
    dtype_mapping = {}
    for col, col_type in column_types.items():
        if 'int' in col_type:
            dtype_mapping[col] = 'int32'
        elif 'float' in col_type:
            dtype_mapping[col] = 'float32'
        elif 'bool' in col_type:
            dtype_mapping[col] = 'bool'
        else:
            # 对于对象类型，尝试转换为category如果基数低
            dtype_mapping[col] = 'category'
    
    # 分块读取数据
    chunk_size = 100000  # 根据内存调整
    chunks = []
    for batch in parquet_file.iter_batches(batch_size=chunk_size):
        df_chunk = batch.to_pandas()
        
        # 优化数据类型
        for col, dtype in dtype_mapping.items():
            if col in df_chunk.columns:
                try:
                    if dtype == 'category':
                        # 仅当唯一值较少时才转换为category
                        if len(df_chunk[col].unique()) / len(df_chunk) < 0.1:
                            df_chunk[col] = df_chunk[col].astype('category')
                    else:
                        df_chunk[col] = df_chunk[col].astype(dtype)
                except (ValueError, TypeError):
                    pass
        
        chunks.append(df_chunk)
    
    # 合并分块
    df = pd.concat(chunks, ignore_index=True)
    
    return df

# 使用示例
train_path = 'E:/GIT PROJECT/FR/kaggle/input/aeroclub-recsys-2025/train.parquet'
train_df = load_data_with_memory_optimization(train_path)

# 显示基本信息
# print(f"数据集维度: {train_df.shape}")
# print("\n前5行数据:")
# print(train_df.head())
# print("\n数据类型:")
# print(train_df.dtypes)
# print("\n内存使用:")
# print(train_df.memory_usage(deep=True).sum() / (1024**2), "MB")

ERROR: Could not find file C:\Users\25495\AppData\Local\Temp\ipykernel_139988\3395455119.py


In [3]:
@profile
def analyze_group_distribution(df, group_col='ranker_id'):
    """
    分析组大小分布，内存优化版本
    """
    # 使用groupby的size()方法，这是内存效率最高的方式
    group_sizes = df.groupby(group_col).size()
    
    # 转换为更节省内存的Series
    group_sizes = group_sizes.astype('int32')
    
    # 计算统计信息
    stats = {
        'total_groups': len(group_sizes),
        'mean_size': group_sizes.mean(),
        'median_size': group_sizes.median(),
        'min_size': group_sizes.min(),
        'max_size': group_sizes.max(),
        'std_size': group_sizes.std(),
        'percentiles': group_sizes.quantile([0.25, 0.5, 0.75, 0.9, 0.95, 0.99]).to_dict()
    }
    
    # 可视化组大小分布（对数尺度）
    import matplotlib.pyplot as plt
    plt.figure(figsize=(10, 6))
    plt.hist(group_sizes, bins=50, log=True)
    plt.title(f'Distribution of {group_col} group sizes (log scale)')
    plt.xlabel('Group size')
    plt.ylabel('Frequency (log)')
    plt.grid(True)
    plt.savefig('group_size_distribution.png')
    plt.close()
    
    return stats, group_sizes

group_stats, group_sizes = analyze_group_distribution(train_df)
print("\n组大小统计信息:")
for k, v in group_stats.items():
    print(f"{k}: {v}")

ERROR: Could not find file C:\Users\25495\AppData\Local\Temp\ipykernel_147784\3712765949.py

组大小统计信息:
total_groups: 105539
mean_size: 171.93049015056044
median_size: 50.0
min_size: 1
max_size: 8236
std_size: 445.9401183706064
percentiles: {0.25: 19.0, 0.5: 50.0, 0.75: 154.0, 0.9: 396.0, 0.95: 618.0, 0.99: 1876.6199999999953}


In [4]:
@profile
def analyze_user_segments(df):
    """
    分析用户细分市场模式，内存优化版本
    """
    # 选择关键用户特征进行分析
    user_cols = ['profileId', 'companyID', 'sex', 'nationality', 
                'frequentFlyer', 'isVip', 'bySelf', 'isAccess3D']
    
    # 确保这些列存在
    user_cols = [col for col in user_cols if col in df.columns]
    
    # 创建用户特征DataFrame，删除重复项
    user_features = df[user_cols].drop_duplicates(subset=['profileId'])
    
    # 分析公司分布
    company_dist = user_features['companyID'].value_counts(normalize=True)
    
    # 分析用户人口统计
    demo_stats = {}
    for col in ['sex', 'nationality', 'frequentFlyer', 'isVip', 'bySelf', 'isAccess3D']:
        if col in user_features.columns:
            demo_stats[col] = user_features[col].value_counts(normalize=True).to_dict()
    
    # 分析公司政策影响
    if 'corporateTariffCode' in df.columns:
        policy_impact = df.groupby(['companyID', 'corporateTariffCode']).size().unstack(fill_value=0)
    else:
        policy_impact = None
    
    return {
        'company_distribution': company_dist.to_dict(),
        'demographic_stats': demo_stats,
        'policy_impact': policy_impact
    }

# 使用示例
user_segment_stats = analyze_user_segments(train_df)
print("\n用户细分市场分析:")
print("公司分布:", user_segment_stats['company_distribution'])
print("\n人口统计:")
for k, v in user_segment_stats['demographic_stats'].items():
    print(f"{k}: {v}")

ERROR: Could not find file C:\Users\25495\AppData\Local\Temp\ipykernel_147784\2928265634.py

用户细分市场分析:
公司分布: {42620: 0.0725350829232732, 57323: 0.039153149869388254, 60628: 0.03110382115302837, 42622: 0.030253326043375252, 40253: 0.029797703663203938, 24728: 0.026729846303383757, 27507: 0.022538120405807667, 56901: 0.017192151145130917, 25312: 0.017040277018407143, 42702: 0.015187412672377134, 61061: 0.014822914768240082, 36948: 0.014276167912034506, 42174: 0.012878925946175809, 25515: 0.012727051819452039, 44729: 0.012362553915314987, 60482: 0.01166393293238564, 61295: 0.011633558107040885, 43509: 0.011056436425490554, 54163: 0.01096531194945629, 54579: 0.010691938521353502, 60187: 0.010509689569284976, 62795: 0.0099629427130794, 62212: 0.009659194459631856, 54154: 0.00953769515825284, 54218: 0.009385821031529068, 63394: 0.008990948302047263, 55341: 0.008899823826013, 58666: 0.008383451795152179, 61081: 0.008170828017738899, 61063: 0.008110078367049389, 52573: 0.007867079764291355, 25

In [5]:
@profile
def analyze_flight_features(df, sample_size=100000):
    """
    分析航班特征分布，使用采样减少内存使用
    """
    # 对大型数据集进行采样
    if len(df) > sample_size:
        df_sample = df.sample(n=sample_size, random_state=42)
    else:
        df_sample = df.copy()
    
    # 选择关键航班特征
    flight_cols = ['totalPrice', 'taxes', 'legs0_duration', 'legs1_duration',
                  'legs0_segments0_cabinClass', 'legs1_segments0_cabinClass',
                  'miniRules0_statusInfos', 'miniRules1_statusInfos',
                  'pricingInfo_isAccessTP']
    
    # 确保这些列存在
    flight_cols = [col for col in flight_cols if col in df_sample.columns]
    
    # 分析数值特征
    num_features = ['totalPrice', 'taxes', 'legs0_duration', 'legs1_duration']
    num_features = [col for col in num_features if col in flight_cols]
    num_stats = df_sample[num_features].describe().to_dict()
    
    # 分析分类特征
    cat_features = [col for col in flight_cols if col not in num_features]
    cat_stats = {}
    for col in cat_features:
        cat_stats[col] = df_sample[col].value_counts(normalize=True).to_dict()
    
    # 可视化关键特征
    import matplotlib.pyplot as plt
    for col in num_features:
        plt.figure(figsize=(10, 6))
        plt.hist(df_sample[col].dropna(), bins=50)
        plt.title(f'Distribution of {col}')
        plt.xlabel(col)
        plt.ylabel('Frequency')
        plt.grid(True)
        plt.savefig(f'{col}_distribution.png')
        plt.close()
    
    return {
        'numerical_stats': num_stats,
        'categorical_stats': cat_stats
    }

# 使用示例
flight_feature_stats = analyze_flight_features(train_df)
print("\n航班特征分析:")
print("数值特征统计:")
for k, v in flight_feature_stats['numerical_stats'].items():
    print(f"{k}: {v}")
print("\n分类特征统计:")
for k, v in flight_feature_stats['categorical_stats'].items():
    print(f"{k}: {v}")

ERROR: Could not find file C:\Users\25495\AppData\Local\Temp\ipykernel_147784\738968246.py

航班特征分析:
数值特征统计:
totalPrice: {'count': 100000.0, 'mean': 46358.41622, 'std': 73672.43311580128, 'min': 1774.0, '25%': 12884.0, '50%': 25074.0, '75%': 55232.0, 'max': 4067938.0}
taxes: {'count': 100000.0, 'mean': 4244.1895634, 'std': 11559.765067831833, 'min': 0.0, '25%': 1006.0, '50%': 1246.0, '75%': 1746.0, 'max': 691219.0}

分类特征统计:
legs0_segments0_cabinClass: {1.0: 0.8116, 2.0: 0.17608, 4.0: 0.01217, 3.0: 0.00015}
legs1_segments0_cabinClass: {1.0: 0.82932865011575, 2.0: 0.1595087943375642, 4.0: 0.011149250951278571, 3.0: 1.3304595407253666e-05}
miniRules0_statusInfos: {1.0: 0.9750451450079411, 0.0: 0.02495485499205883}
miniRules1_statusInfos: {1.0: 0.5827086947508537, 0.0: 0.41729130524914626}
pricingInfo_isAccessTP: {0.0: 0.5021080146717821, 1.0: 0.4978919853282179}


In [11]:
import pyarrow.parquet as pq
import pandas as pd

def analyze_null_columns(parquet_path, threshold=99):
    """
    分析Parquet文件中空值比例过高的特征
    
    参数:
        parquet_path: Parquet文件路径
        threshold: 空值比例阈值(百分比)，默认99%
    
    返回:
        DataFrame包含空值比例超过阈值的特征及其统计信息
    """
    # 1. 打开Parquet文件
    parquet_file = pq.ParquetFile(parquet_path)
    
    # 2. 获取列名和初始化计数器
    schema = parquet_file.schema_arrow  # 使用schema_arrow替代schema
    columns = schema.names
    null_counts = {col: 0 for col in columns}
    total_rows = parquet_file.metadata.num_rows
    
    print(f"开始分析文件: {parquet_path}")
    print(f"总行数: {total_rows:,}")
    print(f"总特征数: {len(columns)}")
    
    # 3. 分批次读取
    batch_size = 5000  # 根据内存调整
    processed_rows = 0
    
    for batch in parquet_file.iter_batches(batch_size=batch_size):
        df_batch = batch.to_pandas()
        processed_rows += len(df_batch)
        
        # 更新进度信息
        progress = (processed_rows / total_rows) * 100
        print(f"\r处理进度: {progress:.1f}% ({processed_rows:,}/{total_rows:,}行)", end="", flush=True)
        
        for col in columns:
            if col in df_batch.columns:
                null_counts[col] += df_batch[col].isnull().sum()
    
    print("\n分析完成!")
    
    # 4. 计算空值比例
    null_percent = {col: (null_counts[col] / total_rows) * 100 for col in columns}
    
    # 5. 创建结果DataFrame
    result_df = pd.DataFrame({
        'feature': columns,
        'null_count': [null_counts[col] for col in columns],
        'null_percent': [null_percent[col] for col in columns],
        'dtype': [str(schema.field(col).type) for col in columns]  # 使用schema.field
    })
    
    # 6. 筛选并排序高比例空值特征
    high_null_df = result_df[result_df['null_percent'] > threshold]\
                     .sort_values('null_percent', ascending=False)
    
    return high_null_df

# 使用示例
try:
    high_null_features = analyze_null_columns(
        'E:/GIT PROJECT/FR/kaggle/input/aeroclub-recsys-2025/train.parquet',
        threshold=-1  # 可以调整此阈值
    )

    print(len(high_null_features))
    # 打印结果
    if not high_null_features.empty:
        print("\n空值比例超过阈值的特征:")
        print(high_null_features[['feature', 'null_percent', 'dtype']].to_string(index=False))
        
        # 保存结果到CSV
        high_null_features.to_csv('high_null_features.csv', index=False)
        print("\n结果已保存到 high_null_features.csv")
    else:
        print("\n没有发现空值比例超过阈值的特征")
        
except Exception as e:
    print(f"发生错误: {str(e)}")

开始分析文件: E:/GIT PROJECT/FR/kaggle/input/aeroclub-recsys-2025/train.parquet
总行数: 18,145,372
总特征数: 127
处理进度: 100.0% (18,145,372/18,145,372行)
分析完成!
127

空值比例超过阈值的特征:
                                               feature  null_percent         dtype
                 legs1_segments3_operatingCarrier_code     99.999967        string
                        legs1_segments3_seatsAvailable     99.999967        double
                          legs1_segments3_flightNumber     99.999967        string
                         legs1_segments3_aircraft_code     99.999967        string
             legs1_segments3_baggageAllowance_quantity     99.999967        double
                legs1_segments3_arrivalTo_airport_iata     99.999967        string
           legs1_segments3_arrivalTo_airport_city_iata     99.999967        string
                 legs1_segments3_marketingCarrier_code     99.999967        string
                              legs1_segments3_duration     99.999967        string
        

In [8]:
@profile
def validate_data_consistency(df, sample_size=500000):
    """
    验证数据一致性规则，使用采样减少内存使用
    """
    # 对大型数据集进行采样
    if len(df) > sample_size:
        df_sample = df.sample(n=sample_size, random_state=42)
    else:
        df_sample = df.copy()
    
    # 检查1: 每个ranker_id组只有一个selected=1的记录或全部为0
    selected_counts = df_sample.groupby('ranker_id')['selected'].sum()
    # 修正条件判断：每个组的selected总和要么等于1，要么等于0
    invalid_groups = selected_counts[(selected_counts != 1) & (selected_counts != 0)]
    selected_check = len(invalid_groups) == 0
    
    if 'totalPrice' in df_sample.columns and 'taxes' in df_sample.columns:
        price_check = (df_sample['totalPrice'] >= df_sample['taxes']).all()
    else:
        price_check = "Columns not available"
    
    # 检查3: 出发时间早于到达时间（改进部分）
    time_checks = {}
    time_format_errors = {}
    
    def safe_convert_to_datetime(series):
        """安全转换时间格式，记录错误"""
        try:
            return pd.to_datetime(series, errors='coerce')
        except Exception as e:
            print(f"时间转换错误: {e}")
            return pd.Series([pd.NaT] * len(series), False)
    
    # 处理leg0的时间比较
    if 'legs0_departureAt' in df_sample.columns and 'legs0_arrivalAt' in df_sample.columns:
        # 转换时间格式
        leg0_departure = safe_convert_to_datetime(df_sample['legs0_departureAt'])
        leg0_arrival = safe_convert_to_datetime(df_sample['legs0_arrivalAt'])
        
        # 检查转换是否成功
        if leg0_departure is not None and leg0_arrival is not None:
            # 计算有效时间记录
            valid_mask = (~leg0_departure.isna()) & (~leg0_arrival.isna())
            valid_count = valid_mask.sum()
            invalid_count = len(df_sample) - valid_count
            
            # 只比较有效时间记录
            if valid_count > 0:
                time_checks['leg0'] = (leg0_arrival[valid_mask] > leg0_departure[valid_mask]).all()
                time_format_errors['leg0_invalid_timestamps'] = invalid_count
            else:
                time_checks['leg0'] = "No valid timestamps"
        else:
            time_checks['leg0'] = "Time conversion failed"
    
    # 处理leg1的时间比较
    if 'legs1_departureAt' in df_sample.columns and 'legs1_arrivalAt' in df_sample.columns:
        # 转换时间格式
        leg1_departure = safe_convert_to_datetime(df_sample['legs1_departureAt'])
        leg1_arrival = safe_convert_to_datetime(df_sample['legs1_arrivalAt'])
        
        # 检查转换是否成功
        if leg1_departure is not None and leg1_arrival is not None:
            # 计算有效时间记录
            valid_mask = (~leg1_departure.isna()) & (~leg1_arrival.isna())
            valid_count = valid_mask.sum()
            invalid_count = len(df_sample) - valid_count
            
            # 只比较有效时间记录
            if valid_count > 0:
                time_checks['leg1'] = (leg1_arrival[valid_mask] > leg1_departure[valid_mask]).all()
                time_format_errors['leg1_invalid_timestamps'] = invalid_count
            else:
                time_checks['leg1'] = "No valid timestamps"
        else:
            time_checks['leg1'] = "Time conversion failed"
    
    # 检查4: 舱位等级是否有效
    if 'legs0_segments0_cabinClass' in df_sample.columns:
        cabin_classes = df_sample['legs0_segments0_cabinClass'].dropna().unique()
        valid_classes = {1.0, 2.0, 4.0}
        cabin_check = all(x in valid_classes for x in cabin_classes)
    else:
        cabin_check = "Column not available"
    
    return {
        'single_selected_per_group': selected_check,
        'price_greater_than_taxes': price_check,
        'departure_before_arrival': time_checks,
        'time_format_errors': time_format_errors,
        'valid_cabin_classes': cabin_check,
        'invalid_groups': len(invalid_groups)
    }

# 使用示例
consistency_checks = validate_data_consistency(train_df)
print("\n改进版数据一致性检查:")
for k, v in consistency_checks.items():
    print(f"{k}: {v}")

ERROR: Could not find file C:\Users\25495\AppData\Local\Temp\ipykernel_152400\1923724020.py

改进版数据一致性检查:
single_selected_per_group: True
price_greater_than_taxes: True
departure_before_arrival: {'leg0': np.False_, 'leg1': np.False_}
time_format_errors: {'leg0_invalid_timestamps': np.int64(0), 'leg1_invalid_timestamps': np.int64(120793)}
valid_cabin_classes: False
invalid_groups: 0


  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(df[col])
  df[col] = pd.to_datetime(d

ValueError: incompatible dimensions for axis 1