In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Concatenate, Dropout
import matplotlib.pyplot as plt

from subway_yolo_multiline import create_subway_network_data, extract_subway_features
from subway_lstm_multiline_complete import create_temporal_data, prepare_sequences, extract_temporal_features

In [None]:
def build_defect_detection_model(spatial_feature_dim=128, temporal_feature_dim=64, n_lines=4, n_defect_types=5):
    """
    构建地铁线路障碍检测模型，融合YOLO空间特征和LSTM时间特征

    参数:
    - spatial_feature_dim: 空间特征向量维度
    - temporal_feature_dim: 时间特征向量维度
    - n_lines: 线路数量
    - n_defect_types: 需要检测的缺陷类型数量

    返回:
    - 融合模型列表，每条线路一个
    """
    models = []

    # 为每条线路构建一个模型
    for line_id in range(1, n_lines + 1):
        # 空间特征输入
        spatial_input = Input(shape=(spatial_feature_dim,), name=f'line_{line_id}_spatial_input')

        # 时间特征输入
        temporal_input = Input(shape=(temporal_feature_dim,), name=f'line_{line_id}_temporal_input')

        # 特征融合
        combined_features = Concatenate()([spatial_input, temporal_input])

        # 全连接层
        x = Dense(128, activation='relu')(combined_features)
        x = Dropout(0.3)(x)
        x = Dense(64, activation='relu')(x)
        x = Dropout(0.2)(x)

        # 输出层 - 多缺陷类型检测
        defect_outputs = Dense(n_defect_types, activation='sigmoid', name=f'line_{line_id}_defects')(x)

        # 创建单线路模型
        model = Model(
            inputs=[spatial_input, temporal_input],
            outputs=defect_outputs,
            name=f'line_{line_id}_defect_model'
        )

        # 编译模型
        model.compile(
            optimizer='adam',
            loss='binary_crossentropy',
            metrics=['accuracy']
        )

        models.append(model)

    return models

In [None]:
def create_integrated_model(spatial_feature_dim=128, temporal_feature_dim=64, n_lines=4, n_defect_types=5):
    """
    创建集成模型，同时处理所有线路

    参数:
    - spatial_feature_dim: 空间特征向量维度
    - temporal_feature_dim: 时间特征向量维度
    - n_lines: 线路数量
    - n_defect_types: 需要检测的缺陷类型数量

    返回:
    - 集成模型
    """
    # 空间特征输入
    spatial_input = Input(shape=(spatial_feature_dim,), name='spatial_input')

    # 每条线路的时间特征输入
    temporal_inputs = [
        Input(shape=(temporal_feature_dim,), name=f'line_{i + 1}_temporal_input')
        for i in range(n_lines)
    ]

    # 线路特定的特征处理
    line_outputs = []
    for i in range(n_lines):
        # 组合该线路的空间和时间特征
        combined = Concatenate()([spatial_input, temporal_inputs[i]])

        # 线路特定的处理层
        x = Dense(64, activation='relu')(combined)
        x = Dropout(0.2)(x)

        # 线路特定的缺陷预测
        line_output = Dense(n_defect_types, activation='sigmoid', name=f'line_{i + 1}_defects')(x)
        line_outputs.append(line_output)

    # 创建集成模型
    model = Model(
        inputs=[spatial_input] + temporal_inputs,
        outputs=line_outputs,
        name='integrated_defect_model'
    )

    # 编译模型
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=['accuracy']
    )

    return model

In [None]:
def generate_defect_labels(n_lines=4, n_defect_types=5):
    """
    生成样本缺陷标签，用于演示模型训练和评估

    参数:
    - n_lines: 线路数量
    - n_defect_types: 缺陷类型数量

    返回:
    - 缺陷标签数组，形状为(n_lines, n_defect_types)
    """
    defect_labels = np.zeros((n_lines, n_defect_types))

    # 随机添加一些缺陷
    for i in range(n_lines):
        # 每条线路可能有0-2个缺陷类型
        n_defects = np.random.randint(0, 3)
        if n_defects > 0:
            # 随机选择n_defects个缺陷类型
            defect_indices = np.random.choice(n_defect_types, n_defects, replace=False)
            defect_labels[i, defect_indices] = 1

    return defect_labels

In [None]:
def predict_line_defects(models, subway_features, line_features, defect_types):
    """
    使用训练好的模型预测各线路的缺陷

    参数:
    - models: 线路缺陷检测模型列表
    - subway_features: 地铁网络空间特征
    - line_features: 线路时间特征列表
    - defect_types: 缺陷类型名称列表

    返回:
    - 预测结果
    """
    predictions = []

    for i, model in enumerate(models):
        # 准备输入数据
        spatial_input = np.expand_dims(subway_features, axis=0)
        temporal_input = np.expand_dims(line_features[i][0], axis=0)  # 使用第一个时间点

        # 预测
        pred = model.predict([spatial_input, temporal_input])
        predictions.append(pred[0])

        # 打印预测结果
        print(f"\n线路 {i + 1} 预测结果:")
        for j, defect_type in enumerate(defect_types):
            prob = pred[0][j]
            status = "✓" if prob > 0.5 else "✗"
            print(f"  - {defect_type}: {prob:.4f} {'有' if prob > 0.5 else '无'} {status}")

    return predictions

In [None]:
def plot_prediction_results(predictions, defect_labels, defect_types, n_lines=4):
    """
    可视化预测结果与实际标签的对比

    参数:
    - predictions: 预测的缺陷概率
    - defect_labels: 实际缺陷标签
    - defect_types: 缺陷类型名称列表
    - n_lines: 线路数量
    """
    fig, axes = plt.subplots(n_lines, 1, figsize=(10, 3 * n_lines))

    for i in range(n_lines):
        ax = axes[i] if n_lines > 1 else axes

        # 设置x轴标签和位置
        x = np.arange(len(defect_types))
        width = 0.35

        # 绘制条形图
        ax.bar(x - width / 2, predictions[i], width, label='预测概率')
        ax.bar(x + width / 2, defect_labels[i], width, label='实际标签')

        # 设置图表属性
        ax.set_ylabel('概率/标签')
        ax.set_title(f'线路 {i + 1} 缺陷预测与实际对比')
        ax.set_xticks(x)
        ax.set_xticklabels(defect_types)
        ax.legend()

        # 旋转x轴标签以避免重叠
        plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")

    plt.tight_layout()
    plt.show()

In [None]:
def apply_maintenance_strategy(predictions, threshold=0.5, budget_constraint=2):
    """
    基于预测结果应用维护策略

    参数:
    - predictions: 预测的缺陷概率
    - threshold: 缺陷概率阈值
    - budget_constraint: 预算约束（可以维修的最大线路数量）

    返回:
    - 维护优先级列表
    """
    # 计算每条线路的总体风险分数
    line_risks = []
    for i, line_pred in enumerate(predictions):
        # 计算平均风险（也可以使用更复杂的权重计算）
        risk_score = np.mean(line_pred)
        line_risks.append((i + 1, risk_score))

    # 按风险分数排序
    line_risks.sort(key=lambda x: x[1], reverse=True)

    # 应用预算约束
    maintenance_priority = line_risks[:budget_constraint]

    print("\n维护策略建议（按风险优先级）:")
    for line_id, risk in maintenance_priority:
        print(f"线路 {line_id}: 风险分数 = {risk:.4f}")

    return maintenance_priority

In [None]:
def generate_maintenance_report(predictions, defect_types, n_lines=4):
    """
    生成详细的维护报告。

    参数:
    - predictions: 预测的缺陷概率
    - defect_types: 缺陷类型名称列表
    - n_lines: 线路数量

    返回:
    - 维护报告DataFrame
    """
    report_data = []

    for line_id in range(1, n_lines + 1):
        line_pred = predictions[line_id - 1]

        # 找出超过阈值的缺陷
        for defect_id, prob in enumerate(line_pred):
            severity = "高" if prob > 0.7 else "中" if prob > 0.5 else "低"

            if prob > 0.3:  # 只报告可能性较大的缺陷
                report_data.append({
                    "线路ID": line_id,
                    "缺陷类型": defect_types[defect_id],
                    "缺陷概率": prob,
                    "风险等级": severity,
                    "建议操作": "立即检修" if prob > 0.7 else "计划检修" if prob > 0.5 else "监控"
                })

    # 创建DataFrame并按风险等级和缺陷概率排序
    report_df = pd.DataFrame(report_data)
    if not report_df.empty:
        # 定义排序键
        severity_order = {"高": 0, "中": 1, "低": 2}
        report_df["排序键"] = report_df["风险等级"].map(severity_order)

        # 排序并删除辅助列
        report_df = report_df.sort_values(by=["排序键", "缺陷概率"], ascending=[True, False]).drop("排序键", axis=1)

    return report_df

In [None]:
def main():
    # 设置参数
    n_lines = 4
    n_defect_types = 5
    defect_types = ['连通性问题', '容量瓶颈', '信号故障', '轨道磨损', '换乘障碍']

    print("===== 地铁线路障碍检测系统 =====")
    print(f"支持 {n_lines} 条线路和 {n_defect_types} 种缺陷类型检测")

    # 在实际应用中，以下代码会从文件中导入处理好的特征
    # subway_image, stations, lines = create_subway_network_data()
    # subway_features = extract_subway_features(subway_image)
    # time_df = create_temporal_data(n_days=90, n_lines=n_lines)
    # line_sequences = prepare_sequences(time_df, 24, n_lines)
    # line_features = extract_temporal_features(line_sequences)

    # 为演示目的，生成模拟特征
    print("\n生成模拟特征数据...")
    subway_features = np.random.normal(0, 1, 128)  # 模拟空间特征
    line_features = []
    for _ in range(n_lines):
        # 每条线路一组时间特征
        line_feature = np.random.normal(0, 1, (100, 64))  # 100个时间点，64维特征
        line_features.append(line_feature)

    # 为演示目的，生成一些缺陷标签
    defect_labels = generate_defect_labels(n_lines, n_defect_types)

    print("\n构建线路障碍检测模型...")
    models = build_defect_detection_model(
        spatial_feature_dim=len(subway_features),
        temporal_feature_dim=line_features[0].shape[1],
        n_lines=n_lines,
        n_defect_types=n_defect_types
    )

    # 显示模型摘要
    models[0].summary()

    print("\n在实际应用中，我们会基于真实数据训练模型...")
    print("此处仅为演示，使用随机初始化的模型进行预测")

    # 模拟实际标签
    print("\n模拟的线路缺陷标签:")
    for i, defects in enumerate(defect_labels):
        defect_names = [defect_types[j] for j, has_defect in enumerate(defects) if has_defect > 0.5]
        if defect_names:
            print(f"线路 {i + 1}: {', '.join(defect_names)}")
        else:
            print(f"线路 {i + 1}: 无缺陷")

    # 使用模型预测
    print("\n执行线路障碍检测...")
    predictions = predict_line_defects(models, subway_features, line_features, defect_types)

    # 应用维护策略
    maintenance_priority = apply_maintenance_strategy(predictions)

    # 生成维护报告
    report_df = generate_maintenance_report(predictions, defect_types, n_lines)

    print("\n详细维护报告:")
    if report_df.empty:
        print("没有检测到需要维护的线路。")
    else:
        print(report_df)

    # 在实际应用中可以启用可视化
    # plot_prediction_results(predictions, defect_labels, defect_types, n_lines)

    print("\n集成模型示例...")
    integrated_model = create_integrated_model(
        spatial_feature_dim=len(subway_features),
        temporal_feature_dim=line_features[0].shape[1],
        n_lines=n_lines,
        n_defect_types=n_defect_types
    )
    integrated_model.summary()

    # print("\n多线路站点支持的优势:")
    # print("1. 准确捕捉站点在多条线路上的影响")
    # print("2. 同时考虑站点的所有线路关系进行缺陷检测")
    # print("3. 支持线路间相互影响的分析")
    # print("4. 更接近实际地铁网络的复杂拓扑结构")
    # print("5. 能够识别由线路交叉引起的特定类型缺陷")

In [None]:
if __name__ == "__main__":
    main()