In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from sklearn.metrics import roc_auc_score


df = pd.read_csv("/content/drive/MyDrive/model-citizens/data/Training_TriGuard.csv")

df = df.dropna(subset=['subrogation'])

train, test = train_test_split(df, test_size=0.3, random_state=12, stratify=df['subrogation'])

X_train = train.drop(columns=["subrogation"]).copy()
y_train = train["subrogation"].copy()
X_test = test.drop(columns=["subrogation"]).copy()
y_test = test["subrogation"].copy()

real_test = pd.read_csv("/content/drive/MyDrive/model-citizens/data/Testing_TriGuard.csv")

class Preprocessor:
    def __init__(self):
        self.id_column = ['claim_number']
        self.min_driver_age = 14
        self.max_driver_age = 120

        self.categorical_cols_ = None
        self.label_encoders = {}
        self.income_q25 = None
        self.income_q75 = None

    def _coerce_and_clean(self, df):
        df = df.copy()
        df['claim_date'] = pd.to_datetime(df['claim_date'], errors='coerce')
        df['claim_year'] = df['claim_date'].dt.year
        df.loc[(df['year_of_born'] < 1900) | (df['year_of_born'] > df['claim_year']), 'year_of_born'] = np.nan
        future_mask = df['vehicle_made_year'] > df['claim_year']
        df.loc[future_mask, 'vehicle_made_year'] = np.nan
        return df

    def _create_features(self, df):
        df = df.copy()
        df['is_multi_vehicle_clear'] = (df['accident_type'] == 'multi_vehicle_clear').astype(int)
        df['is_multi_vehicle_unclear'] = (df['accident_type'] == 'multi_vehicle_unclear').astype(int)
        df['is_single_car'] = (df['accident_type'] == 'single_car').astype(int)

        df['has_recovery_target'] = (df['is_multi_vehicle_clear'] | df['is_multi_vehicle_unclear']).astype(int)

        df['recovery_case_clarity'] = 0
        df.loc[df['is_multi_vehicle_clear'] == 1, 'recovery_case_clarity'] = 3
        df.loc[df['is_multi_vehicle_unclear'] == 1, 'recovery_case_clarity'] = 1

        df['witness_present'] = df['witness_present_ind'].map({'Y': 1, 'N': 0})
        df['evidence_none'] = ((df['witness_present'] == 0) & (df['policy_report_filed_ind'] == 0)).astype(int)
        df['evidence_weak'] = (
            ((df['witness_present'] == 1) & (df['policy_report_filed_ind'] == 0)) |
            ((df['witness_present'] == 0) & (df['policy_report_filed_ind'] == 1))
        ).astype(int)
        df['evidence_strong'] = ((df['witness_present'] == 1) & (df['policy_report_filed_ind'] == 1)).astype(int)
        df['evidence_very_strong'] = (
            (df['witness_present'] == 1) &
            (df['policy_report_filed_ind'] == 1) &
            (df['liab_prct'] < 20)
        ).astype(int)

        df['not_at_fault'] = (df['liab_prct'] < 10).astype(int)
        df['minimal_fault'] = ((df['liab_prct'] >= 10) & (df['liab_prct'] <= 20)).astype(int)
        df['shared_fault'] = (df['liab_prct'] > 20).astype(int)

        df['driver_age'] = df['claim_year'] - df['year_of_born']
        bad_age = (df['driver_age'] < self.min_driver_age) | (df['driver_age'] > self.max_driver_age)
        df.loc[bad_age, 'driver_age'] = np.nan
        df['young_driver_18_25'] = ((df['driver_age'] >= 18) & (df['driver_age'] <= 25)).astype(int)
        df['adult_driver_26_45'] = ((df['driver_age'] >= 26) & (df['driver_age'] <= 45)).astype(int)
        df['middle_age_driver_46_65'] = ((df['driver_age'] >= 46) & (df['driver_age'] <= 65)).astype(int)
        df['senior_driver_65plus'] = (df['driver_age'] > 65).astype(int)

        df['driving_experience'] = df['driver_age'] - df['age_of_DL']
        df.loc[df['driving_experience'] < 0, 'driving_experience'] = np.nan
        df['novice_driver'] = (df['driving_experience'] < 2).astype(int)
        df['experienced_2_5y'] = ((df['driving_experience'] >= 2) & (df['driving_experience'] <= 5)).astype(int)
        df['experienced_5_10y'] = ((df['driving_experience'] > 5) & (df['driving_experience'] <= 10)).astype(int)
        df['veteran_driver'] = (df['driving_experience'] > 10).astype(int)

        df['via_broker'] = (df['channel'] == 'Broker').astype(int)
        df['via_online'] = (df['channel'] == 'Online').astype(int)
        df['via_phone']  = (df['channel'] == 'Phone').astype(int)
        df['channel_good_documentation'] = df['channel'].isin(['Broker', 'Online']).astype(int)

        df['low_income'] = np.nan
        df['middle_income'] = np.nan
        df['high_income'] = np.nan

        df['has_high_education'] = df['high_education_ind']
        df['recent_address_change'] = df['address_change_ind']
        df['home_owner'] = (df['living_status'] == 'Own').astype(int)
        df['renter'] = (df['living_status'] == 'Rent').astype(int)
        df['contact_info_available'] = df['email_or_tel_available']
        df['in_network_repair'] = (df['in_network_bodyshop'] == 'yes').astype(int)
        df['out_of_network_repair'] = (df['in_network_bodyshop'] == 'no').astype(int)

        liability_score = np.sqrt((100 - df['liab_prct']) / 100.0)
        evidence_score  = (df['evidence_none'] * 0.0 +
                           df['evidence_weak'] * 0.5 +
                           df['evidence_strong'] * 0.8 +
                           df['evidence_very_strong'] * 1.0)
        clarity_score = df['recovery_case_clarity'] / 3.0
        info_score = df['channel_good_documentation'] * 0.7 + df['contact_info_available'] * 0.3

        weights = np.array([0.30, 0.30, 0.20, 0.15, 0.05])
        parts = np.vstack([
            liability_score,
            df['has_recovery_target'],
            evidence_score,
            clarity_score,
            info_score
        ])
        #df['recovery_feasibility_score'] = (parts * weights.reshape(-1,1)).sum(axis=0)

        df['high_subrogation_potential'] = (
            (df['liab_prct'] < 20) &
            (df['has_recovery_target'] == 1) &
            (df['evidence_strong'] == 1)
        ).astype(int)
        df['likely_no_subrogation'] = (
            (df['liab_prct'] > 50) |
            (df['is_single_car'] == 1) |
            (df['evidence_none'] == 1)
        ).astype(int)
        df['potential_subrogation_case'] = (df['high_subrogation_potential'] == 1).astype(int)

        df = df.drop(columns=self.id_column, errors='ignore')
        return df

    def fit(self, df):
        df = self._coerce_and_clean(df.copy())
        # 分位数（只用训练集）
        self.income_q25 = df['annual_income'].quantile(0.25)
        self.income_q75 = df['annual_income'].quantile(0.75)

        df = self._create_features(df)

        self.categorical_cols_ = list(df.select_dtypes(include=['object']).columns)

        self.label_encoders.clear()
        for col in self.categorical_cols_:
            le = LabelEncoder()
            le.fit(df[col].astype(str))
            self.label_encoders[col] = le
        return self

    def transform(self, df):
        df = self._coerce_and_clean(df.copy())
        df = self._create_features(df)

        q25, q75 = self.income_q25, self.income_q75
        df['low_income']    = (df['annual_income'] <= q25).astype(int)
        df['middle_income'] = ((df['annual_income'] > q25) & (df['annual_income'] <= q75)).astype(int)
        df['high_income']   = (df['annual_income'] > q75).astype(int)

        for col, le in self.label_encoders.items():
            df[col] = le.transform(df[col].astype(str))

        df = df.drop(columns=['claim_date'], errors='ignore')
        return df

    def fit_transform(self, df):
        return self.fit(df).transform(df)

pre = Preprocessor()
X_train_proc = pre.fit_transform(X_train)
X_test_proc = pre.transform(X_test)

X_test_proc = X_test_proc.reindex(columns=X_train_proc.columns, fill_value=0)


# ======================================
# 改进版：小网格调参 + F1 最优阈值搜索
# ======================================
from sklearn.metrics import precision_recall_curve, roc_auc_score, average_precision_score

ratio = y_train.value_counts()[0] / y_train.value_counts()[1]
param_grid = []
for lr in [0.05, 0.07]:
    for depth in [5, 6]:
        for spw_factor in [0.7, 1.0, 1.6, 2.0]:
            param_grid.append({
                "learning_rate": lr,
                "max_depth": depth,
                "scale_pos_weight": ratio * spw_factor,
            })

best_model = None
best_f1 = -1
best_threshold = None
best_params = None

for params in param_grid:
    model = XGBClassifier(
        n_estimators=1200,
        learning_rate=params["learning_rate"],
        max_depth=params["max_depth"],
        min_child_weight=2,
        gamma=0.1,
        subsample=0.8,
        colsample_bytree=0.8,
        reg_alpha=0.3,
        reg_lambda=1.2,
        scale_pos_weight=params["scale_pos_weight"],
        random_state=12,
        eval_metric="auc",
        n_jobs=-1,
    )
    model.fit(X_train_proc, y_train, eval_set=[(X_test_proc, y_test)], verbose=False)

    preds = model.predict_proba(X_test_proc)[:, 1]
    precisions, recalls, thresholds = precision_recall_curve(y_test, preds)
    f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-6)
    idx = np.argmax(f1_scores)

    avg_f1 = f1_scores[idx]
    print(f"{params} -> avg F1: {avg_f1:.4f}")

    if avg_f1 > best_f1:
        best_f1 = avg_f1
        best_threshold = thresholds[idx]
        best_params = params
        best_model = model

print("最优参数组合:", best_params)
print(f"Best threshold={best_threshold:.3f}, Best F1={best_f1:.4f}")

test_pred_proba = best_model.predict_proba(X_test_proc)[:, 1]
print("ROC AUC:", roc_auc_score(y_test, test_pred_proba))
print("PR AUC:", average_precision_score(y_test, test_pred_proba))


X_real_test_proc = pre.transform(real_test)
X_real_test_proc = X_real_test_proc.reindex(columns=X_train_proc.columns, fill_value=0)
real_pred_proba = best_model.predict_proba(X_real_test_proc)[:, 1]
real_pred_label = (real_pred_proba >= best_threshold).astype(int)

prediction = pd.DataFrame({
    "claim_number": real_test["claim_number"],
    "subrogation": real_pred_label
})

prediction.to_csv("TriGuard_4_prediction.csv", index=False)

print(prediction.head())

import matplotlib.pyplot as plt
import xgboost as xgb

xgb.plot_importance(best_model, max_num_features=15)
plt.title("Top 15 Feature Importances")
plt.show()