diff --git a/orchestrator/utilities/ray_env/ordered_pip.py b/orchestrator/utilities/ray_env/ordered_pip.py index aade2139..51e16004 100644 --- a/orchestrator/utilities/ray_env/ordered_pip.py +++ b/orchestrator/utilities/ray_env/ordered_pip.py @@ -4,8 +4,10 @@ import contextlib import logging import os +import threading import typing +import ray._private.runtime_env.packaging from ray._private.runtime_env import virtualenv_utils from ray._private.runtime_env.pip import PipPlugin from ray._private.runtime_env.plugin import RuntimeEnvPlugin @@ -29,18 +31,24 @@ async def create_or_get_virtualenv(path: str, cwd: str, logger: logging.Logger): await original_create_or_get_virtualenv(path=path, cwd=cwd, logger=logger) +_monkey_patch_lock = threading.RLock() + + @contextlib.contextmanager def patch_create_or_get_virtualenv(phase_index: int): - if phase_index > 0: - setattr(virtualenv_utils, "create_or_get_virtualenv", create_or_get_virtualenv) - try: - yield - finally: - setattr( - virtualenv_utils, - "create_or_get_virtualenv", - original_create_or_get_virtualenv, - ) + with _monkey_patch_lock: + if phase_index > 0: + setattr( + virtualenv_utils, "create_or_get_virtualenv", create_or_get_virtualenv + ) + try: + yield + finally: + setattr( + virtualenv_utils, + "create_or_get_virtualenv", + original_create_or_get_virtualenv, + ) class OrderedPipPlugin(RuntimeEnvPlugin): @@ -93,27 +101,81 @@ def try_import_torch(): """ name = "ordered_pip" + + # VV: Configure Ray to use this RuntimeEnvPlugin last + priority = 100 ClassPath = "orchestrator.utilities.ray_env.ordered_pip.OrderedPipPlugin" def __init__(self, resources_dir: str | None = None): - if resources_dir is None: - import ray._private.ray_constants as ray_constants + self._global_mtx = threading.RLock() + self._create_env_mtx: dict[str, threading.RLock] = {} + self._pip_resources_dir = resources_dir - resources_dir = os.environ.get( - ray_constants.RAY_RUNTIME_ENV_CREATE_WORKING_DIR_ENV_VAR - ) + # VV: Maintains a cache of the environments that have been built thus far + self._cache = {} - if not resources_dir: - import tempfile + def _try_switch_resources_dir_from_context( + self, + context: "RuntimeEnvContext", # noqa: F821 + logger: logging.Logger | None = default_logger, + ): + # VV: When ray instantiates custom RuntimeEnvPlugins it does not provide a resources_dir path. + # This method is a HACK that the resources_dir based on the RuntimeEnvContext which is known + # at the time of CREATING a virtual environment i.e. **after** the RuntimeEnvPlugin is initialized. + + with self._global_mtx: + # VV: Stick with whatever resources dir we've already picked + if self._pip_resources_dir: + return + + logger.info("Generating resources dir") + unique = set() + if "PYTHONPATH" in context.env_vars: + # VV: This is a HACK to find the "runtime_resources" path inside the PYTHONPATH env-var + # This is an env-var that the WorkingDirPlugin inserts. + # I noticed that sometimes the PYTHONPATH contains multiple copies of the same PATH. + # The PYTHONPATH looks like this: + # /tmp/ray/session_$timestamp/runtime_resources/working_dir_files/_ray_pkg_$uid + many = context.env_vars["PYTHONPATH"].split(os.pathsep) + logger.info(f"Current PYTHONPATH {many}") + runtime_resources_followup = f"{os.sep}working_dir_files{os.sep}" + unique.update( + [ + os.path.join( + x.split(runtime_resources_followup, 1)[0], "ordered_pip" + ) + for x in many + if runtime_resources_followup in x + ] + ) - resources_dir = tempfile.mkdtemp(prefix="ordered_pip_", dir="/tmp/ray") + logger.info(f"The candidate locations of runtime_resources: {list(unique)}") - self._pip_resources_dir = resources_dir + if len(unique) != 1: + import tempfile + + unique.clear() + unique.add(tempfile.mkdtemp(prefix="ordered_pip_", dir="/tmp/ray")) - from ray._common.utils import try_to_create_directory + self._switch_resources_dir(unique.pop()) - try_to_create_directory(self._pip_resources_dir) - self._pip_plugin = PipPlugin(self._pip_resources_dir) + def _switch_resources_dir(self, resources_dir: str): + with self._global_mtx: + from ray._common.utils import try_to_create_directory + + self._pip_resources_dir = resources_dir + try_to_create_directory(self._pip_resources_dir) + + @property + def _pip_plugin(self) -> PipPlugin: + # The PipPlugin keeps an internal cache of virtual environments it has created but not yet deleted. + # When .create() is called, it checks this cache for a venv matching the given URI. + # If a match is found, it assumes the venv already exists and skips re-creation. + # However, ordered_pip needs to reuse the same venv multiple times (once per "phase"). + # Thus, we create a new PipPlugin instance on demand for each phase of ordered_pip. + # Also, we maintain our own record of venvs to decide whether to create a new "ordered_pip" + # venv or reuse an existing one. + return PipPlugin(self._pip_resources_dir) @staticmethod def validate(runtime_env_dict: dict[str, typing.Any]) -> RuntimeEnv: @@ -132,14 +194,14 @@ def validate(runtime_env_dict: dict[str, typing.Any]) -> RuntimeEnv: raise ValueError("runtime_env must be a dictionary") if "ordered_pip" not in runtime_env_dict: - raise ValueError("missing the 'ordered_pip' key", runtime_env_dict) + return RuntimeEnv(**runtime_env_dict) if not isinstance(runtime_env_dict["ordered_pip"], dict): raise ValueError("runtime_env['ordered_pip'] must be a dictionary") if not isinstance(runtime_env_dict["ordered_pip"]["phases"], list): raise ValueError( - "runtime_env['ordered_pip']['phases'] must be a dictionary consistent with pip" + "runtime_env['ordered_pip']['phases'] must be an array of pip entries" ) phases = [] @@ -164,6 +226,9 @@ def validate(runtime_env_dict: dict[str, typing.Any]) -> RuntimeEnv: return result def get_uris(self, runtime_env: "RuntimeEnv") -> list[str]: + if not self.is_ordered_pip_runtimeenv(runtime_env): + return [] + # VV: We want the hash to be invariant to the order of package names within a phase, # and we also want the order of phases to be reflected in the hash. aggregate_packages = [ @@ -178,6 +243,9 @@ def get_uris(self, runtime_env: "RuntimeEnv") -> list[str]: "pip://" + hashlib.sha1(str(aggregate_packages).encode("utf-8")).hexdigest() ] + def is_ordered_pip_runtimeenv(self, runtime_env: "RuntimeEnv") -> bool: + return bool(self.validate(runtime_env).get("ordered_pip")) + async def create( self, uri: str, @@ -185,24 +253,66 @@ async def create( context: "RuntimeEnvContext", # noqa: F821 logger: logging.Logger | None = default_logger, ) -> int: + self._try_switch_resources_dir_from_context(context, logger) + + if not self.is_ordered_pip_runtimeenv(runtime_env): + return 0 + uri = self.get_uris(runtime_env)[0] - total_bytes = 0 - - for idx, pip in enumerate(self.validate(runtime_env)["ordered_pip"]["phases"]): - with patch_create_or_get_virtualenv(idx): - total_bytes += await self._pip_plugin.create( - uri=uri, - runtime_env=RuntimeEnv(pip=pip), - context=context, - logger=logger, - ) - return total_bytes + with self._global_mtx: + if uri not in self._create_env_mtx: + self._create_env_mtx[uri] = threading.RLock() + + with self._create_env_mtx[uri]: + logger.info(f"Creating {uri} for {runtime_env}") + try: + if os.path.isdir(self.get_path_to_pip_venv(uri)): + logger.info(f"Virtual environment for {uri} already exists") + return self._cache[uri] + except KeyError: + pass + + self._cache[uri] = 0 + for idx, pip in enumerate( + self.validate(runtime_env)["ordered_pip"]["phases"] + ): + with patch_create_or_get_virtualenv(idx): + logger.info(f"Creating {idx} for {uri}") + + self._cache[uri] += await self._pip_plugin.create( + uri=uri, + runtime_env=RuntimeEnv(pip=pip), + context=context, + logger=logger, + ) + logger.info(f"Done creating {idx} for {uri}") + + return self._cache[uri] + + def get_path_to_pip_venv(self, uri: str) -> str: + _, env_hash = ray._private.runtime_env.packaging.parse_uri(uri) + return os.path.join(self._pip_resources_dir, "pip", env_hash) def delete_uri( self, uri: str, logger: logging.Logger | None = default_logger ) -> int: - return self._pip_plugin.delete_uri(uri=uri, logger=logger) + logger.info(f"Cleaning up {uri}") + del self._cache[uri] + + import shutil + + import ray._private.utils + + env_dir = self.get_path_to_pip_venv(uri) + num_bytes = ray._private.utils.get_directory_size_bytes(env_dir) + + try: + shutil.rmtree(env_dir) + except Exception as e: + logger.warning(f"Exception while cleaning up {env_dir} {e!s} - will ignore") + + return num_bytes def modify_context( self, @@ -211,7 +321,14 @@ def modify_context( context: "RuntimeEnvContext", # noqa: F821 logger: logging.Logger = default_logger, ): - phases = self.validate(runtime_env)["ordered_pip"]["phases"] + self._try_switch_resources_dir_from_context(context) + + runtime_env = self.validate(runtime_env) + if not runtime_env.get("ordered_pip"): + return + + logger.info(f"Modifying the context for {uris} and {runtime_env}") + phases = runtime_env["ordered_pip"]["phases"] if not len(phases): return @@ -222,3 +339,18 @@ def modify_context( context=context, logger=logger, ) + + if "PYTHONPATH" in context.env_vars: + # VV: Ensure unique paths in PYTHONPATH + paths = context.env_vars["PYTHONPATH"].split(os.pathsep) + + unique = [] + for k in paths: + if k not in unique: + unique.append(k) + + context.env_vars["PYTHONPATH"] = os.pathsep.join(unique) + + logger.info( + f"Modified the context for {uris} and {runtime_env} with {context.py_executable} {context.env_vars}" + ) diff --git a/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/actuators.py b/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/actuators.py index 008a292c..f7a1d1c9 100644 --- a/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/actuators.py +++ b/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/actuators.py @@ -204,6 +204,120 @@ def upgrade_simple_model_map( return values +def prepare_runtime_environment( + actuator_parameters: ActuatorParameters, + log: logging.Logger, + space: EntitySpace, + args: "finetune.FineTuneArgs", +) -> dict[str, typing.Any]: + exclude_packages = [] + + if not actuator_parameters.match_exact_dependencies: + if ray_env_utils.is_using_arm_cpu(): + exclude_packages.append("bitsandbytes") + + if not ray_env_utils.is_nvcc_available(): + exclude_packages.extend( + ray_env_utils.packages_requiring_nvidia_development_binaries() + ) + + if exclude_packages: + log.info( + f"Because match_exact_dependencies=False we will exclude the packages {exclude_packages}" + ) + + # VV: Users that switch off match_exact_dependencies are likely in an "exploration" mode. + # Help them out a bit by explaining why their measurement cannot work instead of asking them to + # manually investigate the exception that pip raises. + msg_unsupported_feat = ( + "The measurement requires {feature}, but the required NVIDIA development " + "binaries are not supported on this platform. When using match_exact_dependencies=False you cannot use: " + "fast_moe, fast_kernels, flash_attn" + ) + if "fms-acceleration-foak" in exclude_packages and "true" in [ + str(x).lower() for x in space.fast_kernels or [] + ]: + raise ValueError(msg_unsupported_feat.format(feature="fast_kernels")) + + if "fms-acceleration-moe" in exclude_packages and space.fast_moe not in [ + [0], + 0, + None, + ]: + raise ValueError(msg_unsupported_feat.format(feature="fast_moe")) + + if "flash_attn" in exclude_packages and space.flash_attn: + raise ValueError(msg_unsupported_feat.format(feature="flash_attn")) + + log.info(f"Excluded packages {exclude_packages}") + packages = ray_env_utils.get_pinned_packages( + path_requirements=PATH_PINNED_PACKAGES[space.fms_hf_tuning_version], + override_fms_hf_tuning=get_fms_hf_tuning_package( + commit=FMS_HF_TUNING_COMMIT[space.fms_hf_tuning_version] + ), + exclude_packages=exclude_packages, + ensure_aim=True, + ) + + # VV: Detect any extra wheels and propagate them to the job e.g. sfttrainer + import ray.runtime_context + + context = ray.get_runtime_context() + + pip_config_packages = context.runtime_env.pip_config().get("packages", []) + uv_config_packages = context.runtime_env.uv_config().get("packages", []) + + ordered_pip = context.runtime_env.get("ordered_pip", {}) + ordered_pip_packages = [] + for phase in ordered_pip.get("phases", []): + if isinstance(phase, dict): + ordered_pip_packages.extend(phase.get("packages"), []) + elif isinstance(phase, list): + ordered_pip_packages.extend(phase) + + additional_packages = ( + pip_config_packages + uv_config_packages + ordered_pip_packages + ) + + additional_wheels = [ + x + for x in additional_packages + # VV: Do not install the ado_core wheel. Its dependencies may conflict with those in fms-hf-tuning + if x.endswith(".whl") and not os.path.basename(x).startswith("ado_core-") + ] + + if additional_wheels: + log.info( + "Discovered custom wheels which will be propagated to dynamic virtual environment: " + f"{[os.path.basename(x) for x in additional_wheels]}" + ) + packages.extend(additional_wheels) + + # VV: Get a ray runtime-environment which contains packages that this version of fms-hf-tuning imports + + env_vars = {} + for key, name in os.environ.items(): + # VV: Propagate environment variables that are related to pip + # for example, PIP_FIND_LINKS for installing packages from a URL/directory. + # This is useful for packages that take too long to compile from source like mamba-ssm + if key.startswith("PIP_"): + env_vars[key] = name + + runtime_env = ray_env_utils.get_ray_environment( + packages=packages, + packages_requiring_extra_phase=[ray_env_utils.packages_depending_on_torch()], + env_vars=env_vars, + ) + + # VV: Need HF_HOME set so the tokenize_text() method in finetune.py can access + # the same transformers cache that the fms-hf-tuning.sft_trainer.py script uses + # This is useful for handling models we download from huggingface + runtime_env["env_vars"] = runtime_env.get("env_vars", {}) + runtime_env["env_vars"]["HF_HOME"] = args.hf_home + + return runtime_env + + def dynamic_name_function(function: typing.Callable[..., typing.Any], new_name: str): """Returns a new function identical to the original, but with a new name. @@ -662,67 +776,13 @@ def prepare_finetune_context( f"and args {dataclasses.asdict(args)}" ) - exclude_packages = [] - - if not self.typed_parameters.match_exact_dependencies: - if ray_env_utils.is_using_arm_cpu(): - exclude_packages.append("bitsandbytes") - - if not ray_env_utils.is_nvcc_available(): - exclude_packages.extend( - ray_env_utils.packages_requiring_nvidia_development_binaries() - ) - - if exclude_packages: - self.log.info( - f"Because match_exact_dependencies=False we will exclude the packages {exclude_packages}" - ) - - # VV: Users that switch off match_exact_dependencies are likely in an "exploration" mode. - # Help them out a bit by explaining why their measurement cannot work instead of asking them to - # manually investigate the exception that pip raises. - msg_unsupported_feat = ( - "The measurement requires {feature}, but the required NVIDIA development " - "binaries are not supported on this platform. When using match_exact_dependencies=False you cannot use: " - "fast_moe, fast_kernels, flash_attn" - ) - if "fms-acceleration-foak" in exclude_packages and "true" in [ - str(x).lower() for x in space.fast_kernels or [] - ]: - raise ValueError(msg_unsupported_feat.format(feature="fast_kernels")) - - if "fms-acceleration-moe" in exclude_packages and space.fast_moe not in [ - [0], - 0, - None, - ]: - raise ValueError(msg_unsupported_feat.format(feature="fast_moe")) - - if "flash_attn" in exclude_packages and space.flash_attn: - raise ValueError(msg_unsupported_feat.format(feature="flash_attn")) - - self.log.info(f"Excluded packages {exclude_packages}") - packages = ray_env_utils.get_pinned_packages( - path_requirements=PATH_PINNED_PACKAGES[space.fms_hf_tuning_version], - override_fms_hf_tuning=get_fms_hf_tuning_package( - commit=FMS_HF_TUNING_COMMIT[space.fms_hf_tuning_version] - ), - exclude_packages=exclude_packages, - ensure_aim=True, - ) - - # VV: Get a ray runtime-environment which contains packages that this version of fms-hf-tuning imports - runtime_env = ray_env_utils.get_ray_environment( - packages=packages, - packages_requiring_extra_phase=[["flash_attn", "mamba-ssm"]], + runtime_env = prepare_runtime_environment( + actuator_parameters=self.typed_parameters, + log=self.log, + space=space, + args=args, ) - # VV: Need HF_HOME set so the tokenize_text() method in finetune.py can access - # the same transformers cache that the fms-hf-tuning.sft_trainer.py script uses - # This is useful for handling models we download from huggingface - runtime_env["env_vars"] = runtime_env.get("env_vars", {}) - runtime_env["env_vars"]["HF_HOME"] = args.hf_home - number_cpus = max(space.number_gpus, 1) * 2 aim_metadata = { @@ -745,29 +805,6 @@ def prepare_finetune_context( extra["num_gpus"] = space.number_gpus extra["resources"] = {space.gpu_model: space.number_gpus} - # VV: Detect any extra wheels and propagate them to the job e.g. sfttrainer - import ray.runtime_context - - context = ray.get_runtime_context() - pip_config = context.runtime_env.pip_config() - uv_config = context.runtime_env.uv_config() - - packages = pip_config.get("packages", []) + uv_config.get("packages", []) - - additional_wheels = [ - x - for x in packages - # VV: Do not install the ado_core wheel. Its dependencies may conflict with those in fms-hf-tuning - if x.endswith(".whl") and not os.path.basename(x).startswith("ado_core-") - ] - - if additional_wheels: - self.log.info( - "Discovered custom wheels which will be propagated to dynamic virtual environment: " - f"{[os.path.basename(x) for x in additional_wheels]}" - ) - runtime_env["pip"]["packages"].extend(additional_wheels) - self.log.info(f"The environment is {json.dumps(runtime_env)}") log_level = None diff --git a/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/experiments/common.py b/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/experiments/common.py index 51486640..b273d2c4 100644 --- a/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/experiments/common.py +++ b/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/experiments/common.py @@ -174,6 +174,21 @@ def load_model_map() -> dict[str, dict[WeightsFormat, str]]: def get_default_measured_properties() -> list[str]: return [ + "is_valid", + "dataset_tokens_per_second_per_gpu", + "train_runtime", + "dataset_tokens_per_second", + # VV: the next 4 are inaccurate when terminating the job early + "train_samples_per_second", + "train_steps_per_second", + "train_tokens_per_second", + "train_tokens_per_gpu_per_second", + # VV: We no longer record the model_load_time + # "model_load_time", + # VV: CPU measurements + "cpu_compute_utilization", + "cpu_memory_utilization", + # VV: GPU measurements - these will be 0 when running on a machine without GPUs "gpu_compute_utilization_min", "gpu_compute_utilization_avg", "gpu_compute_utilization_max", @@ -187,19 +202,6 @@ def get_default_measured_properties() -> list[str]: "gpu_power_percent_min", "gpu_power_percent_avg", "gpu_power_percent_max", - "cpu_compute_utilization", - "cpu_memory_utilization", - "train_runtime", - # VV: the next 4 are inaccurate when terminating the job early - "train_samples_per_second", - "train_steps_per_second", - "train_tokens_per_second", - "train_tokens_per_gpu_per_second", - # VV: We no longer record the model_load_time - # "model_load_time", - "dataset_tokens_per_second", - "dataset_tokens_per_second_per_gpu", - "is_valid", ] diff --git a/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/ray_env/utils.py b/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/ray_env/utils.py index 093d4cf8..2e5282d7 100644 --- a/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/ray_env/utils.py +++ b/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/ray_env/utils.py @@ -91,17 +91,23 @@ def packages_requiring_nvidia_development_binaries(): "flash_attn", "mamba-ssm", "causal-conv1d", + # VV: mamba_ssm and causal_conv1d changed their package names + "mamba_ssm", + "causal_conv1d", "nvidia-cublas-cu12", "nvidia-cuda-cupti-cu12", "nvidia-cuda-nvrtc-cu12", "nvidia-cuda-runtime-cu12", "nvidia-cudnn-cu12", "nvidia-cufft-cu12", + "nvidia-cufile-cu12", "nvidia-curand-cu12", "nvidia-cusolver-cu12", "nvidia-cusparse-cu12", + "nvidia-cusparselt-cu12", "nvidia-nccl-cu12", "nvidia-nvjitlink-cu12", + "nvidia-nvshmem-cu12", "nvidia-nvtx-cu12", ] @@ -191,9 +197,31 @@ def find_matching_packages(package_name: str, packages: list[str]) -> list[str]: return packages +@functools.cache +def ray_version_supports_pip_install_options() -> bool | None: + import ray + + try: + # VV: Ray added support for pip_install_options in 2.50.0 + version = [int(x) for x in ray.__version__.split(".")] + return version[0] > 2 or (version[0] >= 2 and version[1] >= 50) + except Exception as e: + print( + f"Unable to tell whether pip_install_options is available for Ray Runtime environments due to {e!s} - " + f"will assume that it is unavailable" + ) + return None + + +def packages_depending_on_torch() -> list[str]: + # VV: mamba_ssm and causal_conv1d changed their package names + return ["flash_attn", "mamba-ssm", "causal-conv1d", "mamba_ssm", "causal_conv1d"] + + def get_ray_environment( packages: list[str], packages_requiring_extra_phase: list[list[str]], + env_vars: dict[str, str], ) -> dict[str, typing.Any]: """Builds a ray-environment using a Ray RuntimeEnvPlugin. @@ -211,8 +239,10 @@ def get_ray_environment( A list of lists of packages. The list with index i expects that the packages in the list with index i-1 have already been installed in the virtual environment that ray will be building. This is only used when the ordered_pip RuntimeEnvPlugin is available. Otherwise, it is ignored. + env_vars: + Environment variables to inject into the RuntimeContext Returns: - A dictionary representing a Ray environment + A dictionary representing a RuntimeContext for Ray jobs """ if is_ordered_pip_available(): env_plugin_name = "ordered_pip" @@ -228,21 +258,40 @@ def get_ray_environment( # VV: Do not switch on pip_check. plugin = {} + env = {"AIM_UI_TELEMETRY_ENABLED": "0"} + env.update(env_vars) + ray_environment = { - "env_vars": {"AIM_UI_TELEMETRY_ENABLED": "0"}, + "env_vars": env, env_plugin_name: plugin, } + pip_install_options = [] + + if ray_version_supports_pip_install_options() or env_plugin_name == "uv": + # VV: Ray added support for pip_install_options in 2.50.0 + pip_install_options = ["--no-build-isolation"] + + if env.get("PIP_FIND_LINKS"): + # VV: I find that exporting PIP_FIND_LINKS does not behave the same way as using --find-links + pip_install_options.extend(("--find-links", env["PIP_FIND_LINKS"])) + if env_plugin_name == "pip": - ray_environment["env_vars"]["PIP_NO_BUILD_ISOLATION"] = "0" - plugin.update({"packages": packages}) + phase = {"packages": packages} + + if pip_install_options: + phase["pip_install_options"] = pip_install_options + else: + ray_environment["env_vars"]["PIP_NO_BUILD_ISOLATION"] = "0" + + plugin.update(phase) elif env_plugin_name == "uv": + pip_install_options.insert(0, "--no-build-isolation") plugin.update( - {"uv_pip_install_options": ["--no-build-isolation"], "packages": packages} + {"uv_pip_install_options": pip_install_options, "packages": packages} ) elif env_plugin_name == "ordered_pip": - # VV: TODO For ray 2.49+ we can also set "pip_install_options"= ["--no-build-isolation"] - ray_environment["env_vars"]["PIP_NO_BUILD_ISOLATION"] = "0" + # VV: Keeps the linter happy base_packages = [] phases = [{"packages": base_packages}] plugin["phases"] = phases @@ -252,7 +301,10 @@ def get_ray_environment( exclude_packages=p, packages=packages ) if this_phase: - phases.append({"packages": this_phase}) + phase = {"packages": this_phase} + if ray_version_supports_pip_install_options(): + phase["pip_install_options"] = list(pip_install_options) + phases.append(phase) # VV: At this point the packages var contains all the packages that must go into the very first phase base_packages.extend(packages) diff --git a/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/tests/test_env_plugin.py b/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/tests/test_env_plugin.py index 2e3bda24..7ec143ce 100644 --- a/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/tests/test_env_plugin.py +++ b/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/tests/test_env_plugin.py @@ -21,6 +21,17 @@ def set_plugin(): del os.environ["RAY_RUNTIME_ENV_PLUGINS"] +def test_detect_support_pip_install_options(): + import ray + + version = tuple(int(x) for x in ray.__version__.split(".")) + import ado_actuators.sfttrainer.wrapper_fms_hf_tuning.tuning_versions as tv + + supported = tv.semver_cmp(version, (2, 50, 0)) >= 0 + + assert supported == utils.ray_version_supports_pip_install_options() + + def test_ray_runtime_env_with_ordered_pip_plugin(set_plugin): if not utils.is_pip_available(): pytest.skip("pip is unavailable") @@ -36,22 +47,88 @@ def test_ray_runtime_env_with_ordered_pip_plugin(set_plugin): assert utils.is_ordered_pip_available() - packages = ["torch==2.6.0", "flash_attn==2.7.4.post1", "mamba-ssm==2.2.5"] + packages = [ + "torch==2.6.0", + "flash_attn==2.7.4.post1", + "mamba-ssm==2.2.5", + ] runtime_env = utils.get_ray_environment( packages=packages, - packages_requiring_extra_phase=[["flash_attn", "mamba-ssm"]], + packages_requiring_extra_phase=[utils.packages_depending_on_torch()], + env_vars={}, ) - assert runtime_env == { - "env_vars": {"AIM_UI_TELEMETRY_ENABLED": "0", "PIP_NO_BUILD_ISOLATION": "0"}, - "ordered_pip": { - "phases": [ - {"packages": ["torch==2.6.0"]}, - {"packages": ["flash_attn==2.7.4.post1", "mamba-ssm==2.2.5"]}, - ] - }, - } + if utils.ray_version_supports_pip_install_options(): + assert runtime_env == { + "env_vars": {"AIM_UI_TELEMETRY_ENABLED": "0"}, + "ordered_pip": { + "phases": [ + {"packages": ["torch==2.6.0"]}, + { + "packages": ["flash_attn==2.7.4.post1", "mamba-ssm==2.2.5"], + "pip_install_options": ["--no-build-isolation"], + }, + ] + }, + } + else: + assert runtime_env == { + "env_vars": { + "AIM_UI_TELEMETRY_ENABLED": "0", + "PIP_NO_BUILD_ISOLATION": "0", + }, + "ordered_pip": { + "phases": [ + {"packages": ["torch==2.6.0"]}, + { + "packages": [ + "flash_attn==2.7.4.post1", + "mamba-ssm==2.2.5", + ] + }, + ] + }, + } + + +def test_pip_find_links_option(): + if not utils.is_pip_available(): + pytest.skip("pip is unavailable") + + packages = ["mamba-ssm==2.2.5"] + + wheelhouse = "file:///path/to/wheelhouse" + runtime_env = utils.get_ray_environment( + packages=packages, + packages_requiring_extra_phase=[utils.packages_depending_on_torch()], + env_vars={"PIP_FIND_LINKS": wheelhouse}, + ) + + if utils.ray_version_supports_pip_install_options(): + assert runtime_env == { + "env_vars": { + "AIM_UI_TELEMETRY_ENABLED": "0", + "PIP_FIND_LINKS": wheelhouse, + }, + "pip": { + "packages": packages, + "pip_install_options": [ + "--no-build-isolation", + "--find-links", + wheelhouse, + ], + }, + } + else: + assert runtime_env == { + "env_vars": { + "AIM_UI_TELEMETRY_ENABLED": "0", + "PIP_NO_BUILD_ISOLATION": "0", + "PIP_FIND_LINKS": wheelhouse, + }, + "pip": {"packages": packages}, + } def test_ray_runtime_env_with_vanilla_pip(): @@ -60,32 +137,57 @@ def test_ray_runtime_env_with_vanilla_pip(): assert utils.is_ordered_pip_available() is False - packages = ["torch==2.6.0", "flash_attn==2.7.4.post1", "mamba-ssm==2.2.5"] + packages = [ + "torch==2.6.0", + "flash_attn==2.7.4.post1", + "mamba-ssm==2.2.5", + ] runtime_env = utils.get_ray_environment( packages=packages, - packages_requiring_extra_phase=[["flash_attn", "mamba-ssm"]], + packages_requiring_extra_phase=[utils.packages_depending_on_torch()], + env_vars={}, ) - assert runtime_env == { - "env_vars": {"AIM_UI_TELEMETRY_ENABLED": "0", "PIP_NO_BUILD_ISOLATION": "0"}, - "pip": { - "packages": ["torch==2.6.0", "flash_attn==2.7.4.post1", "mamba-ssm==2.2.5"] - }, - } + if utils.ray_version_supports_pip_install_options(): + assert runtime_env == { + "env_vars": {"AIM_UI_TELEMETRY_ENABLED": "0"}, + "pip": { + "packages": [ + "torch==2.6.0", + "flash_attn==2.7.4.post1", + "mamba-ssm==2.2.5", + ], + "pip_install_options": ["--no-build-isolation"], + }, + } + else: + assert runtime_env == { + "env_vars": { + "AIM_UI_TELEMETRY_ENABLED": "0", + "PIP_NO_BUILD_ISOLATION": "0", + }, + "pip": { + "packages": [ + "torch==2.6.0", + "flash_attn==2.7.4.post1", + "mamba-ssm==2.2.5", + ] + }, + } def test_ordered_pip_plugin(set_plugin): - if not utils.is_nvidia_smi_available(): - pytest.skip("there's no NVIDIA gpu on this machine") + if not utils.is_pip_available(): + pytest.skip("pip is unavailable") @ray.remote( runtime_env={ "ordered_pip": { "phases": [ - ["torch==2.6.0"], + ["pyyaml"], { - "packages": ["mamba-ssm==2.2.5"], + "packages": ["filelock"], "pip_install_options": ["--no-build-isolation"], }, ] @@ -96,11 +198,23 @@ def test_ordered_pip_plugin(set_plugin): }, }, ) - def try_import_torch(): - import torch + def try_import_packages(): + import yaml + + _ = dir(yaml) + + import filelock + + _ = dir(filelock) - print(torch.__file__) - assert torch.__version__ == "2.6.0" return True - assert ray.get(try_import_torch.remote()) + import importlib.metadata + + installed_packages = importlib.metadata.distributions() + installed_packages = sorted([pkg.metadata["Name"] for pkg in installed_packages]) + + if ("pyyaml" in installed_packages) and ("filelock" in installed_packages): + pytest.skip("pyyaml and filelock are both already installed") + + assert ray.get(try_import_packages.remote()) diff --git a/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/wrapper_fms_hf_tuning/tuning_versions/__init__.py b/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/wrapper_fms_hf_tuning/tuning_versions/__init__.py index dad22470..2b3f8582 100644 --- a/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/wrapper_fms_hf_tuning/tuning_versions/__init__.py +++ b/plugins/actuators/sfttrainer/ado_actuators/sfttrainer/wrapper_fms_hf_tuning/tuning_versions/__init__.py @@ -28,8 +28,14 @@ def semver_cmp(v1: tuple[int, ...], v2: tuple[int, ...]) -> int: NotImplementedError: If the two semver versions do not consist of the same number of integers """ - if len(v1) != len(v2): - raise NotImplementedError("Cannot compare semvers of different length", v1, v2) + max_len = max(len(v1), len(v2)) + + if max_len != len(v1): + v1 = tuple(list(v1) + [0] * (max_len - len(v1))) + + if max_len != len(v2): + v1 = tuple(list(v2) + [0] * (max_len - len(v2))) + if v1 < v2: return -1 if v2 < v1: