From be8cd977b02e6685ed87c2571fb8fb691d2abb16 Mon Sep 17 00:00:00 2001 From: Yuanjing Shi Date: Thu, 23 Sep 2021 16:02:40 -0700 Subject: [PATCH 1/8] [Meta Schedule][M3a]Local runner (#479) * localrunner * localrunner init * linting * address comments * exception handling * single run testcase * two more cases added * add exception case * one case with AddModule added * address comments * address comments * remove unused dependency * optional arguments --- python/tvm/meta_schedule/runner/__init__.py | 1 + .../tvm/meta_schedule/runner/local_runner.py | 353 ++++++++++++++++++ .../unittest/test_meta_schedule_runner.py | 303 +++++++++++++++ 3 files changed, 657 insertions(+) create mode 100644 python/tvm/meta_schedule/runner/local_runner.py diff --git a/python/tvm/meta_schedule/runner/__init__.py b/python/tvm/meta_schedule/runner/__init__.py index 47f4557e1d3a..8fc9acaf79cf 100644 --- a/python/tvm/meta_schedule/runner/__init__.py +++ b/python/tvm/meta_schedule/runner/__init__.py @@ -20,4 +20,5 @@ """ from .config import EvaluatorConfig, RPCConfig from .rpc_runner import RPCRunner +from .local_runner import LocalRunner, LocalRunnerFuture from .runner import PyRunner, Runner, RunnerFuture, RunnerInput, RunnerResult diff --git a/python/tvm/meta_schedule/runner/local_runner.py b/python/tvm/meta_schedule/runner/local_runner.py new file mode 100644 index 000000000000..bdf379991585 --- /dev/null +++ b/python/tvm/meta_schedule/runner/local_runner.py @@ -0,0 +1,353 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Local Runner""" +import itertools +from contextlib import contextmanager +from typing import Any, Callable, Dict, List, Optional, Union +import tvm + +from ...contrib.popen_pool import PopenPoolExecutor +from ...runtime import Device, Module, ndarray +from ..arg_info import ArgInfo +from ..utils import get_global_func_with_default_on_worker +from .config import EvaluatorConfig +from .runner import PyRunner, RunnerFuture, RunnerInput, RunnerResult +from .rpc_runner import T_ARG_INFO_JSON_OBJ, T_ARG_INFO_JSON_OBJ_LIST, T_ARGUMENT, T_ARGUMENT_LIST + + +class LocalRunnerFuture(RunnerFuture): + res: Optional[List[float]] + error_message: Optional[str] + + def __init__( + self, result: Optional[List[float]] = None, error_message: Optional[str] = None + ) -> None: + """Constructor + + Parameters + ---------- + res: Optional[List[float]] + The result of this LocalRunnerFuture + error_message: Optional[str] + The stringfied error message of any exception during execution + + """ + super().__init__() + self.res = result + self.error_message = error_message + + def done(self) -> bool: + return True + + def result(self) -> RunnerResult: + return RunnerResult(self.res, self.error_message) + + +class LocalRunner(PyRunner): + """Local runner + + Parameters + ---------- + evaluator_config: EvaluatorConfig + The evaluator configuration. + cooldown_sec: float + The cooldown in seconds. + alloc_repeat: int + The number of times to repeat the allocation. + f_alloc_argument: Optional[str, Callable] + The function name to allocate the arguments or the function itself. + f_run_evaluator: Optional[str, Callable] + The function name to run the evaluator or the function itself. + f_cleanup: Optional[str, Callable] + The function name to cleanup the session or the function itself. + pool: PopenPoolExecutor + The popen pool executor. + + Attributes + ---------- + T_ALLOC_ARGUMENT : typing._GenericAlias + The signature of the function `f_alloc_argument`, which is: + + .. code-block:: python + + def default_alloc_argument( + device: Device, + args_info: T_ARG_INFO_JSON_OBJ_LIST, + alloc_repeat: int, + ) -> List[T_ARGUMENT_LIST]: + ... + + T_RUN_EVALUATOR : typing._GenericAlias + The signature of the function `f_run_evaluator`, which is: + + .. code-block:: python + + def default_run_evaluator( + rt_mod: Module, + device: Device, + evaluator_config: EvaluatorConfig, + repeated_args: List[T_ARGUMENT_LIST], + ) -> List[float]: + ... + + T_CLEANUP : typing._GenericAlias + The signature of the function `f_cleanup`, which is: + + .. code-block:: python + + def default_cleanup() -> None: + ... + """ + + T_ALLOC_ARGUMENT = Callable[ + [ + Device, # The device on the remote + T_ARG_INFO_JSON_OBJ_LIST, # The metadata information of the arguments to be allocated + int, # The number of repeated allocations to be done + ], + List[T_ARGUMENT_LIST], # A list of argument lists + ] + T_RUN_EVALUATOR = Callable[ + [ + Module, # The Module opened on the remote + Device, # The device on the remote + EvaluatorConfig, # The evaluator configuration + List[T_ARGUMENT_LIST], # A list of argument lists + ], + List[float], # A list of running time + ] + T_CLEANUP = Callable[ + [], + None, + ] + + timeout_sec: float + evaluator_config: EvaluatorConfig + cooldown_sec: float + alloc_repeat: int + + f_alloc_argument: Union[T_ALLOC_ARGUMENT, str, None] + f_run_evaluator: Union[T_RUN_EVALUATOR, str, None] + f_cleanup: Union[T_CLEANUP, str, None] + + pool: PopenPoolExecutor + + def __init__( + self, + timeout_sec: float, + evaluator_config: Optional[EvaluatorConfig] = None, + cooldown_sec: float = 0.0, + alloc_repeat: int = 1, + f_alloc_argument: Optional[str] = None, + f_run_evaluator: Optional[str] = None, + f_cleanup: Optional[str] = None, + initializer: Optional[Callable[[], None]] = None, + ) -> None: + super().__init__() + self.timeout_sec = timeout_sec + self.evaluator_config = EvaluatorConfig._normalized(evaluator_config) + self.cooldown_sec = cooldown_sec + self.alloc_repeat = alloc_repeat + self.f_alloc_argument = f_alloc_argument + self.f_run_evaluator = f_run_evaluator + self.f_cleanup = f_cleanup + + self.pool = PopenPoolExecutor( + max_workers=1, # one local worker + timeout=timeout_sec, + initializer=initializer, + ) + + def run(self, runner_inputs: List[RunnerInput]) -> List[RunnerFuture]: + results: List[RunnerFuture] = [] + for runner_input in runner_inputs: + future = self.pool.submit( + LocalRunner._worker_func, + self.f_alloc_argument, + self.f_run_evaluator, + self.f_cleanup, + self.evaluator_config, + self.alloc_repeat, + str(runner_input.artifact_path), + str(runner_input.device_type), + tuple(arg_info.as_json() for arg_info in runner_input.args_info), + ) + try: + result: List[float] = future.result() + error_message: str = None + except TimeoutError as exception: + result: List[float] = None + error_message: str = ( + f"LocalRunner: Timeout, killed after {self.timeout_sec} seconds\n" + ) + except Exception as exception: # pylint: disable=broad-except + result: List[float] = None + error_message: str = "LocalRunner: An exception occurred\n" + str(exception) + local_future = LocalRunnerFuture(result=result, error_message=error_message) + results.append(local_future) + return results + + @staticmethod + def _worker_func( + _f_alloc_argument: Optional[str], + _f_run_evaluator: Optional[str], + _f_cleanup: Optional[str], + evaluator_config: EvaluatorConfig, + alloc_repeat: int, + artifact_path: str, + device_type: str, + args_info: T_ARG_INFO_JSON_OBJ_LIST, + ) -> List[float]: + f_alloc_argument: LocalRunner.T_ALLOC_ARGUMENT = get_global_func_with_default_on_worker( + _f_alloc_argument, default_alloc_argument + ) + f_run_evaluator: LocalRunner.T_RUN_EVALUATOR = get_global_func_with_default_on_worker( + _f_run_evaluator, default_run_evaluator + ) + f_cleanup: LocalRunner.T_CLEANUP = get_global_func_with_default_on_worker( + _f_cleanup, default_cleanup + ) + + @contextmanager + def resource_handler(): + try: + yield + finally: + # Step 5. Clean up + f_cleanup() + + with resource_handler(): + # Step 1: create the local runtime module + rt_mod = tvm.runtime.load_module(artifact_path) + # Step 2: create the local device + device = tvm.runtime.device(dev_type=device_type, dev_id=0) + # Step 3: Allocate input arguments + repeated_args: List[T_ARGUMENT_LIST] = f_alloc_argument( + device, + args_info, + alloc_repeat, + ) + # Step 4: Run time_evaluator + costs: List[float] = f_run_evaluator( + rt_mod, + device, + evaluator_config, + repeated_args, + ) + return costs + + +def default_alloc_argument( + device: Device, + args_info: T_ARG_INFO_JSON_OBJ_LIST, + alloc_repeat: int, +) -> List[T_ARGUMENT_LIST]: + """Default function to allocate the arguments + + Parameters + ---------- + device: Device + The device to allocate the arguments + args_info: T_ARG_INFO_JSON_OBJ_LIST + The arguments info + alloc_repeat: int + The number of times to repeat the allocation + + Returns + ------- + repeated_args: List[T_ARGUMENT_LIST] + The allocation args + """ + try: + f_random_fill = tvm.get_global_func("tvm.contrib.random.random_fill", True) + except AttributeError as error: + raise AttributeError( + 'Unable to find function "tvm.contrib.random.random_fill" on local runner. ' + "Please make sure USE_RANDOM is turned ON in the config.cmake." + ) from error + + def alloc_tensor(_, dtype, shape) -> ndarray.NDArray: + arg = ndarray.empty(shape=shape, dtype=dtype, device=device) + f_random_fill(arg) + return arg + + def alloc_fail(*arg_info) -> None: + raise NotImplementedError(arg_info) + + dispatcher: Dict[Any, Callable] = { + "TENSOR": alloc_tensor, + None: alloc_fail, + } + + repeated_args: List[T_ARGUMENT_LIST] = [] + for _ in range(alloc_repeat): + args: T_ARGUMENT_LIST = [] + arg_info: T_ARG_INFO_JSON_OBJ + for arg_info in args_info: + arg_type = arg_info[0] + arg: Any = dispatcher.get(arg_type, None)(*arg_info) + args.append(arg) + repeated_args.append(args) + return repeated_args + + +def default_run_evaluator( + rt_mod: Module, + device: Device, + evaluator_config: EvaluatorConfig, + repeated_args: List[T_ARGUMENT_LIST], +) -> List[float]: + """Default function to run the evaluator + + Parameters + ---------- + rt_mod: Module + The runtime module + device: Device + The device to run the evaluator + evaluator_config: EvaluatorConfig + The evaluator config + repeated_args: List[T_ARGUMENT_LIST] + The repeated arguments + + Returns + ------- + costs: List[float] + The evaluator results + """ + evaluator = rt_mod.time_evaluator( + func_name=rt_mod.entry_name, + dev=device, + number=evaluator_config.number, + repeat=evaluator_config.repeat, + min_repeat_ms=evaluator_config.min_repeat_ms, + f_preproc="cache_flush_cpu_non_first_arg" + if evaluator_config.enable_cpu_cache_flush + else "", + ) + repeated_costs: List[List[float]] = [] + for args in repeated_args: + device.sync() + profile_result = evaluator(*args) + repeated_costs.append(profile_result.results) + costs = [float(cost) for cost in itertools.chain.from_iterable(repeated_costs)] + return costs + + +def default_cleanup() -> None: + """Default function to clean up the session""" + pass diff --git a/tests/python/unittest/test_meta_schedule_runner.py b/tests/python/unittest/test_meta_schedule_runner.py index 3c8aee0c6d58..df6c03b07fa1 100644 --- a/tests/python/unittest/test_meta_schedule_runner.py +++ b/tests/python/unittest/test_meta_schedule_runner.py @@ -31,6 +31,7 @@ from tvm.meta_schedule.builder import BuilderInput, LocalBuilder from tvm.meta_schedule.runner import ( EvaluatorConfig, + LocalRunner, PyRunner, RPCConfig, RPCRunner, @@ -39,6 +40,11 @@ ) from tvm.meta_schedule.runner.rpc_runner import ( default_alloc_argument as rpc_default_alloc_argument, + T_ARG_INFO_JSON_OBJ_LIST, + T_ARGUMENT_LIST, +) +from tvm.meta_schedule.runner.local_runner import ( + default_alloc_argument as local_default_alloc_argument, ) from tvm.meta_schedule.testing import LocalRPC from tvm.meta_schedule.utils import get_global_func_with_default_on_worker @@ -165,6 +171,44 @@ def test_meta_schedule_rpc_single_run(): _clean_build(builder_result.artifact_path) +def test_meta_schedule_local_single_run(): + """Test meta schedule local runner for a single run""" + # Build the module + mod = MatmulModule() + builder = LocalBuilder() + (builder_result,) = builder.build([BuilderInput(mod, Target("llvm"))]) + assert builder_result.artifact_path is not None + assert builder_result.error_msg is None + + runner_input = RunnerInput( + builder_result.artifact_path, + "llvm", + [ + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + ], + ) + + evaluator_config = EvaluatorConfig( + number=1, + repeat=1, + min_repeat_ms=0, + enable_cpu_cache_flush=False, + ) + runner = LocalRunner(timeout_sec=100, evaluator_config=evaluator_config) + # Run the module + (runner_future,) = runner.run([runner_input]) + runner_result = runner_future.result() + assert runner_result.error_msg is None + for result in runner_result.run_secs: + if isinstance(result, FloatImm): + result = result.value + assert isinstance(result, float) + assert result >= 0.0 + _clean_build(builder_result.artifact_path) + + def test_meta_schedule_rpc_multiple_runs(): """Test meta schedule rpc runner for multiple runs""" # Build the module @@ -234,6 +278,69 @@ def test_meta_schedule_rpc_multiple_runs(): _clean_build(builder_result.artifact_path) +def test_meta_schedule_local_multiple_runs(): + """Test meta schedule local runner for multiple runs""" + # Build the module + mods = [ + MatmulModule(), + MatmulReluModule(), + BatchMatmulModule(), + ] + builder = LocalBuilder() + builder_inputs = [BuilderInput(mod, Target("llvm")) for mod in mods] + builder_results = builder.build(builder_inputs) + for builder_result in builder_results: + assert builder_result.artifact_path is not None + assert builder_result.error_msg is None + + args_infos = [ + [ + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + ], + [ + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + ], + [ + TensorInfo("float32", [16, MATMUL_M, MATMUL_M]), + TensorInfo("float32", [16, MATMUL_M, MATMUL_M]), + TensorInfo("float32", [16, MATMUL_M, MATMUL_M]), + ], + ] + + runner_inputs = [ + RunnerInput(builder_results[i].artifact_path, "llvm", args_infos[i]) + for i in range(len(mods)) + ] + + evaluator_config = EvaluatorConfig( + number=1, + repeat=1, + min_repeat_ms=0, + enable_cpu_cache_flush=False, + ) + + runner = LocalRunner(timeout_sec=100, evaluator_config=evaluator_config) + + # Run the module + runner_futures = runner.run(runner_inputs) + runner_results = [runner_future.result() for runner_future in runner_futures] + + for runner_result in runner_results: + assert runner_result.error_msg is None + for result in runner_result.run_secs: + if isinstance(result, FloatImm): + result = result.value + assert isinstance(result, float) + assert result >= 0.0 + + for builder_result in builder_results: + _clean_build(builder_result.artifact_path) + + def test_meta_schedule_py_runner(): """Test meta schedule PyRunner""" @@ -296,6 +403,57 @@ def timeout_session_creator( # pylint: disable=unused-variable assert runner_result.run_secs is None +def test_meta_schedule_local_runner_time_out(): + """Test meta schedule Local Runner time out""" + mod = MatmulModule() + builder = LocalBuilder() + (builder_result,) = builder.build([BuilderInput(mod, Target("llvm"))]) + assert builder_result.artifact_path is not None + assert builder_result.error_msg is None + + runner_input = RunnerInput( + builder_result.artifact_path, + "llvm", + [ + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + ], + ) + + def initializer(): + @register_func("meta_schedule.runner.test_time_out") + def timeout_session_creator( # pylint: disable=unused-variable + device: Device, # pylint: disable=unused-argument + args_info: T_ARG_INFO_JSON_OBJ_LIST, # pylint: disable=unused-argument + alloc_repeat: int, # pylint: disable=unused-argument + ) -> RPCSession: + time.sleep(2) + + evaluator_config = EvaluatorConfig( + number=1, + repeat=1, + min_repeat_ms=0, + enable_cpu_cache_flush=False, + ) + + runner = LocalRunner( + timeout_sec=1, + evaluator_config=evaluator_config, + initializer=initializer, + f_alloc_argument="meta_schedule.runner.test_time_out", + ) + + # Run the module + (runner_future,) = runner.run([runner_input]) + runner_result = runner_future.result() + + assert runner_result.error_msg is not None and runner_result.error_msg.startswith( + "LocalRunner: Timeout, killed after" + ) + assert runner_result.run_secs is None + + def test_meta_schedule_rpc_runner_exception(): """Test meta schedule RPC Runner exception""" @@ -345,6 +503,57 @@ def exception_session_creator( # pylint: disable=unused-variable assert runner_result.run_secs is None +def test_meta_schedule_local_runner_exception(): + """Test meta schedule Local Runner time out""" + mod = MatmulModule() + builder = LocalBuilder() + (builder_result,) = builder.build([BuilderInput(mod, Target("llvm"))]) + assert builder_result.artifact_path is not None + assert builder_result.error_msg is None + + runner_input = RunnerInput( + builder_result.artifact_path, + "llvm", + [ + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + ], + ) + + def initializer(): + @register_func("meta_schedule.runner.test_exception") + def timeout_session_creator( # pylint: disable=unused-variable + device: Device, # pylint: disable=unused-argument + args_info: T_ARG_INFO_JSON_OBJ_LIST, # pylint: disable=unused-argument + alloc_repeat: int, # pylint: disable=unused-argument + ) -> RPCSession: + raise Exception("Test") + + evaluator_config = EvaluatorConfig( + number=1, + repeat=1, + min_repeat_ms=0, + enable_cpu_cache_flush=False, + ) + + runner = LocalRunner( + timeout_sec=1, + evaluator_config=evaluator_config, + initializer=initializer, + f_alloc_argument="meta_schedule.runner.test_exception", + ) + + # Run the module + (runner_future,) = runner.run([runner_input]) + runner_result = runner_future.result() + + assert runner_result.error_msg is not None and runner_result.error_msg.startswith( + "LocalRunner: An exception occurred\n" + ) + assert runner_result.run_secs is None + + def test_meta_schedule_runner_matmul_test(): """Test meta schedule runner with add module""" @@ -567,5 +776,99 @@ def test_run_evaluator( _clean_build(builder_result.artifact_path) +def test_meta_schedule_local_runner_add_test(): + """Test meta schedule local runner with add module""" + + def _check_correct_add(args_before: List[np.array], args_after: List[np.array]) -> None: + a_before, b_before, c_before = args_before + a_after, b_after, c_after = args_after + c_before = a_before + b_before + assert (a_before == a_after).all() + assert (b_before == b_after).all() + assert (c_before == c_after).all() + + def test_alloc_argument( + device: Device, + args_info: T_ARG_INFO_JSON_OBJ_LIST, # pylint: disable=unused-argument + alloc_repeat: int, + ) -> List[T_ARGUMENT_LIST]: + global repeated_args_before # pylint: disable=global-variable-undefined, invalid-name + repeated_args_before = [] + repeated_args = local_default_alloc_argument(device, args_info, alloc_repeat) + for args in repeated_args: + repeated_args_before.append([arg.asnumpy() for arg in args]) + return repeated_args + + def test_run_evaluator( + rt_mod: Module, + device: Device, + evaluator_config: EvaluatorConfig, + repeated_args: List[Any], + ) -> List[float]: + global repeated_args_before # pylint: disable=global-variable-undefined, invalid-name + repeated_args_after = [] + evaluator = rt_mod.time_evaluator( + func_name=rt_mod.entry_name, + dev=device, + number=evaluator_config.number, + repeat=evaluator_config.repeat, + min_repeat_ms=evaluator_config.min_repeat_ms, + f_preproc="cache_flush_cpu_non_first_arg" + if evaluator_config.enable_cpu_cache_flush + else "", + ) + repeated_costs: List[List[float]] = [] + for args in repeated_args: + device.sync() + profile_result = evaluator(*args) + repeated_costs.append(profile_result.results) + repeated_args_after.append([arg.asnumpy() for arg in args]) + costs = [float(cost) for cost in itertools.chain.from_iterable(repeated_costs)] + for args_before, args_after in zip(repeated_args_before, repeated_args_after): + _check_correct_add(args_before, args_after) + del repeated_args_before + return costs + + # Build the module + mod = AddModule() + builder = LocalBuilder() + (builder_result,) = builder.build([BuilderInput(mod, Target("llvm"))]) + assert builder_result.artifact_path is not None + assert builder_result.error_msg is None + + runner_input = RunnerInput( + builder_result.artifact_path, + "llvm", + [ + TensorInfo("float32", [MATMUL_M]), + TensorInfo("float32", [MATMUL_M]), + TensorInfo("float32", [MATMUL_M]), + ], + ) + + evaluator_config = EvaluatorConfig( + number=1, + repeat=1, + min_repeat_ms=0, + enable_cpu_cache_flush=False, + ) + runner = LocalRunner( + timeout_sec=100, + evaluator_config=evaluator_config, + f_alloc_argument=test_alloc_argument, + f_run_evaluator=test_run_evaluator, + ) + # Run the module + (runner_future,) = runner.run([runner_input]) + runner_result = runner_future.result() + assert runner_result.error_msg is None + for result in runner_result.run_secs: + if isinstance(result, FloatImm): + result = result.value + assert isinstance(result, float) + assert result >= 0.0 + _clean_build(builder_result.artifact_path) + + if __name__ == "__main__": sys.exit(pytest.main([__file__] + sys.argv[1:])) From 2f914222c2260776988c0d2596055d1458d65c93 Mon Sep 17 00:00:00 2001 From: shingjan Date: Wed, 29 Sep 2021 15:08:46 -0700 Subject: [PATCH 2/8] linting --- .../tvm/meta_schedule/runner/local_runner.py | 20 ++++++++++++++++--- python/tvm/meta_schedule/runner/rpc_runner.py | 6 +++--- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/python/tvm/meta_schedule/runner/local_runner.py b/python/tvm/meta_schedule/runner/local_runner.py index bdf379991585..53ee38e7a60c 100644 --- a/python/tvm/meta_schedule/runner/local_runner.py +++ b/python/tvm/meta_schedule/runner/local_runner.py @@ -22,14 +22,28 @@ from ...contrib.popen_pool import PopenPoolExecutor from ...runtime import Device, Module, ndarray -from ..arg_info import ArgInfo from ..utils import get_global_func_with_default_on_worker from .config import EvaluatorConfig from .runner import PyRunner, RunnerFuture, RunnerInput, RunnerResult -from .rpc_runner import T_ARG_INFO_JSON_OBJ, T_ARG_INFO_JSON_OBJ_LIST, T_ARGUMENT, T_ARGUMENT_LIST +from .rpc_runner import T_ARG_INFO_JSON_OBJ, T_ARG_INFO_JSON_OBJ_LIST, T_ARGUMENT_LIST class LocalRunnerFuture(RunnerFuture): + """Local based runner future + + Parameters + ---------- + res: Optional[List[float]] + The optional result as a list of float. + error_message: Optional[str] + The optional error message. + + Note + ---- + Either one of the parameters will be None upon the creation + of LocalRunnerFuture object + """ + res: Optional[List[float]] error_message: Optional[str] @@ -350,4 +364,4 @@ def default_run_evaluator( def default_cleanup() -> None: """Default function to clean up the session""" - pass + pass # pylint: disable=unnecessary-pass diff --git a/python/tvm/meta_schedule/runner/rpc_runner.py b/python/tvm/meta_schedule/runner/rpc_runner.py index 5080fd4c95fc..e07ff372d373 100644 --- a/python/tvm/meta_schedule/runner/rpc_runner.py +++ b/python/tvm/meta_schedule/runner/rpc_runner.py @@ -454,10 +454,10 @@ def default_alloc_argument( The session to allocate the arguments device: Device The device to allocate the arguments + args_info: T_ARG_INFO_JSON_OBJ_LIST + The arguments info alloc_repeat: int The number of times to repeat the allocation - args_info: PyArgsInfo - The arguments info Returns ------- @@ -514,7 +514,7 @@ def default_run_evaluator( The device to run the evaluator evaluator_config: EvaluatorConfig The evaluator config - repeated_args: List[Args] + repeated_args: List[T_ARGUMENT_LIST] The repeated arguments Returns From cb3c7d731407399b4e01ef307f65254867fac053 Mon Sep 17 00:00:00 2001 From: shingjan Date: Wed, 29 Sep 2021 17:25:26 -0700 Subject: [PATCH 3/8] add utils --- .../tvm/meta_schedule/runner/local_runner.py | 51 ++------ python/tvm/meta_schedule/runner/rpc_runner.py | 55 ++------ python/tvm/meta_schedule/runner/utils.py | 119 ++++++++++++++++++ 3 files changed, 137 insertions(+), 88 deletions(-) create mode 100644 python/tvm/meta_schedule/runner/utils.py diff --git a/python/tvm/meta_schedule/runner/local_runner.py b/python/tvm/meta_schedule/runner/local_runner.py index 53ee38e7a60c..be12660091d7 100644 --- a/python/tvm/meta_schedule/runner/local_runner.py +++ b/python/tvm/meta_schedule/runner/local_runner.py @@ -25,7 +25,13 @@ from ..utils import get_global_func_with_default_on_worker from .config import EvaluatorConfig from .runner import PyRunner, RunnerFuture, RunnerInput, RunnerResult -from .rpc_runner import T_ARG_INFO_JSON_OBJ, T_ARG_INFO_JSON_OBJ_LIST, T_ARGUMENT_LIST +from .utils import ( + T_ARG_INFO_JSON_OBJ, + T_ARG_INFO_JSON_OBJ_LIST, + T_ARGUMENT_LIST, + alloc_argument_common, + run_evaluator_common, +) class LocalRunnerFuture(RunnerFuture): @@ -293,30 +299,7 @@ def default_alloc_argument( 'Unable to find function "tvm.contrib.random.random_fill" on local runner. ' "Please make sure USE_RANDOM is turned ON in the config.cmake." ) from error - - def alloc_tensor(_, dtype, shape) -> ndarray.NDArray: - arg = ndarray.empty(shape=shape, dtype=dtype, device=device) - f_random_fill(arg) - return arg - - def alloc_fail(*arg_info) -> None: - raise NotImplementedError(arg_info) - - dispatcher: Dict[Any, Callable] = { - "TENSOR": alloc_tensor, - None: alloc_fail, - } - - repeated_args: List[T_ARGUMENT_LIST] = [] - for _ in range(alloc_repeat): - args: T_ARGUMENT_LIST = [] - arg_info: T_ARG_INFO_JSON_OBJ - for arg_info in args_info: - arg_type = arg_info[0] - arg: Any = dispatcher.get(arg_type, None)(*arg_info) - args.append(arg) - repeated_args.append(args) - return repeated_args + return alloc_argument_common(f_random_fill, device, args_info, alloc_repeat) def default_run_evaluator( @@ -343,23 +326,7 @@ def default_run_evaluator( costs: List[float] The evaluator results """ - evaluator = rt_mod.time_evaluator( - func_name=rt_mod.entry_name, - dev=device, - number=evaluator_config.number, - repeat=evaluator_config.repeat, - min_repeat_ms=evaluator_config.min_repeat_ms, - f_preproc="cache_flush_cpu_non_first_arg" - if evaluator_config.enable_cpu_cache_flush - else "", - ) - repeated_costs: List[List[float]] = [] - for args in repeated_args: - device.sync() - profile_result = evaluator(*args) - repeated_costs.append(profile_result.results) - costs = [float(cost) for cost in itertools.chain.from_iterable(repeated_costs)] - return costs + return run_evaluator_common(rt_mod, device, evaluator_config, repeated_args) def default_cleanup() -> None: diff --git a/python/tvm/meta_schedule/runner/rpc_runner.py b/python/tvm/meta_schedule/runner/rpc_runner.py index e07ff372d373..ae68d517c7a6 100644 --- a/python/tvm/meta_schedule/runner/rpc_runner.py +++ b/python/tvm/meta_schedule/runner/rpc_runner.py @@ -31,6 +31,13 @@ ) from .config import EvaluatorConfig, RPCConfig from .runner import PyRunner, RunnerFuture, RunnerInput, RunnerResult +from .utils import ( + T_ARG_INFO_JSON_OBJ, + T_ARG_INFO_JSON_OBJ_LIST, + T_ARGUMENT_LIST, + alloc_argument_common, + run_evaluator_common, +) class RPCRunnerFuture(RunnerFuture): @@ -80,12 +87,6 @@ def result(self) -> RunnerResult: return RunnerResult(run_secs, None) -T_ARG_INFO_JSON_OBJ = List[Any] # pylint: disable=invalid-name -T_ARG_INFO_JSON_OBJ_LIST = List[T_ARG_INFO_JSON_OBJ] # pylint: disable=invalid-name -T_ARGUMENT = Any # pylint: disable=invalid-name -T_ARGUMENT_LIST = List[T_ARGUMENT] # pylint: disable=invalid-name - - class RPCRunner(PyRunner): """RPC based runner @@ -470,29 +471,7 @@ def default_alloc_argument( "Please make sure 'USE_RANDOM' is turned ON in the config.cmake on the RPC server.", ) - def alloc_tensor(_, dtype, shape) -> ndarray.NDArray: - arg = ndarray.empty(shape=shape, dtype=dtype, device=device) - f_random_fill(arg) - return arg - - def alloc_fail(*arg_info) -> None: - raise NotImplementedError(arg_info) - - dispatcher: Dict[Any, Callable] = { - "TENSOR": alloc_tensor, - None: alloc_fail, - } - - repeated_args: List[T_ARGUMENT_LIST] = [] - for _ in range(alloc_repeat): - args: T_ARGUMENT_LIST = [] - arg_info: T_ARG_INFO_JSON_OBJ - for arg_info in args_info: - arg_type = arg_info[0] - arg: Any = dispatcher.get(arg_type, None)(*arg_info) - args.append(arg) - repeated_args.append(args) - return repeated_args + return alloc_argument_common(f_random_fill, device, args_info, alloc_repeat) def default_run_evaluator( @@ -522,23 +501,7 @@ def default_run_evaluator( costs: List[float] The evaluator results """ - evaluator = rt_mod.time_evaluator( - func_name=rt_mod.entry_name, - dev=device, - number=evaluator_config.number, - repeat=evaluator_config.repeat, - min_repeat_ms=evaluator_config.min_repeat_ms, - f_preproc="cache_flush_cpu_non_first_arg" - if evaluator_config.enable_cpu_cache_flush - else "", - ) - repeated_costs: List[List[float]] = [] - for args in repeated_args: - device.sync() - profile_result = evaluator(*args) - repeated_costs.append(profile_result.results) - costs = [float(cost) for cost in itertools.chain.from_iterable(repeated_costs)] - return costs + return run_evaluator_common(rt_mod, device, evaluator_config, repeated_args) def default_cleanup( diff --git a/python/tvm/meta_schedule/runner/utils.py b/python/tvm/meta_schedule/runner/utils.py new file mode 100644 index 000000000000..dae93e68c14c --- /dev/null +++ b/python/tvm/meta_schedule/runner/utils.py @@ -0,0 +1,119 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import itertools +from typing import Any, Callable, Dict, List, Optional, Union + +from ...runtime import Device, Module, ndarray +from .config import EvaluatorConfig + +T_ARG_INFO_JSON_OBJ = List[Any] # pylint: disable=invalid-name +T_ARG_INFO_JSON_OBJ_LIST = List[T_ARG_INFO_JSON_OBJ] # pylint: disable=invalid-name +T_ARGUMENT = Any # pylint: disable=invalid-name +T_ARGUMENT_LIST = List[T_ARGUMENT] # pylint: disable=invalid-name + + +def alloc_argument_common( + f_random_fill: Callable, + device: Device, + args_info: T_ARG_INFO_JSON_OBJ_LIST, + alloc_repeat: int, +) -> List[T_ARGUMENT_LIST]: + """Common function to allocate the arguments + + Parameters + ---------- + f_random_fill: Callable + The callable function for random fill + device: Device + The device to allocate the arguments + args_info: T_ARG_INFO_JSON_OBJ_LIST + The arguments info + alloc_repeat: int + The number of times to repeat the allocation + + Returns + ------- + repeated_args: List[T_ARGUMENT_LIST] + The allocation args + """ + + def alloc_tensor(_, dtype, shape) -> ndarray.NDArray: + arg = ndarray.empty(shape=shape, dtype=dtype, device=device) + f_random_fill(arg) + return arg + + def alloc_fail(*arg_info) -> None: + raise NotImplementedError(arg_info) + + dispatcher: Dict[Any, Callable] = { + "TENSOR": alloc_tensor, + None: alloc_fail, + } + + repeated_args: List[T_ARGUMENT_LIST] = [] + for _ in range(alloc_repeat): + args: T_ARGUMENT_LIST = [] + arg_info: T_ARG_INFO_JSON_OBJ + for arg_info in args_info: + arg_type = arg_info[0] + arg: Any = dispatcher.get(arg_type, None)(*arg_info) + args.append(arg) + repeated_args.append(args) + return repeated_args + + +def run_evaluator_common( + rt_mod: Module, + device: Device, + evaluator_config: EvaluatorConfig, + repeated_args: List[T_ARGUMENT_LIST], +) -> List[float]: + """Common function to run the evaluator + + Parameters + ---------- + rt_mod: Module + The runtime module + device: Device + The device to run the evaluator + evaluator_config: EvaluatorConfig + The evaluator config + repeated_args: List[T_ARGUMENT_LIST] + The repeated arguments + + Returns + ------- + costs: List[float] + The evaluator results + """ + evaluator = rt_mod.time_evaluator( + func_name=rt_mod.entry_name, + dev=device, + number=evaluator_config.number, + repeat=evaluator_config.repeat, + min_repeat_ms=evaluator_config.min_repeat_ms, + f_preproc="cache_flush_cpu_non_first_arg" + if evaluator_config.enable_cpu_cache_flush + else "", + ) + repeated_costs: List[List[float]] = [] + for args in repeated_args: + device.sync() + profile_result = evaluator(*args) + repeated_costs.append(profile_result.results) + costs = [float(cost) for cost in itertools.chain.from_iterable(repeated_costs)] + return costs From 04ac2925e6ab1326052fe181532ed7f0e4e724a5 Mon Sep 17 00:00:00 2001 From: shingjan Date: Wed, 29 Sep 2021 17:30:24 -0700 Subject: [PATCH 4/8] linting --- python/tvm/meta_schedule/runner/local_runner.py | 6 ++---- python/tvm/meta_schedule/runner/rpc_runner.py | 6 ++---- python/tvm/meta_schedule/runner/utils.py | 3 ++- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/python/tvm/meta_schedule/runner/local_runner.py b/python/tvm/meta_schedule/runner/local_runner.py index be12660091d7..7321ee724503 100644 --- a/python/tvm/meta_schedule/runner/local_runner.py +++ b/python/tvm/meta_schedule/runner/local_runner.py @@ -15,18 +15,16 @@ # specific language governing permissions and limitations # under the License. """Local Runner""" -import itertools from contextlib import contextmanager -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Callable, List, Optional, Union import tvm from ...contrib.popen_pool import PopenPoolExecutor -from ...runtime import Device, Module, ndarray +from ...runtime import Device, Module from ..utils import get_global_func_with_default_on_worker from .config import EvaluatorConfig from .runner import PyRunner, RunnerFuture, RunnerInput, RunnerResult from .utils import ( - T_ARG_INFO_JSON_OBJ, T_ARG_INFO_JSON_OBJ_LIST, T_ARGUMENT_LIST, alloc_argument_common, diff --git a/python/tvm/meta_schedule/runner/rpc_runner.py b/python/tvm/meta_schedule/runner/rpc_runner.py index ae68d517c7a6..f32063e05e61 100644 --- a/python/tvm/meta_schedule/runner/rpc_runner.py +++ b/python/tvm/meta_schedule/runner/rpc_runner.py @@ -17,13 +17,12 @@ """RPC Runner""" import concurrent.futures from contextlib import contextmanager -import itertools import os.path as osp -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Callable, List, Optional, Union from tvm.contrib.popen_pool import PopenPoolExecutor from tvm.rpc import RPCSession -from tvm.runtime import Device, Module, ndarray +from tvm.runtime import Device, Module from ..utils import ( get_global_func_on_rpc_session, @@ -32,7 +31,6 @@ from .config import EvaluatorConfig, RPCConfig from .runner import PyRunner, RunnerFuture, RunnerInput, RunnerResult from .utils import ( - T_ARG_INFO_JSON_OBJ, T_ARG_INFO_JSON_OBJ_LIST, T_ARGUMENT_LIST, alloc_argument_common, diff --git a/python/tvm/meta_schedule/runner/utils.py b/python/tvm/meta_schedule/runner/utils.py index dae93e68c14c..ef0d4b5f98f7 100644 --- a/python/tvm/meta_schedule/runner/utils.py +++ b/python/tvm/meta_schedule/runner/utils.py @@ -14,8 +14,9 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +"""Runner utility functions""" import itertools -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List from ...runtime import Device, Module, ndarray from .config import EvaluatorConfig From a0cb3dac1e1b206d9166cb29ceffb813e0c26999 Mon Sep 17 00:00:00 2001 From: shingjan Date: Wed, 29 Sep 2021 18:34:32 -0700 Subject: [PATCH 5/8] address comments --- .../tvm/meta_schedule/runner/local_runner.py | 19 ++++++++++++++----- python/tvm/meta_schedule/runner/rpc_runner.py | 2 +- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/python/tvm/meta_schedule/runner/local_runner.py b/python/tvm/meta_schedule/runner/local_runner.py index 7321ee724503..8711de3be7d5 100644 --- a/python/tvm/meta_schedule/runner/local_runner.py +++ b/python/tvm/meta_schedule/runner/local_runner.py @@ -44,7 +44,7 @@ class LocalRunnerFuture(RunnerFuture): Note ---- - Either one of the parameters will be None upon the creation + Only one of the parameters should be None upon the creation of LocalRunnerFuture object """ @@ -52,7 +52,7 @@ class LocalRunnerFuture(RunnerFuture): error_message: Optional[str] def __init__( - self, result: Optional[List[float]] = None, error_message: Optional[str] = None + self, res: Optional[List[float]] = None, error_message: Optional[str] = None ) -> None: """Constructor @@ -65,9 +65,18 @@ def __init__( """ super().__init__() - self.res = result + self.res = res self.error_message = error_message + # sanity check upon the creation of LocalRunnerFuture object + if (res is None and error_message is None) or ( + res is not None and error_message is not None + ): + raise AttributeError( + "Only one of the two parameters should be None upon the creation" + "of LocalRunnerFuture object." + ) + def done(self) -> bool: return True @@ -215,7 +224,7 @@ def run(self, runner_inputs: List[RunnerInput]) -> List[RunnerFuture]: except Exception as exception: # pylint: disable=broad-except result: List[float] = None error_message: str = "LocalRunner: An exception occurred\n" + str(exception) - local_future = LocalRunnerFuture(result=result, error_message=error_message) + local_future = LocalRunnerFuture(res=result, error_message=error_message) results.append(local_future) return results @@ -245,7 +254,7 @@ def resource_handler(): try: yield finally: - # Step 5. Clean up + # Final step. Always clean up… f_cleanup() with resource_handler(): diff --git a/python/tvm/meta_schedule/runner/rpc_runner.py b/python/tvm/meta_schedule/runner/rpc_runner.py index f32063e05e61..c89da269ea94 100644 --- a/python/tvm/meta_schedule/runner/rpc_runner.py +++ b/python/tvm/meta_schedule/runner/rpc_runner.py @@ -368,7 +368,7 @@ def resource_handler(): try: yield finally: - # Step 5. Clean up + # Final step. Always clean up… f_cleanup(session, remote_path) with resource_handler(): From f91c66e7ea8661dd535559ded1c4e9421a518a58 Mon Sep 17 00:00:00 2001 From: shingjan Date: Thu, 30 Sep 2021 10:54:00 -0700 Subject: [PATCH 6/8] remove non-ascii commennt --- python/tvm/meta_schedule/runner/local_runner.py | 2 +- python/tvm/meta_schedule/runner/rpc_runner.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/tvm/meta_schedule/runner/local_runner.py b/python/tvm/meta_schedule/runner/local_runner.py index 8711de3be7d5..e0e358aeefa2 100644 --- a/python/tvm/meta_schedule/runner/local_runner.py +++ b/python/tvm/meta_schedule/runner/local_runner.py @@ -254,7 +254,7 @@ def resource_handler(): try: yield finally: - # Final step. Always clean up… + # Final step. Always clean up f_cleanup() with resource_handler(): diff --git a/python/tvm/meta_schedule/runner/rpc_runner.py b/python/tvm/meta_schedule/runner/rpc_runner.py index c89da269ea94..3ba1c1dccf5f 100644 --- a/python/tvm/meta_schedule/runner/rpc_runner.py +++ b/python/tvm/meta_schedule/runner/rpc_runner.py @@ -368,7 +368,7 @@ def resource_handler(): try: yield finally: - # Final step. Always clean up… + # Final step. Always clean up f_cleanup(session, remote_path) with resource_handler(): From 2d80cdcb424018272e93846ce6cdacc0522522fc Mon Sep 17 00:00:00 2001 From: shingjan Date: Thu, 30 Sep 2021 10:58:39 -0700 Subject: [PATCH 7/8] add sanity check --- .../tvm/meta_schedule/runner/local_runner.py | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/python/tvm/meta_schedule/runner/local_runner.py b/python/tvm/meta_schedule/runner/local_runner.py index e0e358aeefa2..45cd90204443 100644 --- a/python/tvm/meta_schedule/runner/local_runner.py +++ b/python/tvm/meta_schedule/runner/local_runner.py @@ -198,6 +198,7 @@ def __init__( timeout=timeout_sec, initializer=initializer, ) + self._sanity_check() def run(self, runner_inputs: List[RunnerInput]) -> List[RunnerFuture]: results: List[RunnerFuture] = [] @@ -228,6 +229,25 @@ def run(self, runner_inputs: List[RunnerInput]) -> List[RunnerFuture]: results.append(local_future) return results + def _sanity_check(self) -> None: + def _check( + f_alloc_argument, + f_run_evaluator, + f_cleanup, + ) -> None: + get_global_func_with_default_on_worker(name=f_alloc_argument, default=None) + get_global_func_with_default_on_worker(name=f_run_evaluator, default=None) + get_global_func_with_default_on_worker(name=f_cleanup, default=None) + tvm.get_global_func("tvm.contrib.random.random_fill", True) + + value = self.pool.submit( + _check, + self.f_alloc_argument, + self.f_run_evaluator, + self.f_cleanup, + ) + value.result() + @staticmethod def _worker_func( _f_alloc_argument: Optional[str], From 9f293d54761be471c925755adac57ffcbd8c656a Mon Sep 17 00:00:00 2001 From: shingjan Date: Thu, 30 Sep 2021 11:11:02 -0700 Subject: [PATCH 8/8] address comments --- python/tvm/meta_schedule/runner/local_runner.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/python/tvm/meta_schedule/runner/local_runner.py b/python/tvm/meta_schedule/runner/local_runner.py index 45cd90204443..f6bc2bcce55d 100644 --- a/python/tvm/meta_schedule/runner/local_runner.py +++ b/python/tvm/meta_schedule/runner/local_runner.py @@ -238,7 +238,9 @@ def _check( get_global_func_with_default_on_worker(name=f_alloc_argument, default=None) get_global_func_with_default_on_worker(name=f_run_evaluator, default=None) get_global_func_with_default_on_worker(name=f_cleanup, default=None) - tvm.get_global_func("tvm.contrib.random.random_fill", True) + get_global_func_with_default_on_worker( + name="tvm.contrib.random.random_fill", default=None + ) value = self.pool.submit( _check, @@ -319,13 +321,9 @@ def default_alloc_argument( repeated_args: List[T_ARGUMENT_LIST] The allocation args """ - try: - f_random_fill = tvm.get_global_func("tvm.contrib.random.random_fill", True) - except AttributeError as error: - raise AttributeError( - 'Unable to find function "tvm.contrib.random.random_fill" on local runner. ' - "Please make sure USE_RANDOM is turned ON in the config.cmake." - ) from error + f_random_fill = get_global_func_with_default_on_worker( + name="tvm.contrib.random.random_fill", default=None + ) return alloc_argument_common(f_random_fill, device, args_info, alloc_repeat)