In [None]:
import pandas as pd
import numpy as np

# 1. Prepare Data
file_path = "真社製首個樓宇維修公開資料庫.xlsx"
df = pd.read_excel(file_path)

# Clean bid amount and calculate log bid
df['bid_amount'] = pd.to_numeric(df['涉及費用'].astype(str).str.replace(r'[^\d.]', '', regex=True), errors='coerce')
df['log_bid'] = np.log(df['bid_amount'])
df = df.dropna(subset=['log_bid', '公司名稱', '大廈/屋苑名稱(入標年份)'])

# 2. Calculate Residuals (De-centering)
# Remove project-specific cost factors by subtracting the project mean
df['project_mean'] = df.groupby('大廈/屋苑名稱(入標年份)')['log_bid'].transform('mean')
df['residual'] = df['log_bid'] - df['project_mean']

# 3. Filter Configuration
MIN_JOINT_PROJECTS = 5      # Minimum joint bids required to calculate correlation
CORRELATION_THRESHOLD = 0.8 # High risk threshold

# 4. Build Pivot Matrix (Index=Project, Columns=Company, Values=Residual)
pivot_matrix = df.pivot_table(index='大廈/屋苑名稱(入標年份)', columns='公司名稱', values='residual')

# 5. Detection Algorithm
suspicious_pairs = []
companies = pivot_matrix.columns
n_companies = len(companies)

print(f"Scanning interactions for {n_companies} companies...")

# Iterate through unique pairs of companies
for i in range(n_companies):
    for j in range(i + 1, n_companies):
        firm_a = companies[i]
        firm_b = companies[j]
        
        # Extract data for the pair
        pair_data = pivot_matrix[[firm_a, firm_b]].dropna()
        joint_count = len(pair_data)
        
        if joint_count >= MIN_JOINT_PROJECTS:
            # Calculate correlation of residuals
            corr = pair_data[firm_a].corr(pair_data[firm_b])
            
            if corr > CORRELATION_THRESHOLD:
                suspicious_pairs.append({
                    'Firm_A': firm_a,
                    'Firm_B': firm_b,
                    'Joint_Projects': joint_count,
                    'Correlation': corr
                })

# 6. Output Results
suspicious_df = pd.DataFrame(suspicious_pairs)

if not suspicious_df.empty:
    suspicious_df = suspicious_df.sort_values(by='Correlation', ascending=False)
    print("\n[High Risk Syndicates Found]")
    print(suspicious_df.head(100).to_markdown(index=False))
    
    # Save to CSV
    suspicious_df.to_csv('suspicious_syndicates.csv', index=False)
    print("\nSaved to 'suspicious_syndicates.csv'")
else:
    print("No pairs found exceeding the correlation threshold.")

In [None]:
import pandas as pd
import numpy as np
import re

# ==========================================
# 1. Data Loading & Cleaning
# ==========================================
file_path = "傳真社製首個樓宇維修公開資料庫.xlsx"
df = pd.read_excel(file_path)

# --- Cleaning A: Win Status ---
# Standardize: 1 = Winner, 0 = Lost
df['is_winner'] = pd.to_numeric(df['中標'], errors='coerce').fillna(0).astype(int)

# --- Cleaning B: Rank ---
# 1 = Lowest price, higher number = more expensive
df['rank_num'] = pd.to_numeric(df['排名(平至貴)'], errors='coerce')

# --- Cleaning C: Building Features ---
df['has_mall'] = df['其他設施'].astype(str).str.contains('商場|mall', case=False, na=False).astype(int)
df['has_club'] = df['其他設施'].astype(str).str.contains('會所|club', case=False, na=False).astype(int)
# Extract unit count
df['units_num'] = df['單位'].astype(str).str.extract(r'(\d+)').astype(float)

print(f"Data cleaning complete. Total records: {len(df)}")
print("-" * 30)

# ==========================================
# 2. Core Analysis Logic
# ==========================================

# Filter for winning bids only
winning_bids = df[df['is_winner'] == 1].copy()

# Aggregate company profile
company_profile = winning_bids.groupby('公司名稱').agg({
    '大廈/屋苑名稱(入標年份)': 'count',      # Win Count
    'rank_num': 'mean',                   # Avg Winning Rank
    'has_mall': 'mean',                   # Mall Ratio
    'has_club': 'mean',                   # Club Ratio
    'units_num': 'mean',                  # Avg Project Size (Units)
    '公司性質': 'first'                   # Company Type
}).rename(columns={'大廈/屋苑名稱(入標年份)': 'total_wins'})

# Calculate Global Benchmarks
global_avg_rank = winning_bids['rank_num'].mean()
global_avg_units = winning_bids['units_num'].mean()
print(f"Global Benchmarks: Avg Rank={global_avg_rank:.2f}, Avg Units={global_avg_units:.0f}")

# ==========================================
# 3. Define Risk Indicators
# ==========================================

# Filter: Companies with at least 2 wins
suspects = company_profile[company_profile['total_wins'] >= 2].copy()

# --- Indicator 1: Price Manipulation ---
# Avg winning rank > 3 implies winning despite high prices
suspects['Risk_HighPriceWin'] = suspects['rank_num'] > 3.0

# --- Indicator 2: Predatory Targeting ---
# Targeting large estates (>1000 units) with clubs (>30% ratio)
suspects['Risk_BigTarget'] = (suspects['units_num'] > 1000) & (suspects['has_club'] > 0.3)

# Combined High Risk Flag
suspects['High_Risk_Flag'] = suspects['Risk_HighPriceWin'] | suspects['Risk_BigTarget']

# ==========================================
# 4. Output Results
# ==========================================

top_suspects = suspects[suspects['High_Risk_Flag'] == True].sort_values(by='rank_num', ascending=False)

columns_to_show = ['total_wins', 'rank_num', 'units_num', 'has_club', 'Risk_HighPriceWin', 'Risk_BigTarget', '公司性質']
print("\n[High Risk Companies (Based on Feature Analysis)]")
print(top_suspects[columns_to_show])

# Save to CSV
top_suspects[columns_to_show].to_csv('high_risk_bidders_analysis.csv')
print("\nResults saved to 'high_risk_bidders_analysis.csv'")

In [None]:
import pandas as pd
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
import seaborn as sns

# 1. Data Preparation
file_path = "傳真社製首個樓宇維修公開資料庫.xlsx"
df = pd.read_excel(file_path)
df['bid_amount'] = pd.to_numeric(df['涉及費用'].astype(str).str.replace(r'[^\d.]', '', regex=True), errors='coerce')
df['log_bid'] = np.log(df['bid_amount'])
df['is_winner'] = pd.to_numeric(df['中標'], errors='coerce').fillna(0).astype(int)
df['rank_num'] = pd.to_numeric(df['排名(平至貴)'], errors='coerce')
df['units_num'] = df['單位'].astype(str).str.extract(r'(\d+)').astype(float)

# Extract district
def extract_district(name):
    districts = ['沙田', '荃灣', '土瓜灣', '深水埗', '灣仔', '西環', '大圍', '元朗', '青衣', '大埔', '九龍城', '上環', '中環']
    for d in districts:
        if d in str(name): return d
    return 'Other'
df['district'] = df['大廈/屋苑名稱(入標年份)'].apply(extract_district)

# 2. Build Network Graph Data (based on residual correlation)
# Calculate residuals
df_reg = df.dropna(subset=['log_bid', '公司名稱', '大廈/屋苑名稱(入標年份)']).copy()
df_reg['project_mean'] = df_reg.groupby('大廈/屋苑名稱(入標年份)')['log_bid'].transform('mean')
df_reg['residual'] = df_reg['log_bid'] - df_reg['project_mean']

# Pivot table
pivot_matrix = df_reg.pivot_table(index='大廈/屋苑名稱(入標年份)', columns='公司名稱', values='residual')

# Filter high-risk pairs (correlation > 0.8, common projects >= 3)
corr_matrix = pivot_matrix.corr()
links = []
companies = corr_matrix.columns
# For demonstration, take only active companies to avoid cluttered graph
active_companies = df['公司名稱'].value_counts().head(30).index.tolist()
# Ensure companies involved in the case are included
garden_vista_companies = df[df['涉翠湖案'] == 1]['公司名稱'].unique()
target_companies = list(set(active_companies) | set(garden_vista_companies))

filtered_corr = pivot_matrix[target_companies].corr()

# Build edge list
for i in range(len(target_companies)):
    for j in range(i+1, len(target_companies)):
        firm_a = target_companies[i]
        firm_b = target_companies[j]
        # Check number of common projects
        common_projects = pivot_matrix[[firm_a, firm_b]].dropna().shape[0]
        if common_projects >= 3:
            corr = filtered_corr.loc[firm_a, firm_b]
            if corr > 0.8: # Threshold
                links.append((firm_a, firm_b, corr))

# 3. Plot 1: Collusion Network Graph
plt.figure(figsize=(12, 12))
G = nx.Graph()
for firm_a, firm_b, corr in links:
    G.add_edge(firm_a, firm_b, weight=corr)

# Node colors: case-involved companies in red, others in blue
node_colors = []
for node in G.nodes():
    if node in garden_vista_companies:
        node_colors.append('#FF6B6B') # Red
    else:
        node_colors.append('#4ECDC4') # Teal

# Layout
pos = nx.spring_layout(G, k=0.3, iterations=50, seed=42)
nx.draw_networkx_nodes(G, pos, node_size=300, node_color=node_colors, alpha=0.9)
nx.draw_networkx_edges(G, pos, width=1.5, alpha=0.5, edge_color='gray')
# Label optimization
degrees = dict(G.degree)
labels = {node: node for node in G.nodes() if degrees[node] > 1}
# Set font for Chinese display
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
nx.draw_networkx_labels(G, pos, labels=labels, font_size=8)

plt.title("Collusion Network: High Correlation Pairs (>0.8)\n(Red nodes = Known Bad Companies)", fontsize=15)
plt.axis('off')
plt.savefig('collusion_network.png')

# 4. Plot 2: Scatter Plot (Winning Rank vs Project Size)
plt.figure(figsize=(10, 6))
winners = df[df['is_winner'] == 1].dropna(subset=['units_num', 'rank_num'])
# Mark case-involved companies
winners['Type'] = winners['公司名稱'].apply(lambda x: 'High Risk (Case Involved)' if x in garden_vista_companies else 'Normal')

sns.scatterplot(data=winners, x='units_num', y='rank_num', hue='Type', style='Type', 
                palette={'High Risk (Case Involved)': '#FF4500', 'Normal': '#1f77b4'}, s=100, alpha=0.7)

plt.axhline(y=3, color='gray', linestyle='--', alpha=0.5, label='Risk Threshold (Rank > 3)')
plt.axvline(x=1000, color='gray', linestyle='--', alpha=0.5, label='Target Threshold (Units > 1000)')
plt.title("Winning Strategy Analysis: Are they targeting 'Fat Sheep'?", fontsize=14)
plt.xlabel("Project Size (Number of Units)")
plt.ylabel("Winning Price Rank (1=Cheapest)")
plt.legend(title='Company Type')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('winning_strategy_scatter.png')

print("Charts generated: collusion_network.png, winning_strategy_scatter.png")

In [None]:
import pandas as pd

def map_gvkey_to_cik():
    # 1. Read data
    print("Reading data...")
    try:
        # Read main dataset
        df_main = pd.read_csv('data_FraudDetection_JAR2020.csv')
        # Read auxiliary dataset containing CIK
        df_aaer = pd.read_csv('AAER_firm_year.csv')
    except FileNotFoundError as e:
        print(f"Error: File not found. Please ensure the CSV files are in the current directory.\nDetails: {e}")
        return

    # 2. Data preprocessing
    # Ensure the data types of the join keys are consistent (usually p_aaer is numeric)
    # In some cases, CSV reading may treat it as object, here force convert to numeric
    df_main['p_aaer'] = pd.to_numeric(df_main['p_aaer'], errors='coerce')
    df_aaer['P_AAER'] = pd.to_numeric(df_aaer['P_AAER'], errors='coerce')

    # 3. Prepare mapping table
    # The AAER file may have multiple rows for the same P_AAER (different years), but CIK should be the same.
    # We only take P_AAER and CIK columns, and drop duplicates to prevent inflation of main data rows.
    mapping_table = df_aaer[['P_AAER', 'CIK']].drop_duplicates()

    # 4. Perform merge (Merge/Join)
    # Use left join to retain all rows from main data
    print("Merging data...")
    df_result = pd.merge(
        df_main, 
        mapping_table, 
        left_on='p_aaer',   # Main table join key
        right_on='P_AAER',  # Mapping table join key
        how='left'          # Retain all rows from main data even if CIK is not found
    )

    # 5. Result statistics
    total_rows = len(df_result)
    matched_rows = df_result['CIK'].notna().sum()
    
    print("-" * 30)
    print(f"Processing completed!")
    print(f"Total rows: {total_rows}")
    print(f"Rows successfully matched to CIK (fraud samples): {matched_rows}")
    print(f"Rows not matched to CIK (non-fraud samples): {total_rows - matched_rows}")
    unique_cik_count = df_result['CIK'].dropna().nunique()
    print(f"Unique CIK count (deduplicated fraud samples): {unique_cik_count}")
    print("-" * 30)

    # 6. View sample data
    # Print a few rows of data successfully matched to CIK
    print("\nSample of successfully matched records (first 5 rows):")
    print(df_result[df_result['CIK'].notna()][['gvkey', 'p_aaer', 'CIK', 'misstate']].head())

    # 7. Save results (optional)
    df_result.to_csv('C:\\Users\\MaXin\\Desktop\\HSBC\\FraudDetection-master\\FraudDetection\\data_with_cik.csv', index=False)
    print("\nFile saved as data_with_cik.csv")

if __name__ == "__main__":
    map_gvkey_to_cik()

In [None]:
    # Reorder columns to place 'CIK' right after 'gvkey'
    cols = list(df_result.columns)
    if 'gvkey' in cols and 'CIK' in cols:
        cols.remove('CIK')
        gvkey_index = cols.index('gvkey')
        cols.insert(gvkey_index + 1, 'CIK')
    df_result = df_result[cols]
    
    df_result.to_csv('data_with_cik.csv', index=False)
    print("\nFile saved as data_with_cik.csv")
    
    # Save fraud-only rows to a new CSV
    fraud_df = df_result[df_result['misstate'] == 1]
    fraud_df.to_csv('fraud_only_data.csv', index=False)
    print("Fraud-only data saved as fraud_only_data.csv")



In [None]:
import pandas as pd
import matplotlib.pyplot as plt

def analyze_ticker_matches(df):
    # 1. 筛选出成功匹配到 Ticker 的行
    # 注意：根据之前的逻辑，没匹配上的可能是 None 或 NaN
    mask_matched = df['Ticker'].notna()
    df_matched = df[mask_matched]
    
    # -------------------------------------------------------
    # 统计 1: 总体匹配情况
    # -------------------------------------------------------
    total_rows = len(df)
    matched_count = len(df_matched)
    unique_companies_matched = df_matched['CIK'].nunique()
    
    print("="*40)
    print("DATASET TICKER 匹配统计报告")
    print("="*40)
    print(f"总行数 (Total Rows): {total_rows}")
    print(f"成功匹配 Ticker 的行数: {matched_count}")
    print(f"匹配率: {matched_count / total_rows:.2%}")
    print(f"成功匹配的唯一公司数 (Unique Companies): {unique_companies_matched}")
    print("-" * 40)

    # -------------------------------------------------------
    # 统计 2: 造假样本 vs 非造假样本 的匹配情况
    # -------------------------------------------------------
    # 很多老旧的造假公司可能已经退市，导致匹配失败，这里看下差异
    if 'misstate' in df.columns:
        fraud_df = df[df['misstate'] == 1]
        fraud_matched = fraud_df[fraud_df['Ticker'].notna()]
        
        print(f"造假样本 (Fraud) 总数: {len(fraud_df)}")
        print(f"造假样本匹配到 Ticker 数: {len(fraud_matched)}")
        print(f"造假样本匹配率: {len(fraud_matched) / len(fraud_df) if len(fraud_df)>0 else 0:.2%}")
        print("-" * 40)

    # -------------------------------------------------------
    # 统计 3: 年份分布 (Year Distribution)
    # -------------------------------------------------------
    print("匹配成功的年份分布 (前10个年份):")
    year_dist = df_matched['fyear'].value_counts().sort_index()
    print(year_dist.head(10))
    print("...")
    print(year_dist.tail(5))
    
    # -------------------------------------------------------
    # 可视化 (可选)
    # -------------------------------------------------------
    # 绘制简单的柱状图看分布
    plt.figure(figsize=(12, 6))
    plt.bar(year_dist.index, year_dist.values, color='skyblue', label='Matched Rows')
    plt.title('Distribution of Rows with Matched Tickers by Fiscal Year')
    plt.xlabel('Fiscal Year (fyear)')
    plt.ylabel('Count of Matched Rows')
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.legend()
    plt.show()

# 运行分析
analyze_ticker_matches(df_final)

In [None]:
import pandas as pd

def thorough_comparison(my_file, colleague_file):
    df_mine = pd.read_excel(my_file)
    df_theirs = pd.read_excel(colleague_file)

    df_m = df_mine[df_mine['Ticker'].notna()].copy()
    df_t = df_theirs[df_theirs['Ticker'].notna()].copy()

    for df in [df_m, df_t]:
        df['CIK'] = pd.to_numeric(df['CIK'], errors='coerce').fillna(0).astype(int)
        df['key'] = df['CIK'].astype(str) + "_" + df['fyear'].astype(str)

    ciks_mine = set(df_m['CIK'])
    ciks_theirs = set(df_t['CIK'])

    only_mine_ciks = ciks_mine - ciks_theirs
    only_theirs_ciks = ciks_theirs - ciks_mine
    common_ciks = ciks_mine & ciks_theirs

    print("="*60)
    print("【1. 唯一公司 (CIK) 维度对比】")
    print(f"你的唯一公司数: {len(ciks_mine)}")
    print(f"同事的唯一公司数: {len(ciks_theirs)}")
    print(f"共同匹配到的公司数: {len(common_ciks)}")
    print(f"仅你匹配到的公司数: {len(only_mine_ciks)}")
    print(f"仅同事匹配到的公司数: {len(only_theirs_ciks)}")

    if only_mine_ciks:
        print("\n>>> 仅在你结果中出现的公司 (示例):")
        print(df_m[df_m['CIK'].isin(only_mine_ciks)].drop_duplicates('CIK')[['CIK', 'Ticker']].head(10))

    if only_theirs_ciks:
        print("\n>>> 仅在同事结果中出现的公司 (示例):")
        print(df_t[df_t['CIK'].isin(only_theirs_ciks)].drop_duplicates('CIK')[['CIK', 'Ticker']].head(10))

    keys_mine = set(df_m['key'])
    keys_theirs = set(df_t['key'])

    only_mine_keys = keys_mine - keys_theirs
    only_theirs_keys = keys_theirs - keys_mine

    print("\n" + "="*60)
    print("【2. 样本行 (CIK-fyear) 维度对比】")
    print(f"仅你有的样本行数: {len(only_mine_keys)}")
    print(f"仅同事有的样本行数: {len(only_theirs_keys)}")

    merged = pd.merge(df_m[['key', 'Ticker', 'fyear', 'CIK']], 
                      df_t[['key', 'Ticker']], 
                      on='key', suffixes=('_mine', '_theirs'))
    
    conflicts = merged[merged['Ticker_mine'] != merged['Ticker_theirs']]

    print("\n" + "="*60)
    print("【3. Ticker 标注冲突对比】")
    if not conflicts.empty:
        print(f"发现 {len(conflicts)} 行数据的 Ticker 标注不一致：")
        print(conflicts[['fyear', 'CIK', 'Ticker_mine', 'Ticker_theirs']])
    else:
        print("恭喜！所有共同样本的 Ticker 标注完全一致。")

# 使用方法
my_file = 'data_with_ticker_and_name.xlsx'
colleague_file = 'colleague_data.xlsx'
thorough_comparison(my_file, colleague_file)

In [None]:
import pandas as pd
import numpy as np

batches = ['1', '2', '3']
target_variables = ['a_1', 'b_2', 'c_3', 'is_active'] 
output_file = 'variable_stats_analysis.xlsx'
                          
results_list = []

for run_dt in batches:
    print(f"Processing batch: {run_dt}")
    
    non_fraud_df = mock_load_data(run_dt, is_fraud=False)
    
    data_groups = {
        'non-fraud': non_fraud_df
    }
    
    combined_df = non_fraud_df

    # Fraud part
    # fraud_df = mock_load_data(run_dt, is_fraud=True)
    # data_groups['fraud'] = fraud_df
    # combined_df = pd.concat([non_fraud_df, fraud_df], ignore_index=True)

    # Total part
    # data_groups['total'] = combined_df

    for var in target_variables:

        if var not in combined_df.columns:
            print(f"Warning: Col {var} is not existing, skipping...")
            continue
            
        is_numeric = pd.api.types.is_numeric_dtype(combined_df[var])
        is_bool = pd.api.types.is_bool_dtype(combined_df[var])
        
        if not is_numeric or is_bool:
            print(f"Col {var} is not numeric, skipping...")
            continue
            
        for group_name, df_group in data_groups.items():
            series = df_group[var]
            
            stats = {
                'var': var,
                'batch': run_dt,
                'Flag_fraud': group_name,
                'mean': series.mean(),
                'std': series.std(),
                'min': series.min(),
                'median': series.median(),
                'max': series.max()
            }
            results_list.append(stats)

if results_list:
    final_df = pd.DataFrame(results_list)

    columns_order = ['var', 'batch', 'Flag_fraud', 'mean', 'std', 'min', 'median', 'max']
    final_df = final_df[columns_order]

    group_order_map = {'fraud': 1, 'non-fraud': 2, 'total': 3}
    final_df['group_rank'] = final_df['Flag_fraud'].map(group_order_map)

    final_df = final_df.sort_values(by=['var', 'batch', 'group_rank'])
    final_df = final_df.drop(columns=['group_rank'])

    pd.set_option('display.max_rows', None)
    pd.set_option('display.width', 1000)

    print(final_df)

    try:
        final_df.to_excel(output_file, index=False)
        print(f"\nResults are saved in {output_file}")
    except Exception as e:
        print(f"\nFailed to save Excel: {e}")
else:
    print("No valid results to display or save.")

In [None]:
import pandas as pd
import numpy as np
import re

# ==========================================
# 1. 辅助工具函数
# ==========================================

def natural_sort_key(s):
    """自然排序键生成 (使用 Tuple 解决 unhashable 问题)"""
    return tuple(int(text) if text.isdigit() else text.lower() for text in re.split(r'(\d+)', str(s)))

def get_top_5_values(series):
    """获取 Series 中出现频率最高的5个值，作为类型判断的证据"""
    try:
        # 转换为字符串以免混合类型报错，取前5个
        return str(series.value_counts().head(5).index.tolist())
    except:
        return "Error getting values"

# ==========================================
# 2. 模拟数据生成 (增加了一些 tricky 的情况)
# ==========================================
def mock_load_data(run_dt, is_fraud=False):
    np.random.seed(int(run_dt) + int(is_fraud))
    n_rows = 50
    data = {
        # --- 数值变量 ---
        'amount': np.random.uniform(10, 100, n_rows),
        'age': np.random.randint(20, 60, n_rows),
        
        # --- 布尔变量 (标准) ---
        'is_active': np.random.choice([True, False], n_rows),
        'has_email': np.random.choice([True, False, None], n_rows), # 含 Null
        
        # --- 容易混淆的变量 (测试诊断功能) ---
        'flag_str_bool': np.random.choice(['True', 'False'], n_rows), # 字符串布尔
        'category_code': np.random.choice(['A1', 'B2', 'C3'], n_rows), # 字符串
        'mixed_num': np.random.choice([1, 0, 1, 0], n_rows) # 整数 0/1 (通常被视为数值，但逻辑上可能是布尔)
    }
    return pd.DataFrame(data)

# ==========================================
# 3. 配置部分
# ==========================================
batches = ['1', '2', '3']
output_file = 'comprehensive_analysis.xlsx'

# 容器，用于存放最终结果
numeric_results = []
boolean_results = []
diagnosis_results = []

# ==========================================
# 4. 主处理逻辑
# ==========================================

for run_dt in batches:
    print(f"Processing batch: {run_dt}")
    
    # 1. 加载数据
    non_fraud_df = mock_load_data(run_dt, is_fraud=False)
    
    # 构建数据组 (可扩展 fraud/total)
    data_groups = {'non-fraud': non_fraud_df}
    combined_df = non_fraud_df # 用于类型检查和诊断
    
    # Fraud / Total 部分 (按需开启)
    # fraud_df = mock_load_data(run_dt, is_fraud=True)
    # data_groups['fraud'] = fraud_df
    # combined_df = pd.concat([non_fraud_df, fraud_df], ignore_index=True)
    # data_groups['total'] = combined_df

    # -------------------------------------------------------
    # 步骤 A: 变量诊断 (针对该 batch 的所有变量)
    # -------------------------------------------------------
    # 我们对 combined_df 的每一列进行诊断，帮助你发现遗漏的变量
    for col in combined_df.columns:
        # 判断类型
        is_num = pd.api.types.is_numeric_dtype(combined_df[col])
        is_bool = pd.api.types.is_bool_dtype(combined_df[col])
        dtype_name = str(combined_df[col].dtype)
        
        # 收集证据 (Top 5 values)
        evidence = get_top_5_values(combined_df[col])
        
        diag_info = {
            'batch': run_dt,
            'var': col,
            'pandas_dtype': dtype_name,
            'is_numeric_check': is_num,
            'is_boolean_check': is_bool,
            'top_5_values_evidence': evidence
        }
        diagnosis_results.append(diag_info)

    # -------------------------------------------------------
    # 步骤 B: 变量分析 (自动分流 Numeric 和 Boolean)
    # -------------------------------------------------------
    # 这里我们遍历 dataframe 的所有列，或者你指定的 target_variables
    # 建议：这里遍历 all columns，然后根据类型自动决定去哪个表
    
    target_vars = combined_df.columns # 自动分析所有列
    
    for var in target_vars:
        
        # 获取 Series 用于判断
        series_check = combined_df[var]
        
        is_bool = pd.api.types.is_bool_dtype(series_check)
        is_numeric = pd.api.types.is_numeric_dtype(series_check)
        
        # --- 分支 1: 处理 Boolean 变量 ---
        if is_bool:
            for group_name, df_group in data_groups.items():
                s = df_group[var].dropna() # 去除 NA 进行统计
                
                if len(s) == 0:
                    continue

                # 计算 True/False 分布
                val_counts = s.value_counts(normalize=True) # 获取百分比
                count_unique = s.nunique()
                
                try:
                    most_freq = s.value_counts().idxmax()
                except:
                    most_freq = np.nan
                
                # 安全获取 True/False 的百分比 (有些列可能全为 True 或全为 False)
                pct_true = val_counts.get(True, 0.0)
                pct_false = val_counts.get(False, 0.0)

                bool_stats = {
                    'var': var,
                    'batch': run_dt,
                    'Flag_fraud': group_name,
                    'number_of_unique': count_unique,
                    'most_frequent_value': most_freq,
                    '%_of_True': pct_true,
                    '%_of_False': pct_false
                }
                boolean_results.append(bool_stats)

        # --- 分支 2: 处理 Numeric 变量 (排除 Boolean) ---
        elif is_numeric: 
            for group_name, df_group in data_groups.items():
                s = df_group[var]
                
                num_stats = {
                    'var': var,
                    'batch': run_dt,
                    'Flag_fraud': group_name,
                    'mean': s.mean(),
                    'std': s.std(),
                    'min': s.min(),
                    'median': s.median(),
                    'max': s.max()
                }
                numeric_results.append(num_stats)
        
        # --- 分支 3: 其他类型 (String/Object) ---
        else:
            # 这些变量会被跳过，但它们已经记录在 Diagnosis Sheet 里了
            pass

# ==========================================
# 5. 结果整合与保存
# ==========================================

print("正在生成报告...")

# --- 处理 Numerical 结果 ---
if numeric_results:
    df_num = pd.DataFrame(numeric_results)
    
    # 格式化数值为字符串 (保留2位小数)
    for col in ['mean', 'std', 'min', 'median', 'max']:
        df_num[col] = df_num[col].map('{:.2f}'.format)
        
    # 排序
    df_num['sort_key'] = df_num['var'].apply(natural_sort_key)
    group_map = {'fraud': 1, 'non-fraud': 2, 'total': 3}
    df_num['g_rank'] = df_num['Flag_fraud'].map(group_map)
    df_num = df_num.sort_values(by=['sort_key', 'batch', 'g_rank']).drop(columns=['sort_key', 'g_rank'])
else:
    df_num = pd.DataFrame()

# --- 处理 Boolean 结果 ---
if boolean_results:
    df_bool = pd.DataFrame(boolean_results)
    
    cols_order_bool = ['var', 'batch', 'Flag_fraud', 'number_of_unique', 'most_frequent_value', '%_of_True', '%_of_False']
    df_bool = df_bool[cols_order_bool]
    
    # 格式化百分比 (例如 0.5 -> "50.00%")
    for col in ['%_of_True', '%_of_False']:
        df_bool[col] = df_bool[col].apply(lambda x: "{:.2f}%".format(x * 100))
    
    # 排序
    df_bool['sort_key'] = df_bool['var'].apply(natural_sort_key)
    df_bool['g_rank'] = df_bool['Flag_fraud'].map(group_map)
    df_bool = df_bool.sort_values(by=['sort_key', 'batch', 'g_rank']).drop(columns=['sort_key', 'g_rank'])
else:
    df_bool = pd.DataFrame()

# --- 处理 Diagnosis 结果 ---
if diagnosis_results:
    df_diag = pd.DataFrame(diagnosis_results)
    cols_diag = ['var', 'batch', 'pandas_dtype', 'is_numeric_check', 'is_boolean_check', 'top_5_values_evidence']
    df_diag = df_diag[cols_diag]
    # 按变量名排序
    df_diag['sort_key'] = df_diag['var'].apply(natural_sort_key)
    df_diag = df_diag.sort_values(by=['sort_key', 'batch']).drop(columns=['sort_key'])
else:
    df_diag = pd.DataFrame()

# --- 写入 Excel (多 Sheet) ---
try:
    with pd.ExcelWriter(output_file, engine='xlsxwriter') as writer:
        
        # Sheet 1: 数值分析
        if not df_num.empty:
            df_num.to_excel(writer, sheet_name='Numerical_Analysis', index=False)
            print(f"- Sheet 'Numerical_Analysis' created ({len(df_num)} rows).")
        
        # Sheet 2: 布尔分析
        if not df_bool.empty:
            df_bool.to_excel(writer, sheet_name='Boolean_Analysis', index=False)
            print(f"- Sheet 'Boolean_Analysis' created ({len(df_bool)} rows).")
            
        # Sheet 3: 变量诊断 (证据)
        if not df_diag.empty:
            df_diag.to_excel(writer, sheet_name='Variable_Diagnosis', index=False)
            # 调整列宽以便阅读证据
            worksheet = writer.sheets['Variable_Diagnosis']
            worksheet.set_column('F:F', 50) # 拉宽 'top_5_values_evidence' 列
            print(f"- Sheet 'Variable_Diagnosis' created ({len(df_diag)} rows).")
            
    print(f"\n成功! 文件已保存至: {output_file}")
    
    # 简单打印一下诊断结果供预览
    print("\n--- 变量类型诊断预览 (部分) ---")
    print(df_diag.head(10).to_string(index=False))

except Exception as e:
    print(f"保存失败: {e}")