# CMS Healthcare.gov PUF Data Integration

## 目标
按照变量角色需求，整理和合并以下CMS公开使用文件（PUF）：

### 数据来源和变量清单

| 来源文件 | 变量名 | 角色 | 清洗/处理要求 |
|---------|--------|------|---------------|
| **Rate PUF** | `IndividualRate` | 目标变量 (Y) | 必须获取，这是模型预测的最终价格 |
| **Rate PUF** | `Age` | 核心特征 (X) | 必须保留所有年龄段，用于构建分年龄段的费率和风险 |
| **Rate PUF** | `BusinessYear` | 时间键 | 2020-2026 年数据，用于时间序列错位合并 (T → T+2) |
| **Rate PUF** | `RatingAreaId` | 连接键 | 用于连接 Service_Area 表 |
| **Plan Attributes** | `PlanId` | 连接键 / 主键 | 确保连接所有 CMS 表 |
| **Plan Attributes** | `MetalLevel` | 产品特征 (X) | Bronze, Silver, Gold 等，用于模型中的分类特征 |
| **Service Area** | `StateCode` | 地理键 / 分组键 | 州的二字代码（如 NY, CA），用于连接所有数据和进行数据库分区 |

## 数据处理流程
1. 读取所有年份的 Rate PUF 文件
2. 读取所有年份的 Plan Attributes 文件
3. 读取所有年份的 Service Area 文件
4. 按 PlanId 合并 Rate 和 Plan Attributes
5. 按 RatingAreaId/ServiceAreaId 连接 Service Area 获取 StateCode
6. 导出整理后的数据集

In [None]:
# 导入必要的库
import pandas as pd
import numpy as np
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

pd.set_option('display.max_columns', 50)
pd.set_option('display.width', 200)

In [None]:
# 定义数据路径和年份范围
BASE_PATH = Path('2433_p3_data/BRFSS Survey Data')
YEARS = [2020, 2021, 2022, 2023, 2024, 2025, 2026]

# 文件名模式 (不同年份可能有不同命名)
FILE_PATTERNS = {
    'rate': ['Rate_PUF.csv', 'rate-puf.csv'],
    'plan': ['Plan_Attributes_PUF.csv', 'plan-attributes-puf.csv'],
    'service': ['Service_Area_PUF.csv', 'service-area-puf.csv']
}

print(f"数据根目录: {BASE_PATH}")
print(f"分析年份: {YEARS}")

## 1. 读取 Rate PUF 数据

提取关键变量：
- `IndividualRate` (目标变量 Y)
- `Age` (核心特征)
- `BusinessYear` (时间键)
- `RatingAreaId` (连接键)
- `PlanId` (连接键)
- `StateCode` (地理键)

In [None]:
def find_file(year_path, patterns):
    """在指定目录中查找匹配的文件"""
    for pattern in patterns:
        file_path = year_path / pattern
        if file_path.exists():
            return file_path
    return None

# 读取所有年份的 Rate PUF 数据
rate_data_list = []

for year in YEARS:
    year_path = BASE_PATH / str(year)
    rate_file = find_file(year_path, FILE_PATTERNS['rate'])
    
    if rate_file:
        print(f"正在读取 {year} 年 Rate PUF: {rate_file.name}")
        df = pd.read_csv(rate_file, low_memory=False)
        
        # 选择需要的列
        required_cols = ['BusinessYear', 'StateCode', 'PlanId', 'RatingAreaId', 'Age', 'IndividualRate']
        available_cols = [col for col in required_cols if col in df.columns]
        
        df_subset = df[available_cols].copy()
        rate_data_list.append(df_subset)
        
        print(f"  - 读取 {len(df_subset):,} 行, 列: {available_cols}")
    else:
        print(f"警告: {year} 年未找到 Rate PUF 文件")

# 合并所有年份的数据
rate_df = pd.concat(rate_data_list, ignore_index=True)
print(f"\n✓ Rate PUF 总计: {len(rate_df):,} 行")
print(f"  时间范围: {rate_df['BusinessYear'].min()} - {rate_df['BusinessYear'].max()}")
print(f"  州数量: {rate_df['StateCode'].nunique() if 'StateCode' in rate_df.columns else 'N/A'}")
print(f"  计划数量: {rate_df['PlanId'].nunique()}")

In [None]:
# 检查 Rate 数据质量
print("Rate PUF 数据概览:")
print(rate_df.head())
print("\n数据类型:")
print(rate_df.dtypes)
print("\n缺失值统计:")
print(rate_df.isnull().sum())
print("\nAge 分布:")
print(rate_df['Age'].value_counts().sort_index().head(20))

## 2. 读取 Plan Attributes 数据

提取关键变量：
- `PlanId` (主键/连接键)
- `MetalLevel` (产品特征)
- `StateCode` (地理键)
- `BusinessYear` (时间键)

In [None]:
# 读取所有年份的 Plan Attributes 数据
plan_data_list = []

for year in YEARS:
    year_path = BASE_PATH / str(year)
    plan_file = find_file(year_path, FILE_PATTERNS['plan'])
    
    if plan_file:
        print(f"正在读取 {year} 年 Plan Attributes: {plan_file.name}")
        df = pd.read_csv(plan_file, low_memory=False)
        
        # 选择需要的列
        required_cols = ['BusinessYear', 'StateCode', 'PlanId', 'MetalLevel', 'PlanType', 'IssuerId']
        available_cols = [col for col in required_cols if col in df.columns]
        
        df_subset = df[available_cols].copy()
        plan_data_list.append(df_subset)
        
        print(f"  - 读取 {len(df_subset):,} 行, 列: {available_cols}")
    else:
        print(f"警告: {year} 年未找到 Plan Attributes 文件")

# 合并所有年份的数据
plan_df = pd.concat(plan_data_list, ignore_index=True)
print(f"\n✓ Plan Attributes 总计: {len(plan_df):,} 行")
print(f"  时间范围: {plan_df['BusinessYear'].min()} - {plan_df['BusinessYear'].max()}")
print(f"  州数量: {plan_df['StateCode'].nunique() if 'StateCode' in plan_df.columns else 'N/A'}")
print(f"  计划数量: {plan_df['PlanId'].nunique()}")

In [None]:
# 检查 Plan Attributes 数据质量
print("Plan Attributes 数据概览:")
print(plan_df.head())
print("\nMetalLevel 分布:")
print(plan_df['MetalLevel'].value_counts())
print("\nPlanType 分布:")
if 'PlanType' in plan_df.columns:
    print(plan_df['PlanType'].value_counts())

## 3. 读取 Service Area 数据

提取关键变量：
- `ServiceAreaId` (连接键)
- `StateCode` (地理键)
- `BusinessYear` (时间键)

In [None]:
# 读取所有年份的 Service Area 数据
service_data_list = []

for year in YEARS:
    year_path = BASE_PATH / str(year)
    service_file = find_file(year_path, FILE_PATTERNS['service'])
    
    if service_file:
        print(f"正在读取 {year} 年 Service Area: {service_file.name}")
        df = pd.read_csv(service_file, low_memory=False)
        
        # 选择需要的列
        required_cols = ['BusinessYear', 'StateCode', 'ServiceAreaId', 'IssuerId', 'County', 'ZipCodes']
        available_cols = [col for col in required_cols if col in df.columns]
        
        df_subset = df[available_cols].copy()
        service_data_list.append(df_subset)
        
        print(f"  - 读取 {len(df_subset):,} 行, 列: {available_cols}")
    else:
        print(f"警告: {year} 年未找到 Service Area 文件")

# 合并所有年份的数据
service_df = pd.concat(service_data_list, ignore_index=True)
print(f"\n✓ Service Area 总计: {len(service_df):,} 行")
print(f"  时间范围: {service_df['BusinessYear'].min()} - {service_df['BusinessYear'].max()}")
print(f"  州数量: {service_df['StateCode'].nunique() if 'StateCode' in service_df.columns else 'N/A'}")

In [None]:
# 检查 Service Area 数据质量
print("Service Area 数据概览:")
print(service_df.head())
print("\n各州服务区数量:")
if 'StateCode' in service_df.columns:
    print(service_df.groupby('StateCode')['ServiceAreaId'].nunique().sort_values(ascending=False).head(10))

## 4. 数据合并

### 合并策略:
1. **Rate PUF ← Plan Attributes**: 通过 `PlanId` 合并，获取 `MetalLevel` 和其他计划属性
2. **结果 ← Service Area**: 通过 `ServiceAreaId` (从 Plan Attributes) 合并，确保 `StateCode` 正确连接

注意: Rate PUF 中可能已经包含 StateCode，我们将优先使用 Rate PUF 中的 StateCode，如果缺失则从其他表补充。

In [None]:
# 第一步: Rate PUF + Plan Attributes (通过 PlanId)
print("合并 Rate PUF 和 Plan Attributes...")
merged_df = rate_df.merge(
    plan_df,
    on=['PlanId', 'BusinessYear'],
    how='left',
    suffixes=('', '_plan')
)

print(f"合并后行数: {len(merged_df):,}")
print(f"成功匹配计划的比例: {(merged_df['MetalLevel'].notna().sum() / len(merged_df) * 100):.2f}%")

# 如果 Rate 中没有 StateCode，使用 Plan 中的
if 'StateCode' not in rate_df.columns and 'StateCode_plan' in merged_df.columns:
    merged_df['StateCode'] = merged_df['StateCode_plan']
    print("已从 Plan Attributes 补充 StateCode")
elif 'StateCode' in merged_df.columns and 'StateCode_plan' in merged_df.columns:
    # 使用 Rate 中的 StateCode，如果缺失则用 Plan 的
    merged_df['StateCode'] = merged_df['StateCode'].fillna(merged_df['StateCode_plan'])
    merged_df.drop('StateCode_plan', axis=1, inplace=True)
    print("已合并 StateCode (优先使用 Rate PUF 数据)")

In [None]:
# 检查合并后的数据
print("\n合并后数据概览:")
print(merged_df.head())
print("\n合并后数据形状:", merged_df.shape)
print("\n关键列的缺失情况:")
key_cols = ['BusinessYear', 'StateCode', 'PlanId', 'Age', 'IndividualRate', 'MetalLevel']
for col in key_cols:
    if col in merged_df.columns:
        missing_pct = (merged_df[col].isna().sum() / len(merged_df)) * 100
        print(f"  {col}: {missing_pct:.2f}% 缺失")

## 5. 数据清洗和质量检查

In [None]:
# 移除目标变量为空的行
print(f"移除前行数: {len(merged_df):,}")
merged_df = merged_df[merged_df['IndividualRate'].notna()].copy()
print(f"移除 IndividualRate 缺失后行数: {len(merged_df):,}")

# 移除关键特征为空的行
merged_df = merged_df[merged_df['Age'].notna()].copy()
print(f"移除 Age 缺失后行数: {len(merged_df):,}")

merged_df = merged_df[merged_df['MetalLevel'].notna()].copy()
print(f"移除 MetalLevel 缺失后行数: {len(merged_df):,}")

merged_df = merged_df[merged_df['StateCode'].notna()].copy()
print(f"移除 StateCode 缺失后行数: {len(merged_df):,}")

In [None]:
# 数据类型转换和清理
print("\n数据类型转换...")

# 确保 BusinessYear 是整数
merged_df['BusinessYear'] = merged_df['BusinessYear'].astype(int)

# 确保 StateCode 是字符串
merged_df['StateCode'] = merged_df['StateCode'].astype(str).str.strip().str.upper()

# 清理 Age (可能是 'Family Option' 或数值)
merged_df['Age_Original'] = merged_df['Age']  # 保留原始值
merged_df['IsAgeNumeric'] = pd.to_numeric(merged_df['Age'], errors='coerce').notna()

print(f"数值型 Age 的比例: {merged_df['IsAgeNumeric'].sum() / len(merged_df) * 100:.2f}%")
print(f"\nAge 类型分布:")
print(merged_df['Age'].value_counts().head(20))

In [None]:
# 最终数据集统计
print("="*80)
print("最终整理数据集统计")
print("="*80)
print(f"总行数: {len(merged_df):,}")
print(f"时间范围: {merged_df['BusinessYear'].min()} - {merged_df['BusinessYear'].max()}")
print(f"州数量: {merged_df['StateCode'].nunique()}")
print(f"计划数量: {merged_df['PlanId'].nunique()}")
print(f"\nIndividualRate (目标变量) 统计:")
print(merged_df['IndividualRate'].describe())
print(f"\n按年份分布:")
print(merged_df['BusinessYear'].value_counts().sort_index())
print(f"\n按州分布 (前10):")
print(merged_df['StateCode'].value_counts().head(10))
print(f"\n按 MetalLevel 分布:")
print(merged_df['MetalLevel'].value_counts())

## 6. 导出整理后的数据

In [None]:
# 选择最终导出的列
export_cols = [
    'BusinessYear',      # 时间键
    'StateCode',         # 地理键 / 分组键
    'PlanId',           # 主键 / 连接键
    'RatingAreaId',     # 连接键
    'Age',              # 核心特征
    'Age_Original',     # 原始 Age 值
    'IsAgeNumeric',     # Age 是否为数值型
    'IndividualRate',   # 目标变量 (Y)
    'MetalLevel',       # 产品特征
    'PlanType',         # 额外的计划类型特征
    'IssuerId'          # 发行商ID
]

# 只保留存在的列
export_cols = [col for col in export_cols if col in merged_df.columns]
export_df = merged_df[export_cols].copy()

# 导出路径
output_dir = Path('2433_p3_data/healthcare.gov/exports/integrated')
output_dir.mkdir(parents=True, exist_ok=True)

output_file = output_dir / 'cms_rate_integrated_2020_2026.csv'
export_df.to_csv(output_file, index=False)

print(f"✓ 数据已导出到: {output_file}")
print(f"  导出行数: {len(export_df):,}")
print(f"  导出列数: {len(export_cols)}")
print(f"  文件大小: {output_file.stat().st_size / 1024 / 1024:.2f} MB")
print(f"\n导出列清单: {export_cols}")

In [None]:
# 额外导出: 按年份分割的文件 (可选)
print("\n按年份导出单独文件...")
for year in sorted(export_df['BusinessYear'].unique()):
    year_df = export_df[export_df['BusinessYear'] == year]
    year_file = output_dir / f'cms_rate_integrated_{year}.csv'
    year_df.to_csv(year_file, index=False)
    print(f"  {year}: {len(year_df):,} 行 -> {year_file.name}")

print("\n✓ 所有数据导出完成!")

## 7. 数据质量报告

In [None]:
# 生成数据质量报告
print("="*80)
print("数据质量报告")
print("="*80)

print("\n1. 变量角色完成度检查:")
required_vars = {
    'IndividualRate': '目标变量 (Y)',
    'Age': '核心特征 (X)',
    'BusinessYear': '时间键',
    'RatingAreaId': '连接键',
    'PlanId': '主键/连接键',
    'MetalLevel': '产品特征 (X)',
    'StateCode': '地理键/分组键'
}

for var, role in required_vars.items():
    if var in export_df.columns:
        completeness = (export_df[var].notna().sum() / len(export_df)) * 100
        print(f"  ✓ {var} ({role}): {completeness:.2f}% 完整")
    else:
        print(f"  ✗ {var} ({role}): 缺失")

print("\n2. 数据范围检查:")
print(f"  年份范围: {export_df['BusinessYear'].min()} - {export_df['BusinessYear'].max()}")
print(f"  覆盖州数: {export_df['StateCode'].nunique()} 个州")
print(f"  计划数量: {export_df['PlanId'].nunique():,} 个不同计划")

print("\n3. 目标变量 (IndividualRate) 分布:")
print(f"  均值: ${export_df['IndividualRate'].mean():.2f}")
print(f"  中位数: ${export_df['IndividualRate'].median():.2f}")
print(f"  标准差: ${export_df['IndividualRate'].std():.2f}")
print(f"  最小值: ${export_df['IndividualRate'].min():.2f}")
print(f"  最大值: ${export_df['IndividualRate'].max():.2f}")

print("\n4. 年龄特征分布:")
if 'IsAgeNumeric' in export_df.columns:
    numeric_age_pct = export_df['IsAgeNumeric'].sum() / len(export_df) * 100
    print(f"  数值型年龄: {numeric_age_pct:.2f}%")
    print(f"  非数值型年龄 (如 Family Option): {100 - numeric_age_pct:.2f}%")

print("\n" + "="*80)
print("数据整理完成! 可以开始建模了。")
print("="*80)