In [None]:
# ===== 通用工具 =====
import os
import sys
import numpy as np
import pandas as pd

# ===== 数据处理与预处理 =====
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.preprocessing import StandardScaler, OneHotEncoder, label_binarize

# ===== 聚类与降维 =====
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans, AgglomerativeClustering
from sklearn.mixture import GaussianMixture

# ===== 分类模型 =====
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from xgboost import XGBClassifier

# ===== 超参数搜索 =====
from scipy.stats import loguniform

# ===== 模型评估 =====
from sklearn.metrics import (
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    confusion_matrix,
    roc_curve,
    auc
)

# ===== 可视化 =====
import matplotlib.pyplot as plt
import seaborn as sns
from mpl_toolkits.mplot3d import Axes3D  # 用于 3D 可视化


In [None]:
# 设置文件路径

input_file = r"..\data\DryBeanDataset\Dry_Bean_Dataset.xlsx"
df = pd.read_excel(input_file)

In [None]:
# 检查缺失值
print("缺失值统计：")
print(df.isnull().sum())

# 检查重复数据
print("\n重复数据统计：")
# 检查整行重复
duplicates = df.duplicated()
print(f"整行完全重复的行数: {duplicates.sum()}")
if duplicates.sum() > 0:
    print("\n重复数据示例（显示两条）：")
    duplicate_rows = df[duplicates].head(2)
    print(duplicate_rows)
    
    # 显示这些重复行的原始行号
    print("\n这些重复行的原始行号：")
    for idx in duplicate_rows.index:
        print(f"行号 {idx}:")
        print(df.iloc[idx])
        print("-" * 50)

# 删除重复数据
df_cleaned = df.drop_duplicates()
print(f"\n删除重复数据后，数据集大小从 {len(df)} 减少到 {len(df_cleaned)}")

# 添加ID列
df_cleaned['ID'] = range(1, len(df_cleaned) + 1)
print("\n已添加ID列")

In [None]:
# ========== One-Hot 编码 ==========
if 'Class' in df_cleaned.columns:
    encoder = OneHotEncoder(sparse_output=False, drop=None)  # drop='first' 可避免虚拟变量陷阱

    # 对 'Class' 列进行编码
    class_encoded = encoder.fit_transform(df_cleaned[['Class']]).astype(int)
    class_encoded_cols = encoder.get_feature_names_out(['Class'])

    # 将编码结果转为 DataFrame 并合并
    class_encoded_df = pd.DataFrame(class_encoded, columns=class_encoded_cols, index=df_cleaned.index)
    df_encoded = pd.concat([df_cleaned.drop(columns=['Class']), class_encoded_df], axis=1)

# 将 'ID' 列移动到第一列
col = df_encoded.pop('ID')
df_encoded.insert(0, 'ID', col)

# ========== 数据标准化 ==========
# 获取数值型列（排除 ID 和 one-hot 编码列）
numeric_columns = df_encoded.columns[0:16]
if 'ID' in numeric_columns:
    numeric_columns = numeric_columns.drop('ID')

scaler = StandardScaler()
df_encoded[numeric_columns] = scaler.fit_transform(df_encoded[numeric_columns])

print("\n已完成数据标准化（使用StandardScaler）")
print("标准化后的数据统计：")
print(df_encoded[numeric_columns].describe())

# 保存处理后的数据
output_file = 'processed_data_onehot.csv'
df_encoded.to_csv(output_file, index=False)
print(f"\n处理后的数据已保存到 {output_file}")

# ========== 检查每列的重复值 ==========
print("\n各列重复值统计：")
for column in df_cleaned.columns:
    duplicate_count = df_cleaned[column].duplicated().sum()
    print(f"{column}: {duplicate_count} 个重复值")

# ========== 从 One-Hot 标签中提取整数标签 ==========
data = pd.read_csv('processed_data_onehot.csv')

# 假设最后7列是 One-Hot 编码的标签
onehot_labels = data.iloc[:, -7:].to_numpy()
y_true = onehot_labels.argmax(axis=1)
data['encoded_label'] = y_true

print("\n添加整数标签后的数据预览：")
print(data.head())

# 删除原来的 one-hot 标签列（保留最后一个标签列作为参考）
data.drop(data.columns[-8:-1], axis=1, inplace=True)

print("\n删除部分 One-Hot 列后的数据预览：")
print(data.head())

# 保存新的数据文件
data.to_csv('processed_data_label_encoding.csv', index=False)
print("\n整数标签数据已保存为 'processed_data_label_encoding.csv'")

In [None]:
####visualization

In [None]:
def tsne_visualize(data_file, output_dir='tsne_plots', perplexity=30, n_iter=1000, random_state=42):
    # 读取数据
    df = pd.read_csv(data_file)

    # 提取特征和标签
    feature_columns = [col for col in df.columns if col not in ['ID', 'encoded_label']]
    if 'encoded_label' not in df.columns:
        raise ValueError("数据中未找到 'encoded_label' 列。请检查输入数据格式。")
    
    X = df[feature_columns].values
    y = df['encoded_label'].astype(str).values

    # t-SNE 降维
    print("执行 t-SNE 降维...")
    tsne = TSNE(n_components=3, perplexity=perplexity, n_iter=n_iter, random_state=random_state)
    X_embedded = tsne.fit_transform(X)

    # 创建输出目录
    os.makedirs(output_dir, exist_ok=True)

    # 2D 图
    plt.figure(figsize=(10, 8))
    for label in sorted(set(y)):
        idx = y == label
        plt.scatter(X_embedded[idx, 0], X_embedded[idx, 1], label=label, s=10)
    plt.legend()
    plt.title('t-SNE 2D Visualization')
    plt.xlabel('TSNE1')
    plt.ylabel('TSNE2')
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, 'tsne_2d.png'))
    plt.close()

    # 3D 图
    from mpl_toolkits.mplot3d import Axes3D
    fig = plt.figure(figsize=(10, 8))
    ax = fig.add_subplot(111, projection='3d')
    for label in sorted(set(y)):
        idx = y == label
        ax.scatter(X_embedded[idx, 0], X_embedded[idx, 1], X_embedded[idx, 2], label=label, s=5)
    ax.set_title('t-SNE 3D Visualization')
    ax.set_xlabel('TSNE1')
    ax.set_ylabel('TSNE2')
    ax.set_zlabel('TSNE3')
    ax.legend()
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, 'tsne_3d.png'))
    plt.close()

    print(f"PNG 格式的可视化图已保存至：{output_dir}")
    
def tsne_cluster_visualize(X, labels, output_dir='tsne_cluster_plots', name='kmeans', perplexity=30, max_iter=1000, random_state=42):
    print(f"执行 t-SNE 降维并保存 PNG 图像: {name}")

    # t-SNE
    tsne = TSNE(n_components=3, perplexity=perplexity, n_iter=max_iter, random_state=random_state)
    X_embedded = tsne.fit_transform(X)
    labels_str = labels.astype(str)

    os.makedirs(output_dir, exist_ok=True)

    # 2D 图
    plt.figure(figsize=(10, 8))
    for label in sorted(set(labels_str)):
        idx = labels_str == label
        plt.scatter(X_embedded[idx, 0], X_embedded[idx, 1], label=label, s=10)
    plt.legend()
    plt.title(f't-SNE 2D Clustering Visualization - {name}')
    plt.xlabel('TSNE1')
    plt.ylabel('TSNE2')
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, f'{name}_tsne_2d.png'))
    plt.close()

    # 3D 图
    fig = plt.figure(figsize=(10, 8))
    ax = fig.add_subplot(111, projection='3d')
    for label in sorted(set(labels_str)):
        idx = labels_str == label
        ax.scatter(X_embedded[idx, 0], X_embedded[idx, 1], X_embedded[idx, 2], label=label, s=5)
    ax.set_title(f't-SNE 3D Clustering Visualization - {name}')
    ax.set_xlabel('TSNE1')
    ax.set_ylabel('TSNE2')
    ax.set_zlabel('TSNE3')
    ax.legend()
    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, f'{name}_tsne_3d.png'))
    plt.close()

    print(f"{name} 聚类的 PNG 可视化图已保存至：{output_dir}")




In [None]:
data_path = 'processed_data_label_encoding.csv'
data_output = r"tsne_plots"
tsne_visualize(data_file=data_path,output_dir=data_output)


In [None]:
### Clustering  

In [None]:

df = pd.read_csv(r"processed_data_label_encoding.csv")  
X = df.iloc[:, 1:17].values 
y_true = df.iloc[:, -1].values

kmeans = KMeans(n_clusters = 8, random_state=42)
labels_kmeans = kmeans.fit_predict(X)

gmm = GaussianMixture(n_components = 7, random_state=42)
labels_gmm = gmm.fit_predict(X)

hierarchical = AgglomerativeClustering(n_clusters = 6,  metric ='euclidean', linkage='ward')
labels_hierarchical = hierarchical.fit_predict(X)



tsne_cluster_visualize(X, labels_kmeans, output_dir='clustering_plots', name='KMeans')
tsne_cluster_visualize(X, labels_gmm, output_dir='clustering_plots', name='GMM')
tsne_cluster_visualize(X, labels_hierarchical, output_dir='clustering_plots', name='Hierarchical')


In [None]:
##Train and Test

In [None]:
# 1. 读取原始数据文件（请修改为你的文件路径）
data = pd.read_csv("processed_data_label_encoding.csv")  # 替换成你自己的文件名

# 2. 使用 train_test_split 划分数据（test_size=0.3 表示30%作为测试集）
train_data, test_data = train_test_split(data, test_size=0.3, random_state=42)

# 3. 保存训练集和测试集到新文件
train_data.to_csv("train_data.csv", index=False)
test_data.to_csv("test_data.csv", index=False)

print("划分完成，训练集和测试集已保存。")


In [None]:
def evaluate_model(model,classes, name,result_dir="evaluation_results"):

    # Create result directory if not exists
    os.makedirs(result_dir, exist_ok=True)

    df_test = pd.read_csv("../test_data.csv")  # 测试集路径
    X_test = df_test.drop(columns=["ID", "encoded_label"])
    y_test = df_test["encoded_label"]
    n_classes = len(classes)

    # Binarize labels for ROC curve if applicable
    y_bin = label_binarize(y_test, classes=classes)

    # Predict the labels and probabilities
    y_pred = model.predict(X_test)
    y_pred_prob = model.predict_proba(X_test)


    # Calculate metrics
    acc = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred, average='weighted')
    rec = recall_score(y_test, y_pred, average='weighted')
    f1 = f1_score(y_test, y_pred, average='weighted')

    # Save metrics to file
    with open(os.path.join(result_dir, "metrics.txt"), "w", encoding="utf-8") as f:
        f.write("模型在测试集上的评估结果：\n")
        f.write(f"Accuracy：{acc:.4f}\n")
        f.write(f"Precision：{prec:.4f}\n")
        f.write(f"Recall：{rec:.4f}\n")
        f.write(f"F1 score：{f1:.4f}\n\n")


    fpr, tpr, roc_auc = dict(), dict(), dict()
    with open(os.path.join(result_dir, "metrics.txt"), "a", encoding="utf-8") as f:
        f.write("各类 AUC 值：\n")
        for i in range(n_classes):
            fpr[i], tpr[i], _ = roc_curve(y_bin[:, i], y_pred_prob[:, i])
            roc_auc[i] = auc(fpr[i], tpr[i])
            f.write(f"  Class {classes[i]}: AUC = {roc_auc[i]:.2f}\n")

        # Save ROC curve plot
        plt.figure(figsize=(6, 5))
        for i in range(n_classes):
            plt.plot(fpr[i], tpr[i], label=f"Class {classes[i]} (AUC = {roc_auc[i]:.2f})")
        plt.plot([0, 1], [0, 1], 'k--')
        plt.title(f"{name} Test Set - Multi-class ROC Curve")
        plt.xlabel("False Positive Rate")
        plt.ylabel("True Positive Rate")
        plt.legend(loc="lower right")
        plt.tight_layout()
        plt.savefig(os.path.join(result_dir, "roc_curve.png"))
        plt.close()

    # Plot confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    plt.figure(figsize=(7, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=classes, yticklabels=classes)
    plt.title(f'{name} Confusion Matrix - Per Class')
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.savefig(os.path.join(result_dir, "confusion_matrix_per_class.png"))
    plt.close()


In [None]:
def logistic_regression(data_path="../train_data.csv"):
    # 读取数据
    df = pd.read_csv(data_path)

    # 特征与标签
    X = df.drop(columns=["ID", "encoded_label"])
    y = df["encoded_label"]


    # 参数搜索空间
    param_dist = {
        'C': loguniform(1e-4, 1e4),
        'solver': ['lbfgs', 'newton-cg', 'sag', 'saga'],
        'penalty': ['l2'],
        'max_iter': [500, 1000, 2000]
    }

    # 定义模型和搜索器
    base_clf = LogisticRegression()
    random_search = RandomizedSearchCV(
        estimator=base_clf,
        param_distributions=param_dist,
        n_iter=20,
        cv=5,
        verbose=1,
        n_jobs=-1,
        random_state=42
    )

    # 模型训练（直接用所有数据）
    random_search.fit(X, y)

    # 输出最优模型和参数
    print("Best Parameters from Random Search:")
    print(random_search.best_params_)

    # 获取训练好的最佳模型
    best_model = random_search.best_estimator_

    # 调用测试函数
    evaluate_model(best_model,classes=best_model.classes_,name='Logistic',result_dir='logistic_result')

    return best_model

def random_forest_classifier(data_path="../train_data.csv"):
    # 读取数据
    df = pd.read_csv(data_path)

    # 特征与标签
    X = df.drop(columns=["ID", "encoded_label"])
    y = df["encoded_label"]

    # 参数搜索空间
    param_dist = {
        'n_estimators': [100, 200, 300, 500],
        'max_features': [ 'sqrt', 'log2'],
        'max_depth': [None, 10, 20, 30, 50],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4],
        'bootstrap': [True, False]
    }

    # 定义模型和搜索器
    base_clf = RandomForestClassifier(random_state=42)
    random_search = RandomizedSearchCV(
        estimator=base_clf,
        param_distributions=param_dist,
        n_iter=20,
        cv=5,
        verbose=1,
        n_jobs=-1,
        random_state=42
    )

    # 模型训练（直接用所有数据）
    random_search.fit(X, y)

    # 输出最优模型和参数
    print("Best Parameters from Random Search:")
    print(random_search.best_params_)

    # 获取训练好的最佳模型
    best_model = random_search.best_estimator_

    # 调用测试函数
    evaluate_model(best_model, classes=best_model.classes_,name='random_forest',result_dir='random_forest_result')

    return best_model

def mlp_classifier(data_path="../train_data.csv"):
    # 读取数据
    df = pd.read_csv(data_path)

    # 特征与标签
    X = df.drop(columns=["ID", "encoded_label"])
    y = df["encoded_label"]

    # 参数搜索空间
    param_dist = {
        'hidden_layer_sizes': [(50,), (100,), (200,)],
        'activation': ['relu', 'tanh'],
        'solver': ['adam', 'sgd'],
        'alpha': loguniform(1e-4, 1e4),
        'learning_rate': ['constant', 'invscaling', 'adaptive'],
        'max_iter': [500, 1000, 2000]
    }

    # 定义模型和搜索器
    base_clf = MLPClassifier(random_state=42)
    random_search = RandomizedSearchCV(
        estimator=base_clf,
        param_distributions=param_dist,
        n_iter=20,
        cv=5,
        verbose=1,
        n_jobs=-1,
        random_state=42
    )

    # 模型训练（直接用所有数据）
    random_search.fit(X, y)

    # 输出最优模型和参数
    print("Best Parameters from Random Search:")
    print(random_search.best_params_)

    # 获取训练好的最佳模型
    best_model = random_search.best_estimator_

    # 调用测试函数
    evaluate_model(best_model, classes=best_model.classes_,name="mlp" ,result_dir='mlp_result')

    return best_model

def svm_classifier(data_path="../train_data.csv"):
    # 读取数据
    df = pd.read_csv(data_path)

    # 特征与标签
    X = df.drop(columns=["ID", "encoded_label"])
    y = df["encoded_label"]
    '''
    # 参数搜索空间
    param_dist = {
        'C': loguniform(1e-4, 1e4),
        'kernel': ['linear', 'poly', 'rbf', 'sigmoid'],
        'degree': [3, 4, 5],  # 适用于poly核
        'gamma': ['scale', 'auto'] + list(np.logspace(-3, 3, 7)),  # 适用于rbf和sigmoid核
        'coef0': [0, 0.1, 0.5, 1]  # 适用于poly和sigmoid核
    }
    '''
    # 调整后的参数搜索空间
    param_dist = {
        'C': loguniform(1e-1, 1e2),  # 缩小C的范围
        'kernel': ['linear', 'rbf'],  # 去掉poly和sigmoid核
        'degree': [3, 4],  # 只保留适用于poly核的degree为3和4
        'gamma': ['scale', 'auto'],  # 只保留常用的gamma选项
        'coef0': [0, 0.1]  # 适用于poly和sigmoid核，只保留几个值
    }


    # 定义模型和搜索器
    base_clf = SVC(probability=True, random_state=42)
    random_search = RandomizedSearchCV(
        estimator=base_clf,
        param_distributions=param_dist,
        n_iter=20,
        cv=5,
        verbose=1,
        n_jobs=-1,
        random_state=42
    )

    # 模型训练（直接用所有数据）
    random_search.fit(X, y)

    # 输出最优模型和参数
    print("Best Parameters from Random Search:")
    print(random_search.best_params_)

    # 获取训练好的最佳模型
    best_model = random_search.best_estimator_

    # 调用测试函数
    evaluate_model(best_model, classes=best_model.classes_,name='svm',result_dir='svm_result')

    return best_model

def xgboost_classifier(data_path="../train_data.csv"):
    # 读取数据
    df = pd.read_csv(data_path)

    # 特征与标签
    X = df.drop(columns=["ID", "encoded_label"])
    y = df["encoded_label"]

    # 参数搜索空间
    param_dist = {
        'n_estimators': [100, 200, 300, 500],
        'max_depth': [3, 6, 10, 15],
        'learning_rate': loguniform(1e-4, 1e-1),
        'subsample': [0.6, 0.8, 1.0],
        'colsample_bytree': [0.6, 0.8, 1.0],
        'gamma': [0, 0.1, 0.2],
        'reg_alpha': [0, 0.1, 1],
        'reg_lambda': [0, 0.1, 1]
    }

    # 定义模型和搜索器
    base_clf = XGBClassifier(random_state=42)
    random_search = RandomizedSearchCV(
        estimator=base_clf,
        param_distributions=param_dist,
        n_iter=20,
        cv=5,
        verbose=1,
        n_jobs=-1,
        random_state=42
    )

    # 模型训练（直接用所有数据）
    random_search.fit(X, y)

    # 输出最优模型和参数
    print("Best Parameters from Random Search:")
    print(random_search.best_params_)

    # 获取训练好的最佳模型
    best_model = random_search.best_estimator_

    # 调用测试函数
    evaluate_model(best_model, classes=best_model.classes_,name='xgboost',result_dir='xgboost_result')

    return best_model

In [None]:
# 调用函数并训练模型
best_model = logistic_regression(data_path="train_data.csv")
best_model = mlp_classifier(data_path="train_data.csv")
best_model = random_forest_classifier(data_path="train_data.csv")
best_model = svm_classifier(data_path="train_data.csv")
best_model = xgboost_classifier(data_path="train_data.csv")