# 模型部署示例

本笔记本展示如何将训练好的TensorFlow模型转换为嵌入式系统可用的格式，包括：
1. 模型加载和分析
2. TensorFlow Lite转换
3. 量化优化
4. C代码生成
5. 性能评估
6. STM32部署准备

## 1. 设置环境

首先，我们需要导入必要的库并设置路径。

In [None]:
import os
import sys
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Tuple, Optional, Union, Any

# 添加父目录到路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath('.'))))

# 导入ML Pipeline模块
from ml_pipeline.models.model import load_model_with_metadata
from ml_pipeline.models.convert_model import (
    convert_to_tflite, generate_c_array, analyze_model
)
from ml_pipeline.data_processing.data_processing import load_dataset

# 设置绘图样式
sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)

## 2. 加载模型

加载训练好的模型和相关元数据。

In [None]:
# 加载模型
model_path = 'trained_models/lstm_model.h5'
model, metadata = load_model_with_metadata(model_path)

# 显示模型信息
print("模型信息:")
print(f"架构: {metadata['architecture']}")
print(f"输入形状: {metadata['input_shape']}")
print(f"类别数: {metadata['num_classes']}")
print(f"类别名称: {metadata['class_names']}")
print(f"特征名称: {metadata['feature_names']}")

# 显示模型结构
model.summary()

## 3. 准备代表性数据集

为量化校准准备代表性数据集。

In [None]:
# 加载数据
data_path = 'processed_data.npz'
windows, labels, _ = load_dataset(data_path)

# 创建代表性数据集生成器
def representative_dataset():
    """生成代表性数据样本。"""
    # 随机选择100个样本
    indices = np.random.choice(len(windows), min(100, len(windows)), replace=False)
    for idx in indices:
        sample = windows[idx:idx+1].astype(np.float32)
        yield [sample]

# 显示数据集信息
print(f"数据集大小: {len(windows)} 个样本")
print(f"样本形状: {windows.shape[1:]}")

# 显示一个样本
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
for i in range(3):
    plt.plot(windows[0, :, i], label=metadata['feature_names'][i])
plt.title('加速度数据示例')
plt.xlabel('时间点')
plt.ylabel('加速度')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
for i in range(3, 6):
    plt.plot(windows[0, :, i], label=metadata['feature_names'][i])
plt.title('陀螺仪数据示例')
plt.xlabel('时间点')
plt.ylabel('角速度')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

## 4. 转换为TFLite格式

尝试不同的量化选项，比较它们的性能和大小。

In [None]:
# 创建输出目录
output_dir = 'embedded'
os.makedirs(output_dir, exist_ok=True)

# 定义要尝试的量化选项
quantization_options = ['none', 'float16', 'int8', 'full_int8']
model_sizes = {}
conversion_results = {}

# 转换模型
for quantize in quantization_options:
    print(f"\n转换模型 (量化: {quantize})...")
    
    try:
        # 转换模型
        tflite_model = convert_to_tflite(
            model=model,
            quantize=quantize,
            optimize=True,
            representative_dataset=representative_dataset if quantize in ['int8', 'full_int8'] else None
        )
        
        # 保存模型
        tflite_path = os.path.join(output_dir, f"model_{quantize}.tflite")
        with open(tflite_path, 'wb') as f:
            f.write(tflite_model)
        
        # 记录模型大小
        model_sizes[quantize] = len(tflite_model)
        conversion_results[quantize] = 'success'
        
        print(f"模型已保存到 {tflite_path}")
        print(f"模型大小: {len(tflite_model) / 1024:.2f} KB")
        
    except Exception as e:
        print(f"转换失败: {str(e)}")
        conversion_results[quantize] = 'failed'

# 比较模型大小
plt.figure(figsize=(10, 5))
sizes = [size/1024 for size in model_sizes.values()]
plt.bar(model_sizes.keys(), sizes)
plt.title('不同量化选项的模型大小比较')
plt.xlabel('量化方法')
plt.ylabel('大小 (KB)')
for i, size in enumerate(sizes):
    plt.text(i, size, f'{size:.2f}KB', ha='center', va='bottom')
plt.grid(True)
plt.show()

## 5. 生成C代码

将选定的TFLite模型转换为C数组。

In [None]:
# 选择最佳的量化模型
best_quantize = 'float16'  # 可以根据实际需求选择
tflite_path = os.path.join(output_dir, f"model_{best_quantize}.tflite")

# 读取TFLite模型
with open(tflite_path, 'rb') as f:
    tflite_model = f.read()

# 生成C代码
c_code = generate_c_array(
    tflite_model=tflite_model,
    variable_name='g_model'
)

# 保存C文件
c_path = os.path.join(output_dir, 'model.c')
with open(c_path, 'w') as f:
    f.write(c_code)

# 生成头文件
h_code = f"""#ifndef MODEL_DATA_H
#define MODEL_DATA_H

extern const unsigned char g_model[];
extern const unsigned int g_model_len;

#endif // MODEL_DATA_H
"""

h_path = os.path.join(output_dir, 'model.h')
with open(h_path, 'w') as f:
    f.write(h_code)

print(f"C文件已保存到 {c_path}")
print(f"头文件已保存到 {h_path}")

# 显示C代码的前几行
print("\nC代码预览:")
print("===========")
print('\n'.join(c_code.split('\n')[:10]) + '\n...')

## 6. 性能分析

分析转换后模型的性能。

In [None]:
# 分析模型
results = analyze_model(
    model_path=model_path,
    tflite_path=tflite_path,
    output_dir=output_dir
)

# 显示分析结果
print("性能分析结果:")
print("=============")
print(f"原始模型大小: {results['original_size'] / 1024:.2f} KB")
print(f"TFLite模型大小: {results['tflite_size'] / 1024:.2f} KB")
print(f"大小减少: {results['size_reduction'] * 100:.2f}%")

if 'benchmark' in results:
    print(f"\n推理性能:")
    print(f"平均推理时间: {results['benchmark']['average_time_ms']:.2f} ms")
    print(f"每秒推理次数: {results['benchmark']['inference_per_second']:.2f}")

## 7. 验证转换后的模型

比较原始模型和转换后模型的预测结果。

In [None]:
# 加载TFLite模型
interpreter = tf.lite.Interpreter(model_path=tflite_path)
interpreter.allocate_tensors()

# 获取输入和输出细节
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

print("TFLite模型细节:")
print("输入:")
print(f"  形状: {input_details[0]['shape']}")
print(f"  类型: {input_details[0]['dtype']}")
print("输出:")
print(f"  形状: {output_details[0]['shape']}")
print(f"  类型: {output_details[0]['dtype']}")

# 选择一些测试样本
n_samples = 100
test_samples = windows[:n_samples]
test_labels = labels[:n_samples]

# 获取原始模型预测
original_predictions = model.predict(test_samples)
if original_predictions.shape[1] > 1:
    original_predictions = np.argmax(original_predictions, axis=1)
else:
    original_predictions = (original_predictions > 0.5).astype(int).flatten()

# 获取TFLite模型预测
tflite_predictions = []
for sample in test_samples:
    # 设置输入张量
    interpreter.set_tensor(input_details[0]['index'], np.array([sample], dtype=np.float32))
    
    # 运行推理
    interpreter.invoke()
    
    # 获取输出
    output = interpreter.get_tensor(output_details[0]['index'])
    if output.shape[1] > 1:
        pred = np.argmax(output)
    else:
        pred = (output > 0.5).astype(int)[0]
    
    tflite_predictions.append(pred)

tflite_predictions = np.array(tflite_predictions)

# 比较预测结果
matches = (original_predictions == tflite_predictions)
accuracy = np.mean(matches)

print(f"\n预测比较:")
print(f"预测匹配率: {accuracy * 100:.2f}%")

# 显示不匹配的预测
mismatches = np.where(~matches)[0]
if len(mismatches) > 0:
    print("\n预测不匹配的样本:")
    for idx in mismatches[:5]:  # 显示前5个不匹配
        print(f"样本 {idx}:")
        print(f"  真实标签: {metadata['class_names'][int(test_labels[idx])]}")
        print(f"  原始预测: {metadata['class_names'][int(original_predictions[idx])]}")
        print(f"  TFLite预测: {metadata['class_names'][int(tflite_predictions[idx])]}")

## 8. STM32部署准备

生成STM32项目所需的文件和配置。

In [None]:
# 创建STM32项目目录
stm32_dir = os.path.join(output_dir, 'stm32')
os.makedirs(stm32_dir, exist_ok=True)

# 生成预处理和后处理函数
preprocess_code = f"""
void preprocess_data(float* input_data, int length, int n_features) {{
    // 标准化参数
    const float means[6] = {{0.0f, 0.0f, 1.0f, 0.0f, 0.0f, 0.0f}};
    const float stds[6] = {{1.0f, 1.0f, 1.0f, 1.0f, 1.0f, 1.0f}};
    
    // 对每个特征进行标准化
    for (int i = 0; i < length; i++) {{
        for (int j = 0; j < n_features; j++) {{
            input_data[i * n_features + j] = 
                (input_data[i * n_features + j] - means[j]) / stds[j];
        }}
    }}
}}
"""

postprocess_code = f"""
int postprocess_output(float* output_data, int n_classes) {{
    // 找到最大概率的类别
    float max_prob = output_data[0];
    int pred_class = 0;
    
    for (int i = 1; i < n_classes; i++) {{
        if (output_data[i] > max_prob) {{
            max_prob = output_data[i];
            pred_class = i;
        }}
    }}
    
    return pred_class;
}}
"""

# 生成主要处理函数
main_code = f"""
#include "model.h"
#include "tensorflow/lite/micro/all_ops_resolver.h"
#include "tensorflow/lite/micro/micro_interpreter.h"
#include "tensorflow/lite/micro/micro_mutable_op_resolver.h"
#include "tensorflow/lite/schema/schema_generated.h"

// 全局变量
constexpr int kTensorArenaSize = 32768;  // 根据需要调整
uint8_t tensor_arena[kTensorArenaSize];

// 模型接口
tflite::MicroInterpreter* interpreter = nullptr;
TfLiteTensor* input = nullptr;
TfLiteTensor* output = nullptr;

// 初始化函数
bool initialize_model() {{
    // 加载模型
    const tflite::Model* model = tflite::GetModel(g_model);
    if (model->version() != TFLITE_SCHEMA_VERSION) {{
        return false;
    }}
    
    // 创建操作解析器
    static tflite::MicroMutableOpResolver<4> micro_op_resolver;
    micro_op_resolver.AddFullyConnected();
    micro_op_resolver.AddReshape();
    micro_op_resolver.AddSoftmax();
    
    // 创建解释器
    static tflite::MicroInterpreter static_interpreter(
        model, micro_op_resolver, tensor_arena, kTensorArenaSize);
    interpreter = &static_interpreter;
    
    // 分配张量
    TfLiteStatus allocate_status = interpreter->AllocateTensors();
    if (allocate_status != kTfLiteOk) {{
        return false;
    }}
    
    // 获取输入输出张量
    input = interpreter->input(0);
    output = interpreter->output(0);
    
    return true;
}}

// 预测函数
int predict(float* data, int length, int n_features) {{
    // 预处理
    preprocess_data(data, length, n_features);
    
    // 复制数据到输入张量
    for (int i = 0; i < length * n_features; i++) {{
        input->data.f[i] = data[i];
    }}
    
    // 运行推理
    TfLiteStatus invoke_status = interpreter->Invoke();
    if (invoke_status != kTfLiteOk) {{
        return -1;
    }}
    
    // 后处理
    return postprocess_output(output->data.f, {metadata['num_classes']});
}}
"""

# 保存文件
with open(os.path.join(stm32_dir, 'preprocess.c'), 'w') as f:
    f.write(preprocess_code)

with open(os.path.join(stm32_dir, 'postprocess.c'), 'w') as f:
    f.write(postprocess_code)

with open(os.path.join(stm32_dir, 'model_interface.cpp'), 'w') as f:
    f.write(main_code)

# 复制模型文件
import shutil
shutil.copy2(os.path.join(output_dir, 'model.c'), os.path.join(stm32_dir, 'model.c'))
shutil.copy2(os.path.join(output_dir, 'model.h'), os.path.join(stm32_dir, 'model.h'))

print("STM32项目文件已生成:")
print(f"  - {os.path.join(stm32_dir, 'preprocess.c')}")
print(f"  - {os.path.join(stm32_dir, 'postprocess.c')}")
print(f"  - {os.path.join(stm32_dir, 'model_interface.cpp')}")
print(f"  - {os.path.join(stm32_dir, 'model.c')}")
print(f"  - {os.path.join(stm32_dir, 'model.h')}")

## 9. 使用命令行工具

展示如何使用命令行工具进行模型转换。

In [None]:
print("模型转换命令示例:")
print("convert-model \
    --model trained_models/lstm_model.h5 \
    --output-dir embedded \
    --quantize float16 \
    --format both \
    --optimize \
    --representative-data processed_data.npz \
    --target-device stm32f4 \
    --analyze")

## 10. 总结

在本笔记本中，我们展示了如何将训练好的模型转换为嵌入式系统可用的格式：

1. 加载和分析模型
2. 准备代表性数据集
3. 尝试不同的量化选项
4. 生成C代码
5. 分析模型性能
6. 验证转换后的模型
7. 准备STM32部署文件

这个流程可以作为将机器学习模型部署到嵌入式系统的参考。通过调整量化参数和优化选项，你可以在模型大小和性能之间找到最佳平衡点。