In [1]:
import os

import numpy as np
import tvm
from tvm import te, auto_scheduler

In [2]:
@auto_scheduler.register_workload  # Note the auto_scheduler decorator
def matmul_add(N, L, M, dtype):
    A = te.placeholder((N, L), name="A", dtype=dtype)
    B = te.placeholder((L, M), name="B", dtype=dtype)
    C = te.placeholder((N, M), name="C", dtype=dtype)

    k = te.reduce_axis((0, L), name="k")
    matmul = te.compute(
        (N, M),
        lambda i, j: te.sum(A[i, k] * B[k, j], axis=k),
        name="matmul",
        attrs={"layout_free_placeholders": [B]},  # enable automatic layout transform for tensor B
    )
    out = te.compute((N, M), lambda i, j: matmul[i, j] + C[i, j], name="out")

    return [A, B, C, out]

In [5]:
A, B, C, out = matmul_add(1024, 1024, 1024, "float32")

In [8]:
func = te.create_prim_func([A, B, C, out])

In [14]:
print(dir(func))
print(func.params)

['__annotations__', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattr__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_handle_by_constructor__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__slots__', '__str__', '__subclasshook__', '__weakref__', '_checked_type_', '_move', '_relax_script', 'attrs', 'attrs', 'body', 'buffer_map', 'checked_type', 'handle', 'legacy_repr', 'params', 'ret_type', 'same_as', 'script', 'show', 'span', 'specialize', 'struct_info', 'with_attr', 'with_attrs', 'with_body', 'without_attr']
[var_A, var_B, var_C, var_out]


In [36]:
print(task.workload_key)
for ele in task.workload_key:
    print(ele)

["matmul_add", 1024, 1024, 1024, "float32"]
[
"
m
a
t
m
u
l
_
a
d
d
"
,
 
1
0
2
4
,
 
1
0
2
4
,
 
1
0
2
4
,
 
"
f
l
o
a
t
3
2
"
]


In [15]:
target = tvm.target.Target("llvm")
N = L = M = 1024
task = tvm.auto_scheduler.SearchTask(func=matmul_add, args=(N, L, M, "float32"), target=target)

# 检查计算图
print("Computational DAG:")
print(task.compute_dag)

Computational DAG:
A = PLACEHOLDER [1024, 1024]
B = PLACEHOLDER [1024, 1024]
matmul(i, j) += (A[i, k]*B[k, j])
C = PLACEHOLDER [1024, 1024]
out(i, j) = (matmul[i, j] + C[i, j])



In [32]:
log_file = "matmul.json"
tune_option = auto_scheduler.TuningOptions(
    num_measure_trials=10,
    measure_callbacks=[auto_scheduler.RecordToFile(log_file)],
    verbose=2,
)

In [33]:
# 运行 auto-tuning（搜索）
task.tune(tune_option)
# 应用最佳 schedule
sch, args = task.apply_best(log_file)

----------------------------------------------------------------------
------------------------------  [ Search ]
----------------------------------------------------------------------
Generate Sketches		#s: 3
Sample Initial Population	#s: 2014	fail_ct: 3	Time elapsed: 1.58
GA Iter: 0	Max score: 0.9989	Min score: 0.9405	#Pop: 128	#M+: 0	#M-: 0
GA Iter: 4	Max score: 0.9999	Min score: 0.9883	#Pop: 128	#M+: 1371	#M-: 72
EvolutionarySearch		#s: 128	Time elapsed: 6.93
----------------------------------------------------------------------
------------------------------  [ Measure ]
----------------------------------------------------------------------
Get 10 programs to measure:
..........**********
No: 1	GFLOPS: 60.02 / 60.02	results: MeasureResult(cost:[0.0358], error_no:0, all_cost:0.98, Tstamp:1732991436.56)
Placeholder: A, B, C
parallel i.0@j.0@ (0,64)
  for j.1 (0,4)
    for k.0 (0,1024)
      for i.2 (0,16)
        for j.2 (0,64)
          for i.3 (0,2)
            vectorize j.3 (0,2)

In [34]:
task.compute_dag

A = PLACEHOLDER [1024, 1024]
B = PLACEHOLDER [1024, 1024]
matmul(i, j) += (A[i, k]*B[k, j])
C = PLACEHOLDER [1024, 1024]
out(i, j) = (matmul[i, j] + C[i, j])

In [9]:
tvm.lower(sch, args).show()

In [12]:
func = tvm.build(sch, args, target)
a_np = np.random.uniform(size=(N, L)).astype(np.float32)
b_np = np.random.uniform(size=(L, M)).astype(np.float32)
c_np = np.random.uniform(size=(N, M)).astype(np.float32)
out_np = a_np.dot(b_np) + c_np

dev = tvm.cpu()
a_tvm = tvm.nd.array(a_np, device=dev)
b_tvm = tvm.nd.array(b_np, device=dev)
c_tvm = tvm.nd.array(c_np, device=dev)
out_tvm = tvm.nd.empty(out_np.shape, device=dev)
func(a_tvm, b_tvm, c_tvm, out_tvm)

# Check results
np.testing.assert_allclose(out_np, out_tvm.numpy(), rtol=1e-5)

# Evaluate execution time.
evaluator = func.time_evaluator(func.entry_name, dev, min_repeat_ms=500)
print(
    "Execution time of this operator: %.3f ms"
    % (np.median(evaluator(a_tvm, b_tvm, c_tvm, out_tvm).results) * 1000)
)

Execution time of this operator: 9.953 ms
