Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ansor][AutoTVM v2.0] Phase 1: Add RPC Runner #6077

Merged
merged 7 commits into from Jul 20, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/tvm/auto_scheduler/__init__.py
Expand Up @@ -28,7 +28,8 @@
from .compute_dag import ComputeDAG
from .auto_schedule import SearchTask, TuningOptions, HardwareParams, \
auto_schedule, EmptyPolicy
from .measure import MeasureInput, LocalBuilder, LocalRunner
from .measure import MeasureInput, LocalBuilder, LocalRunner, RPCRunner, \
LocalRPCMeasureContext
from .measure_record import RecordToFile, RecordReader, load_best, \
load_records, save_records
from .workload_registry import register_workload, make_workload_key
295 changes: 291 additions & 4 deletions python/tvm/auto_scheduler/measure.py
Expand Up @@ -42,17 +42,22 @@
from tvm.runtime import Object, module, ndarray
from tvm.driver import build_module
from tvm.ir import transform
from tvm.rpc.tracker import Tracker
from tvm.rpc.server import Server
from tvm.autotvm.measure.measure_methods import set_cuda_target_arch
from tvm.contrib import tar, ndk

from . import _ffi_api
from .utils import get_const_tuple, NoDaemonPool, call_func_with_timeout
from .utils import get_const_tuple, NoDaemonPool, call_func_with_timeout, request_remote, \
check_remote

# The maximum length of error message
MAX_ERROR_MSG_LEN = 512

# We use fork and a global variable to copy arguments between processings.
# This can avoid expensive serialization of TVM IR when using multiprocessing.Pool
GLOBAL_BUILD_ARGUMENTS = None
GLOBAL_RUN_ARGUMENTS = None

@tvm._ffi.register_object("auto_scheduler.MeasureCallback")
class MeasureCallback(Object):
Expand Down Expand Up @@ -195,6 +200,8 @@ def __init__(self,
class LocalRunner(ProgramRunner):
""" LocalRunner that uses local CPU/GPU to measures the time cost of programs.

TODO(FrozenGene): Add cpu cache flush to this runner.

Parameters
----------
timeout : int = 10
Expand Down Expand Up @@ -230,6 +237,124 @@ def __init__(self,
_ffi_api.LocalRunner, timeout, number, repeat, min_repeat_ms, cooldown_interval)


@tvm._ffi.register_object("auto_scheduler.RPCRunner")
class RPCRunner(ProgramRunner):
""" RPCRunner that uses RPC call to measures the time cost of programs on remote devices.
Or sometime we may need to use RPC even in local running to insulate the thread environment.
(e.g. running CUDA programs)

TODO(FrozenGene): Add cpu cache flush to this runner.

Parameters
----------
key : str
The key of the device registered in the RPC tracker.
host : str
The host address of the RPC Tracker.
port : int
The port of RPC Tracker.
priority : int = 1
The priority of this run request, larger is more prior.
n_parallel : int = 1
The number of tasks run in parallel.
timeout : int = 10
The timeout limit (in second) for each run.
This is used in a wrapper of the multiprocessing.Process.join().
number : int = 3
The number of times to run the generated code for taking average.
We call these runs as one `repeat` of measurement.
repeat : int = 1
The number of times to repeat the measurement.
In total, the generated code will be run (1 + number x repeat) times,
where the first "1" is warm up and will be discarded.
The returned result contains `repeat` costs,
each of which is an average of `number` costs.
min_repeat_ms : int = 0
The minimum duration of one `repeat` in milliseconds.
By default, one `repeat` contains `number` runs. If this parameter is set,
the parameters `number` will be dynamically adjusted to meet the
minimum duration requirement of one `repeat`.
i.e., When the run time of one `repeat` falls below this time, the `number` parameter
will be automatically increased.
cooldown_interval : float = 0.0
The cool down interval between two measurements.
"""

def __init__(self, key, host, port,
priority=1, n_parallel=1, timeout=10, number=3, repeat=1,
min_repeat_ms=0, cooldown_interval=0.0):
self.__init_handle_by_constructor__(
_ffi_api.RPCRunner, key, host, port, priority, n_parallel, timeout,
number, repeat, min_repeat_ms, cooldown_interval)

if check_remote(key, host, port, priority, timeout):
print("Get devices for measurement successfully!")
else:
raise RuntimeError("Cannot get remote devices from the tracker. "
"Please check the status of tracker by "
"'python -m tvm.exec.query_rpc_tracker --port [THE PORT YOU USE]' "
"and make sure you have free devices on the queue status.")


class LocalRPCMeasureContext:
""" A context wrapper for running RPCRunner locally.
This will launch a local RPC Tracker and local RPC Server.

TODO(FrozenGene): Add cpu cache flush to this RPC context.

Parameters
----------
priority : int = 1
The priority of this run request, larger is more prior.
n_parallel : int = 1
The number of tasks run in parallel.
timeout : int = 10
The timeout limit (in second) for each run.
This is used in a wrapper of the multiprocessing.Process.join().
number : int = 3
The number of times to run the generated code for taking average.
We call these runs as one `repeat` of measurement.
repeat : int = 1
The number of times to repeat the measurement.
In total, the generated code will be run (1 + number x repeat) times,
where the first "1" is warm up and will be discarded.
The returned result contains `repeat` costs,
each of which is an average of `number` costs.
min_repeat_ms : int = 0
The minimum duration of one `repeat` in milliseconds.
By default, one `repeat` contains `number` runs. If this parameter is set,
the parameters `number` will be dynamically adjusted to meet the
minimum duration requirement of one `repeat`.
i.e., When the run time of one `repeat` falls below this time, the `number` parameter
will be automatically increased.
cooldown_interval : float = 0.0
The cool down interval between two measurements.
"""

def __init__(self, priority=1, n_parallel=1, timeout=10, number=3, repeat=1,
min_repeat_ms=0, cooldown_interval=0.0):
ctx = tvm.context("cuda", 0)
if ctx.exist:
cuda_arch = "sm_" + "".join(ctx.compute_version.split('.'))
set_cuda_target_arch(cuda_arch)
host = '0.0.0.0'
self.tracker = Tracker(host, port=9000, port_end=10000, silent=True)
device_key = '$local$device$%d' % self.tracker.port
self.server = Server(host, port=self.tracker.port, port_end=10000,
key=device_key, use_popen=True, silent=True,
tracker_addr=(self.tracker.host, self.tracker.port))
self.runner = RPCRunner(device_key, host, self.tracker.port, priority,
n_parallel, timeout, number, repeat,
min_repeat_ms, cooldown_interval)
# Wait for the processes to start
time.sleep(0.5)

def __del__(self):
# Close the tracker and server before exit
self.tracker.terminate()
self.server.terminate()


class MeasureErrorNo(object):
""" Error type for MeasureResult. """
NO_ERROR = 0 # No error
Expand Down Expand Up @@ -307,7 +432,8 @@ def timed_func():
dirname, "tmp_func." + build_func.output_format)

try:
with transform.PassContext(): # todo(lmzheng): port the unroll pass
# TODO(merrymercy): Port the unroll pass.
with transform.PassContext():
func = build_module.build(
sch, args, target=task.target, target_host=task.target_host)
func.export_library(filename, build_func)
Expand Down Expand Up @@ -376,8 +502,10 @@ def local_builder_build(inputs, timeout, n_parallel, build_func='default', verbo

return results


@tvm._ffi.register_func("auto_scheduler.local_runner.run")
def local_run(inputs, build_results, timeout, number, repeat, min_repeat_ms, cooldown_interval,
def local_run(inputs, build_results,
timeout=10, number=3, repeat=1, min_repeat_ms=0, cooldown_interval=0,
verbose=1):
"""
Run function of LocalRunner to test the performance of the input BuildResults.
Expand All @@ -388,7 +516,7 @@ def local_run(inputs, build_results, timeout, number, repeat, min_repeat_ms, coo
The MeasureInputs to be measured.
build_results : List[BuildResult]
The BuildResults to be measured.
timeout : int
timeout : int = 10
The timeout limit (in second) for each run.
This is used in a wrapper of the multiprocessing.Process.join().
number : int = 3
Expand Down Expand Up @@ -426,6 +554,7 @@ def timed_func(inp, build_res):
try:
func = module.load_module(build_res.filename)
ctx = ndarray.context(str(inp.task.target), 0)
# TODO(FrozenGene): Add cpu cache flush to this function.
time_f = func.time_evaluator(
func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms)
# pylint: disable=broad-except
Expand All @@ -436,6 +565,7 @@ def timed_func(inp, build_res):

if error_no == 0:
try:
# TODO(FrozenGene): Update to ndarray.non-empty.
args = [ndarray.empty(get_const_tuple(x.shape), x.dtype, ctx) for x in
build_res.args]
ctx.sync()
Expand Down Expand Up @@ -478,3 +608,160 @@ def timed_func(inp, build_res):
print("")

return measure_results


def rpc_run_worker(index):
""" Function to be ran in the RPCRunner thread pool.

Parameters
----------
index : int
The MeasureInput and BuildResult index to be processed by the current Runner thread.

Returns
-------
res : MeasureResult
The measure result of this Runner thread.
"""
global GLOBAL_RUN_ARGUMENTS
inputs, build_results, key, host, port, priority, timeout, number, \
repeat, min_repeat_ms, cooldown_interval, verbose = GLOBAL_RUN_ARGUMENTS

max_float = 1e10 # We use 1e10 instead of sys.float_info.max for better readability in log
inp = inputs[index]
build_res = build_results[index]

if build_res.error_no != MeasureErrorNo.NO_ERROR:
return (max_float,), build_res.error_no, build_res.error_msg, build_res.time_cost, \
time.time()

def timed_func():
tic = time.time()
error_no = 0
error_msg = None
try:
# upload built module
remote = request_remote(key, host, port, priority, timeout)
remote.upload(build_res.filename)
func = remote.load_module(os.path.split(build_res.filename)[1])
ctx = remote.context(str(inp.task.target), 0)
# TODO(FrozenGene): Add cpu cache flush to this function.
time_f = func.time_evaluator(
func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms)
# pylint: disable=broad-except
except Exception:
costs = (max_float,)
error_no = MeasureErrorNo.COMPILE_DEVICE
error_msg = make_error_msg()

if error_no == 0:
try:
# TODO(FrozenGene): Update to ndarray.non-empty.
args = [ndarray.empty(get_const_tuple(x.shape), x.dtype, ctx) for x in
build_res.args]
ctx.sync()

costs = time_f(*args).results
# clean up remote files
remote.remove(build_res.filename)
remote.remove(os.path.splitext(build_res.filename)[0] + '.so')
remote.remove('')
# pylint: disable=broad-except
except Exception:
costs = (max_float,)
error_no = MeasureErrorNo.RUNTIME_DEVICE
error_msg = make_error_msg()

shutil.rmtree(os.path.dirname(build_res.filename))
toc = time.time()

time.sleep(cooldown_interval)
if verbose >= 1:
if error_no == MeasureErrorNo.NO_ERROR:
print("*", end="")
else:
print("*E", end="") # Run error

return costs, error_no, error_msg, toc - tic + build_res.time_cost, toc

res = call_func_with_timeout(timeout, timed_func)

if isinstance(res, TimeoutError):
if verbose >= 1:
print("*T", end="") # Run timeout
res = (max_float,), MeasureErrorNo.RUN_TIMEOUT, None, build_res.time_cost + \
timeout, time.time()
return res


@tvm._ffi.register_func("auto_scheduler.rpc_runner.run")
def rpc_runner_run(inputs, build_results, key, host, port,
priority=1, n_parallel=1, timeout=10, number=3, repeat=1, min_repeat_ms=0,
cooldown_interval=0.0, verbose=1):
""" Run function of RPCRunner to test the performance of the input BuildResults.

Parameters
----------
inputs : List[MeasureInput]
The MeasureInputs to be measured.
build_results : List[BuildResult]
The BuildResults to be measured.
key : str
The key of the device registered in the RPC tracker.
host : str
The host address of the RPC Tracker.
port : int
The port of RPC Tracker.
priority : int = 1
The priority of this run request, larger is more prior.
n_parallel : int = 1
The number of tasks run in parallel.
timeout : int = 10
The timeout limit (in second) for each run.
This is used in a wrapper of the multiprocessing.Process.join().
number : int = 3
The number of times to run the generated code for taking average.
We call these runs as one `repeat` of measurement.
repeat : int = 1
The number of times to repeat the measurement.
In total, the generated code will be run (1 + number x repeat) times,
where the first "1" is warm up and will be discarded.
The returned result contains `repeat` costs,
each of which is an average of `number` costs.
min_repeat_ms : int = 0
The minimum duration of one `repeat` in milliseconds.
By default, one `repeat` contains `number` runs. If this parameter is set,
the parameters `number` will be dynamically adjusted to meet the
minimum duration requirement of one `repeat`.
i.e., When the run time of one `repeat` falls below this time, the `number` parameter
will be automatically increased.
cooldown_interval : float = 0.0
The cool down interval between two measurements.
verbose: int = 1
Verbosity level. 0 for silent, 1 to output information during program measuring.

Returns
-------
res : List[MeasureResult]
The measure results of these MeasureInputs.
"""
global GLOBAL_RUN_ARGUMENTS
GLOBAL_RUN_ARGUMENTS = (inputs, build_results, key, host, port, priority, timeout, number,
repeat, min_repeat_ms, cooldown_interval, verbose)

assert len(inputs) == len(build_results), \
"Measure input size should be equal to build results"
pool = NoDaemonPool(n_parallel)
tuple_res = pool.map(rpc_run_worker, range(len(build_results)))
pool.terminate()
pool.join()
del pool

results = []
for res in tuple_res:
results.append(MeasureResult(*res))

if verbose >= 1:
print("")

return results