# 数据清洗，上传到MySQL

In [None]:
from sqlalchemy import create_engine
from config import MySQLConfig
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.metrics import accuracy_score, r2_score, mean_squared_error, precision_score, recall_score, f1_score, confusion_matrix, mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import os
import json
import numpy as np
from matplotlib import rcParams

rcParams['font.sans-serif'] = ['SimHei']
rcParams['axes.unicode_minus'] = False

db = MySQLConfig()
engine_str = f"mysql+pymysql://{db.user}:{db.password}@{db.host}:{db.port}/{db.database}?charset={db.charset}"
engine = create_engine(engine_str)
print("MySQL连接成功")

In [None]:
def showinfo(df):
    # 缺失值统计
    print("缺失值统计：")
    print(df.isna().sum())

    print("\n流失情况分布：")
    print(df["流失情况"].value_counts(dropna=False))


    # 数据概览
    print("\n数据概览：")
    display(df.head())

    # 数据类型信息
    print("\n数据类型信息：")
    print(df.info())


## customer.csv

In [None]:
file_path = "./data/customer.csv"
df = pd.read_csv(file_path)

# print("数据概览：")
# display(df.head(20))
# print("\n数据类型信息：")
# print(df.info())

### 类型转换

In [None]:
# 日期类型转换
df["近期合作月份_time"] = pd.to_datetime(df["近期合作月份_time"], errors="coerce")
df["近期合作日期_time"] = pd.to_datetime(df["近期合作日期_time"], errors="coerce")

# 数值字段转 float
num_cols = ["运单数", "业务量", "体积", "计费重量", "收益"]
df[num_cols] = df[num_cols].apply(pd.to_numeric, errors="coerce")

# ========== 布尔转换（保留缺失值） ==========
def to_bool(series):
    """通用布尔类型转换函数，保留缺失值"""
    mapped = series.astype(str).str.strip().map({
        '流失': True, '未流失': False
    })
    return mapped.astype("boolean")

# 转换列
df["流失情况"] = to_bool(df["流失情况"])
# showinfo(df)

### 标签编码

In [None]:
df_encoded = df.copy()
label_encoders = {}
label_mappings = {}

for col in df.select_dtypes(include=["object"]).columns:
    le = LabelEncoder()
    df_encoded[col] = df_encoded[col].astype(str).fillna("缺失值")
    df_encoded[col] = le.fit_transform(df_encoded[col])
    label_encoders[col] = le
    label_mappings[col] = {str(k): int(v) for k, v in zip(le.classes_, le.transform(le.classes_))}

# 打印每个列的标签映射表
for col, mapping in label_mappings.items():
    mapping_df = pd.DataFrame(list(mapping.items()), columns=["原始值", "编码值"])
    # print(f"\n列名：{col} 的标签映射：")
    # display(mapping_df)

os.makedirs("label", exist_ok=True)
with open("label/customer_label_mappings.json", "w", encoding="utf-8") as f:
    json.dump(label_mappings, f, ensure_ascii=False, indent=4)

# display(df_encoded.head(5))
print("标签映射已保存到 customer_label_mappings.json")

### 处理缺失值


In [None]:
results = []
target_cols = ["运单数", "业务量", "体积"]

for target in target_cols:
    print(f"\n正在处理缺失值：{target}")

    df_train = df_encoded[df_encoded[target].notna()]
    df_pred = df_encoded[df_encoded[target].isna()]

    if df_pred.empty:
        print(f"{target} 无缺失值，跳过")
        continue

    features = [c for c in df_encoded.columns if c not in [target, "近期合作月份_time", "近期合作日期_time"]]
    X = df_train[features]
    y = df_train[target]

    if len(df_train) < 3:
        print(f"训练样本太少（{len(df_train)}），跳过评估，直接用全量训练并预测")
        if target == "重泡标识":
            model = RandomForestClassifier(random_state=42, n_estimators=200)
            model_type = "分类"
        else:
            model = RandomForestRegressor(random_state=42, n_estimators=200)
            model_type = "回归"
        model.fit(X, y)
        X_pred = df_pred[features]
        y_pred = model.predict(X_pred)
        df_encoded.loc[df_encoded[target].isna(), target] = y_pred
        results.append({
            "字段": target, "模型类型": model_type, "样本数": len(df_train),
            "评估": "样本不足无法做验证"
        })
        continue

    # 划分训练/验证集
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

    # 选模型
    if target == "重泡标识":
        model = RandomForestClassifier(random_state=42, n_estimators=200)
        model_type = "分类"
    else:
        model = RandomForestRegressor(random_state=42, n_estimators=200)
        model_type = "回归"

    # 训练
    model.fit(X_train, y_train)

    # 验证预测
    y_val_pred = model.predict(X_val)

    if model_type == "分类":
        acc = accuracy_score(y_val, y_val_pred)
        prec = precision_score(y_val, y_val_pred, average="weighted", zero_division=0)
        rec = recall_score(y_val, y_val_pred, average="weighted", zero_division=0)
        f1 = f1_score(y_val, y_val_pred, average="weighted", zero_division=0)
        cm = confusion_matrix(y_val, y_val_pred)

        print(f"分类评估 — Accuracy: {acc:.4f}, Precision: {prec:.4f}, Recall: {rec:.4f}, F1: {f1:.4f}")
        print("混淆矩阵：")
        print(cm)

        results.append({
            "字段": target, "模型类型": model_type, "样本数": len(df_train),
            "Accuracy": acc, "Precision": prec, "Recall": rec, "F1": f1, "RMSE": np.nan, "R2": np.nan
        })
    else:
        mse = mean_squared_error(y_val, y_val_pred)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(y_val, y_val_pred)
        r2 = r2_score(y_val, y_val_pred)

        print(f"回归评估 — R²: {r2:.4f}, RMSE: {rmse:.4f}, MAE: {mae:.4f}")

        results.append({
            "字段": target, "模型类型": model_type, "样本数": len(df_train),
            "Accuracy": np.nan, "Precision": np.nan, "Recall": np.nan, "F1": np.nan,
            "RMSE": rmse, "R2": r2, "MAE": mae
        })

    # 用全量训练数据重新训练并填补缺失值
    model.fit(X, y)
    X_pred = df_pred[features]
    y_pred = model.predict(X_pred)
    df_encoded.loc[df_encoded[target].isna(), target] = y_pred

    print(f"已使用 {model_type} 模型填补 {target} 缺失值")

### 上传数据库

In [None]:
showinfo(df_encoded)
df_encoded.to_sql(
    name="customer",
    con=engine,
    if_exists="replace",
    index=False,
    chunksize=1000
)

print(f"已成功上传 {len(df)} 条数据至表 customer")

In [None]:
sql = """SELECT *FROM DeliverInsight.customer"""
with engine.connect() as conn:
    result = pd.read_sql(sql, conn)
display(result)
print(result.head())

## order.csv

In [None]:
file_path = "./data/order.csv"
df = pd.read_csv(file_path)

print("数据概览：")
display(df.head())
print("\n数据类型信息：")
print(df.info())


In [None]:
# 转换日期字段
date_cols = ["收入月份_time", "录入时间_time", "录入日期_time", "签字日期_time"]
for col in date_cols:
    df[col] = pd.to_datetime(df[col], errors="coerce")

# 数值字段
num_cols = ["件数", "毛重", "体积", "计费重量", "收入金额", "lat", "lng"]
df[num_cols] = df[num_cols].apply(pd.to_numeric, errors="coerce")

print("缺失值统计：")
display(df.isna().sum())

print("清洗后数据概览：")
display(df.head())
print("\n数据类型信息：")
print(df.info())




In [None]:
table_name = "order"
try:
    with engine.begin() as conn:
        df.to_sql(
            name=table_name,
            con=conn,
            if_exists="replace",  # 首次上传用 replace，之后改 append
            index=False,
            chunksize=1000
        )
    print(f"已成功上传 {len(df)} 条数据至表 `order`")
except Exception as e:
    print("上传失败：", e)

try:
    sql = f"SELECT COUNT(*) AS total_rows FROM `order`"
    with engine.connect() as conn:
        result = pd.read_sql(sql, conn)
    print("数据库中记录数：")
    display(result)
except Exception as e:
    print("查询失败：", e)