# 性能度量

In [3]:
from tvm.ir import IRModule
from tvm import relay
from tvm.ir.transform import Pass
import numpy as np
import tvm
from tvm import te
from tvm.relay import analysis, transform, Function

In [4]:
def run_opt_pass(expr, opt_pass):
    assert isinstance(opt_pass, Pass)
    mod = IRModule.from_expr(expr)
    mod = transform.InferType()(mod)
    mod = opt_pass(mod)
    entry = mod["main"]
    return entry if isinstance(expr, Function) else entry.body

In [5]:
def test_gemm():
    n = 512
    k = 1024
    m = 256
    dshape1 = (n, k)
    dshape2 = (m, k)
    data1 = relay.var("data1", shape=dshape1)
    data2 = relay.var("data2", shape=dshape2)
    gemm = relay.nn.dense(data1, data2)
    func = relay.Function([data1, data2], relay.Tuple(tvm.runtime.convert([gemm])))
    func = run_opt_pass(func, transform.InferType())
    compute_count = analysis.get_total_mac_number(func)
    expect_count = n * m * k
    assert compute_count == expect_count

def test_conv():
    batch_size = 1
    input_channel = 3
    h = 224
    w = 224
    output_channel = 64
    kh = 7
    kw = 7
    h_padding = 1
    w_padding = 1
    oh = h + h_padding * 2 - kh + 1
    ow = w + w_padding * 2 - kw + 1
    dshape = (batch_size, input_channel, h, w)
    weight = relay.var("weight", shape=(output_channel, input_channel, kh, kw))
    data = relay.var("data", shape=dshape)
    conv2d = relay.nn.conv2d(
        data, weight, channels=output_channel, kernel_size=(kh, kw), padding=(h_padding, w_padding)
    )
    func = relay.Function([data, weight], relay.Tuple(tvm.runtime.convert([conv2d])))
    func = run_opt_pass(func, transform.InferType())
    compute_count = analysis.get_total_mac_number(func)
    expect_count = batch_size * input_channel * oh * ow * output_channel * kh * kw
    assert compute_count == expect_count



def test_simple_network():
    batch_size = 1
    dshape = (batch_size, 64, 56, 56)
    weight_conv = relay.var("weight_conv", shape=(64, 64, 3, 3))
    data1 = relay.var("data1", shape=dshape)
    data2 = relay.var("data2", shape=dshape)
    weight_dense = relay.var("weight_dense", shape=(1, 56 * 56 * 64))

    conv2d_1 = relay.nn.conv2d(data1, weight_conv, channels=64, kernel_size=(3, 3), padding=(1, 1))
    conv2d_2 = relay.nn.conv2d(data2, weight_conv, channels=64, kernel_size=(3, 3), padding=(1, 1))
    add = relay.add(conv2d_1, conv2d_2)
    flattened = relay.nn.batch_flatten(add)
    dense_1 = relay.nn.dense(flattened, weight_dense)

    func = relay.Function(
        [data1, data2, weight_conv, weight_dense],
        relay.Tuple(tvm.runtime.convert([conv2d_1, conv2d_2, dense_1, add, flattened])),
    )
    # alter the CONV 2D data layout to test
    func = run_opt_pass(func, transform.AlterOpLayout())
    compute_count = analysis.get_total_mac_number(func)
    expect_count = 231411712
    assert compute_count == expect_count


def test_depthwise_conv2d():
    batch_size = 1
    dshape = (batch_size, 64, 56, 56)
    weight_conv = relay.var("weight_depthwiseconv", shape=(64, 1, 3, 3))
    data1 = relay.var("data1", shape=dshape)
    data2 = relay.var("data2", shape=dshape)
    depthwise_conv2d_1 = relay.nn.conv2d(
        data1, weight_conv, kernel_size=(3, 3), padding=(1, 1), groups=64
    )
    depthwise_conv2d_2 = relay.nn.conv2d(
        data2, weight_conv, kernel_size=(3, 3), padding=(1, 1), groups=64
    )
    add = relay.add(depthwise_conv2d_1, depthwise_conv2d_2)
    func = relay.Function(
        [data1, data2, weight_conv],
        relay.Tuple(tvm.runtime.convert([depthwise_conv2d_1, depthwise_conv2d_2, add])),
    )
    func = run_opt_pass(func, transform.InferType())
    compute_count = analysis.get_total_mac_number(func)
    assert compute_count == 2 * np.prod(dshape) * 3 * 3


def test_conv_2d_transpose():
    batch_size = 1
    input_channel = 3
    h = 224
    w = 224
    output_channel = 64
    kh = 7
    kw = 7
    h_padding = 1
    w_padding = 1
    oh = h - h_padding * 2 + kh - 1
    ow = w - w_padding * 2 + kw - 1
    dshape = (batch_size, input_channel, h, w)
    weight = relay.var("weight", shape=(input_channel, output_channel, kh, kw))
    data = relay.var("data", shape=dshape)
    conv2d_transpose = relay.nn.conv2d_transpose(
        data, weight, channels=output_channel, kernel_size=(kh, kw), padding=(h_padding, w_padding)
    )
    func = relay.Function([data, weight], relay.Tuple(tvm.runtime.convert([conv2d_transpose])))
    func = run_opt_pass(func, transform.InferType())
    compute_count = analysis.get_total_mac_number(func)
    expect_count = batch_size * input_channel * oh * ow * output_channel * kh * kw
    assert compute_count == expect_count


if __name__ == "__main__":
    test_conv()
    test_gemm()
    test_simple_network()
    test_depthwise_conv2d()
    test_conv_2d_transpose()



[08:58:06] /media/pc/data/4tb/lxw/books/tvm/src/relay/analysis/mac_count.cc:172: This pass only counts MACs in direct conv2d, conv2d_transpose, dense, and batch_matmul ops
[08:58:06] /media/pc/data/4tb/lxw/books/tvm/src/relay/analysis/mac_count.cc:172: This pass only counts MACs in direct conv2d, conv2d_transpose, dense, and batch_matmul ops
[08:58:06] /media/pc/data/4tb/lxw/books/tvm/src/relay/analysis/mac_count.cc:172: This pass only counts MACs in direct conv2d, conv2d_transpose, dense, and batch_matmul ops
[08:58:06] /media/pc/data/4tb/lxw/books/tvm/src/relay/analysis/mac_count.cc:172: This pass only counts MACs in direct conv2d, conv2d_transpose, dense, and batch_matmul ops
[08:58:06] /media/pc/data/4tb/lxw/books/tvm/src/relay/analysis/mac_count.cc:172: This pass only counts MACs in direct conv2d, conv2d_transpose, dense, and batch_matmul ops


## params

In [None]:
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import numpy as np
import tvm
from tvm import te, runtime
import json
import base64
from tvm._ffi.base import py_str
from tvm.relay.op import add
from tvm import relay
from tvm import rpc
from tvm.contrib import utils, graph_executor


def test_save_load():
    x = np.ones((10, 2)).astype("float32")
    y = np.ones((1, 2, 3)).astype("float32")
    params = {"x": x, "y": y}
    param_bytes = runtime.save_param_dict(params)
    assert isinstance(param_bytes, bytearray)
    param2 = relay.load_param_dict(param_bytes)
    assert len(param2) == 2
    np.testing.assert_equal(param2["x"].numpy(), x)
    np.testing.assert_equal(param2["y"].numpy(), y)


def test_ndarray_reflection():
    # Make two `NDArrayWrapper`s that point to the same underlying array.
    np_array = np.random.uniform(size=(10, 2)).astype("float32")
    tvm_array = tvm.nd.array(np_array)
    param_dict = {"x": tvm_array, "y": tvm_array}
    assert param_dict["x"].same_as(param_dict["y"])
    # Serialize then deserialize `param_dict`.
    deser_param_dict = relay.load_param_dict(runtime.save_param_dict(param_dict))
    # Make sure the data matches the original data and `x` and `y` contain the same data.
    np.testing.assert_equal(deser_param_dict["x"].numpy(), tvm_array.numpy())
    # Make sure `x` and `y` contain the same data.
    np.testing.assert_equal(deser_param_dict["x"].numpy(), deser_param_dict["y"].numpy())


def test_bigendian_rpc_param():
    """Test big endian rpc when there is a PowerPC RPC server available"""
    host = os.environ.get("TVM_POWERPC_TEST_HOST", None)
    port = os.environ.get("TVM_POWERPC_TEST_PORT", 9090)
    if host is None:
        return

    def verify_graph_executor(remote, target, shape, dtype):
        x = relay.var("x")
        y = relay.const(1)
        z = relay.add(x, y)
        func = relay.Function([x], z)

        x_in = np.ones(shape).astype(dtype)
        params = {"x": x_in}
        graph, lib, params = relay.build(func, target=target, params=params)

        temp = utils.tempdir()
        path_dso = temp.relpath("dev_lib.o")
        lib.save(path_dso)
        remote.upload(path_dso)
        lib = remote.load_module("dev_lib.o")
        dev = remote.cpu(0)
        mod = graph_executor.create(graph, lib, dev)
        mod.load_params(runtime.save_param_dict(params))
        mod.run()
        out = mod.get_output(0, tvm.nd.empty(shape, dtype=dtype, device=dev))
        tvm.testing.assert_allclose(x_in + 1, out.numpy())

    print("Test RPC connection to PowerPC...")
    remote = rpc.connect(host, port)
    target = "llvm -mtriple=powerpc-linux-gnu"
    for dtype in ["float32", "float64", "int32", "int8"]:
        verify_graph_executor(remote, target, (10,), dtype)


if __name__ == "__main__":
    test_save_load()
    test_ndarray_reflection()
    test_bigendian_rpc_param()
