From 52be95d45418ad5cdcf4ae85550354a32a420267 Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Thu, 16 Jul 2020 17:16:50 +0800 Subject: [PATCH 1/7] Add rpc runner --- python/tvm/auto_scheduler/__init__.py | 3 +- python/tvm/auto_scheduler/measure.py | 185 +++++++++++++++++- python/tvm/auto_scheduler/utils.py | 68 +++++++ python/tvm/rpc/server.py | 3 +- src/auto_scheduler/measure.cc | 41 ++++ src/auto_scheduler/measure.h | 38 ++++ .../unittest/test_auto_scheduler_measure.py | 20 ++ 7 files changed, 355 insertions(+), 3 deletions(-) diff --git a/python/tvm/auto_scheduler/__init__.py b/python/tvm/auto_scheduler/__init__.py index 90bec8665cef..c3a3712e55b4 100644 --- a/python/tvm/auto_scheduler/__init__.py +++ b/python/tvm/auto_scheduler/__init__.py @@ -28,7 +28,8 @@ from .compute_dag import ComputeDAG from .auto_schedule import SearchTask, TuningOptions, HardwareParams, \ auto_schedule, EmptyPolicy -from .measure import MeasureInput, LocalBuilder, LocalRunner +from .measure import MeasureInput, LocalBuilder, LocalRunner, RPCRunner, \ + LocalRPCMeasureContext from .measure_record import RecordToFile, RecordReader, load_best, \ load_records, save_records from .workload_registry import register_workload, make_workload_key diff --git a/python/tvm/auto_scheduler/measure.py b/python/tvm/auto_scheduler/measure.py index e99c47e6262c..0cf3083c1737 100644 --- a/python/tvm/auto_scheduler/measure.py +++ b/python/tvm/auto_scheduler/measure.py @@ -42,10 +42,14 @@ from tvm.runtime import Object, module, ndarray from tvm.driver import build_module from tvm.ir import transform +from tvm.rpc.tracker import Tracker +from tvm.rpc.server import Server +from tvm.autotvm.measure.measure_methods import set_cuda_target_arch from tvm.contrib import tar, ndk from . import _ffi_api -from .utils import get_const_tuple, NoDaemonPool, call_func_with_timeout +from .utils import get_const_tuple, NoDaemonPool, call_func_with_timeout, request_remote, \ + check_remote # The maximum length of error message MAX_ERROR_MSG_LEN = 512 @@ -230,6 +234,88 @@ def __init__(self, _ffi_api.LocalRunner, timeout, number, repeat, min_repeat_ms, cooldown_interval) + +@tvm._ffi.register_object("auto_scheduler.RPCRunner") +class RPCRunner(ProgramRunner): + """ + Parameters + ---------- + key : Str + host : Str + port : Int + priority : Int + n_parallel : Int + timeout : Int + number : Int + repeat : Int + min_repeat_ms : Int + cooldown_interval : Float + """ + + def __init__(self, key, host, port, priority=1, + n_parallel=1, + timeout=10, + number=3, + repeat=1, + min_repeat_ms=0, + cooldown_interval=0.0): + self.__init_handle_by_constructor__( + _ffi_api.RPCRunner, key, host, port, priority, timeout, n_parallel, + number, repeat, min_repeat_ms, cooldown_interval) + + if check_remote(key, host, port, priority, timeout): + print("Get devices for measurement successfully!") + else: + raise RuntimeError("Cannot get remote devices from the tracker. " + "Please check the status of tracker by " + "'python -m tvm.exec.query_rpc_tracker --port [THE PORT YOU USE]' " + "and make sure you have free devices on the queue status.") + + +class LocalRPCMeasureContext: + """ A context wrapper for running RPCRunner locally. + This will launch a local RPC Tracker and local RPC Server. + + Parameters + ---------- + priority : Int + n_parallel : Int + timeout : Int + number : Int + repeat : Int + min_repeat_ms : Int + cooldown_interval : Float + """ + + def __init__(self, + priority=1, + n_parallel=1, + timeout=10, + number=10, + repeat=1, + min_repeat_ms=0, + cooldown_interval=0.0): + ctx = tvm.context("cuda", 0) + if ctx.exist: + cuda_arch = "sm_" + "".join(ctx.compute_version.split('.')) + set_cuda_target_arch(cuda_arch) + host = '0.0.0.0' + self.tracker = Tracker(host, port=9000, port_end=10000, silent=True) + device_key = '$local$device$%d' % self.tracker.port + self.server = Server(host, port=self.tracker.port, port_end=10000, + key=device_key, use_popen=True, silent=True, + tracker_addr=(self.tracker.host, self.tracker.port)) + self.runner = RPCRunner(device_key, host, self.tracker.port, priority, + n_parallel, timeout, number, repeat, + min_repeat_ms, cooldown_interval) + # wait for the processes to start + time.sleep(0.5) + + def __del__(self): + self.tracker.terminate() + self.server.terminate() + + class MeasureErrorNo(object): """ Error type for MeasureResult. """ NO_ERROR = 0 # No error @@ -376,6 +462,7 @@ def local_builder_build(inputs, timeout, n_parallel, build_func='default', verbo return results + @tvm._ffi.register_func("auto_scheduler.local_runner.run") def local_run(inputs, build_results, timeout, number, repeat, min_repeat_ms, cooldown_interval, verbose=1): @@ -478,3 +565,99 @@ def timed_func(inp, build_res): print("") return measure_results + + +def rpc_run_worker(index): + """ ... + """ + inputs, build_results, key, host, port, priority, timeout, number, \ + repeat, min_repeat_ms, cooldown_interval, verbose = global_run_arguments + + MAX_FLOAT = 1e10 # We use 1e10 instead of sys.float_info.max for better readability in log + inp = inputs[index] + build_res = build_results[index] + + if build_res.error_no != MeasureErrorNo.NO_ERROR: + return (MAX_FLOAT,), build_res.error_no, build_res.error_msg, build_res.time_cost, \ + time.time() + + def timed_func(): + tic = time.time() + error_no = 0 + error_msg = None + try: + # upload built module + remote = request_remote(key, host, port, priority, timeout) + remote.upload(build_res.filename) + func = remote.load_module(os.path.split(build_res.filename)[1]) + ctx = remote.context(str(inp.task.target), 0) + time_f = func.time_evaluator( + func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms) + except Exception: + costs = (MAX_FLOAT,) + error_no = MeasureErrorNo.COMPILE_DEVICE + error_msg = make_error_msg() + + if error_no == 0: + try: + args = [ndarray.empty(get_const_tuple(x.shape), x.dtype, ctx) for x in + build_res.args] + ctx.sync() + + costs = time_f(*args).results + # clean up remote files + remote.remove(build_res.filename) + remote.remove(os.path.splitext(build_res.filename)[0] + '.so') + remote.remove('') + except Exception: + costs = (MAX_FLOAT,) + error_no = MeasureErrorNo.RUNTIME_DEVICE + error_msg = make_error_msg() + + shutil.rmtree(os.path.dirname(build_res.filename)) + toc = time.time() + + time.sleep(cooldown_interval) + if verbose >= 1: + if error_no == MeasureErrorNo.NO_ERROR: + print("*", end="") + else: + print("*E", end="") # Run error + + return costs, error_no, error_msg, toc - tic + build_res.time_cost, toc + + res = call_func_with_timeout(timeout, timed_func) + + if isinstance(res, TimeoutError): + if verbose >= 1: + print("*T", end="") # Run timeout + res = (MAX_FLOAT,), MeasureErrorNo.RUN_TIMEOUT, None, build_res.time_cost + \ + timeout, time.time() + return res + + +@tvm._ffi.register_func("auto_scheduler.rpc_runner.run") +def rpc_runner_run(inputs, build_results, + key: str, host: str, port: int, priority: int, timeout: float, + n_parallel: int, number: int, repeat: int, min_repeat_ms: int, + cooldown_interval: float, verbose: int): + global global_run_arguments + global_run_arguments = (inputs, build_results, key, host, port, priority, timeout, number, + repeat, min_repeat_ms, cooldown_interval, verbose) + + assert len(inputs) == len(build_results), \ + "Measure input size should be equal to build results" + pool = NoDaemonPool(n_parallel) + tuple_res = pool.map(rpc_run_worker, range(len(build_results))) + pool.terminate() + pool.join() + del pool + + results = [] + for res in tuple_res: + results.append(MeasureResult(*res)) + + if verbose >= 1: + print("") + + return results diff --git a/python/tvm/auto_scheduler/utils.py b/python/tvm/auto_scheduler/utils.py index f7b12028fc36..68174aaca65a 100644 --- a/python/tvm/auto_scheduler/utils.py +++ b/python/tvm/auto_scheduler/utils.py @@ -22,12 +22,14 @@ import multiprocessing.pool import queue import signal +import threading try: import psutil except ImportError: raise ImportError("psutil not found, try `pip install psutil` to fix this") +from tvm import rpc from tvm.tir import expr from tvm.tir.transform import Simplify from tvm.ir.transform import Sequential @@ -193,3 +195,69 @@ def func_wrapper(que): del que return res + + +def request_remote(device_key, host=None, port=None, priority=1, timeout=60): + """Request a remote session + + Parameters + ---------- + device_key: string + The device key of registered device in tracker + host: host, optional + The host address of rpc tracker. + If is none, will use environment variable "TVM_TRACKER_HOST" + port: int, optional + The port of rpc tracker. + If is none, will use environment variable "TVM_TRACKER_PORT" + priority: int, optional + The priority of this request, larger is more prior + timeout: float, optional + The timeout of this session (units: second) + + Returns + ------ + session: RPCSession + """ + # connect to the tracker + host = host or os.environ['TVM_TRACKER_HOST'] + port = port or int(os.environ['TVM_TRACKER_PORT']) + + tracker = rpc.connect_tracker(host, port) + remote = tracker.request(device_key, priority=priority, + session_timeout=timeout) + return remote + + +def check_remote(device_key, host=None, port=None, priority=100, timeout=10): + """ + Check the availability of a remote device + + Parameters + ---------- + device_key: string + device key of registered device in tracker + host: host, optional + The host address of rpc tracker. + If is none, will use environment variable "TVM_TRACKER_HOST" + port: int, optional + The port address of rpc tracker. + If is none, will use environment variable "TVM_TRACKER_PORT" + priority: int, optional + The priority of this request, larger is more prior + timeout: float, optional + The timeout of this check (units: seconds). + + Returns + ------- + available: bool + True if can find available device + """ + + def _check(): + remote = request_remote(device_key, host, port, priority) + + t = threading.Thread(target=_check, ) + t.start() + t.join(timeout) + return not t.is_alive() diff --git a/python/tvm/rpc/server.py b/python/tvm/rpc/server.py index 15a3c7de789d..42bcb00a9117 100644 --- a/python/tvm/rpc/server.py +++ b/python/tvm/rpc/server.py @@ -348,7 +348,8 @@ def __init__(self, cmd = [sys.executable, "-m", "tvm.exec.rpc_server", "--host=%s" % host, - "--port=%s" % port] + "--port=%s" % port, + "--port-end=%s" % port_end] if tracker_addr: assert key cmd += ["--tracker=%s:%d" % tracker_addr, diff --git a/src/auto_scheduler/measure.cc b/src/auto_scheduler/measure.cc index 3c545525f4e6..9f14cbcff969 100644 --- a/src/auto_scheduler/measure.cc +++ b/src/auto_scheduler/measure.cc @@ -41,6 +41,7 @@ TVM_REGISTER_OBJECT_TYPE(ProgramRunnerNode); TVM_REGISTER_OBJECT_TYPE(ProgramBuilderNode); TVM_REGISTER_OBJECT_TYPE(LocalBuilderNode); TVM_REGISTER_OBJECT_TYPE(LocalRunnerNode); +TVM_REGISTER_OBJECT_TYPE(RPCRunnerNode); static const char* ErrorNoToStr[] = { "NoError", @@ -146,6 +147,38 @@ Array LocalRunnerNode::Run(const Array& inputs, throw; } +/********** RPCRunner **********/ +RPCRunner::RPCRunner(const std::string& key, const std::string& host, int port, + int priority, int timeout, int n_parallel, int number, + int repeat, int min_repeat_ms, double cooldown_interval) { + auto node = make_object(); + node->key = key; + node->host = host; + node->port = port; + node->priority = priority; + node->timeout = timeout; + node->n_parallel = n_parallel; + node->number = number; + node->repeat = repeat; + node->min_repeat_ms = min_repeat_ms; + node->cooldown_interval = cooldown_interval; + data_ = std::move(node); +} + +Array RPCRunnerNode::Run(const Array& inputs, + const Array& build_results, + int verbose) { + if (const auto* f = runtime::Registry::Get("auto_scheduler.rpc_runner.run")) { + Array results = (*f)( + inputs, build_results, key, host, port, priority, timeout, n_parallel, + number, repeat, min_repeat_ms, cooldown_interval, verbose); + return results; + } else { + LOG(FATAL) << "auto_scheduler.rpc_runner.run is not registered"; + } + return Array(); +} + /********** ProgramMeasurer **********/ ProgramMeasurer::ProgramMeasurer(ProgramBuilder builder, ProgramRunner runner, Optional> callbacks, int verbose, @@ -327,5 +360,13 @@ TVM_REGISTER_GLOBAL("auto_scheduler.LocalRunner") return LocalRunner(timeout, number, repeat, min_repeat_ms, cooldown_interval); }); +TVM_REGISTER_GLOBAL("auto_scheduler.RPCRunner") + .set_body_typed([](const std::string& key, const std::string& host, int port, + int priority, int timeout, int n_parallel, int number, + int repeat, int min_repeat_ms, double cooldown_interval){ + return RPCRunner(key, host, port, priority, timeout, n_parallel, number, + repeat, min_repeat_ms, cooldown_interval); + }); + } // namespace auto_scheduler } // namespace tvm diff --git a/src/auto_scheduler/measure.h b/src/auto_scheduler/measure.h index 50b6fcfd6520..2e7d30ceb614 100644 --- a/src/auto_scheduler/measure.h +++ b/src/auto_scheduler/measure.h @@ -363,6 +363,44 @@ class LocalRunner : public ProgramRunner { TVM_DEFINE_MUTABLE_OBJECT_REF_METHODS(LocalRunner, ProgramRunner, LocalRunnerNode); }; +/*! + * \brief RPCRunner that uses RPC call to measures the time cost of programs + * on remote devices. + */ +class RPCRunnerNode : public ProgramRunnerNode { + public: + std::string key; + std::string host; + int port; + int priority; + int n_parallel; + int number; + int repeat; + int min_repeat_ms; + double cooldown_interval; + + /*! \biref Run measurement and return results */ + Array Run(const Array& inputs, + const Array& build_results, + int verbose) final; + + static constexpr const char* _type_key = "auto_scheduler.RPCRunner"; + TVM_DECLARE_FINAL_OBJECT_INFO(RPCRunnerNode, ProgramRunnerNode); +}; + +/*! + * \brief Managed reference to RPCRunnerNode. + * \sa RPCRunnerNode + */ +class RPCRunner : public ProgramRunner { + public: + RPCRunner(const std::string& key, const std::string& host, int port, + int priority, int timeout, int n_parallel, int number, + int repeat, int min_repeat_ms, double cooldown_interval); + + TVM_DEFINE_MUTABLE_OBJECT_REF_METHODS(RPCRunner, ProgramRunner, RPCRunnerNode); +}; + /*! * \brief Measurer that measures the time costs of tvm programs * This class combines ProgramBuilder and ProgramRunner, and provides a simpler API */ diff --git a/tests/python/unittest/test_auto_scheduler_measure.py b/tests/python/unittest/test_auto_scheduler_measure.py index 1bcd0540ecb4..4309a9adfb77 100644 --- a/tests/python/unittest/test_auto_scheduler_measure.py +++ b/tests/python/unittest/test_auto_scheduler_measure.py @@ -67,6 +67,26 @@ def test_measure_local_builder_runner(): assert mress[0].error_no == 0 +def test_measure_local_builder_rpc_runner(): + dag, s0 = get_tiled_matmul() + + if not tvm.runtime.enabled("llvm"): + return + tgt = tvm.target.create("llvm") + task = auto_scheduler.SearchTask(dag, "test", tgt) + + minp = auto_scheduler.MeasureInput(task, s0) + local_builder = auto_scheduler.LocalBuilder() + measure_ctx = auto_scheduler.LocalRPCMeasureContext() + rpc_runner = measure_ctx.runner + + bress = local_builder.build([minp]) + assert bress[0].error_no == 0 + mress = rpc_runner.run([minp], bress) + assert mress[0].error_no == 0 + + if __name__ == "__main__": test_record() test_measure_local_builder_runner() + test_measure_local_builder_rpc_runner() From 82584b47c6592d05bfae4857e4567a75c1fd9630 Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Thu, 16 Jul 2020 20:13:01 +0800 Subject: [PATCH 2/7] Update --- python/tvm/auto_scheduler/measure.py | 194 ++++++++++++++++++++------- python/tvm/auto_scheduler/utils.py | 2 +- src/auto_scheduler/measure.cc | 29 ++-- src/auto_scheduler/measure.h | 62 +++++---- 4 files changed, 197 insertions(+), 90 deletions(-) diff --git a/python/tvm/auto_scheduler/measure.py b/python/tvm/auto_scheduler/measure.py index 0cf3083c1737..985ac1d7bb64 100644 --- a/python/tvm/auto_scheduler/measure.py +++ b/python/tvm/auto_scheduler/measure.py @@ -57,6 +57,7 @@ # We use fork and a global variable to copy arguments between processings. # This can avoid expensive serialization of TVM IR when using multiprocessing.Pool GLOBAL_BUILD_ARGUMENTS = None +GLOBAL_RUN_ARGUMENTS = None @tvm._ffi.register_object("auto_scheduler.MeasureCallback") class MeasureCallback(Object): @@ -237,30 +238,50 @@ def __init__(self, @tvm._ffi.register_object("auto_scheduler.RPCRunner") class RPCRunner(ProgramRunner): - """ + """ RPCRunner that uses RPC call to measures the time cost of programs on remote devices. + Or sometime we may need to use RPC even in local running to insulate the thread environment. + (e.g. running CUDA programs) + Parameters ---------- - key : Str - host : Str - port : Int - priority : Int - n_parallel : Int - timeout : Int - number : Int - repeat : Int - min_repeat_ms : Int - cooldown_interval : Float + key : str + The key of the device registered in the RPC tracker. + host : str + The host address of the RPC Tracker. + port : int + The port of RPC Tracker. + priority : int = 1 + The priority of this run request, larger is more prior. + n_parallel : int = 1 + The number of tasks run in parallel. + timeout : int = 10 + The timeout limit (in second) for each run. + This is used in a wrapper of the multiprocessing.Process.join(). + number : int = 3 + The number of times to run the generated code for taking average. + We call these runs as one `repeat` of measurement. + repeat : int = 1 + The number of times to repeat the measurement. + In total, the generated code will be run (1 + number x repeat) times, + where the first "1" is warm up and will be discarded. + The returned result contains `repeat` costs, + each of which is an average of `number` costs. + min_repeat_ms : int = 0 + The minimum duration of one `repeat` in milliseconds. + By default, one `repeat` contains `number` runs. If this parameter is set, + the parameters `number` will be dynamically adjusted to meet the + minimum duration requirement of one `repeat`. + i.e., When the run time of one `repeat` falls below this time, the `number` parameter + will be automatically increased. + cooldown_interval : float = 0.0 + The cool down interval between two measurements. """ - def __init__(self, key, host, port, priority=1, - n_parallel=1, - timeout=10, - number=3, - repeat=1, - min_repeat_ms=0, - cooldown_interval=0.0): + def __init__(self, key, host, port, + priority=1, n_parallel=1, timeout=10, number=3, repeat=1, + min_repeat_ms=0, cooldown_interval=0.0): self.__init_handle_by_constructor__( - _ffi_api.RPCRunner, key, host, port, priority, timeout, n_parallel, + _ffi_api.RPCRunner, key, host, port, priority, n_parallel, timeout, number, repeat, min_repeat_ms, cooldown_interval) if check_remote(key, host, port, priority, timeout): @@ -278,23 +299,35 @@ class LocalRPCMeasureContext: Parameters ---------- - priority : Int - n_parallel : Int - timeout : Int - number : Int - repeat : Int - min_repeat_ms : Int - cooldown_interval : Float + priority : int = 1 + The priority of this run request, larger is more prior. + n_parallel : int = 1 + The number of tasks run in parallel. + timeout : int = 10 + The timeout limit (in second) for each run. + This is used in a wrapper of the multiprocessing.Process.join(). + number : int = 3 + The number of times to run the generated code for taking average. + We call these runs as one `repeat` of measurement. + repeat : int = 1 + The number of times to repeat the measurement. + In total, the generated code will be run (1 + number x repeat) times, + where the first "1" is warm up and will be discarded. + The returned result contains `repeat` costs, + each of which is an average of `number` costs. + min_repeat_ms : int = 0 + The minimum duration of one `repeat` in milliseconds. + By default, one `repeat` contains `number` runs. If this parameter is set, + the parameters `number` will be dynamically adjusted to meet the + minimum duration requirement of one `repeat`. + i.e., When the run time of one `repeat` falls below this time, the `number` parameter + will be automatically increased. + cooldown_interval : float = 0.0 + The cool down interval between two measurements. """ - def __init__(self, - priority=1, - n_parallel=1, - timeout=10, - number=10, - repeat=1, - min_repeat_ms=0, - cooldown_interval=0.0): + def __init__(self, priority=1, n_parallel=1, timeout=10, number=3, repeat=1, + min_repeat_ms=0, cooldown_interval=0.0): ctx = tvm.context("cuda", 0) if ctx.exist: cuda_arch = "sm_" + "".join(ctx.compute_version.split('.')) @@ -308,10 +341,11 @@ def __init__(self, self.runner = RPCRunner(device_key, host, self.tracker.port, priority, n_parallel, timeout, number, repeat, min_repeat_ms, cooldown_interval) - # wait for the processes to start + # Wait for the processes to start time.sleep(0.5) def __del__(self): + # Close the tracker and server before exit self.tracker.terminate() self.server.terminate() @@ -464,7 +498,8 @@ def local_builder_build(inputs, timeout, n_parallel, build_func='default', verbo @tvm._ffi.register_func("auto_scheduler.local_runner.run") -def local_run(inputs, build_results, timeout, number, repeat, min_repeat_ms, cooldown_interval, +def local_run(inputs, build_results, + timeout=10, number=3, repeat=1, min_repeat_ms=0, cooldown_interval=0, verbose=1): """ Run function of LocalRunner to test the performance of the input BuildResults. @@ -475,7 +510,7 @@ def local_run(inputs, build_results, timeout, number, repeat, min_repeat_ms, coo The MeasureInputs to be measured. build_results : List[BuildResult] The BuildResults to be measured. - timeout : int + timeout : int = 10 The timeout limit (in second) for each run. This is used in a wrapper of the multiprocessing.Process.join(). number : int = 3 @@ -568,17 +603,28 @@ def timed_func(inp, build_res): def rpc_run_worker(index): - """ ... + """ Function to be ran in the RPCRunner thread pool. + + Parameters + ---------- + index : int + The MeasureInput and BuildResult index to be processed by the current Runner thread. + + Returns + ------- + res : MeasureResult + The measure result of this Runner thread. """ + global GLOBAL_RUN_ARGUMENTS inputs, build_results, key, host, port, priority, timeout, number, \ - repeat, min_repeat_ms, cooldown_interval, verbose = global_run_arguments + repeat, min_repeat_ms, cooldown_interval, verbose = GLOBAL_RUN_ARGUMENTS - MAX_FLOAT = 1e10 # We use 1e10 instead of sys.float_info.max for better readability in log + max_float = 1e10 # We use 1e10 instead of sys.float_info.max for better readability in log inp = inputs[index] build_res = build_results[index] if build_res.error_no != MeasureErrorNo.NO_ERROR: - return (MAX_FLOAT,), build_res.error_no, build_res.error_msg, build_res.time_cost, \ + return (max_float,), build_res.error_no, build_res.error_msg, build_res.time_cost, \ time.time() def timed_func(): @@ -593,8 +639,9 @@ def timed_func(): ctx = remote.context(str(inp.task.target), 0) time_f = func.time_evaluator( func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms) + # pylint: disable=broad-except except Exception: - costs = (MAX_FLOAT,) + costs = (max_float,) error_no = MeasureErrorNo.COMPILE_DEVICE error_msg = make_error_msg() @@ -609,8 +656,9 @@ def timed_func(): remote.remove(build_res.filename) remote.remove(os.path.splitext(build_res.filename)[0] + '.so') remote.remove('') + # pylint: disable=broad-except except Exception: - costs = (MAX_FLOAT,) + costs = (max_float,) error_no = MeasureErrorNo.RUNTIME_DEVICE error_msg = make_error_msg() @@ -631,18 +679,64 @@ def timed_func(): if isinstance(res, TimeoutError): if verbose >= 1: print("*T", end="") # Run timeout - res = (MAX_FLOAT,), MeasureErrorNo.RUN_TIMEOUT, None, build_res.time_cost + \ + res = (max_float,), MeasureErrorNo.RUN_TIMEOUT, None, build_res.time_cost + \ timeout, time.time() return res @tvm._ffi.register_func("auto_scheduler.rpc_runner.run") -def rpc_runner_run(inputs, build_results, - key: str, host: str, port: int, priority: int, timeout: float, - n_parallel: int, number: int, repeat: int, min_repeat_ms: int, - cooldown_interval: float, verbose: int): - global global_run_arguments - global_run_arguments = (inputs, build_results, key, host, port, priority, timeout, number, +def rpc_runner_run(inputs, build_results, key, host, port, + priority=1, n_parallel=1, timeout=10, number=3, repeat=1, min_repeat_ms=0, + cooldown_interval=0.0, verbose=1): + """ Run function of RPCRunner to test the performance of the input BuildResults. + + Parameters + ---------- + inputs : List[MeasureInput] + The MeasureInputs to be measured. + build_results : List[BuildResult] + The BuildResults to be measured. + key : str + The key of the device registered in the RPC tracker. + host : str + The host address of the RPC Tracker. + port : int + The port of RPC Tracker. + priority : int = 1 + The priority of this run request, larger is more prior. + n_parallel : int = 1 + The number of tasks run in parallel. + timeout : int = 10 + The timeout limit (in second) for each run. + This is used in a wrapper of the multiprocessing.Process.join(). + number : int = 3 + The number of times to run the generated code for taking average. + We call these runs as one `repeat` of measurement. + repeat : int = 1 + The number of times to repeat the measurement. + In total, the generated code will be run (1 + number x repeat) times, + where the first "1" is warm up and will be discarded. + The returned result contains `repeat` costs, + each of which is an average of `number` costs. + min_repeat_ms : int = 0 + The minimum duration of one `repeat` in milliseconds. + By default, one `repeat` contains `number` runs. If this parameter is set, + the parameters `number` will be dynamically adjusted to meet the + minimum duration requirement of one `repeat`. + i.e., When the run time of one `repeat` falls below this time, the `number` parameter + will be automatically increased. + cooldown_interval : float = 0.0 + The cool down interval between two measurements. + verbose: int = 1 + Verbosity level. 0 for silent, 1 to output information during program measuring. + + Returns + ------- + res : List[MeasureResult] + The measure results of these MeasureInputs. + """ + global GLOBAL_RUN_ARGUMENTS + GLOBAL_RUN_ARGUMENTS = (inputs, build_results, key, host, port, priority, timeout, number, repeat, min_repeat_ms, cooldown_interval, verbose) assert len(inputs) == len(build_results), \ diff --git a/python/tvm/auto_scheduler/utils.py b/python/tvm/auto_scheduler/utils.py index 68174aaca65a..8177c9a4b8b1 100644 --- a/python/tvm/auto_scheduler/utils.py +++ b/python/tvm/auto_scheduler/utils.py @@ -255,7 +255,7 @@ def check_remote(device_key, host=None, port=None, priority=100, timeout=10): """ def _check(): - remote = request_remote(device_key, host, port, priority) + request_remote(device_key, host, port, priority) t = threading.Thread(target=_check, ) t.start() diff --git a/src/auto_scheduler/measure.cc b/src/auto_scheduler/measure.cc index 9f14cbcff969..6198f60da5a6 100644 --- a/src/auto_scheduler/measure.cc +++ b/src/auto_scheduler/measure.cc @@ -148,9 +148,9 @@ Array LocalRunnerNode::Run(const Array& inputs, } /********** RPCRunner **********/ -RPCRunner::RPCRunner(const std::string& key, const std::string& host, int port, - int priority, int timeout, int n_parallel, int number, - int repeat, int min_repeat_ms, double cooldown_interval) { +RPCRunner::RPCRunner(const String& key, const String& host, int port, int priority, int n_parallel, + int timeout, int number, int repeat, int min_repeat_ms, + double cooldown_interval) { auto node = make_object(); node->key = key; node->host = host; @@ -166,15 +166,16 @@ RPCRunner::RPCRunner(const std::string& key, const std::string& host, int port, } Array RPCRunnerNode::Run(const Array& inputs, - const Array& build_results, - int verbose) { + const Array& build_results, int verbose) { if (const auto* f = runtime::Registry::Get("auto_scheduler.rpc_runner.run")) { - Array results = (*f)( - inputs, build_results, key, host, port, priority, timeout, n_parallel, - number, repeat, min_repeat_ms, cooldown_interval, verbose); + Array results = + (*f)(inputs, build_results, key, host, port, priority, n_parallel, timeout, number, repeat, + min_repeat_ms, cooldown_interval, verbose); return results; } else { - LOG(FATAL) << "auto_scheduler.rpc_runner.run is not registered"; + LOG(FATAL) << "auto_scheduler.rpc_runner.run is not registered. " + << "This is a function registered in Python, " + << "make sure the TVM Python runtime has been loaded successfully."; } return Array(); } @@ -361,11 +362,11 @@ TVM_REGISTER_GLOBAL("auto_scheduler.LocalRunner") }); TVM_REGISTER_GLOBAL("auto_scheduler.RPCRunner") - .set_body_typed([](const std::string& key, const std::string& host, int port, - int priority, int timeout, int n_parallel, int number, - int repeat, int min_repeat_ms, double cooldown_interval){ - return RPCRunner(key, host, port, priority, timeout, n_parallel, number, - repeat, min_repeat_ms, cooldown_interval); + .set_body_typed([](const String& key, const String& host, int port, int priority, + int n_parallel, int timeout, int number, int repeat, int min_repeat_ms, + double cooldown_interval) { + return RPCRunner(key, host, port, priority, n_parallel, timeout, number, repeat, + min_repeat_ms, cooldown_interval); }); } // namespace auto_scheduler diff --git a/src/auto_scheduler/measure.h b/src/auto_scheduler/measure.h index 2e7d30ceb614..02d6e879a1cd 100644 --- a/src/auto_scheduler/measure.h +++ b/src/auto_scheduler/measure.h @@ -266,6 +266,14 @@ class ProgramRunnerNode : public Object { public: /*! \brief Timeout of a run. */ int timeout; + /*! \brief The number of times to run the generated code for taking average. */ + int number; + /*! \brief The number of times to repeat the measurement. */ + int repeat; + /*! \brief The minimum duration of one repeat in milliseconds. */ + int min_repeat_ms; + /*! \brief The cool down interval between two measurements. */ + double cooldown_interval; /*! * \brief Run measurement and return results. @@ -326,15 +334,6 @@ class LocalBuilder : public ProgramBuilder { /*! \brief LocalRunner that uses local CPU/GPU to measures the time cost of programs */ class LocalRunnerNode : public ProgramRunnerNode { public: - /*! \brief Number of measure times. */ - int number; - /*! \brief Number of repeat times in each measure. */ - int repeat; - /*! \brief The minimum duration of one repeat in milliseconds. */ - int min_repeat_ms; - /*! \brief The cool down interval between two measurements. */ - double cooldown_interval; - Array Run(const Array& inputs, const Array& build_results, int verbose) final; @@ -353,8 +352,8 @@ class LocalRunner : public ProgramRunner { * for more detailed parameter explaination. * \param timeout The timeout limit (in second) for each run. * This is used in a wrapper of the multiprocessing.Process.join(). - * \param number Number of measure times. - * \param repeat Number of repeat times in each measure. + * \param number The number of times to run the generated code for taking average. + * \param repeat The number of times to repeat the measurement. * \param min_repeat_ms The minimum duration of one repeat in milliseconds. * \param cooldown_interval The cool down interval between two measurements. */ @@ -364,25 +363,26 @@ class LocalRunner : public ProgramRunner { }; /*! - * \brief RPCRunner that uses RPC call to measures the time cost of programs - * on remote devices. + * \brief RPCRunner that uses RPC call to measures the time cost of programs on remote devices. + * Or sometime we may need to use RPC even in local running to insulate the thread environment. + * (e.g. running CUDA programs) */ class RPCRunnerNode : public ProgramRunnerNode { public: - std::string key; - std::string host; + /*! \brief The key of the device registered in the RPC tracker. */ + String key; + /*! \brief The host address of the RPC Tracker. */ + String host; + /*! \brief The port of RPC Tracker. */ int port; + /*! \brief The priority of this run request, larger is more prior. */ int priority; + /*! \brief The number of tasks run in parallel. */ int n_parallel; - int number; - int repeat; - int min_repeat_ms; - double cooldown_interval; + /*! \brief The number of times to run the generated code for taking average. */ - /*! \biref Run measurement and return results */ Array Run(const Array& inputs, - const Array& build_results, - int verbose) final; + const Array& build_results, int verbose) final; static constexpr const char* _type_key = "auto_scheduler.RPCRunner"; TVM_DECLARE_FINAL_OBJECT_INFO(RPCRunnerNode, ProgramRunnerNode); @@ -394,9 +394,21 @@ class RPCRunnerNode : public ProgramRunnerNode { */ class RPCRunner : public ProgramRunner { public: - RPCRunner(const std::string& key, const std::string& host, int port, - int priority, int timeout, int n_parallel, int number, - int repeat, int min_repeat_ms, double cooldown_interval); + /*! + * \brief The constructor. + * \param key The key of the device registered in the RPC tracker. + * \param host The host address of the RPC Tracker. + * \param prot The port of RPC Tracker. + * \param priority The priority of this run request, larger is more prior. + * \param n_parallel The number of tasks run in parallel. + * \param timeout Timeout of a run. + * \param number The number of times to run the generated code for taking average. + * \param repeat The number of times to repeat the measurement. + * \param min_repeat_ms The minimum duration of one repeat in milliseconds. + * \param cooldown_interval The cool down interval between two measurements. + */ + RPCRunner(const String& key, const String& host, int port, int priority, int n_parallel, + int timeout, int number, int repeat, int min_repeat_ms, double cooldown_interval); TVM_DEFINE_MUTABLE_OBJECT_REF_METHODS(RPCRunner, ProgramRunner, RPCRunnerNode); }; From c2d72f9d6f5459f0b377f88a70d629ff8692638a Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Thu, 16 Jul 2020 20:29:32 +0800 Subject: [PATCH 3/7] Update --- python/tvm/auto_scheduler/utils.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/python/tvm/auto_scheduler/utils.py b/python/tvm/auto_scheduler/utils.py index 8177c9a4b8b1..f34db44fc4b0 100644 --- a/python/tvm/auto_scheduler/utils.py +++ b/python/tvm/auto_scheduler/utils.py @@ -23,6 +23,7 @@ import queue import signal import threading +import os try: import psutil @@ -202,22 +203,22 @@ def request_remote(device_key, host=None, port=None, priority=1, timeout=60): Parameters ---------- - device_key: string + device_key : str The device key of registered device in tracker - host: host, optional + host : Optional[str] The host address of rpc tracker. If is none, will use environment variable "TVM_TRACKER_HOST" - port: int, optional + port : Optional[int] The port of rpc tracker. If is none, will use environment variable "TVM_TRACKER_PORT" - priority: int, optional + priority : int = 1 The priority of this request, larger is more prior - timeout: float, optional + timeout : int = 60 The timeout of this session (units: second) Returns - ------ - session: RPCSession + ------- + session : RPCSession """ # connect to the tracker host = host or os.environ['TVM_TRACKER_HOST'] @@ -235,17 +236,17 @@ def check_remote(device_key, host=None, port=None, priority=100, timeout=10): Parameters ---------- - device_key: string + device_key: str device key of registered device in tracker - host: host, optional + host: Optional[str] The host address of rpc tracker. If is none, will use environment variable "TVM_TRACKER_HOST" - port: int, optional + port: Optional[int] The port address of rpc tracker. If is none, will use environment variable "TVM_TRACKER_PORT" - priority: int, optional + priority: int = 100 The priority of this request, larger is more prior - timeout: float, optional + timeout: int = 10 The timeout of this check (units: seconds). Returns From 168c403fdc4c1e5a962a3f99bb7c052bba06817a Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Fri, 17 Jul 2020 10:34:25 +0800 Subject: [PATCH 4/7] Add clflush & non-empty ndarray TODO hints --- python/tvm/auto_scheduler/measure.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/python/tvm/auto_scheduler/measure.py b/python/tvm/auto_scheduler/measure.py index 985ac1d7bb64..3c4aeea5af67 100644 --- a/python/tvm/auto_scheduler/measure.py +++ b/python/tvm/auto_scheduler/measure.py @@ -200,6 +200,8 @@ def __init__(self, class LocalRunner(ProgramRunner): """ LocalRunner that uses local CPU/GPU to measures the time cost of programs. + TODO(FrozenGene): Add cpu cache flush to this runner + Parameters ---------- timeout : int = 10 @@ -242,6 +244,8 @@ class RPCRunner(ProgramRunner): Or sometime we may need to use RPC even in local running to insulate the thread environment. (e.g. running CUDA programs) + TODO(FrozenGene): Add cpu cache flush to this runner. + Parameters ---------- key : str @@ -297,6 +301,8 @@ class LocalRPCMeasureContext: """ A context wrapper for running RPCRunner locally. This will launch a local RPC Tracker and local RPC Server. + TODO(FrozenGene): Add cpu cache flush to this RPC context. + Parameters ---------- priority : int = 1 @@ -548,6 +554,7 @@ def timed_func(inp, build_res): try: func = module.load_module(build_res.filename) ctx = ndarray.context(str(inp.task.target), 0) + # TODO(FrozenGene): Add cpu cache flush to this function. time_f = func.time_evaluator( func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms) # pylint: disable=broad-except @@ -558,6 +565,7 @@ def timed_func(inp, build_res): if error_no == 0: try: + # TODO(FrozenGene): Update to ndarray.non-empty args = [ndarray.empty(get_const_tuple(x.shape), x.dtype, ctx) for x in build_res.args] ctx.sync() @@ -637,6 +645,7 @@ def timed_func(): remote.upload(build_res.filename) func = remote.load_module(os.path.split(build_res.filename)[1]) ctx = remote.context(str(inp.task.target), 0) + # TODO(FrozenGene): Add cpu cache flush to this function. time_f = func.time_evaluator( func.entry_name, ctx, number=number, repeat=repeat, min_repeat_ms=min_repeat_ms) # pylint: disable=broad-except @@ -647,6 +656,7 @@ def timed_func(): if error_no == 0: try: + # TODO(FrozenGene): Update to ndarray.non-empty args = [ndarray.empty(get_const_tuple(x.shape), x.dtype, ctx) for x in build_res.args] ctx.sync() From 7f08fbb8aae3d7dada412837549161c3061ef8f3 Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Fri, 17 Jul 2020 17:44:30 +0800 Subject: [PATCH 5/7] Update --- python/tvm/auto_scheduler/measure.py | 10 +++++----- python/tvm/auto_scheduler/utils.py | 29 ++++++++++++++-------------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/python/tvm/auto_scheduler/measure.py b/python/tvm/auto_scheduler/measure.py index 3c4aeea5af67..03ad23ef6a62 100644 --- a/python/tvm/auto_scheduler/measure.py +++ b/python/tvm/auto_scheduler/measure.py @@ -200,7 +200,7 @@ def __init__(self, class LocalRunner(ProgramRunner): """ LocalRunner that uses local CPU/GPU to measures the time cost of programs. - TODO(FrozenGene): Add cpu cache flush to this runner + TODO(FrozenGene): Add cpu cache flush to this runner. Parameters ---------- @@ -237,7 +237,6 @@ def __init__(self, _ffi_api.LocalRunner, timeout, number, repeat, min_repeat_ms, cooldown_interval) - @tvm._ffi.register_object("auto_scheduler.RPCRunner") class RPCRunner(ProgramRunner): """ RPCRunner that uses RPC call to measures the time cost of programs on remote devices. @@ -433,7 +432,8 @@ def timed_func(): dirname, "tmp_func." + build_func.output_format) try: - with transform.PassContext(): # todo(lmzheng): port the unroll pass + # TODO(merrymercy): Port the unroll pass. + with transform.PassContext(): func = build_module.build( sch, args, target=task.target, target_host=task.target_host) func.export_library(filename, build_func) @@ -565,7 +565,7 @@ def timed_func(inp, build_res): if error_no == 0: try: - # TODO(FrozenGene): Update to ndarray.non-empty + # TODO(FrozenGene): Update to ndarray.non-empty. args = [ndarray.empty(get_const_tuple(x.shape), x.dtype, ctx) for x in build_res.args] ctx.sync() @@ -656,7 +656,7 @@ def timed_func(): if error_no == 0: try: - # TODO(FrozenGene): Update to ndarray.non-empty + # TODO(FrozenGene): Update to ndarray.non-empty. args = [ndarray.empty(get_const_tuple(x.shape), x.dtype, ctx) for x in build_res.args] ctx.sync() diff --git a/python/tvm/auto_scheduler/utils.py b/python/tvm/auto_scheduler/utils.py index f34db44fc4b0..f5b53fb2a446 100644 --- a/python/tvm/auto_scheduler/utils.py +++ b/python/tvm/auto_scheduler/utils.py @@ -199,26 +199,27 @@ def func_wrapper(que): def request_remote(device_key, host=None, port=None, priority=1, timeout=60): - """Request a remote session + """ Request a remote session. Parameters ---------- device_key : str - The device key of registered device in tracker + The device key of registered device in tracker. host : Optional[str] The host address of rpc tracker. - If is none, will use environment variable "TVM_TRACKER_HOST" + If is none, will use environment variable "TVM_TRACKER_HOST". port : Optional[int] The port of rpc tracker. - If is none, will use environment variable "TVM_TRACKER_PORT" + If is none, will use environment variable "TVM_TRACKER_PORT". priority : int = 1 - The priority of this request, larger is more prior + The priority of this request, larger is more prior. timeout : int = 60 - The timeout of this session (units: second) + The timeout of this session in second. Returns ------- - session : RPCSession + remote : RPCSession + The connected remote RPCSession. """ # connect to the tracker host = host or os.environ['TVM_TRACKER_HOST'] @@ -232,27 +233,27 @@ def request_remote(device_key, host=None, port=None, priority=1, timeout=60): def check_remote(device_key, host=None, port=None, priority=100, timeout=10): """ - Check the availability of a remote device + Check the availability of a remote device. Parameters ---------- device_key: str - device key of registered device in tracker + device key of registered device in tracker. host: Optional[str] The host address of rpc tracker. - If is none, will use environment variable "TVM_TRACKER_HOST" + If is none, will use environment variable "TVM_TRACKER_HOST". port: Optional[int] The port address of rpc tracker. - If is none, will use environment variable "TVM_TRACKER_PORT" + If is none, will use environment variable "TVM_TRACKER_PORT". priority: int = 100 - The priority of this request, larger is more prior + The priority of this request, larger is more prior. timeout: int = 10 - The timeout of this check (units: seconds). + The timeout of this check in seconds. Returns ------- available: bool - True if can find available device + True if can find available device. """ def _check(): From 00e1f69ad0323847a5d97781f3a279efd8b1e7a4 Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Sat, 18 Jul 2020 16:32:00 +0800 Subject: [PATCH 6/7] UT Update --- tests/python/unittest/test_auto_scheduler_measure.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/python/unittest/test_auto_scheduler_measure.py b/tests/python/unittest/test_auto_scheduler_measure.py index 4309a9adfb77..8232d3e6d017 100644 --- a/tests/python/unittest/test_auto_scheduler_measure.py +++ b/tests/python/unittest/test_auto_scheduler_measure.py @@ -25,10 +25,10 @@ def test_record(): - dag, s = get_tiled_matmul() - if not tvm.runtime.enabled("llvm"): return + + dag, s = get_tiled_matmul() target = tvm.target.create("llvm") task = auto_scheduler.SearchTask(dag, "test", target) @@ -50,10 +50,10 @@ def test_record(): def test_measure_local_builder_runner(): - dag, s0 = get_tiled_matmul() - if not tvm.runtime.enabled("llvm"): return + + dag, s0 = get_tiled_matmul() tgt = tvm.target.create("llvm") task = auto_scheduler.SearchTask(dag, "test", tgt) @@ -68,10 +68,10 @@ def test_measure_local_builder_runner(): def test_measure_local_builder_rpc_runner(): - dag, s0 = get_tiled_matmul() - if not tvm.runtime.enabled("llvm"): return + + dag, s0 = get_tiled_matmul() tgt = tvm.target.create("llvm") task = auto_scheduler.SearchTask(dag, "test", tgt) From b6b25e0dfd485ef6631acabe24353a9f2e3bd832 Mon Sep 17 00:00:00 2001 From: "chengfan.jcf" Date: Mon, 20 Jul 2020 09:41:16 +0800 Subject: [PATCH 7/7] Update timeout in UT --- tests/python/unittest/test_auto_scheduler_measure.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/python/unittest/test_auto_scheduler_measure.py b/tests/python/unittest/test_auto_scheduler_measure.py index 8232d3e6d017..d6e6c51a28ba 100644 --- a/tests/python/unittest/test_auto_scheduler_measure.py +++ b/tests/python/unittest/test_auto_scheduler_measure.py @@ -59,7 +59,7 @@ def test_measure_local_builder_runner(): minp = auto_scheduler.MeasureInput(task, s0) local_builder = auto_scheduler.LocalBuilder() - local_runner = auto_scheduler.LocalRunner() + local_runner = auto_scheduler.LocalRunner(timeout=60) bress = local_builder.build([minp]) assert bress[0].error_no == 0 @@ -77,7 +77,7 @@ def test_measure_local_builder_rpc_runner(): minp = auto_scheduler.MeasureInput(task, s0) local_builder = auto_scheduler.LocalBuilder() - measure_ctx = auto_scheduler.LocalRPCMeasureContext() + measure_ctx = auto_scheduler.LocalRPCMeasureContext(timeout=60) rpc_runner = measure_ctx.runner bress = local_builder.build([minp])