# Question 2
导入必要的库

In [None]:
from random_forest_Q2 import *
from scipy.optimize import minimize
import itertools
import numpy as np
from collections import defaultdict
import pandas as pd
from itertools import product
import os
from contextlib import redirect_stdout
from datetime import datetime

## 预测人数
### 随机森林模型主流程

In [None]:
# 1. 加载原始数据
original_data = pd.read_excel('D题附件：法医物证多人身份鉴定问题数据集/附件2：不同混合比例的STR图谱数据_processed.xlsx')

# 2. 加载并预处理模型数据
df = load_and_preprocess(data_path="data/enhanced_processed_Q2_data.xlsx")
X = df.drop(['people', 'gene_from', 'proportion'], axis=1)
y = df['people']

# 划分训练集/测试集
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    stratify=y,
    random_state=42
)
test_sample_files = X_test['sample_file']

# 3. 模型训练
rf_model, feature_selector = train_random_forest(
    X_train.drop('sample_file', axis=1),
    y_train
)

# 4. 模型预测
X_test_selected = feature_selector.transform(X_test.drop('sample_file', axis=1))
y_pred = rf_model.predict(X_test_selected)
y_proba = rf_model.predict_proba(X_test_selected)

# 5. 模型评估
print("=" * 40)
print("模型评估指标：")
print(f"准确率: {accuracy_score(y_test, y_pred):.4f}")
print(f"精确率: {precision_score(y_test, y_pred, average='macro'):.4f}")
print(f"召回率: {recall_score(y_test, y_pred, average='macro'):.4f}")
print(f"F1分数: {f1_score(y_test, y_pred, average='macro'):.4f}")
print(f"AUC值: {roc_auc_score(y_test, y_proba, multi_class='ovo'):.4f}")

# 6. 可视化输出
# 特征重要性图
plot_feature_importance(rf_model, X.columns.tolist(), "images/Q2/feature_importance.png")

# ROC曲线
plot_roc_curve(y_test, y_proba, "images/Q2/roc_curve.png")

# 混淆矩阵
cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm,
                              display_labels=rf_model.classes_)
disp.plot(cmap=plt.cm.Blues)
plt.title("混淆矩阵")
plt.savefig("images/Q2/confusion_matrix.png", dpi=300)
plt.show()
plt.close()

### 提取测试集数据结果

In [None]:
#合并到原始数据
predictions = pd.DataFrame({
    'Sample File': test_sample_files,
    'predicted_people': y_pred
})
data_test = original_data[original_data['Sample File'].isin(test_sample_files)]
data_test = data_test.merge(predictions, on='Sample File', how='left')

# 将predicted_people列移动到第5列位置
cols = data_test.columns.tolist()
cols.remove('predicted_people')
cols.insert(4, 'predicted_people')  # 插入到第5列位置(索引4)
data_test = data_test[cols]

# 验证每个样本的预测值唯一性
assert data_test.groupby('Sample File')['predicted_people'].nunique().max() == 1

# 创建结果目录
if not os.path.exists('result'):
    os.makedirs('result')

# 输出结果
data_test.to_excel('result/Q2_data_test.xlsx', index=False)

## 预测比例
### 生成贡献矩阵A_i，归一化样本矩阵C

In [None]:
# 基因座数据配置
GENE_LOCI = {
    'D8S1179': [10, 11, 12, 13, 14, 15, 16],
    'D21S11': [24.3, 27, 28, 29, 30, 31, 31.2, 32, 32.2, 33, 33.2, 35, 36],
    'D7S820': [7, 8, 9, 10, 11, 12, 13],
    'CSF1PO': [7, 8, 9, 10, 11, 12, 13],
    'D3S1358': [14, 15, 16, 17, 18],
    'TH01': [5, 6, 7, 8, 9, 9.3],
    'D13S317': [8, 9, 10, 11, 12, 13, 14],
    'D16S539': [8, 9, 10, 11, 12, 13, 14],
    'D2S1338': [16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26],
    'D19S433': [10, 10.2, 11, 12, 12.2, 13, 13.2, 14, 14.2, 15, 15.2, 16, 16.2],
    'vWA': [11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
    'TPOX': [6, 7, 8, 9, 10, 11, 12],
    'D18S51': [10, 12, 13, 14, 15, 16, 17, 18, 19, 20, 22, 23],
    'AMEL': ['X', 'Y'],
    'D5S818': [7, 8, 9, 10, 11, 12, 13, 14],
    'FGA': [19, 20, 21, 21.2, 22, 22.2, 23, 24, 25, 26, 27, 28]
}

In [None]:
# 生成C矩阵
def generate_C_matrix(sample_name, data_test, GENE_LOCI):
    """
    根据C_vector的非零位置生成该基因座的合法等位基因组合（去重）并应用动态噪声过滤

    参数:
    C_vector: 当前基因座的峰高向量
    alleles: 当前基因座的等位基因列表

    返回:
    该基因座所有合法组合的列表（去重）
    """

    # 初始化结果容器
    matrix_data = []          # 归一化的峰高比例向量
    locus_raw_heights = []    # 原始峰高列表（每个等位基因的原始峰高）
    locus_total_heights = []  # 每个基因座的原始总峰高（过滤噪声后）

    # 获取基因座顺序列表
    loci_order = list(GENE_LOCI.keys())

    # 处理每个基因座
    for locus_name in loci_order:
        alleles = GENE_LOCI[locus_name]
        # 转换标准等位基因格式
        if locus_name == 'AMEL':
            std_alleles = [str(a).upper() for a in alleles]
        else:
            std_alleles = [f"{float(a):g}" for a in alleles]

        # 获取当前基因座数据
        sample_mask = (
            (data_test['Sample File'] == sample_name) &
            (data_test['Marker'].str.upper() == locus_name.upper())
        )
        locus_data = data_test[sample_mask].copy()

        # 初始化峰高向量
        locus_vector = [0.0] * len(std_alleles)
        raw_heights = [0.0] * len(std_alleles)  # 存储原始峰高
        total_height = 0.0

        if not locus_data.empty:
            # 第一遍：收集所有峰高数据
            allele_counts = defaultdict(float)

            # 遍历所有Allele列
            for col in [c for c in locus_data.columns if c.startswith('Allele')]:
                col_num = col.split()[-1]
                height_col = f'Height {col_num}'

                # 提取等位基因和峰高
                allele = str(locus_data[col].iloc[0]).strip()
                height = locus_data[height_col].iloc[0]

                # 过滤无效数据
                if pd.isna(allele) or pd.isna(height) or 'OL' in allele or 'off-ladder' in allele.lower():
                    continue

                # 格式处理
                try:
                    processed_allele = (
                        allele.upper() if locus_name == 'AMEL'
                        else f"{float(allele):g}"
                    )
                except ValueError:
                    continue

                # 累加峰高值
                if processed_allele in std_alleles:
                    allele_index = std_alleles.index(processed_allele)
                    raw_heights[allele_index] += float(height)
                    total_height += float(height)
                    allele_counts[processed_allele] += float(height)

            # 动态噪声过滤
            if total_height > 0 and any(h > 0 for h in raw_heights):
                # 获取有效峰高值（非零）
                valid_heights = [h for h in raw_heights if h > 0]

                if valid_heights:
                    # 计算最大峰高和总峰高
                    max_peak = max(valid_heights)
                    total_valid = sum(valid_heights)

                    # 设置动态阈值
                    threshold = max(0.1 * max_peak, 0.01 * total_valid)

                    # 应用阈值过滤
                    for i in range(len(raw_heights)):
                        if raw_heights[i] < threshold:
                            # 将低于阈值的峰高设为0
                            total_height -= raw_heights[i]
                            raw_heights[i] = 0.0
                else:
                    # 如果没有有效峰高，重置总峰高
                    total_height = 0.0
            else:
                # 如果没有峰高数据，保持为0
                total_height = 0.0

        # 计算归一化比例（仅当总峰高大于0）
        if total_height > 0:
            # 填充归一化峰高比例
            for i in range(len(std_alleles)):
                locus_vector[i] = raw_heights[i] / total_height
        else:
            # 如果没有峰高数据，保持为0
            locus_vector = [0.0] * len(std_alleles)

        matrix_data.append(locus_vector)
        locus_raw_heights.append(raw_heights)       # 保存过滤后的原始峰高列表
        locus_total_heights.append(total_height)    # 保存过滤后的原始总峰高

    return matrix_data, locus_total_heights, locus_raw_heights

In [None]:
# 生成A矩阵
def generate_valid_allele_combinations_for_locus(C_vector, alleles):
    """
    根据C_vector的非零位置生成该基因座的合法等位基因组合（去重）

    参数:
    C_vector: 当前基因座的峰高向量
    alleles: 当前基因座的等位基因列表

    返回:
    该基因座所有合法组合的列表（去重）
    """

    n = len(alleles)
    if n == 0:
        return []

    # 1. 找出C_vector中非零位置的索引
    non_zero_indices = [i for i, height in enumerate(C_vector) if height > 0]

    # 2. 特殊处理性染色体基因座（AMEL）
    # 检测是否为AMEL基因座：检查是否包含X或Y等位基因
    is_amel = False
    for allele in alleles:
        if isinstance(allele, str) and any(char in allele.upper() for char in ['X', 'Y']):
            is_amel = True
            break

    if is_amel:
        # 对于性染色体基因座，只允许特定组合
        valid_combinations = []

        # 找出X和Y的位置
        x_index = None
        y_index = None

        for i, allele in enumerate(alleles):
            if isinstance(allele, str):
                allele_upper = allele.upper()
                if 'X' in allele_upper:
                    x_index = i
                if 'Y' in allele_upper:
                    y_index = i

        # 女性组合：XX（纯合子）
        if x_index is not None:
            female_vec = [0] * n
            female_vec[x_index] = 2
            valid_combinations.append(female_vec)

        # 男性组合：XY（杂合子）
        if x_index is not None and y_index is not None and x_index != y_index:
            male_vec = [0] * n
            male_vec[x_index] = 1
            male_vec[y_index] = 1
            valid_combinations.append(male_vec)

        return valid_combinations

    # 3. 对于普通基因座，使用标准生成方法
    # 如果没有非零位置，返回空列表
    if not non_zero_indices:
        return []

    valid_combinations = []

    # 生成纯合子组合（只考虑非零位置）
    for i in non_zero_indices:
        vec = [0] * n
        vec[i] = 2
        valid_combinations.append(vec)

    # 生成杂合子组合（只考虑非零位置）
    for i in range(len(non_zero_indices)):
        for j in range(i+1, len(non_zero_indices)):
            idx_i = non_zero_indices[i]
            idx_j = non_zero_indices[j]
            vec = [0] * n
            vec[idx_i] = 1
            vec[idx_j] = 1
            valid_combinations.append(vec)

    return valid_combinations


def get_sorted_combinations(valid_combs, k):
    """
    生成去重后的k个贡献者组合（考虑组合相同但顺序不同的情况）

    参数:
    valid_combs: 合法基因型组合列表
    k: 贡献者数量

    返回:
    去重后的组合列表
    """
    if not valid_combs:
        return []

    # 使用集合去除重复组合
    unique_combs = set()

    try:
        # 生成所有可能的组合
        for comb_group in product(valid_combs, repeat=k):
            # 对组内的基因型进行排序（不考虑顺序）
            sorted_group = tuple(sorted(tuple(comb) for comb in comb_group))
            unique_combs.add(sorted_group)

        # 转换回原始形式
        result = []
        for comb_tuple in unique_combs:
            group = []
            for genotype in comb_tuple:
                # 查找原始组合（可能需要检查是否匹配）
                group.append(list(genotype))
            if group:
                result.append(group)

        return result
    except Exception as e:
        print(f"  生成去重组合时出错: {str(e)}")
        return []

### 范数最优化求解

In [None]:
def estimate_contributors_by_locus(k, C_matrix, locus_total_heights, locus_raw_heights, GENE_LOCI):
    """
    按基因座优化并加权平均的贡献者比例估计（使用首个基因座作为全局参考）
    """

    loci_order = list(GENE_LOCI.keys())
    n_loci = len(loci_order)

    # 存储每个基因座的结果
    locus_results = []
    best_combinations = []
    reference_p = None  # 全局参考比例向量

    # 1. 计算权重 - 使用原始总峰高
    weights = []
    for i, locus_name in enumerate(loci_order):
        # 使用原始总峰高计算权重
        total_allele_height = locus_total_heights[i]
        weight_i = np.sqrt(total_allele_height) if total_allele_height > 0 else 0
        weights.append(weight_i)

    # 归一化权重
    total_omega = sum(weights)
    if total_omega > 0:
        normalized_weights = [w / total_omega for w in weights]
    else:
        normalized_weights = [1/n_loci] * n_loci

    # 2. 初始化全局参考（第一个基因座）
    print(f"开始按基因座优化，贡献者人数k={k}")
    print(f"将使用第一个基因座作为全局参考基准")

    for i, locus_name in enumerate(loci_order):
        C_vector = np.array(C_matrix[i])  # 归一化峰高比例
        raw_heights = locus_raw_heights[i]  # 原始峰高列表
        total_height = locus_total_heights[i]  # 原始总峰高
        alleles = GENE_LOCI[locus_name]

        # 动态生成合法组合（基于原始峰高）
        valid_combs = generate_valid_allele_combinations_for_locus(C_vector, alleles)

        # 打印详细峰高信息
        non_zero_alleles = []
        for j, h in enumerate(raw_heights):
            if h > 0:
                non_zero_alleles.append(f"{alleles[j]}: {h:.1f}")

        print(f"\n处理基因座 '{locus_name}'，原始总峰高={total_height:.1f}")

        min_residual = float('inf')
        best_p = None
        best_combo = None

        # 检查无效数据情况 - 提供默认值
        if not valid_combs or total_height <= 0:
            uniform_p = np.ones(k) / k
            locus_results.append(uniform_p)
            best_combinations.append([np.zeros(len(alleles))] * k)
            print(f"  无有效数据，使用均匀分布: {uniform_p.round(3)}")
            continue

        # 生成去重后的组合
        comb_product = get_sorted_combinations(valid_combs, k)
        total_combs = len(comb_product) if comb_product is not None else 0
        print(f"  考虑{total_combs}种组合（去重后）...", end='', flush=True)

        # 如果组合数量为0，跳过优化
        if total_combs == 0:
            uniform_p = np.ones(k) / k
            best_combo = [valid_combs[0]] * k if valid_combs else [np.zeros(len(alleles))] * k
            best_p = uniform_p
            min_residual = 0.0
        else:
            for comb in comb_product:
                A_matrix = np.array(comb).T

                def objective(p):
                    return np.linalg.norm(A_matrix @ p - C_vector)

                constraints = (
                    {'type': 'eq', 'fun': lambda p: np.sum(p) - 1},
                    {'type': 'ineq', 'fun': lambda p: p}
                )

                p0 = np.ones(k) / k
                try:
                    res = minimize(objective, p0, method='SLSQP', constraints=constraints)

                    if res.success and res.fun < min_residual:
                        min_residual = res.fun
                        best_p = res.x
                        best_combo = comb
                except Exception as e:
                    # 优化可能失败，但继续尝试其他组合
                    continue

        # 如果没有找到有效组合，使用均匀分布
        if best_p is None:
            print(f"\n  警告: 未找到有效组合，使用均匀分布")
            best_p = np.ones(k) / k
            best_combo = [valid_combs[0]] * k if valid_combs else [np.zeros(len(alleles))] * k

        # 归一化比例 - 添加空值检查
        best_p = np.clip(best_p, 0, None) if best_p is not None else np.ones(k) / k
        best_p_sum = np.sum(best_p)
        if best_p_sum > 0:
            best_p /= best_p_sum

        # 如果是第一个基因座，设为全局参考
        if i == 0:
            reference_p = best_p.copy()
            print(f"完成! 最小残差={min_residual:.4f}")
            print(f"  设为全局参考比例: {np.round(best_p, 4)}")
        else:
            # 与参考比例调整顺序 - 添加空值检查
            if best_combo and all(combo is not None for combo in best_combo):
                best_p, best_combo = match_to_reference(reference_p, best_p, best_combo)
            print(f"完成! 最小残差={min_residual:.4f}")
            print(f"  调整后比例: {np.round(best_p, 4)}")

        # 输出基因型组合 - 添加空值检查
        print(f"  贡献者基因型组合:")
        if best_combo:
            for p_idx in range(k):
                if p_idx < len(best_combo):  # 防止索引越界
                    combo_vec = best_combo[p_idx]
                    allele_strs = []
                    for idx, count in enumerate(combo_vec):
                        if count > 0:
                            # 获取原始峰高值（如果可用）
                            raw_height = ""
                            if len(raw_heights) > idx and raw_heights[idx] > 0:
                                raw_height = f"({raw_heights[idx]:.1f})"

                            allele_val = alleles[idx]
                            if isinstance(allele_val, float) and allele_val.is_integer():
                                allele_strs.append(f"{int(allele_val)}×{int(count)}")
                            else:
                                allele_strs.append(f"{allele_val}×{int(count)}")
                    print(f"    贡献者{p_idx+1}: {' + '.join(allele_strs)}")
                else:
                    print(f"    贡献者{p_idx+1}: 无数据")
        else:
            print("    无基因型组合数据")

        # 保存结果
        locus_results.append(best_p)
        best_combinations.append(best_combo)

    # 3. 加权平均
    final_p = np.zeros(k)
    for j in range(k):
        for i in range(n_loci):
            if i < len(locus_results) and locus_results[i] is not None and j < len(locus_results[i]):
                final_p[j] += normalized_weights[i] * locus_results[i][j]

    final_p_sum = np.sum(final_p)
    if final_p_sum > 0:
        final_p /= final_p_sum

    # 输出最终结果
    print("\n所有基因座处理完成，开始加权平均")
    print(f"最终混合比例: {np.round(final_p, 4)}")
    print(f"比例总和: {np.sum(final_p):.4f}")

    return final_p, locus_results, normalized_weights

def match_to_reference(reference_p, best_p, best_combo):
    k = len(reference_p)
    if k == 1 or reference_p is None or best_p is None:
        return best_p, best_combo

    try:
        # 计算原始顺序相关性
        corr_original = np.corrcoef(reference_p, best_p)[0, 1]

        # 当k=2时尝试交换顺序
        if k == 2:
            swapped_p = best_p[::-1].copy()
            swapped_combo = best_combo[::-1]
            corr_swapped = np.corrcoef(reference_p, swapped_p)[0, 1]

            if corr_swapped > corr_original:
                # print(f"  调整顺序以匹配参考（原始相关性={corr_original:.3f}, 调整后={corr_swapped:.3f})")
                return swapped_p, swapped_combo

        # 当k>2时尝试所有排列
        else:
            best_perm = list(range(k))
            best_corr = corr_original

            # 尝试所有可能的排列
            for perm in itertools.permutations(range(k)):
                perm = list(perm)
                permuted_p = [best_p[i] for i in perm]
                permuted_combo = [best_combo[i] for i in perm]
                corr = np.corrcoef(reference_p, permuted_p)[0, 1]

                if corr > best_corr:
                    best_corr = corr
                    best_perm = perm
                    # print(f"  发现更好顺序: 相关性={corr:.3f}")

            # 应用最佳排列
            if best_perm != list(range(k)):
                permuted_p = [best_p[i] for i in best_perm]
                permuted_combo = [best_combo[i] for i in best_perm]
                # print(f"  应用最佳排列: {best_perm}, 相关性={best_corr:.3f}")
                return permuted_p, permuted_combo

        return best_p, best_combo
    except Exception as e:
        print(f"  匹配顺序时出错: {str(e)}")
        return best_p, best_combo


### 主流程

In [None]:
# 创建汇总结果DataFrame
summary_data = []
sample_names = data_test['Sample File'].unique()

# 遍历每个样本
for i in range(0, len(sample_names)):
    sample_name = sample_names[i]
    print(f"\n开始处理样本: {sample_name}")

    # 创建日志文件路径
    log_filename = f"result/{sample_name.replace('/', '_').replace(':', '_')}.log"

    # 打开日志文件用于记录输出
    with open(log_filename, 'w') as log_file:
        with redirect_stdout(log_file):
            print(f"处理样本: {sample_name}")
            print(f"开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
            print("="*80)

            try:
                # 1. 生成C矩阵、总峰高和原始峰高
                print(f"步骤1: 生成基因座峰高矩阵")
                C_matrix, locus_total_heights, locus_raw_heights = generate_C_matrix(sample_name, data_test, GENE_LOCI)

                # 2. 获取贡献者人数
                print(f"步骤2: 获取预测贡献者数量")
                k = data_test[data_test['Sample File'] == sample_name]['predicted_people'].iloc[0]
                print(f"  预测贡献者人数: {k}")

                # 3. 执行优化
                print(f"步骤3: 开始优化计算")
                final_p, locus_results, weights = estimate_contributors_by_locus(
                    k, C_matrix, locus_total_heights, locus_raw_heights, GENE_LOCI
                )

                # 4. 记录结果
                print(f"步骤4: 记录优化结果")
                print(f"最终估计比例: {np.round(final_p, 4)}")
                print(f"比例总和: {np.sum(final_p):.4f}")
                print(f"权重分布: {np.round(weights, 4)}")

                # 添加到汇总数据
                summary_row = {
                    'Sample': sample_name,
                    'Contributors': k,
                    'Processing_Time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                }

                # 添加每个贡献者的比例
                for i in range(k):
                    summary_row[f'Contributor_{i+1}_P'] = final_p[i]

                # 添加权重信息
                for i, locus_name in enumerate(GENE_LOCI.keys()):
                    weight = weights[i] if i < len(weights) else 0
                    summary_row[f'Weight_{locus_name}'] = weight

                summary_data.append(summary_row)

                # 5. 保存基因座结果到CSV
                locus_results_df = pd.DataFrame({
                    'Locus': list(GENE_LOCI.keys()),
                    'Total_Raw_Height': locus_total_heights,
                    'Final_Weight': weights
                })

                # 添加每个贡献者的比例
                for i in range(k):
                    locus_results_df[f'Contributor_{i+1}_P'] = [p[i] if i < len(p) else 0 for p in locus_results]

                locus_results_df.to_csv(f"result/{sample_name.replace('/', '_').replace(':', '_')}_locus_results.csv", index=False)

            except Exception as e:
                print(f"处理样本 {sample_name} 时出错: {str(e)}")
                # 添加到汇总数据（错误标记）
                summary_data.append({
                    'Sample': sample_name,
                    'Contributors': 'ERROR',
                    'Processing_Time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                    'Error': str(e)
                })

            print("="*80)
            print(f"结束时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
            print(f"结果已保存到: {log_filename}")

    print(f"完成样本 {sample_name} 的处理")

# 创建汇总DataFrame
if summary_data:
    summary_df = pd.DataFrame(summary_data)

    # 重新排列列（样本信息在前）
    col_order = ['Sample', 'Contributors', 'Processing_Time', 'Error']
    for i in range(1, 6):  # 最多支持5个贡献者
        col_name = f'Contributor_{i}_P'
        if col_name in summary_df.columns:
            col_order.append(col_name)

    # 添加权重列
    locus_order = list(GENE_LOCI.keys())
    for locus_name in locus_order:
        col_name = f'Weight_{locus_name}'
        if col_name in summary_df.columns:
            col_order.append(col_name)

    # 修改列选择逻辑，确保只选择存在的列
    col_order = [col_name for col_name in col_order if col_name in summary_df.columns]

    # 然后进行列选择
    summary_df = summary_df[col_order]

    # 保存汇总结果
    summary_df.to_csv("result/summary_results.csv", index=False)
    print(f"所有样本处理完成！汇总结果已保存到: result/summary_results.csv")
else:
    print("未处理任何样本")