diff --git a/python/tvm/meta_schedule/runner/__init__.py b/python/tvm/meta_schedule/runner/__init__.py index 47f4557e1d3a..8fc9acaf79cf 100644 --- a/python/tvm/meta_schedule/runner/__init__.py +++ b/python/tvm/meta_schedule/runner/__init__.py @@ -20,4 +20,5 @@ """ from .config import EvaluatorConfig, RPCConfig from .rpc_runner import RPCRunner +from .local_runner import LocalRunner, LocalRunnerFuture from .runner import PyRunner, Runner, RunnerFuture, RunnerInput, RunnerResult diff --git a/python/tvm/meta_schedule/runner/local_runner.py b/python/tvm/meta_schedule/runner/local_runner.py new file mode 100644 index 000000000000..f6bc2bcce55d --- /dev/null +++ b/python/tvm/meta_schedule/runner/local_runner.py @@ -0,0 +1,359 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Local Runner""" +from contextlib import contextmanager +from typing import Callable, List, Optional, Union +import tvm + +from ...contrib.popen_pool import PopenPoolExecutor +from ...runtime import Device, Module +from ..utils import get_global_func_with_default_on_worker +from .config import EvaluatorConfig +from .runner import PyRunner, RunnerFuture, RunnerInput, RunnerResult +from .utils import ( + T_ARG_INFO_JSON_OBJ_LIST, + T_ARGUMENT_LIST, + alloc_argument_common, + run_evaluator_common, +) + + +class LocalRunnerFuture(RunnerFuture): + """Local based runner future + + Parameters + ---------- + res: Optional[List[float]] + The optional result as a list of float. + error_message: Optional[str] + The optional error message. + + Note + ---- + Only one of the parameters should be None upon the creation + of LocalRunnerFuture object + """ + + res: Optional[List[float]] + error_message: Optional[str] + + def __init__( + self, res: Optional[List[float]] = None, error_message: Optional[str] = None + ) -> None: + """Constructor + + Parameters + ---------- + res: Optional[List[float]] + The result of this LocalRunnerFuture + error_message: Optional[str] + The stringfied error message of any exception during execution + + """ + super().__init__() + self.res = res + self.error_message = error_message + + # sanity check upon the creation of LocalRunnerFuture object + if (res is None and error_message is None) or ( + res is not None and error_message is not None + ): + raise AttributeError( + "Only one of the two parameters should be None upon the creation" + "of LocalRunnerFuture object." + ) + + def done(self) -> bool: + return True + + def result(self) -> RunnerResult: + return RunnerResult(self.res, self.error_message) + + +class LocalRunner(PyRunner): + """Local runner + + Parameters + ---------- + evaluator_config: EvaluatorConfig + The evaluator configuration. + cooldown_sec: float + The cooldown in seconds. + alloc_repeat: int + The number of times to repeat the allocation. + f_alloc_argument: Optional[str, Callable] + The function name to allocate the arguments or the function itself. + f_run_evaluator: Optional[str, Callable] + The function name to run the evaluator or the function itself. + f_cleanup: Optional[str, Callable] + The function name to cleanup the session or the function itself. + pool: PopenPoolExecutor + The popen pool executor. + + Attributes + ---------- + T_ALLOC_ARGUMENT : typing._GenericAlias + The signature of the function `f_alloc_argument`, which is: + + .. code-block:: python + + def default_alloc_argument( + device: Device, + args_info: T_ARG_INFO_JSON_OBJ_LIST, + alloc_repeat: int, + ) -> List[T_ARGUMENT_LIST]: + ... + + T_RUN_EVALUATOR : typing._GenericAlias + The signature of the function `f_run_evaluator`, which is: + + .. code-block:: python + + def default_run_evaluator( + rt_mod: Module, + device: Device, + evaluator_config: EvaluatorConfig, + repeated_args: List[T_ARGUMENT_LIST], + ) -> List[float]: + ... + + T_CLEANUP : typing._GenericAlias + The signature of the function `f_cleanup`, which is: + + .. code-block:: python + + def default_cleanup() -> None: + ... + """ + + T_ALLOC_ARGUMENT = Callable[ + [ + Device, # The device on the remote + T_ARG_INFO_JSON_OBJ_LIST, # The metadata information of the arguments to be allocated + int, # The number of repeated allocations to be done + ], + List[T_ARGUMENT_LIST], # A list of argument lists + ] + T_RUN_EVALUATOR = Callable[ + [ + Module, # The Module opened on the remote + Device, # The device on the remote + EvaluatorConfig, # The evaluator configuration + List[T_ARGUMENT_LIST], # A list of argument lists + ], + List[float], # A list of running time + ] + T_CLEANUP = Callable[ + [], + None, + ] + + timeout_sec: float + evaluator_config: EvaluatorConfig + cooldown_sec: float + alloc_repeat: int + + f_alloc_argument: Union[T_ALLOC_ARGUMENT, str, None] + f_run_evaluator: Union[T_RUN_EVALUATOR, str, None] + f_cleanup: Union[T_CLEANUP, str, None] + + pool: PopenPoolExecutor + + def __init__( + self, + timeout_sec: float, + evaluator_config: Optional[EvaluatorConfig] = None, + cooldown_sec: float = 0.0, + alloc_repeat: int = 1, + f_alloc_argument: Optional[str] = None, + f_run_evaluator: Optional[str] = None, + f_cleanup: Optional[str] = None, + initializer: Optional[Callable[[], None]] = None, + ) -> None: + super().__init__() + self.timeout_sec = timeout_sec + self.evaluator_config = EvaluatorConfig._normalized(evaluator_config) + self.cooldown_sec = cooldown_sec + self.alloc_repeat = alloc_repeat + self.f_alloc_argument = f_alloc_argument + self.f_run_evaluator = f_run_evaluator + self.f_cleanup = f_cleanup + + self.pool = PopenPoolExecutor( + max_workers=1, # one local worker + timeout=timeout_sec, + initializer=initializer, + ) + self._sanity_check() + + def run(self, runner_inputs: List[RunnerInput]) -> List[RunnerFuture]: + results: List[RunnerFuture] = [] + for runner_input in runner_inputs: + future = self.pool.submit( + LocalRunner._worker_func, + self.f_alloc_argument, + self.f_run_evaluator, + self.f_cleanup, + self.evaluator_config, + self.alloc_repeat, + str(runner_input.artifact_path), + str(runner_input.device_type), + tuple(arg_info.as_json() for arg_info in runner_input.args_info), + ) + try: + result: List[float] = future.result() + error_message: str = None + except TimeoutError as exception: + result: List[float] = None + error_message: str = ( + f"LocalRunner: Timeout, killed after {self.timeout_sec} seconds\n" + ) + except Exception as exception: # pylint: disable=broad-except + result: List[float] = None + error_message: str = "LocalRunner: An exception occurred\n" + str(exception) + local_future = LocalRunnerFuture(res=result, error_message=error_message) + results.append(local_future) + return results + + def _sanity_check(self) -> None: + def _check( + f_alloc_argument, + f_run_evaluator, + f_cleanup, + ) -> None: + get_global_func_with_default_on_worker(name=f_alloc_argument, default=None) + get_global_func_with_default_on_worker(name=f_run_evaluator, default=None) + get_global_func_with_default_on_worker(name=f_cleanup, default=None) + get_global_func_with_default_on_worker( + name="tvm.contrib.random.random_fill", default=None + ) + + value = self.pool.submit( + _check, + self.f_alloc_argument, + self.f_run_evaluator, + self.f_cleanup, + ) + value.result() + + @staticmethod + def _worker_func( + _f_alloc_argument: Optional[str], + _f_run_evaluator: Optional[str], + _f_cleanup: Optional[str], + evaluator_config: EvaluatorConfig, + alloc_repeat: int, + artifact_path: str, + device_type: str, + args_info: T_ARG_INFO_JSON_OBJ_LIST, + ) -> List[float]: + f_alloc_argument: LocalRunner.T_ALLOC_ARGUMENT = get_global_func_with_default_on_worker( + _f_alloc_argument, default_alloc_argument + ) + f_run_evaluator: LocalRunner.T_RUN_EVALUATOR = get_global_func_with_default_on_worker( + _f_run_evaluator, default_run_evaluator + ) + f_cleanup: LocalRunner.T_CLEANUP = get_global_func_with_default_on_worker( + _f_cleanup, default_cleanup + ) + + @contextmanager + def resource_handler(): + try: + yield + finally: + # Final step. Always clean up + f_cleanup() + + with resource_handler(): + # Step 1: create the local runtime module + rt_mod = tvm.runtime.load_module(artifact_path) + # Step 2: create the local device + device = tvm.runtime.device(dev_type=device_type, dev_id=0) + # Step 3: Allocate input arguments + repeated_args: List[T_ARGUMENT_LIST] = f_alloc_argument( + device, + args_info, + alloc_repeat, + ) + # Step 4: Run time_evaluator + costs: List[float] = f_run_evaluator( + rt_mod, + device, + evaluator_config, + repeated_args, + ) + return costs + + +def default_alloc_argument( + device: Device, + args_info: T_ARG_INFO_JSON_OBJ_LIST, + alloc_repeat: int, +) -> List[T_ARGUMENT_LIST]: + """Default function to allocate the arguments + + Parameters + ---------- + device: Device + The device to allocate the arguments + args_info: T_ARG_INFO_JSON_OBJ_LIST + The arguments info + alloc_repeat: int + The number of times to repeat the allocation + + Returns + ------- + repeated_args: List[T_ARGUMENT_LIST] + The allocation args + """ + f_random_fill = get_global_func_with_default_on_worker( + name="tvm.contrib.random.random_fill", default=None + ) + return alloc_argument_common(f_random_fill, device, args_info, alloc_repeat) + + +def default_run_evaluator( + rt_mod: Module, + device: Device, + evaluator_config: EvaluatorConfig, + repeated_args: List[T_ARGUMENT_LIST], +) -> List[float]: + """Default function to run the evaluator + + Parameters + ---------- + rt_mod: Module + The runtime module + device: Device + The device to run the evaluator + evaluator_config: EvaluatorConfig + The evaluator config + repeated_args: List[T_ARGUMENT_LIST] + The repeated arguments + + Returns + ------- + costs: List[float] + The evaluator results + """ + return run_evaluator_common(rt_mod, device, evaluator_config, repeated_args) + + +def default_cleanup() -> None: + """Default function to clean up the session""" + pass # pylint: disable=unnecessary-pass diff --git a/python/tvm/meta_schedule/runner/rpc_runner.py b/python/tvm/meta_schedule/runner/rpc_runner.py index 5080fd4c95fc..3ba1c1dccf5f 100644 --- a/python/tvm/meta_schedule/runner/rpc_runner.py +++ b/python/tvm/meta_schedule/runner/rpc_runner.py @@ -17,13 +17,12 @@ """RPC Runner""" import concurrent.futures from contextlib import contextmanager -import itertools import os.path as osp -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Callable, List, Optional, Union from tvm.contrib.popen_pool import PopenPoolExecutor from tvm.rpc import RPCSession -from tvm.runtime import Device, Module, ndarray +from tvm.runtime import Device, Module from ..utils import ( get_global_func_on_rpc_session, @@ -31,6 +30,12 @@ ) from .config import EvaluatorConfig, RPCConfig from .runner import PyRunner, RunnerFuture, RunnerInput, RunnerResult +from .utils import ( + T_ARG_INFO_JSON_OBJ_LIST, + T_ARGUMENT_LIST, + alloc_argument_common, + run_evaluator_common, +) class RPCRunnerFuture(RunnerFuture): @@ -80,12 +85,6 @@ def result(self) -> RunnerResult: return RunnerResult(run_secs, None) -T_ARG_INFO_JSON_OBJ = List[Any] # pylint: disable=invalid-name -T_ARG_INFO_JSON_OBJ_LIST = List[T_ARG_INFO_JSON_OBJ] # pylint: disable=invalid-name -T_ARGUMENT = Any # pylint: disable=invalid-name -T_ARGUMENT_LIST = List[T_ARGUMENT] # pylint: disable=invalid-name - - class RPCRunner(PyRunner): """RPC based runner @@ -369,7 +368,7 @@ def resource_handler(): try: yield finally: - # Step 5. Clean up + # Final step. Always clean up f_cleanup(session, remote_path) with resource_handler(): @@ -454,10 +453,10 @@ def default_alloc_argument( The session to allocate the arguments device: Device The device to allocate the arguments + args_info: T_ARG_INFO_JSON_OBJ_LIST + The arguments info alloc_repeat: int The number of times to repeat the allocation - args_info: PyArgsInfo - The arguments info Returns ------- @@ -470,29 +469,7 @@ def default_alloc_argument( "Please make sure 'USE_RANDOM' is turned ON in the config.cmake on the RPC server.", ) - def alloc_tensor(_, dtype, shape) -> ndarray.NDArray: - arg = ndarray.empty(shape=shape, dtype=dtype, device=device) - f_random_fill(arg) - return arg - - def alloc_fail(*arg_info) -> None: - raise NotImplementedError(arg_info) - - dispatcher: Dict[Any, Callable] = { - "TENSOR": alloc_tensor, - None: alloc_fail, - } - - repeated_args: List[T_ARGUMENT_LIST] = [] - for _ in range(alloc_repeat): - args: T_ARGUMENT_LIST = [] - arg_info: T_ARG_INFO_JSON_OBJ - for arg_info in args_info: - arg_type = arg_info[0] - arg: Any = dispatcher.get(arg_type, None)(*arg_info) - args.append(arg) - repeated_args.append(args) - return repeated_args + return alloc_argument_common(f_random_fill, device, args_info, alloc_repeat) def default_run_evaluator( @@ -514,7 +491,7 @@ def default_run_evaluator( The device to run the evaluator evaluator_config: EvaluatorConfig The evaluator config - repeated_args: List[Args] + repeated_args: List[T_ARGUMENT_LIST] The repeated arguments Returns @@ -522,23 +499,7 @@ def default_run_evaluator( costs: List[float] The evaluator results """ - evaluator = rt_mod.time_evaluator( - func_name=rt_mod.entry_name, - dev=device, - number=evaluator_config.number, - repeat=evaluator_config.repeat, - min_repeat_ms=evaluator_config.min_repeat_ms, - f_preproc="cache_flush_cpu_non_first_arg" - if evaluator_config.enable_cpu_cache_flush - else "", - ) - repeated_costs: List[List[float]] = [] - for args in repeated_args: - device.sync() - profile_result = evaluator(*args) - repeated_costs.append(profile_result.results) - costs = [float(cost) for cost in itertools.chain.from_iterable(repeated_costs)] - return costs + return run_evaluator_common(rt_mod, device, evaluator_config, repeated_args) def default_cleanup( diff --git a/python/tvm/meta_schedule/runner/utils.py b/python/tvm/meta_schedule/runner/utils.py new file mode 100644 index 000000000000..ef0d4b5f98f7 --- /dev/null +++ b/python/tvm/meta_schedule/runner/utils.py @@ -0,0 +1,120 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Runner utility functions""" +import itertools +from typing import Any, Callable, Dict, List + +from ...runtime import Device, Module, ndarray +from .config import EvaluatorConfig + +T_ARG_INFO_JSON_OBJ = List[Any] # pylint: disable=invalid-name +T_ARG_INFO_JSON_OBJ_LIST = List[T_ARG_INFO_JSON_OBJ] # pylint: disable=invalid-name +T_ARGUMENT = Any # pylint: disable=invalid-name +T_ARGUMENT_LIST = List[T_ARGUMENT] # pylint: disable=invalid-name + + +def alloc_argument_common( + f_random_fill: Callable, + device: Device, + args_info: T_ARG_INFO_JSON_OBJ_LIST, + alloc_repeat: int, +) -> List[T_ARGUMENT_LIST]: + """Common function to allocate the arguments + + Parameters + ---------- + f_random_fill: Callable + The callable function for random fill + device: Device + The device to allocate the arguments + args_info: T_ARG_INFO_JSON_OBJ_LIST + The arguments info + alloc_repeat: int + The number of times to repeat the allocation + + Returns + ------- + repeated_args: List[T_ARGUMENT_LIST] + The allocation args + """ + + def alloc_tensor(_, dtype, shape) -> ndarray.NDArray: + arg = ndarray.empty(shape=shape, dtype=dtype, device=device) + f_random_fill(arg) + return arg + + def alloc_fail(*arg_info) -> None: + raise NotImplementedError(arg_info) + + dispatcher: Dict[Any, Callable] = { + "TENSOR": alloc_tensor, + None: alloc_fail, + } + + repeated_args: List[T_ARGUMENT_LIST] = [] + for _ in range(alloc_repeat): + args: T_ARGUMENT_LIST = [] + arg_info: T_ARG_INFO_JSON_OBJ + for arg_info in args_info: + arg_type = arg_info[0] + arg: Any = dispatcher.get(arg_type, None)(*arg_info) + args.append(arg) + repeated_args.append(args) + return repeated_args + + +def run_evaluator_common( + rt_mod: Module, + device: Device, + evaluator_config: EvaluatorConfig, + repeated_args: List[T_ARGUMENT_LIST], +) -> List[float]: + """Common function to run the evaluator + + Parameters + ---------- + rt_mod: Module + The runtime module + device: Device + The device to run the evaluator + evaluator_config: EvaluatorConfig + The evaluator config + repeated_args: List[T_ARGUMENT_LIST] + The repeated arguments + + Returns + ------- + costs: List[float] + The evaluator results + """ + evaluator = rt_mod.time_evaluator( + func_name=rt_mod.entry_name, + dev=device, + number=evaluator_config.number, + repeat=evaluator_config.repeat, + min_repeat_ms=evaluator_config.min_repeat_ms, + f_preproc="cache_flush_cpu_non_first_arg" + if evaluator_config.enable_cpu_cache_flush + else "", + ) + repeated_costs: List[List[float]] = [] + for args in repeated_args: + device.sync() + profile_result = evaluator(*args) + repeated_costs.append(profile_result.results) + costs = [float(cost) for cost in itertools.chain.from_iterable(repeated_costs)] + return costs diff --git a/tests/python/unittest/test_meta_schedule_runner.py b/tests/python/unittest/test_meta_schedule_runner.py index 3c8aee0c6d58..df6c03b07fa1 100644 --- a/tests/python/unittest/test_meta_schedule_runner.py +++ b/tests/python/unittest/test_meta_schedule_runner.py @@ -31,6 +31,7 @@ from tvm.meta_schedule.builder import BuilderInput, LocalBuilder from tvm.meta_schedule.runner import ( EvaluatorConfig, + LocalRunner, PyRunner, RPCConfig, RPCRunner, @@ -39,6 +40,11 @@ ) from tvm.meta_schedule.runner.rpc_runner import ( default_alloc_argument as rpc_default_alloc_argument, + T_ARG_INFO_JSON_OBJ_LIST, + T_ARGUMENT_LIST, +) +from tvm.meta_schedule.runner.local_runner import ( + default_alloc_argument as local_default_alloc_argument, ) from tvm.meta_schedule.testing import LocalRPC from tvm.meta_schedule.utils import get_global_func_with_default_on_worker @@ -165,6 +171,44 @@ def test_meta_schedule_rpc_single_run(): _clean_build(builder_result.artifact_path) +def test_meta_schedule_local_single_run(): + """Test meta schedule local runner for a single run""" + # Build the module + mod = MatmulModule() + builder = LocalBuilder() + (builder_result,) = builder.build([BuilderInput(mod, Target("llvm"))]) + assert builder_result.artifact_path is not None + assert builder_result.error_msg is None + + runner_input = RunnerInput( + builder_result.artifact_path, + "llvm", + [ + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + ], + ) + + evaluator_config = EvaluatorConfig( + number=1, + repeat=1, + min_repeat_ms=0, + enable_cpu_cache_flush=False, + ) + runner = LocalRunner(timeout_sec=100, evaluator_config=evaluator_config) + # Run the module + (runner_future,) = runner.run([runner_input]) + runner_result = runner_future.result() + assert runner_result.error_msg is None + for result in runner_result.run_secs: + if isinstance(result, FloatImm): + result = result.value + assert isinstance(result, float) + assert result >= 0.0 + _clean_build(builder_result.artifact_path) + + def test_meta_schedule_rpc_multiple_runs(): """Test meta schedule rpc runner for multiple runs""" # Build the module @@ -234,6 +278,69 @@ def test_meta_schedule_rpc_multiple_runs(): _clean_build(builder_result.artifact_path) +def test_meta_schedule_local_multiple_runs(): + """Test meta schedule local runner for multiple runs""" + # Build the module + mods = [ + MatmulModule(), + MatmulReluModule(), + BatchMatmulModule(), + ] + builder = LocalBuilder() + builder_inputs = [BuilderInput(mod, Target("llvm")) for mod in mods] + builder_results = builder.build(builder_inputs) + for builder_result in builder_results: + assert builder_result.artifact_path is not None + assert builder_result.error_msg is None + + args_infos = [ + [ + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + ], + [ + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + ], + [ + TensorInfo("float32", [16, MATMUL_M, MATMUL_M]), + TensorInfo("float32", [16, MATMUL_M, MATMUL_M]), + TensorInfo("float32", [16, MATMUL_M, MATMUL_M]), + ], + ] + + runner_inputs = [ + RunnerInput(builder_results[i].artifact_path, "llvm", args_infos[i]) + for i in range(len(mods)) + ] + + evaluator_config = EvaluatorConfig( + number=1, + repeat=1, + min_repeat_ms=0, + enable_cpu_cache_flush=False, + ) + + runner = LocalRunner(timeout_sec=100, evaluator_config=evaluator_config) + + # Run the module + runner_futures = runner.run(runner_inputs) + runner_results = [runner_future.result() for runner_future in runner_futures] + + for runner_result in runner_results: + assert runner_result.error_msg is None + for result in runner_result.run_secs: + if isinstance(result, FloatImm): + result = result.value + assert isinstance(result, float) + assert result >= 0.0 + + for builder_result in builder_results: + _clean_build(builder_result.artifact_path) + + def test_meta_schedule_py_runner(): """Test meta schedule PyRunner""" @@ -296,6 +403,57 @@ def timeout_session_creator( # pylint: disable=unused-variable assert runner_result.run_secs is None +def test_meta_schedule_local_runner_time_out(): + """Test meta schedule Local Runner time out""" + mod = MatmulModule() + builder = LocalBuilder() + (builder_result,) = builder.build([BuilderInput(mod, Target("llvm"))]) + assert builder_result.artifact_path is not None + assert builder_result.error_msg is None + + runner_input = RunnerInput( + builder_result.artifact_path, + "llvm", + [ + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + ], + ) + + def initializer(): + @register_func("meta_schedule.runner.test_time_out") + def timeout_session_creator( # pylint: disable=unused-variable + device: Device, # pylint: disable=unused-argument + args_info: T_ARG_INFO_JSON_OBJ_LIST, # pylint: disable=unused-argument + alloc_repeat: int, # pylint: disable=unused-argument + ) -> RPCSession: + time.sleep(2) + + evaluator_config = EvaluatorConfig( + number=1, + repeat=1, + min_repeat_ms=0, + enable_cpu_cache_flush=False, + ) + + runner = LocalRunner( + timeout_sec=1, + evaluator_config=evaluator_config, + initializer=initializer, + f_alloc_argument="meta_schedule.runner.test_time_out", + ) + + # Run the module + (runner_future,) = runner.run([runner_input]) + runner_result = runner_future.result() + + assert runner_result.error_msg is not None and runner_result.error_msg.startswith( + "LocalRunner: Timeout, killed after" + ) + assert runner_result.run_secs is None + + def test_meta_schedule_rpc_runner_exception(): """Test meta schedule RPC Runner exception""" @@ -345,6 +503,57 @@ def exception_session_creator( # pylint: disable=unused-variable assert runner_result.run_secs is None +def test_meta_schedule_local_runner_exception(): + """Test meta schedule Local Runner time out""" + mod = MatmulModule() + builder = LocalBuilder() + (builder_result,) = builder.build([BuilderInput(mod, Target("llvm"))]) + assert builder_result.artifact_path is not None + assert builder_result.error_msg is None + + runner_input = RunnerInput( + builder_result.artifact_path, + "llvm", + [ + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + TensorInfo("float32", (MATMUL_N, MATMUL_N)), + ], + ) + + def initializer(): + @register_func("meta_schedule.runner.test_exception") + def timeout_session_creator( # pylint: disable=unused-variable + device: Device, # pylint: disable=unused-argument + args_info: T_ARG_INFO_JSON_OBJ_LIST, # pylint: disable=unused-argument + alloc_repeat: int, # pylint: disable=unused-argument + ) -> RPCSession: + raise Exception("Test") + + evaluator_config = EvaluatorConfig( + number=1, + repeat=1, + min_repeat_ms=0, + enable_cpu_cache_flush=False, + ) + + runner = LocalRunner( + timeout_sec=1, + evaluator_config=evaluator_config, + initializer=initializer, + f_alloc_argument="meta_schedule.runner.test_exception", + ) + + # Run the module + (runner_future,) = runner.run([runner_input]) + runner_result = runner_future.result() + + assert runner_result.error_msg is not None and runner_result.error_msg.startswith( + "LocalRunner: An exception occurred\n" + ) + assert runner_result.run_secs is None + + def test_meta_schedule_runner_matmul_test(): """Test meta schedule runner with add module""" @@ -567,5 +776,99 @@ def test_run_evaluator( _clean_build(builder_result.artifact_path) +def test_meta_schedule_local_runner_add_test(): + """Test meta schedule local runner with add module""" + + def _check_correct_add(args_before: List[np.array], args_after: List[np.array]) -> None: + a_before, b_before, c_before = args_before + a_after, b_after, c_after = args_after + c_before = a_before + b_before + assert (a_before == a_after).all() + assert (b_before == b_after).all() + assert (c_before == c_after).all() + + def test_alloc_argument( + device: Device, + args_info: T_ARG_INFO_JSON_OBJ_LIST, # pylint: disable=unused-argument + alloc_repeat: int, + ) -> List[T_ARGUMENT_LIST]: + global repeated_args_before # pylint: disable=global-variable-undefined, invalid-name + repeated_args_before = [] + repeated_args = local_default_alloc_argument(device, args_info, alloc_repeat) + for args in repeated_args: + repeated_args_before.append([arg.asnumpy() for arg in args]) + return repeated_args + + def test_run_evaluator( + rt_mod: Module, + device: Device, + evaluator_config: EvaluatorConfig, + repeated_args: List[Any], + ) -> List[float]: + global repeated_args_before # pylint: disable=global-variable-undefined, invalid-name + repeated_args_after = [] + evaluator = rt_mod.time_evaluator( + func_name=rt_mod.entry_name, + dev=device, + number=evaluator_config.number, + repeat=evaluator_config.repeat, + min_repeat_ms=evaluator_config.min_repeat_ms, + f_preproc="cache_flush_cpu_non_first_arg" + if evaluator_config.enable_cpu_cache_flush + else "", + ) + repeated_costs: List[List[float]] = [] + for args in repeated_args: + device.sync() + profile_result = evaluator(*args) + repeated_costs.append(profile_result.results) + repeated_args_after.append([arg.asnumpy() for arg in args]) + costs = [float(cost) for cost in itertools.chain.from_iterable(repeated_costs)] + for args_before, args_after in zip(repeated_args_before, repeated_args_after): + _check_correct_add(args_before, args_after) + del repeated_args_before + return costs + + # Build the module + mod = AddModule() + builder = LocalBuilder() + (builder_result,) = builder.build([BuilderInput(mod, Target("llvm"))]) + assert builder_result.artifact_path is not None + assert builder_result.error_msg is None + + runner_input = RunnerInput( + builder_result.artifact_path, + "llvm", + [ + TensorInfo("float32", [MATMUL_M]), + TensorInfo("float32", [MATMUL_M]), + TensorInfo("float32", [MATMUL_M]), + ], + ) + + evaluator_config = EvaluatorConfig( + number=1, + repeat=1, + min_repeat_ms=0, + enable_cpu_cache_flush=False, + ) + runner = LocalRunner( + timeout_sec=100, + evaluator_config=evaluator_config, + f_alloc_argument=test_alloc_argument, + f_run_evaluator=test_run_evaluator, + ) + # Run the module + (runner_future,) = runner.run([runner_input]) + runner_result = runner_future.result() + assert runner_result.error_msg is None + for result in runner_result.run_secs: + if isinstance(result, FloatImm): + result = result.value + assert isinstance(result, float) + assert result >= 0.0 + _clean_build(builder_result.artifact_path) + + if __name__ == "__main__": sys.exit(pytest.main([__file__] + sys.argv[1:]))