# Import & Config

In [1]:
from collections import defaultdict
import gradio as gr
import json
import logging
import numpy as np
import os
import onnx
import onnxruntime
import onnxdumper
import pandas as pd
from pathlib import Path
import shutil
import re

logger = logging.getLogger("[MindAcc]")
logging.basicConfig(level=logging.INFO)

# mslite 环境变量配置 doc: https://www.mindspore.cn/lite/docs/zh-CN/master/train/converter_train.html
PACKAGE_ROOT_PATH = Path("mindspore-lite-2.3.1-linux-x64")
converter = PACKAGE_ROOT_PATH / "tools/converter/converter/converter_lite"
benchmark = PACKAGE_ROOT_PATH / "tools/benchmark/benchmark"
os.environ["LD_LIBRARY_PATH"] = (
    str(PACKAGE_ROOT_PATH / "runtime/lib")
    + ":"
    + str(PACKAGE_ROOT_PATH / "tools/converter/lib")
)
# mslite benchmark配置 doc: https://www.mindspore.cn/lite/docs/zh-CN/master/tools/benchmark_tool.html#dump功能
mslite_benchmark_config = {
    "common_dump_settings": {
        "dump_mode": 0,
        "path": f"{os.getcwd()}/output",
        "net_name": "resnet50",
        "input_output": 0,
        "kernels": [],
    },
}
# 写入文件
with open('dump_config.json', 'w') as f:
    json.dump(mslite_benchmark_config, f)
os.environ["MINDSPORE_DUMP_CONFIG"] = f"{os.getcwd()}/dump_config.json"
ms_dump_path = Path("output")/mslite_benchmark_config["common_dump_settings"]["net_name"]
# 创建目录
for folder in [Path("input"), Path("model"), Path("output"), ms_dump_path]:
    if not folder.exists():
        folder.mkdir(parents=True)
# 记录当前dump目录结构
last_dump_dirs = [f for f in ms_dump_path.iterdir() if f.is_dir()]

# MindAcc Model

In [2]:
class MindaccModel:
    def __init__(self):
        self.path: Path = None
        self.name: str = None
        self.onnx_session: onnxdumper.InferenceSession = None
        self.input_nodes: list[onnxruntime.NodeArg] = None
        self.output_nodes: list[onnxruntime.NodeArg] = None
        self.precision: np.dtype = None
        self.onnx_input: dict = None
        self.ms_output_path: Path = None
        self.onnx_output_path: Path = "onnx_dumpinrun.npz"
        self.compare_map = {}
        self.compare_result: pd.DataFrame = None

    def load(self, model_file: gr.File):
        self.path = Path(model_file)
        self.name = self.path.stem
        # 使用 onnxdumper 的带导出的推理会话
        self.onnx_session = onnxdumper.InferenceSession(self.path)
        self.input_nodes = self.onnx_session.get_inputs()
        self.output_nodes = self.onnx_session.get_outputs()
        # 获取输入精度
        self.precision = onnx.load(self.path).graph.input[0].type.tensor_type
        self.precision = onnx.helper.tensor_dtype_to_np_dtype(self.precision.elem_type)

    def run_ms_converter(self, optimize = "") -> Path:
        cmd = f"{converter} --fmk=ONNX --modelFile={self.path} --outputFile={self.path} {"--optimize=" + optimize if optimize else ""}"
        # cmd = f"{converter} --fmk=ONNX --optimize=none --modelFile={self.path} --outputFile={model}"
        logger.info("Run convert cmd:{}".format(cmd))
        os.system(cmd)
        logger.info("Convert done, output model:{}".format(self.path))
        return Path(str(self.path) + ".ms")

    def input_generate(self, seed = 0) -> dict:
        # 设置随机数种子
        np.random.seed(seed)
        random_input = {}
        # 先考虑单输入的情况
        # inputs_nodes = ort_session.get_inputs()
        # for input_node in inputs_nodes:
        #     input_data = np.random.random(input_node.shape).astype(precision)
        #     input_data.tofile(f"mslite_input.bin")
        #     random_input[input_node.name] = input_data
        #     logger.info("input node generated: {} {}".format(input_node.name, input_node.shape))
        input_node = mindacc_model.input_nodes[0]
        input_data = np.random.random(input_node.shape).astype(mindacc_model.precision)
        # 4维输入时 nchw to nhwc
        if len(input_data.shape) == 4:
            ms_input_data = np.transpose(input_data, (0, 2, 3, 1))
        else :
            ms_input_data = input_data
        ms_input_data.tofile("input/mslite_input.bin")
        random_input[input_node.name] = input_data
        logger.info(
            "input node generated: {} {}".format(input_node.name, input_node.shape)
        )
        self.onnx_input = random_input
        return random_input
    
    def input_load(self, input_data: gr.File) -> None:      # 按mslite的输入格式加载数据
        ms_input_data = np.fromfile(input_data, dtype=self.precision)
        ms_input_data.tofile("input/mslite_input.bin")
        # 4维输入时 nhwc to nchw
        if len(ms_input_data.shape) == 4:
            onnx_input_data = np.transpose(input_data, (0, 3, 1, 2))
        else:
            onnx_input_data = ms_input_data
        self.onnx_input = {self.input_nodes[0].name: onnx_input_data}

    def run_ms_dump(self) -> Path:
        global last_dump_dirs
        cmd = f"{benchmark.resolve()} --modelFile={str(self.path)+".ms"} --inDataFile=input/mslite_input.bin"
        logger.info("Run benchmark cmd:{}".format(cmd))
        os.system(cmd)
        logger.info("Benchmark done")
        now_dump_dirs = [f for f in ms_dump_path.iterdir() if f.is_dir()]
        # 获取最新的dump文件夹
        new_dump_dir = list(set(now_dump_dirs) - set(last_dump_dirs))
        logger.info("Dump dir:{}".format(new_dump_dir))
        last_dump_dirs = now_dump_dirs
        self.ms_output_path = new_dump_dir[0] if new_dump_dir else None
        return self.ms_output_path
        
    def run_onnx_dump(self, dump_path = "onnx_dumpinrun.npz") -> dict:
        self.onnx_output_path = dump_path
        onnx_outputs = self.onnx_session.run([x.name for x in self.output_nodes], self.onnx_input, dump_path=dump_path)
        logger.info("onnx inference done")
        return onnx_outputs

# 单例模式
mindacc_model = MindaccModel()

# MindAcc Mapper

In [3]:
class MindAccMapper:
    
    @classmethod
    def get_ms_bin_info(cls, ms_bin_file: Path):
         # 从文件名中正则解析出shape和dtype   {op_name}_{input_output_index}_{shape}_{data_type}_{format}.bin
        MS_DATATYPE_TO_NP = {
            "Float64": np.float64,
            "Float32": np.float32,
            "Float16": np.float16,
            "Int64": np.int64,
            "Int32": np.int32,
            "Int8": np.int8,
            "UInt64": np.uint64,
            "UInt8": np.uint8,
            "Bool": np.bool_,
        }
        ms_out_pattern = re.compile(
           r"(?P<op_name>\w+)_(?P<io_flag>(input|output))_(?P<io_index>\d+)_(shape)(_(?P<shape>(\d+(_\d+)*)))?_(?P<data_type>[^_]+)(_(?P<layout>\w+))?"
        )
        shape_str_to_np = lambda shape_str: (tuple(map(int, shape_str.split("_"))) if shape_str else ())
        try:
            node_match = ms_out_pattern.match(ms_bin_file.stem).groupdict()
            node_match["shape"] = shape_str_to_np(node_match["shape"])
            node_match["data_type"] = MS_DATATYPE_TO_NP[node_match["data_type"]]
            return node_match
        except Exception as e:
            logger.warning(f"Can't parse file {ms_bin_file.name}, error: {e}")

    @classmethod
    def read_ms_output(cls, ms_output_path: Path):
        """
        解析结果格式如：
        {1: {'shape': (6,), 'data_type': <class 'numpy.int32'>, 'layout': 'NCHW', 'file': PosixPath('/home/liuhaoyi/oh24/mindacc/output/ssd12/9/    TopK_717_output_1_shape_6_Int32_NCHW.bin')}, 0: {'shape':   (6,), 'data_type': <class 'numpy.float32'>, 'layout': 'NCHW', 'file': PosixPath('/home/liuhaoyi/oh24/   mindacc/output/ssd12/9/TopK_717_output_0_shape_6_Float32_NCHW.bin')}}
        """

        ms_output = defaultdict(dict)
        for file in ms_output_path.iterdir():
            if file.is_file() and file.suffix == ".bin":
                x = MindAccMapper.get_ms_bin_info(file)
                if x["io_flag"] == "output":
                    index = int(x["io_index"])
                    ms_output[x["op_name"]][index] = {
                        "shape": x["shape"],
                        "data_type": x["data_type"],
                        "layout": x["layout"],
                        "file": file,
                    }
                # 比较时再读取数据
                # ms_output[node_name]["data"] = np.fromfile(
                #     file, dtype=MS_DATATYPE_TO_NP[x["data_type"]]
                # )
        return ms_output

    def __init__(
        self,
        onnx_model: str,
        onnx_dump_file: Path,
        ms_dump_dir: Path,
        extra_rules: dict = {},
    ) -> None:
        self.onnx_model = onnx.load(onnx_model)
        self.onnx_dump = np.load(onnx_dump_file)
        self.ms_dump = MindAccMapper.read_ms_output(Path(ms_dump_dir))
        self.extra_rules = extra_rules
        self.map = {}
    
    def simple_map(self):
        hwc2chw = lambda shape: (shape[0], shape[3], shape[1], shape[2]) if len(shape) == 4 else shape
        chw2hwc = lambda shape: (shape[0], shape[2], shape[3], shape[1]) if len(shape) == 4 else shape
        dtype_matcher = lambda dtype1, dtype2: (dtype1 == dtype2 or 
                                        (dtype1 in [np.int32, np.int64] and dtype2 in [np.int32, np.int64]))
        shape_matcher = lambda shape1, shape2: (shape1 == shape2 or
                                        (len(shape1) == 4 and len(shape2) == 4 and hwc2chw(shape1) == shape2) or
                                        (len(shape1) == 4 and len(shape2) == 4 and chw2hwc(shape1) == shape2))
        
        
        for ms_node in self.ms_dump:
            # 在 onnx model 中找到对应的 node
            onnx_node = next((x for x in self.onnx_model.graph.node if x.name in ms_node), None)
            if onnx_node is None:
                logger.warning(f"Can't find node {ms_node} in onnx model")
                continue
            # 为每一个 output 找到对应的 onnx output
            onnx_outputs = onnx_node.output
            matched_onnx = []
            for i, ms_output_info in self.ms_dump[ms_node].items():
                for j, onnx_output in enumerate(onnx_outputs):
                    # 条件1: 形状匹配
                    shape_match = shape_matcher(ms_output_info["shape"], self.onnx_dump[onnx_output].shape)
                    # 条件2: 数据类型匹配
                    dtype_match = dtype_matcher(ms_output_info["data_type"], self.onnx_dump[onnx_output].dtype)
                    # 条件3: ONNX输出未匹配
                    not_matched = onnx_output not in matched_onnx
                    # 满足所有条件
                    match_condition = shape_match and dtype_match and not_matched
                    # if ms_node == "Shape_384_fusion":
                    #     print(ms_output_info["shape"], self.onnx_dump[onnx_output].shape, ms_output_info["data_type"], "onnx:",self.onnx_dump[onnx_output].dtype, match_condition)
                    if match_condition:
                        matched_onnx.append(onnx_output)
                        self.ms_dump[ms_node][i]["onnx_output"] = onnx_output

        self.map = {}
        for ms_node in self.ms_dump:
            for i, ms_output_info in self.ms_dump[ms_node].items():
                if "onnx_output" in ms_output_info:
                    file_name=ms_output_info['file'].name
                    self.map[file_name] =ms_output_info['onnx_output']
                else:
                    # logger.warning(f"Can't find node {ms_node} output {i} in onnx model")
                    pass
        return self.map
    
    def get_map_result(self):
        # 获取映射成功率
        maped_count = len(self.map)
        all_count = sum([len(self.ms_dump[node]) for node in self.ms_dump])
        map_rate = maped_count / all_count
        # 获取未映射列表
        unmap_list = []
        for ms_node in self.ms_dump:
            for i, ms_output_info in self.ms_dump[ms_node].items():
                if "onnx_output" not in ms_output_info:
                    file_name=ms_output_info['file'].name
                    unmap_list.append(file_name)
        # 获取映射列表详情
        map_list = {}
        for i in self.map:
            tuple_to_str = lambda t: '_'.join(map(str, t))
            v = f"{self.map[i]}_{tuple_to_str(self.onnx_dump[self.map[i]].shape)}_{self.onnx_dump[self.map[i]].dtype}"
            map_list[i] = v
        return maped_count, all_count, map_rate, unmap_list, map_list
            

        

# MindAcc Analyzer

In [4]:
class MindAccAnalyzer:
    analyzers = {
        "cosine_similarity": lambda a, b: np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)),
        "relative_euclidean_distance": lambda a, b: np.linalg.norm(a - b) / np.linalg.norm(a),
        "max_absolute_error": lambda a, b: np.max(np.abs(a - b)),
        "mean_absolute_error": lambda a, b: np.mean(np.abs(a - b)),
        "root_mean_square_error": lambda a, b: np.sqrt(np.mean(np.square(a - b))),
        "max_relative_error": lambda a, b: np.max(np.abs(a - b) / np.abs(a)),
        "mean_relative_error": lambda a, b: np.mean(np.abs(a - b) / np.abs(a)),
        "accumulated_relative_error": lambda a, b: np.mean(np.abs(a - b) / np.abs(a)),
        "standard_deviation": lambda a, b: np.std(np.abs(a - b) / np.abs(a)),
        "kullback_leibler_divergence": lambda a, b: np.sum(a * np.log(a / b))
    }
    # MeanAbsoluteError趋于0，RootMeanSquareError趋于0，说明测量值与真实值越近似。
    # MeanAbsoluteError趋于0，RootMeanSquareError越大，说明存在局部过大的异常值。
    # MeanAbsoluteError越大，RootMeanSquareError等于或近似MeanAbsoluteError，说明整体偏差越集中。
    # MeanAbsoluteError越大，RootMeanSquareError越大于MeanAbsoluteError，说明存在整体偏差，且整体偏差分布分散。

# Gradio Interface

In [None]:
def process_model(model_file: gr.File, optimize: str):
    # 保存模型文件到./model目录
    if not model_file:
        return None, None
    model_file = shutil.copy(model_file, "./model")
    mindacc_model.load(model_file)
    model_info = f"模型路径: {mindacc_model.path}\n模型名称: {mindacc_model.name}\n输入节点: {[str(x) for x in mindacc_model.input_nodes]}\n输出节点: {[str(x) for x in mindacc_model.output_nodes]}\n输入精度: {mindacc_model.precision}"

    converted_model = mindacc_model.run_ms_converter(optimize = optimize)
    converted_model = str(converted_model)

    return  model_info, gr.File(label="转换后的模型", value=converted_model, visible=True)

def process_input(input_file: gr.File):
    mindacc_model.input_load(input_file)

def random_input(seed):
    # 生成随机输入
    mindacc_model.input_generate(seed=seed)
    onnx_shape = mindacc_model.input_nodes[0].shape
    # # 4维输入时 nchw to nhwc
    ms_shape = onnx_shape if len(onnx_shape) != 4 else (onnx_shape[0], onnx_shape[2], onnx_shape[3], onnx_shape[1])
    return "input/mslite_input.bin", ms_shape

def run_infer():
    # 运行推理
    ms_bench_dir=mindacc_model.run_ms_dump()
    mindacc_model.run_onnx_dump(dump_path = ms_bench_dir/"onnx_dump.npz")
    return f"mslite模型输出文件夹: {str(ms_bench_dir)}, onnx模型输出文件: {str(ms_bench_dir/"onnx_dump.npz")}"

def run_map():
    # 构造映射
    mapper = MindAccMapper(onnx_model=mindacc_model.path, onnx_dump_file=mindacc_model.onnx_output_path, ms_dump_dir=mindacc_model.ms_output_path)
    mindacc_model.compare_map = mapper.simple_map()
    # 构造pd.DataFrame用于显示
    model_map_dataframe = pd.DataFrame(mindacc_model.compare_map.items(), columns=["MSlite输出节点", "ONNX输出节点"])
    # 在DataFrame中添加未映射的节点
    maped_count, all_count, map_rate, unmap_list, map_list = mapper.get_map_result()
    unmap_df = pd.DataFrame({"MSlite输出节点": unmap_list, "ONNX输出节点": "未映射"})
    model_map_dataframe = pd.concat([model_map_dataframe, unmap_df], ignore_index=True)
    return f"映射成功率: {maped_count}/{all_count}={map_rate}", model_map_dataframe
    

def run_compare(analyzers, model_mapping, progress=gr.Progress()):
    nhwc2nchw = lambda x: np.transpose(x, (0, 3, 1, 2)) if len(x.shape) == 4 else x
    # 更新映射
    old= mindacc_model.compare_map
    mindacc_model.compare_map = {row["MSlite输出节点"]: row["ONNX输出节点"] for row in model_mapping.to_dict(orient='records') if row["ONNX输出节点"] != "未映射" and row["ONNX输出节点"] != ""}
    # 运行对比
    compare_result = pd.DataFrame()
    onnx_dump = np.load(mindacc_model.onnx_output_path)
    for i in progress.tqdm(mindacc_model.compare_map):
        i_path = mindacc_model.ms_output_path/i
        i_info = MindAccMapper.get_ms_bin_info(i_path)
        a_shape = i_info['shape']
        a_dtype = i_info['data_type']
        a = np.fromfile(i_path, dtype=a_dtype).reshape(a_shape)
        b = onnx_dump[mindacc_model.compare_map[i]]
        # a, b输入维度不一致时，a转换为nchw
        if a.shape != b.shape:
            a = nhwc2nchw(a)
        # print("a", i_path, a.shape, a.dtype)      
        # print("b",b.shape, b.dtype)
        for analyzer in analyzers:
            # 设置节点名
            compare_result.loc[i, "节点"] = i
            compare_result.loc[i, analyzer] = MindAccAnalyzer.analyzers[analyzer](a.flatten(), b.flatten())
    mindacc_model.compare_result = compare_result
    # 保存对比结果
    date_time = pd.Timestamp.now().strftime("%Y-%m-%d_%H-%M-%S")
    file_path = mindacc_model.ms_output_path / f"compare_result_{date_time}.csv"
    mindacc_model.compare_result.to_csv(file_path)
    return compare_result, gr.DownloadButton(label=f"下载对比结果 {file_path.name}", value=file_path, visible=True)



with gr.Blocks(theme=gr.themes.Soft()) as mindacc:
    # gr.Markdown("""
    # # MindAcc
    #  MindAcc是一个用于对比MindSpore Lite与ONNX模型推理结果的工具，
    
    # 支持模型转换、随机输入生成、推理运行、对比分析等功能。
    #             """)
    gr.HTML("""
        <div style="text-align: center; margin-bottom: 200px;">
            <h1 style="font-size: 3em;">MindAcc</h1>
            <p style="font-size: 1.5em;">MindAcc是一个用于对比MindSpore Lite与ONNX模型推理结果的工具，</p>
            <p style="font-size: 1.5em;">支持模型转换、随机输入生成、推理运行、对比分析等功能。</p>
            <p style="font-size: 1.5em;"><a href="https://gitee.com/noiatrio/mindacc" target="_blank">访问Gitee地址</a></p>
        </div>
    """)
    with gr.Accordion(label="使用说明"):
        gr.HTML("""
            <div>
            <p style="font-size: 12px;">1. 上传ONNX模型文件，将自动转换为MSlite模型并读取信息</p>
            <p style="font-size: 12px;">2. 上传MSlite输入数据或点击随机生成输入按钮</p>
            <p style="font-size: 12px;">3. 点击运行推理按钮，将使用benchmark工具获取MSlite输出，使用onnxdumper获取ONNX输出</p>
            <p style="font-size: 12px;">4. 点击生成映射按钮，将自动匹配MSlite输出节点与ONNX输出节点，若需修改映射可在节点映射表中直接修改</p>
            <p style="font-size: 12px;">5. 点击运行对比按钮，选择对比分析指标，获取对比结果</p>
        </div>
        """)
    with gr.Row():
        random_button = gr.Button("随机生成输入", variant="primary")
        with gr.Column(scale=1):
            random_seed_input = gr.Number(label="随机数种子")
            optimize_input = gr.Dropdown(label="convert-lite优化选项", choices=["none", "general", "gpu_oriented", "ascend_oriented"], value="none")
            model_info_output = gr.Textbox(label="模型信息")
        with gr.Column(scale=1):
            with gr.Group():
                # onnx_shape_input = gr.Textbox(label="ONNX 输入形状")
                # ms_shape_input = gr.Textbox(label="MSlite 输入形状")
                model_input = gr.File(label="上传模型")
                converted_model_output = gr.File(label="转换后的模型", visible=False)
        with gr.Column(scale=1):
            with gr.Group():            
                data_input = gr.File(label="上传 MSlite 输入数据, 或使用随机生成")
                ms_shape_input = gr.Textbox(label="MSlite 输入形状")
                
            
    with gr.Row():
        run_infer_button = gr.Button("运行推理", variant="primary", scale=1)
        infer_output = gr.Textbox(label="推理结果", scale=2)
    with gr.Row():
        run_map_button = gr.Button("生成映射", variant="primary", scale=1)
        mapping_outline = gr.Textbox(label="映射概览", scale=2)
    with gr.Row():
        model_mapping = gr.DataFrame(label="节点映射", headers=["MSlite输出节点", "ONNX输出节点"], datatype=["str", "str"])
    with gr.Row():    
        run_compare_button = gr.Button("运行对比", variant="primary", scale=1) 
        analysis_tool = gr.CheckboxGroup([x for x in MindAccAnalyzer.analyzers.keys()], label="对比分析指标", scale=2)
    compare_output = gr.DataFrame(label="对比结果", headers=["节点", *MindAccAnalyzer.analyzers.keys()])
    
    with gr.Row():
        run_draw_button = gr.Button("绘制统计图", variant="primary") 
        dowload_compare_button = gr.DownloadButton("下载对比结果", variant="primary")
    
    # 渲染柱形图
    @gr.render(triggers=[run_draw_button.click])
    def create_plot():
        # gr.LinePlot(pd.DataFrame(
        #     {"x": [1, 2, 3], "y": [1, 2, 3]}), x="x", y="y")
        dataframe = mindacc_model.compare_result

        if '节点' not in dataframe.columns or len(dataframe['节点']) == 0:
            print("No data")
            gr.Markdown("## 等待对比结果")
        else:
            # 每一个指标绘制一个柱形图
            # 获取dataframe的列名
            analyzers = dataframe.columns[1:]
            for analyzer in analyzers:
                # 取出指标数据
                data = pd.DataFrame({"节点": dataframe['节点'], analyzer: dataframe[analyzer]})
                if len(data) == 0:
                    continue
                # 绘制柱形图
                gr.BarPlot(data, x="节点", y=analyzer)
                
    model_input.upload(
        fn=process_model,
        inputs=[model_input, optimize_input],
        outputs=[model_info_output, converted_model_output],
    )
    data_input.upload(
        fn=process_input,
        inputs=[data_input],
        outputs=[],
    )
    random_button.click(fn=random_input, inputs=random_seed_input, outputs=[data_input, ms_shape_input])
    run_infer_button.click(fn=run_infer, inputs=None, outputs=[infer_output])
    run_map_button.click(fn=run_map, inputs=None, outputs=[mapping_outline, model_mapping])
    run_compare_button.click(fn=run_compare, inputs=[analysis_tool, model_mapping], outputs=[compare_output, dowload_compare_button])

mindacc.launch()