In [None]:
# 压力图检测系统 - MindSpore版本

这个notebook允许您输入文件路径，处理txt文件中的压力图数据，展示热力图并给出模型的睡姿判断结果。
本版本使用MindSpore框架，适配华为Ascend AI处理器。

## 导入必要的库

# 导入MindSpore相关库
import mindspore
import mindspore.nn as nn
import mindspore.ops as ops
from mindspore import Tensor
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
import seaborn as sns
import os
import sys
from pathlib import Path

# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False

# 添加项目路径
project_path = '/workspaces/codespaces-jupyter/project'
if project_path not in sys.path:
    sys.path.append(project_path)

print("MindSpore库导入完成！")
print(f"MindSpore版本: {mindspore.__version__}")

## 模型定义和设备配置

# 从训练脚本导入MindSpore模型定义
from train_mindspore import SimpleCNN

# 定义标签映射
LABEL_NAMES = ['Supine', 'Prone', 'Left Side', 'Right Side']

def label_to_name(label):
    """标签转换为姿态名称"""
    return LABEL_NAMES[label] if 0 <= label < len(LABEL_NAMES) else '未知'

print("MindSpore模型定义导入完成！")

## 加载训练好的模型

def load_mindspore_model(model_path):
    """加载训练好的MindSpore模型"""
    print(f"正在加载MindSpore模型: {model_path}")
    
    # 设置MindSpore上下文
    mindspore.set_context(
        mode=mindspore.PYNATIVE_MODE,  # 推理使用动态图模式
        device_target="Ascend",  # 使用Ascend设备
        device_id=0
    )
    
    model = SimpleCNN(num_classes=4)
    
    # 加载检查点
    if os.path.exists(model_path):
        param_dict = mindspore.load_checkpoint(model_path)
        mindspore.load_param_into_net(model, param_dict)
        model.set_train(False)  # 设置为推理模式
        print("MindSpore模型加载完成！")
        return model
    else:
        print(f"警告: 模型文件不存在: {model_path}")
        return None

# 设置Ascend设备
print("使用设备: Ascend")

# 加载MindSpore模型
model_path = "/workspaces/codespaces-jupyter/project/best_model_converted.ckpt"
if os.path.exists(model_path):
    model = load_mindspore_model(model_path)
else:
    print(f"警告: MindSpore模型文件不存在: {model_path}")
    print("请先运行模型转换工具 convert_model.py")
    model = None

## 数据加载函数

def load_pressure_data(file_path):
    """加载压力分布数据，支持逗号分隔和空格分隔的文件，并处理时间序列数据"""
    try:
        # 首先尝试逗号分隔格式
        try:
            data = np.loadtxt(file_path, delimiter=',')
        except ValueError:
            # 如果失败，尝试空格分隔格式
            data = np.loadtxt(file_path)
        
        print(f"原始数据形状: {data.shape}")
        
        # 检查数据形状并处理
        if data.shape == (40, 26):
            # 已经是正确的形状
            print("数据形状正确，无需处理")
            return data
        elif data.shape[1] == 26:
            # 如果列数正确但行数不是40，尝试重塑
            if data.shape[0] % 40 == 0:
                # 如果行数是40的倍数，取平均值
                n_frames = data.shape[0] // 40
                reshaped_data = data.reshape(n_frames, 40, 26)
                # 计算每个位置的平均值
                averaged_data = np.mean(reshaped_data, axis=0)
                print(f"将 {data.shape} 重塑为 (40, 26)，使用 {n_frames} 帧的平均值")
                return averaged_data
            else:
                # 如果不是40的倍数，截取前面的部分
                n_rows = (data.shape[0] // 40) * 40
                if n_rows >= 40:
                    truncated_data = data[:n_rows]
                    n_frames = n_rows // 40
                    reshaped_data = truncated_data.reshape(n_frames, 40, 26)
                    averaged_data = np.mean(reshaped_data, axis=0)
                    print(f"截取 {n_rows} 行，重塑为 (40, 26)，使用 {n_frames} 帧的平均值")
                    return averaged_data
                else:
                    print(f"警告: 文件行数不足40行，实际形状: {data.shape}")
                    return None
        else:
            print(f"警告: 文件列数不是26，实际形状: {data.shape}")
            return None
            
    except Exception as e:
        print(f"加载文件时出错: {e}")
        return None

print("数据加载函数定义完成！")

## 预测函数

def predict_posture(model, pressure_data):
    """使用MindSpore模型预测睡姿"""
    if model is None:
        print("模型未加载，无法进行预测")
        return None, None
    
    # 转换数据为MindSpore Tensor并添加batch和channel维度
    # (40, 26) -> (1, 1, 40, 26)
    tensor_data = Tensor(pressure_data, mindspore.float32)
    tensor_data = ops.expand_dims(tensor_data, 0)  # 添加batch维度
    tensor_data = ops.expand_dims(tensor_data, 0)  # 添加channel维度
    
    # 模型推理
    model.set_train(False)
    outputs = model(tensor_data)
    
    # 获取预测结果和置信度
    softmax = nn.Softmax(axis=1)
    probabilities = softmax(outputs)
    
    confidence = ops.max(probabilities, axis=1)[0]
    predicted = ops.argmax(probabilities, axis=1)
    
    # 获取所有类别的概率
    all_probs = probabilities.asnumpy()[0]
    
    return predicted.asnumpy().item(), confidence.asnumpy().item(), all_probs

print("MindSpore预测函数定义完成！")

## 可视化函数

def plot_pressure_heatmap(pressure_data, title="pressure distribution heatmap"):
    """绘制压力分布热力图"""
    plt.figure(figsize=(8, 12))
    
    # 使用seaborn绘制热力图
    sns.heatmap(pressure_data, 
                cmap='YlOrRd',  # 黄橙红色调
                cbar_kws={'label': 'pressure value'},
                xticklabels=False,
                yticklabels=False)
    
    plt.title(title, fontsize=16, fontweight='bold')
    plt.xlabel('sensor column', fontsize=12)
    plt.ylabel('sensor row', fontsize=12)
    
    # 显示数据统计信息
    plt.figtext(0.02, 0.02, 
                f'data shape: {pressure_data.shape}\n'
                f'max value: {pressure_data.max():.2f}\n'
                f'min value: {pressure_data.min():.2f}\n'
                f'mean value: {pressure_data.mean():.2f}',
                fontsize=10, 
                bbox=dict(boxstyle="round,pad=0.3", facecolor="lightgray", alpha=0.8))
    
    plt.tight_layout()
    plt.show()

def plot_prediction_results(predicted_label, confidence, all_probs):
    """绘制预测结果"""
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    # 左图：预测结果
    predicted_name = label_to_name(predicted_label)
    ax1.text(0.5, 0.6, f'Predicted Result: {predicted_name}', 
             fontsize=24, fontweight='bold', ha='center', va='center')
    ax1.text(0.5, 0.4, f'Confidence: {confidence:.3f}', 
             fontsize=18, ha='center', va='center')
    ax1.set_xlim(0, 1)
    ax1.set_ylim(0, 1)
    ax1.axis('off')
    ax1.set_title('MindSpore Prediction Result', fontsize=16, fontweight='bold')
    
    # 右图：各类别概率柱状图
    colors = ['#ff9999', '#66b3ff', '#99ff99', '#ffcc99']
    bars = ax2.bar(LABEL_NAMES, all_probs, color=colors, alpha=0.8)
    
    # 高亮预测结果
    bars[predicted_label].set_color(colors[predicted_label])
    bars[predicted_label].set_alpha(1.0)
    bars[predicted_label].set_edgecolor('red')
    bars[predicted_label].set_linewidth(3)
    
    ax2.set_ylabel('Probability', fontsize=12)
    ax2.set_title('Probability of All Labels', fontsize=16, fontweight='bold')
    ax2.set_ylim(0, 1)
    
    # 在柱子上显示概率值
    for i, (bar, prob) in enumerate(zip(bars, all_probs)):
        ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01, 
                f'{prob:.3f}', ha='center', va='bottom', fontsize=10, fontweight='bold')
    
    plt.tight_layout()
    plt.show()

print("可视化函数定义完成！")

## 完整分析流程

def analyze_pressure_file(file_path):
    """分析单个压力文件的完整流程 - MindSpore版本"""
    print(f"正在分析文件: {file_path}")
    print("="*60)
    
    # 检查文件是否存在
    if not os.path.exists(file_path):
        print(f"错误: 文件不存在: {file_path}")
        return
    
    # 1. 加载压力数据
    print("1. 加载压力数据...")
    pressure_data = load_pressure_data(file_path)
    
    if pressure_data is None:
        print("数据加载失败！")
        return
    
    print(f"数据加载成功！最终形状: {pressure_data.shape}")
    
    # 2. 绘制热力图
    print("\n2. 绘制压力分布热力图...")
    filename = os.path.basename(file_path)
    plot_pressure_heatmap(pressure_data, f"Pressure Heatmap - {filename} (MindSpore)")

    # 3. MindSpore模型预测
    print("\n3. 进行睡姿预测 (使用MindSpore + Ascend)...")
    if model is not None:
        predicted_label, confidence, all_probs = predict_posture(model, pressure_data)
        
        if predicted_label is not None:
            print(f"预测完成！")
            print(f"预测结果: {label_to_name(predicted_label)}")
            print(f"置信度: {confidence:.3f}")
            
            # 4. 绘制预测结果
            print("\n4. 显示预测结果...")
            plot_prediction_results(predicted_label, confidence, all_probs)
            
            # 5. 详细概率信息
            print("\n5. 各类别详细概率:")
            for i, (name, prob) in enumerate(zip(LABEL_NAMES, all_probs)):
                mark = " ← 预测结果" if i == predicted_label else ""
                print(f"  {name}: {prob:.4f}{mark}")
        else:
            print("预测失败！")
    else:
        print("MindSpore模型未加载，跳过预测步骤")
    
    print("\n分析完成！")
    print("="*60)

print("完整分析函数定义完成！")

## 使用说明

在下面的单元格中，修改 `file_path` 变量为您要分析的txt文件路径，然后运行单元格即可。

**示例路径:**
- `/workspaces/codespaces-jupyter/project/data/0902数据/hfs0902/hfs_1.txt`
- `/workspaces/codespaces-jupyter/project/data/0902数据/hjj0902/hjj_15.txt`

**注意事项:**
1. 文件应包含40×26的压力数据
2. 支持逗号分隔或空格分隔的格式
3. 如果数据有多个时间帧，会自动计算平均值
4. 使用MindSpore + Ascend进行推理

## 单文件分析

# 在这里输入您要分析的文件路径
file_path = "/workspaces/codespaces-jupyter/project/data/0902数据/hfs0902/hfs_1.txt"

# 运行分析
analyze_pressure_file(file_path)

## 批量测试示例

如果您想测试多个文件，可以使用下面的代码：

# 批量测试多个文件
test_files = [
    "/workspaces/codespaces-jupyter/project/data/0902数据/hfs0902/hfs_1.txt",   # 应该是仰卧
    "/workspaces/codespaces-jupyter/project/data/0902数据/hfs0902/hfs_8.txt",   # 应该是俯卧
    "/workspaces/codespaces-jupyter/project/data/0902数据/hfs0902/hfs_12.txt",  # 应该是左侧卧
    "/workspaces/codespaces-jupyter/project/data/0902数据/hfs0902/hfs_18.txt",  # 应该是右侧卧
]

for file_path in test_files:
    if os.path.exists(file_path):
        analyze_pressure_file(file_path)
        print("\n" + "="*80 + "\n")
    else:
        print(f"文件不存在: {file_path}")

## 性能对比测试

如果您想测试MindSpore在Ascend上的推理性能，可以运行下面的代码：

def performance_test(model, test_data_path, num_runs=10):
    """测试MindSpore模型的推理性能"""
    if model is None:
        print("模型未加载，无法进行性能测试")
        return
    
    print(f"开始性能测试 - 运行 {num_runs} 次推理")
    
    # 加载测试数据
    pressure_data = load_pressure_data(test_data_path)
    if pressure_data is None:
        print("测试数据加载失败")
        return
    
    # 预处理数据
    tensor_data = Tensor(pressure_data, mindspore.float32)
    tensor_data = ops.expand_dims(tensor_data, 0)
    tensor_data = ops.expand_dims(tensor_data, 0)
    
    import time
    
    # 预热
    for _ in range(3):
        _ = model(tensor_data)
    
    # 计时测试
    start_time = time.time()
    for i in range(num_runs):
        outputs = model(tensor_data)
    end_time = time.time()
    
    total_time = end_time - start_time
    avg_time = total_time / num_runs
    
    print(f"性能测试结果:")
    print(f"  总时间: {total_time:.4f} 秒")
    print(f"  平均推理时间: {avg_time:.4f} 秒")
    print(f"  FPS: {1/avg_time:.2f}")
    print(f"  使用设备: Ascend")

# 运行性能测试
if model is not None:
    test_file = "/workspaces/codespaces-jupyter/project/data/0902数据/hfs0902/hfs_1.txt"
    if os.path.exists(test_file):
        performance_test(model, test_file, num_runs=50)

## 系统信息

def print_system_info():
    """打印系统和MindSpore配置信息"""
    print("系统配置信息:")
    print(f"  MindSpore版本: {mindspore.__version__}")
    print(f"  设备类型: Ascend")
    print(f"  运行模式: PYNATIVE_MODE")
    
    # 打印模型信息
    if model is not None:
        total_params = sum(p.size for p in model.trainable_params())
        print(f"  模型参数数量: {total_params:,}")
        print(f"  模型类型: SimpleCNN (MindSpore)")

print_system_info()