Working with operators using Tensor Expression: Relay describes a computation as a set of operators, each of these operators cab be represented as TE which is domain-specific.

Writing and Scheduling Vector Addition in TE for CPU:

In [1]:
%%shell
pip install apache-tvm --pre





In [2]:
import tvm
import tvm.testing
from tvm import te
import numpy as np

In [3]:
tgt = tvm.target.Target(target="llvm", host="llvm")

In [4]:
n = te.var("n") #Defining symbolic variable - represents the dim of tensir
A = te.placeholder((n,), name="A") #represents an input tensor, defines the shape(here 1D with n elements each)
B = te.placeholder((n,), name="B")
C = te.compute(A.shape, lambda i: A[i] + B[i], name="C")
#te.compute - creates a computational graph describing how the addition should be performed when the computation is executed.
#shape of result tensor, computation rule

In [5]:
s = te.create_schedule(C.op) # creating a default schedule, a schedule defines the computational stategy - how a TE should be executed on hardware.

In [6]:
fadd = tvm.build(s, [A, B, C], tgt, name="myadd") #compiles a TE and its schedule into executable code input(schedule,TE,target,name of func)

In [7]:
# creates a TVM device object, specifies the type of device, 0 - specifies device index, first available CPU
dev = tvm.device(tgt.kind.name, 0)

n = 1024
#a,b,c are initialied. Created on specified device (dev) ensuring they are accessible for the computation to be run on the CPU.
a = tvm.nd.array(np.random.uniform(size=n).astype(A.dtype), dev)# arrays are converted to TVM NDArrays and allocated on the device
b = tvm.nd.array(np.random.uniform(size=n).astype(B.dtype), dev)# generates random floating_point numbers btw 0 & 1 of length 1024
c = tvm.nd.array(np.zeros(n, dtype=C.dtype), dev)
fadd(a, b, c)
#compares the elements of TVM computation result (c.numpy) with numpy reference computation
tvm.testing.assert_allclose(c.numpy(), a.numpy() + b.numpy())

In [8]:
#Timing the numpy implementation
import timeit

np_repeat = 100
np_running_time = timeit.timeit(
    setup="import numpy\n"
    "n = 32768\n"
    'dtype = "float32"\n'
    "a = numpy.random.rand(n, 1).astype(dtype)\n"
    "b = numpy.random.rand(n, 1).astype(dtype)\n",
    stmt="answer = a + b",
    number=np_repeat,
)
print("Numpy running time: %f" % (np_running_time / np_repeat))

#Evaluating the TVM implementation
def evaluate_addition(func, target, optimization, log):
    dev = tvm.device(target.kind.name, 0)
    n = 32768
    a = tvm.nd.array(np.random.uniform(size=n).astype(A.dtype), dev)
    b = tvm.nd.array(np.random.uniform(size=n).astype(B.dtype), dev)
    c = tvm.nd.array(np.zeros(n, dtype=C.dtype), dev)

    evaluator = func.time_evaluator(func.entry_name, dev, number=10)
    mean_time = evaluator(a, b, c).mean                             #runs the evaluation and return the mean time for the addition
    print("%s: %f" % (optimization, mean_time))

    log.append((optimization, mean_time))


log = [("numpy", np_running_time / np_repeat)]
evaluate_addition(fadd, tgt, "naive", log=log)

Numpy running time: 0.000014
naive: 0.000008


Updating the Schedule to Use Parallelism

In [9]:
s[C].parallel(C.op.axis[0]) #schedule for parallelism - specifies 1st axis of the computation - outer loop

In [10]:
#how the schedule affects the computation, Lower converting provides a way to inspect the generated IR of the computation.
print(tvm.lower(s, [A, B, C], simple_mode=True))

# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.handle, B: T.handle, C: T.handle):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "tir.noalias": T.bool(True)})
        n = T.int32()
        A_1 = T.match_buffer(A, (n,), strides=("stride",), buffer_type="auto")
        B_1 = T.match_buffer(B, (n,), strides=("stride",), buffer_type="auto")
        C_1 = T.match_buffer(C, (n,), strides=("stride",), buffer_type="auto")
        for i in T.parallel(n):
            C_2 = T.Buffer((C_1.strides[0] * n,), data=C_1.data, buffer_type="auto")
            A_2 = T.Buffer((A_1.strides[0] * n,), data=A_1.data, buffer_type="auto")
            B_2 = T.Buffer((B_1.strides[0] * n,), data=B_1.data, buffer_type="auto")
            C_2[i * C_1.strides[0]] = A_2[i * A_1.strides[0]] + B_2[i * B_1.strides[0]]


In [11]:
fadd_parallel = tvm.build(s, [A, B, C], tgt, name="myadd_parallel")
fadd_parallel(a, b, c)

tvm.testing.assert_allclose(c.numpy(), a.numpy() + b.numpy())

evaluate_addition(fadd_parallel, tgt, "parallel", log=log)

parallel: 0.000015


Updating the Schedule to Use Vectorization

Vectorization is a powerful technique that allows CPU to perform SIMD - Single Instruction Multiple Data, enabling multiple data points to be proccessed in parallel with single instruction

In [12]:
n = te.var("n")
A = te.placeholder((n,), name="A")
B = te.placeholder((n,), name="B")
C = te.compute(A.shape, lambda i: A[i] + B[i], name="C")

s = te.create_schedule(C.op)

In [13]:
factor = 4 # split factor - number of CPU cores

outer, inner = s[C].split(C.op.axis[0], factor=factor)
s[C].parallel(outer)
s[C].vectorize(inner)

fadd_vector = tvm.build(s, [A, B, C], tgt, name="myadd_parallel")

evaluate_addition(fadd_vector, tgt, "vector", log=log)

print(tvm.lower(s, [A, B, C], simple_mode=True))

vector: 0.000113
# from tvm.script import ir as I
# from tvm.script import tir as T

@I.ir_module
class Module:
    @T.prim_func
    def main(A: T.handle, B: T.handle, C: T.handle):
        T.func_attr({"from_legacy_te_schedule": T.bool(True), "tir.noalias": T.bool(True)})
        n = T.int32()
        A_1 = T.match_buffer(A, (n,), strides=("stride",), buffer_type="auto")
        B_1 = T.match_buffer(B, (n,), strides=("stride",), buffer_type="auto")
        C_1 = T.match_buffer(C, (n,), strides=("stride",), buffer_type="auto")
        for i_outer in T.parallel((n + 3) // 4):
            for i_inner_s in range(4):
                if T.likely(i_outer * 4 + i_inner_s < n):
                    C_2 = T.Buffer((C_1.strides[0] * n,), data=C_1.data, buffer_type="auto")
                    A_2 = T.Buffer((A_1.strides[0] * n,), data=A_1.data, buffer_type="auto")
                    B_2 = T.Buffer((B_1.strides[0] * n,), data=B_1.data, buffer_type="auto")
                    cse_var_1: T.int32 = i

Comparing the Different Schedules

In [14]:
baseline = log[0][1]
print("%s\t%s\t%s" % ("Operator".rjust(20), "Timing".rjust(20), "Performance".rjust(20)))
for result in log:
    print(
        "%s\t%s\t%s"
        % (result[0].rjust(20), str(result[1]).rjust(20), str(result[1] / baseline).rjust(20))
    )

            Operator	              Timing	         Performance
               numpy	1.3779510002223106e-05	                 1.0
               naive	          8.3261e-06	  0.6042377413026091
            parallel	         1.49822e-05	  1.0872810424741417
              vector	0.00011327570000000001	   8.220589845482515
