diff --git a/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/deployment_management.py b/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/deployment_management.py new file mode 100644 index 00000000..ebd2284b --- /dev/null +++ b/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/deployment_management.py @@ -0,0 +1,80 @@ +# Copyright (c) IBM Corporation +# SPDX-License-Identifier: MIT + +import asyncio +import logging + +import ray + +from orchestrator.modules.operators.console_output import RichConsoleSpinnerMessage + +logger = logging.getLogger(__name__) + + +class DeploymentWaiter: + def __init__(self, k8s_name: str): + self.k8s_name = k8s_name + self.model_downloaded_event = asyncio.Event() + + +class DeploymentConflictManager: + def __init__(self): + self.deployments_to_wait_for: dict[str, DeploymentWaiter] = {} + self.model_already_downloaded: set[str] = set() + + def maybe_add_deployment(self, model: str, k8s_name: str) -> bool: + if ( + model in self.model_already_downloaded + or model in self.deployments_to_wait_for + ): + return False + + self.deployments_to_wait_for[model] = DeploymentWaiter(k8s_name=k8s_name) + return True + + async def wait(self, request_id: str, k8s_name: str, model: str) -> None: + waiter = self.deployments_to_wait_for.get(model, None) + # making sure a deployment does not wait for itself to be READY + if waiter is not None and waiter.k8s_name != k8s_name: + console = ray.get_actor(name="RichConsoleQueue") + while True: + console.put.remote( + message=RichConsoleSpinnerMessage( + id=request_id, + label=f"({request_id}) Waiting on deployment ({waiter.k8s_name}) to download the model required for this experiment", + state="start", + ) + ) + await waiter.model_downloaded_event.wait() + # If after we got awoken the model is not among the downloaded models, it means that + # something has gone wrong, such as the deployment we were waiting for has failed. + # If am the first to wake up let me add myself as the deployment to be waited for and stop waiting. + if ( + model not in self.model_already_downloaded + and not self.maybe_add_deployment(k8s_name=k8s_name, model=model) + ): + # If I am not the first to wake up, I get the new waiter object and continue waiting + waiter = self.deployments_to_wait_for.get(model) + continue + + console.put.remote( + message=RichConsoleSpinnerMessage( + id=request_id, + label=f"({request_id}) Done waiting for conflicting K8s deployment", + state="stop", + ) + ) + break + + def signal(self, k8s_name: str, model: str, error: bool = False) -> None: + if model not in self.deployments_to_wait_for: + return + + waiter = self.deployments_to_wait_for.pop(model) + if waiter.k8s_name != k8s_name: + raise ValueError( + f"This environment deployment ({k8s_name}) shouldn't have been created because it is conflicting with deployment {waiter.k8s_name}" + ) + if not error: + self.model_already_downloaded.add(model) + waiter.model_downloaded_event.set() diff --git a/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/env_manager.py b/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/env_manager.py index 002509a5..3fdb8e48 100644 --- a/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/env_manager.py +++ b/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/env_manager.py @@ -2,12 +2,14 @@ # SPDX-License-Identifier: MIT import asyncio -import copy import logging -import time from enum import Enum import ray +from ado_actuators.vllm_performance.deployment_management import ( + DeploymentConflictManager, +) +from ado_actuators.vllm_performance.k8s import K8sEnvironmentCreationError from ado_actuators.vllm_performance.k8s.manage_components import ( ComponentsManager, ) @@ -25,7 +27,6 @@ class EnvironmentState(Enum): """ NONE = "None" - CREATING = "creating" READY = "ready" @@ -34,32 +35,28 @@ class Environment: environment class """ - def __init__(self, model: str): + def __init__(self, model: str, configuration: str): """ Defines an environment for a model :param model: LLM model name + :param configuration: The full deployment configuration """ self.k8s_name = ComponentsYaml.get_k8s_name(model=model) self.state = EnvironmentState.NONE - self.in_use = 0 - - def update_creating(self): - val = copy.deepcopy(self) - val.state = EnvironmentState.CREATING - val.in_use = 1 - return val + self.configuration = configuration + self.model = model class EnvironmentsQueue: def __init__(self): self.environments_queue = [] - async def wait(self): + async def wait(self) -> None: wait_event = asyncio.Event() self.environments_queue.append(wait_event) await wait_event.wait() - def signal_next(self): + def signal_next(self) -> None: if len(self.environments_queue) > 0: event = self.environments_queue.pop(0) event.set() @@ -89,8 +86,10 @@ def __init__( :param pvc_name: name of the PVC to be created / used :param pvc_template: template of the PVC to be created """ - self.environments = {} + self.in_use_environments: dict[str, Environment] = {} + self.free_environments: list[Environment] = [] self.environments_queue = EnvironmentsQueue() + self.deployment_conflict_manager = DeploymentConflictManager() self.namespace = namespace self.max_concurrent = max_concurrent self.in_cluster = in_cluster @@ -106,16 +105,30 @@ def __init__( pvc_template=pvc_template, ) + def _delete_environment_k8s_resources(self, k8s_name: str) -> None: + """ + Deletes a deployment. Intended to be used for cleanup or error recovery + param: identifier: the deployment identifier + """ + try: + self.manager.delete_service(k8s_name=k8s_name) + except ApiException as e: + if e.reason != "Not Found": + raise e + try: + self.manager.delete_deployment(k8s_name=k8s_name) + except ApiException as e: + if e.reason != "Not Found": + raise e + def environment_usage(self) -> dict: - return {"max": self.max_concurrent, "in_use": len(self.environments)} + return {"max": self.max_concurrent, "in_use": self.active_environments} async def wait_for_env(self): await self.environments_queue.wait() - def get_environment( - self, model: str, definition: str, increment_usage: bool = False - ) -> Environment | None: + def get_environment(self, model: str, definition: str) -> Environment | None: """ Get an environment for definition :param model: LLM model name @@ -126,67 +139,143 @@ def get_environment( :return: environment state """ - env = self.environments.get(definition, None) + # check if there's an existing free environment satisfying the request + env = self.get_matching_free_environment(definition) if env is None: - if len(self.environments) >= self.max_concurrent: + if self.active_environments >= self.max_concurrent: # can't create more environments now, need clean up - available = False - for key, env in self.environments.items(): - if env.in_use == 0: - available = True - start = time.time() - try: - self.manager.delete_service(k8s_name=env.k8s_name) - self.manager.delete_deployment(k8s_name=env.k8s_name) - except ApiException as e: - logger.error(f"Error deleting deployment or service {e}") - del self.environments[key] - logging.info( - f"deleted environment {env.k8s_name} in {time.time() - start} sec. " - f"Environments length {len(self.environments)}" + if len(self.free_environments) == 0: + # No room for creating a new environment + logger.debug( + f"There are already {self.max_concurrent} actively in use, and I can't create a new one" + ) + return None + + # There are unused environments, let's try to evict one + environment_evicted = False + eviction_index = 0 + # Continue looping until we find one environment that can be successfully evicted or we have gone through them all + while not environment_evicted and eviction_index < len( + self.free_environments + ): + environment_to_evict = self.free_environments[eviction_index] + try: + # _delete_environment_k8s_resources will not raise an error if for whatever the reason the service + # or the deployment we are trying to delete does not exist anymore, and we assume + # the deployment was properly deleted. + self._delete_environment_k8s_resources( + k8s_name=environment_to_evict.k8s_name ) - time.sleep(3) - break - if not available: + except ApiException as e: + # If we can't delete this environment we try with the next one, but we do not + # delete the current env from the free list. This is to avoid spawning more pods than the maximum configured + # in the case the failing ones are still running. + # Since the current eviction candidate environment will stay in the free ones, some other measurement might + # try to evict again and perhaps succeed (e.g., connection restored to the cluster). + logger.critical( + f"Error deleting deployment or service {environment_to_evict.k8s_name}: {e}" + ) + eviction_index += 1 + continue + + logger.info( + f"deleted environment {environment_to_evict.k8s_name}. " + f"Active environments {self.active_environments}" + ) + environment_evicted = True + + if environment_evicted: + # successfully deleted an environment + self.free_environments.pop(eviction_index) + elif len(self.in_use_environments) > 0: + # all the free ones have failed deleting but there is one or more in use that + # might make room for waiting measurements. In this case we just behave as if there + # are no free available environments and we wait. return None - # mark new one - env = Environment(model=model) - self.environments[definition] = env.update_creating() - return env - if increment_usage: - env = self.environments.get(definition) - env.in_use += 1 - self.environments[definition] = env + else: + # None of the free environments could be evicted due to errors and none are in use + # To avoid a deadlock of the operation we fail the measurement + raise K8sEnvironmentCreationError( + "All free environments failed deleting and none are currently in use." + ) + + # We either made space or we had enough space already + env = Environment(model=model, configuration=definition) + logger.debug(f"New environment created for definition {definition}") + + # If deployments target the same model and the model is not in the HF cache, they would all try to download it. + # This can lead to corruption of the HF cache data (shared PVC). + # To avoid this situation, we keep track of the models downloaded by the actuator during the current operation. + # If a deployment wants to download a model for the first time, we do not allow other deployment using the + # same model to start in parallel. + # Once the very first download of a model is done we let any number of deployments using the same model to start + # in parallel as they would only read the model from the cache. + self.deployment_conflict_manager.maybe_add_deployment( + k8s_name=env.k8s_name, model=model + ) + + self.in_use_environments[env.k8s_name] = env + return env - def get_experiment_pvc_name(self): + @property + def active_environments(self) -> int: + return len(self.in_use_environments) + len(self.free_environments) + + def get_experiment_pvc_name(self) -> str: return self.manager.pvc_name - def done_creating(self, definition: str) -> None: + def done_creating(self, identifier: str) -> None: """ Report creation - :param definition: environment definition + :param identifier: environment identifier :return: None """ - env = self.environments.get(definition, None) - if env is None: - return - env.state = EnvironmentState.READY - self.environments[definition] = env + self.in_use_environments[identifier].state = EnvironmentState.READY + model = self.in_use_environments[identifier].model + + self.deployment_conflict_manager.signal(k8s_name=identifier, model=model) + + def cleanup_failed_deployment(self, identifier: str) -> None: + env = self.in_use_environments[identifier] + self._delete_environment_k8s_resources(k8s_name=identifier) + self.done_using(identifier=identifier, reclaim_on_completion=True) + self.deployment_conflict_manager.signal( + k8s_name=identifier, model=env.model, error=True + ) - def done_using(self, definition: str) -> None: + def get_matching_free_environment(self, configuration: str) -> Environment | None: + """ + Find a deployment matching a deployment configuration + :param configuration: The deployment configuration to match + :return: An already existing deployment or None + """ + for id, env in enumerate(self.free_environments): + if env.configuration == configuration: + del self.free_environments[id] + return env + return None + + async def wait_deployment_before_starting( + self, env: Environment, request_id: str + ) -> None: + await self.deployment_conflict_manager.wait( + request_id=request_id, k8s_name=env.k8s_name, model=env.model + ) + + def done_using(self, identifier: str, reclaim_on_completion: bool = False) -> None: """ Report test completion :param definition: environment definition + :param reclaim_on_completion: flag to indicate the environment is to be completely removed and not freed for later use :return: None """ - env = self.environments.get(definition) - if env is None: - return - env.in_use -= 1 - self.environments[definition] = env + env = self.in_use_environments.pop(identifier) + if not reclaim_on_completion: + self.free_environments.append(env) - # If another measurement is waiting in queue we wake it up + # Wake up any other deployment waiting in the queue for a + # free environment. self.environments_queue.signal_next() def cleanup(self) -> None: @@ -195,17 +284,10 @@ def cleanup(self) -> None: :return: None """ logger.info("Cleaning environments") - for env in self.environments.values(): - try: - self.manager.delete_service(k8s_name=env.k8s_name) - except ApiException as e: - if e.reason != "Not Found": - raise e - try: - self.manager.delete_deployment(k8s_name=env.k8s_name) - except ApiException as e: - if e.reason != "Not Found": - raise e + all_envs = list(self.in_use_environments.values()) + self.free_environments + for env in all_envs: + self._delete_environment_k8s_resources(k8s_name=env.k8s_name) + # We only delete the PVC if it was created by this actuator if self.manager.pvc_created: logger.debug("Deleting PVC") diff --git a/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/experiment_executor.py b/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/experiment_executor.py index c7d46337..49efc294 100644 --- a/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/experiment_executor.py +++ b/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/experiment_executor.py @@ -3,7 +3,6 @@ import json import logging -import math import subprocess import time @@ -16,6 +15,10 @@ EnvironmentManager, EnvironmentState, ) +from ado_actuators.vllm_performance.k8s import ( + K8sConnectionError, + K8sEnvironmentCreationError, +) from ado_actuators.vllm_performance.k8s.create_environment import ( create_test_environment, ) @@ -41,14 +44,6 @@ logger = logging.getLogger(__name__) -class K8EnvironmentCreationError(Exception): - """Error raised when K8 environment cannot be created for some reason""" - - -class K8ConnectionError(Exception): - """Error raised when there is an issue connecting to K8s or a service its hosting""" - - def _build_entity_env(values: dict[str, str]) -> str: """ This is the list of entity parameters that define the environment: @@ -109,13 +104,12 @@ def _create_environment( :param timeout: timeout :return: kubernetes environment name - :raises K8EnvironmentCreationError if there was an issue + :raises K8sEnvironmentCreationError if there was an issue - If the creation step fails after three attempts - If after creation the environment was not in ready state after timeout seconds (1200 default) """ from orchestrator.modules.operators.console_output import ( - RichConsoleProgressMessage, RichConsoleSpinnerMessage, ) @@ -136,11 +130,12 @@ def _create_environment( ) while True: - env: Environment = ray.get( - env_manager.get_environment.remote( - model=model, definition=definition, increment_usage=True + try: + env: Environment = ray.get( + env_manager.get_environment.remote(model=model, definition=definition) ) - ) + except Exception as e: + raise e if env is not None: console.put.remote( message=RichConsoleSpinnerMessage( @@ -170,6 +165,14 @@ def _create_environment( # Environment does not exist, create it logger.debug(f"Environment {env.k8s_name} does not exist. Creating it") tmout = 1 + + # To avoid data corruption we wait if another environment is concurrently downloading the same model for the first time + ray.get( + env_manager.wait_deployment_before_starting.remote( + env=env, request_id=request_id + ) + ) + for attempt in range(3): console.put.remote( message=RichConsoleSpinnerMessage( @@ -205,9 +208,11 @@ def _create_environment( reuse_deployment=False, namespace=actuator.namespace, pvc_name=pvc_name, + check_interval=check_interval, + timeout=timeout, ) # Update manager - env_manager.done_creating.remote(definition=definition) + env_manager.done_creating.remote(identifier=env.k8s_name) error = None break except Exception as e: @@ -238,68 +243,20 @@ def _create_environment( state="stop", ) ) - raise K8EnvironmentCreationError( - f"Failed to create test environment {env.k8s_name}: {error}" - ) - - case EnvironmentState.CREATING: - # Someone is creating environment, wait till its ready - logger.info( - f"Environment {env.k8s_name} is being created. Waiting for it to be ready." - ) - console.put.remote( - message=RichConsoleProgressMessage( - id=request_id, - label=f"({request_id}) vLLM deployment {env.k8s_name} is starting. Waiting for it to be ready ...", - progress=0, - ) - ) - n_checks = math.ceil(timeout / check_interval) - for i in range(n_checks): - time.sleep(check_interval) - env = ray.get( - env_manager.get_environment.remote( - model=model, definition=definition - ) - ) - if env.state == EnvironmentState.READY: - break - console.put.remote( - message=RichConsoleProgressMessage( - id=request_id, - label=f"({request_id}) vLLM deployment, {env.k8s_name} is starting. Waiting for it to be ready ...", - progress=i * int(100 / n_checks), + # In case of failure creating the environment deployment we must release any + # other request with a deployment conflicting with this request's deployment + # We also need to release the slot for this environment + ray.get( + env_manager.cleanup_failed_deployment.remote( + identifier=env.k8s_name ) ) - if env.state != EnvironmentState.READY: - # timed out waiting for environment creation - console.put.remote( - message=RichConsoleProgressMessage( - id=request_id, - label=f"({request_id}) Timed out waiting for {env.k8s_name} to be ready. Aborting", - progress=100, - ) - ) - error = f"({request_id}) Timed out waiting for environment to get ready. Timeout {timeout}" - raise K8EnvironmentCreationError( + raise K8sEnvironmentCreationError( f"Failed to create test environment {env.k8s_name}: {error}" ) - console.put.remote( - message=RichConsoleProgressMessage( - id=request_id, - label=f"vLLM deployment, {env.k8s_name} is ready", - progress=100, - ) - ) - - logger.debug("Environment is created, using it") - case _: - # environment exists, use it - logger.debug(f"Environment {env.k8s_name} already exists. Reusing it") - return env.k8s_name, definition @@ -345,19 +302,30 @@ def _connect_to_vllm_server( pf = None else: # we are running locally. need to do port-forward and connect to the local one - pf_command = f"kubectl port-forward svc/{k8s_name} -n {actuator_parameters.namespace} {port}:80 2>&1 >/dev/null" + pf_command_args = [ + "kubectl", + "port-forward", + f"svc/{k8s_name}", + "-n", + f"{actuator_parameters.namespace}", + f"{port}:80", + ] try: - pf = subprocess.Popen(pf_command, shell=True) + pf = subprocess.Popen( + pf_command_args, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) # make sure that port forwarding is up time.sleep(5) # Check if there is a returncode- if there is it means port-forward exited if pf.returncode: - raise K8ConnectionError( + raise K8sConnectionError( f"failed to start port forward to service {k8s_name} - port-forward command exited for unknown reason. Check logs." ) except Exception as e: logger.warning(f"failed to start port forward to service {k8s_name} - {e}") - raise K8ConnectionError( + raise K8sConnectionError( f"failed to start port forward to service {k8s_name} - {e}" ) @@ -416,7 +384,7 @@ def run_resource_and_workload_experiment( logger.info(f"Creating K8s environment for {entity.identifier}") - # Will raise an K8EnvironmentCreationError if the environment could not be created + # Will raise an K8sEnvironmentCreationError if the environment could not be created k8s_name, definition = _create_environment( values=values, actuator=actuator_parameters, @@ -425,7 +393,7 @@ def run_resource_and_workload_experiment( request_id=request.requestid, ) - # Will raise an K8ConnectionError if a port-forward was required + # Will raise an K8sConnectionError if a port-forward was required # but could not be created current_port += 1 base_url, port_forward = _connect_to_vllm_server( @@ -465,8 +433,8 @@ def run_resource_and_workload_experiment( ) except ( - K8EnvironmentCreationError, - K8ConnectionError, + K8sEnvironmentCreationError, + K8sConnectionError, VLLMBenchmarkError, ) as error: logger.error(f"Error running tests for entity {entity.identifier}: {error}") @@ -512,7 +480,7 @@ def run_resource_and_workload_experiment( if port_forward is not None: port_forward.kill() if definition is not None: - env_manager.done_using.remote(definition=definition) + env_manager.done_using.remote(identifier=k8s_name) # For multi entity experiments if ONE entity had ValidResults the status must be SUCCESS if len(measurements) > 0: diff --git a/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/k8s/create_environment.py b/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/k8s/create_environment.py index 2137d37c..4a1f5489 100644 --- a/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/k8s/create_environment.py +++ b/plugins/actuators/vllm_performance/ado_actuators/vllm_performance/k8s/create_environment.py @@ -38,6 +38,8 @@ def create_test_environment( reuse_service: bool = True, reuse_deployment: bool = True, namespace: str = "vllm-testing", + check_interval: int = 5, + timeout: int = 1200, ) -> None: """ Create test deployment @@ -64,6 +66,8 @@ def create_test_environment( :param hf_token: huggingface token :param reuse_service: flag to reuse deployment :param reuse_deployment: flag to reuse deployment + :param check_interval: wait interval in seconds + :param timeout: timeout in seconds :return: """ logger.info(f"Creating environment in ns {namespace} with the parameters: ") @@ -112,7 +116,11 @@ def create_test_environment( reuse=reuse_deployment, ) logger.debug("deployment created") - c_manager.wait_deployment_ready(k8s_name=k8s_name) + c_manager.wait_deployment_ready( + k8s_name=k8s_name, + check_interval=check_interval, + timeout=timeout, + ) logger.info("deployment ready") # service c_manager.create_service(