From ff36abb775928a03a0e060e887316fe518e4261d Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Fri, 3 Mar 2023 10:24:07 -0800 Subject: [PATCH] =?UTF-8?q?Revert=20"[serve]=20Replace=20Ray=20Client=20wi?= =?UTF-8?q?th=20Ray=20Job=20Submission=20for=20`serve=E2=80=A6=20(#32976)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Previously, if connecting to a local cluster, `serve run` will directly call `ray.init()` and `serve.run` - With the change in #30913, even when connecting to a local cluster, `serve run` would submit a Ray Job to the local cluster and then call `ray.init()` and `serve.run()` inside that job. This caused the issue https://github.com/ray-project/ray/issues/32881, where if a user ran `serve run` inside a Job (or any other process with a runtime environment), they should expect to have access to the dependencies installed in that runtime environment. However since `serve run` goes through another layer of Job submission, they don't. Signed-off-by: Jack He --- doc/source/serve/dev-workflow.md | 12 +- python/ray/serve/run_script.py | 124 --------------------- python/ray/serve/scripts.py | 169 +++++++++++++++++------------ python/ray/serve/tests/test_cli.py | 25 ++--- 4 files changed, 116 insertions(+), 214 deletions(-) delete mode 100644 python/ray/serve/run_script.py diff --git a/doc/source/serve/dev-workflow.md b/doc/source/serve/dev-workflow.md index 7c93a7c7c5e1a..e75caa56b7219 100644 --- a/doc/source/serve/dev-workflow.md +++ b/doc/source/serve/dev-workflow.md @@ -61,10 +61,6 @@ serve run local_dev:graph The `serve run` command blocks the terminal and can be canceled with Ctrl-C. Typically, `serve run` should not be run simultaneously from multiple terminals, unless each `serve run` is targeting a separate running Ray cluster. -:::{note} -If you already have a local Ray Cluster running before executing `serve run`, make sure that the path to your Serve app is accessible from the working directory in which you started the Ray Cluster using `ray start --head`. Otherwise, you can pass in `app-dir` or `working-dir` when executing `serve run`. See the documentation for [serve run](serve_cli.html#serve-run) for more details. -::: - Now that Serve is running, we can send HTTP requests to the application. For simplicity, we'll just use the `curl` command to send requests from another terminal. @@ -85,17 +81,17 @@ Note that rerunning `serve run` will redeploy all deployments. To prevent redepl ## Testing on a remote cluster -To test on a remote cluster, you'll use `serve run` again, but this time you'll pass in an `--address` argument to specify the address of the Ray cluster to connect to. For remote clusters, this address has the form `http://:8265` and will be passed to Ray Job Submission; see [Ray Jobs](jobs-overview) for more information. +To test on a remote cluster, you'll use `serve run` again, but this time you'll pass in an `--address` argument to specify the address of the Ray cluster to connect to. For remote clusters, this address has the form `ray://:10001`; see [Ray Client](ray-client-ref) for more information. When making the transition from your local machine to a remote cluster, you'll need to make sure your cluster has a similar environment to your local machine--files, environment variables, and Python packages, for example. Let's see a simple example that just packages the code. Run the following command on your local machine, with your remote cluster head node IP address substituted for `` in the command: ```bash -serve run --address=http://:8265 --working_dir="./project/src" local_dev:graph +serve run --address=ray://:10001 --working_dir="./project/src" local_dev:graph ``` -This will upload the `working_dir` directory to the remote cluster and run your Serve application as a Ray Job on the remote cluster. Here, the local directory specified by `working_dir` must contain `local_dev.py` so that it can be uploaded to the cluster and imported by Ray Serve. +This will connect to the remote cluster via Ray Client, upload the `working_dir` directory, and run your serve application. Here, the local directory specified by `working_dir` must contain `local_dev.py` so that it can be uploaded to the cluster and imported by Ray Serve. Once this is up and running, we can send requests to the application: @@ -107,7 +103,7 @@ curl -X PUT http://:8000/?name=Ray For more complex dependencies, including files outside the working directory, environment variables, and Python packages, you can use {ref}`Runtime Environments`. Here is an example using the --runtime-env-json argument: ```bash -serve run --address=http://:8265 --runtime-env-json='{"env_vars": {"MY_ENV_VAR": "my-value"}, "working_dir": "./project/src", "pip": ["requests", "chess"]}' local_dev:graph +serve run --address=ray://:10001 --runtime-env-json='{"env_vars": {"MY_ENV_VAR": "my-value"}, "working_dir": "./project/src", "pip": ["requests", "chess"]}' local_dev:graph ``` You can also specify the `runtime_env` via a YAML file; see [serve run](serve_cli.html#serve-run) for details. diff --git a/python/ray/serve/run_script.py b/python/ray/serve/run_script.py deleted file mode 100644 index 47c44bd6366b8..0000000000000 --- a/python/ray/serve/run_script.py +++ /dev/null @@ -1,124 +0,0 @@ -import argparse -import pathlib -import sys -import time -import traceback -import yaml - -from ray import serve -from ray._private.utils import import_attr -from ray.autoscaler._private.cli_logger import cli_logger -from ray.serve._private import api as _private_api -from ray.serve._private.constants import ( - DEFAULT_HTTP_HOST, - DEFAULT_HTTP_PORT, -) -from ray.serve.schema import ServeApplicationSchema - - -def main(): - """ - This is the Job that gets submitted to the Ray Cluster when `serve run` is executed. - - Loads the Serve app (either from a YAML config file or a direct import path), starts - Serve and runs the app. By default, the code blocks until a SIGINT signal is - received, at which point Serve is shutdown and the process exits. - """ - parser = argparse.ArgumentParser() - parser.add_argument("--config-or-import-path") - parser.add_argument("--app-dir") - parser.add_argument("--host") - parser.add_argument("--port", type=int) - parser.add_argument("--blocking", action="store_true") - parser.add_argument("--gradio", action="store_true") - args = parser.parse_args() - host, port = args.host, args.port - - sys.path.insert(0, args.app_dir) - if pathlib.Path(args.config_or_import_path).is_file(): - config_path = args.config_or_import_path - cli_logger.print(f"Deploying from config file: '{config_path}'.") - - with open(config_path, "r") as config_file: - config_dict = yaml.safe_load(config_file) - # If host or port is specified as a CLI argument, they should take priority - # over config values. - config_dict.setdefault("host", DEFAULT_HTTP_HOST) - if host is not None: - config_dict["host"] = host - - config_dict.setdefault("port", DEFAULT_HTTP_PORT) - if port is not None: - config_dict["port"] = port - - config = ServeApplicationSchema.parse_obj(config_dict) - is_config = True - else: - if host is None: - host = DEFAULT_HTTP_HOST - if port is None: - port = DEFAULT_HTTP_PORT - import_path = args.config_or_import_path - cli_logger.print(f"Deploying from import path: '{import_path}'.") - node = import_attr(import_path) - is_config = False - - if is_config: - client = _private_api.serve_start( - detached=True, - http_options={ - "host": config.host, - "port": config.port, - "location": "EveryNode", - }, - ) - else: - client = _private_api.serve_start( - detached=True, - http_options={ - "host": host, - "port": port, - "location": "EveryNode", - }, - ) - - try: - if is_config: - client.deploy_apps(config, _blocking=args.gradio) - cli_logger.success("Submitted deploy config successfully.") - if args.gradio: - handle = serve.get_deployment("DAGDriver").get_handle() - else: - handle = serve.run(node, host=host, port=port) - cli_logger.success("Deployed Serve app successfully.") - - if args.gradio: - from ray.serve.experimental.gradio_visualize_graph import ( - GraphVisualizer, - ) - - visualizer = GraphVisualizer() - visualizer.visualize_with_gradio(handle) - else: - if args.blocking: - while True: - # Block, letting Ray print logs to the terminal. - time.sleep(10) - - except KeyboardInterrupt: - cli_logger.info("Got KeyboardInterrupt, shutting down...") - serve.shutdown() - sys.exit() - - except Exception: - traceback.print_exc() - cli_logger.error( - "Received unexpected error, see console logs for more details. Shutting " - "down..." - ) - serve.shutdown() - sys.exit() - - -if __name__ == "__main__": - main() diff --git a/python/ray/serve/scripts.py b/python/ray/serve/scripts.py index ec5cfee81a8d2..3ff840ac5bfcd 100644 --- a/python/ray/serve/scripts.py +++ b/python/ray/serve/scripts.py @@ -1,12 +1,13 @@ #!/usr/bin/env python -import asyncio import os -import signal +import pathlib import sys +import time from typing import Optional, Union import click import yaml +import traceback import re import ray @@ -15,8 +16,6 @@ from ray.autoscaler._private.cli_logger import cli_logger from ray.dashboard.modules.dashboard_sdk import parse_runtime_env_args from ray.dashboard.modules.serve.sdk import ServeSubmissionClient -from ray.job_submission import JobSubmissionClient -from ray.serve import run_script from ray.serve.api import build as build_app from ray.serve.config import DeploymentMode from ray.serve._private.constants import ( @@ -27,11 +26,17 @@ from ray.serve.deployment import deployment_to_schema from ray.serve.deployment_graph import ClassNode, FunctionNode from ray.serve.schema import ServeApplicationSchema +from ray.serve._private import api as _private_api +APP_DIR_HELP_STR = ( + "Local directory to look for the IMPORT_PATH (will be inserted into " + "PYTHONPATH). Defaults to '.', meaning that an object in ./main.py " + "can be imported as 'main.object'. Not relevant if you're importing " + "from an installed module." +) RAY_INIT_ADDRESS_HELP_STR = ( - "Address of the Ray Cluster to run the Serve app on. If no address is specified, " - "a local Ray Cluster will be started. Can also be specified using the RAY_ADDRESS " - "environment variable." + "Address to use for ray.init(). Can also be specified " + "using the RAY_ADDRESS environment variable." ) RAY_DASHBOARD_ADDRESS_HELP_STR = ( "Address to use to query the Ray dashboard agent (defaults to " @@ -185,15 +190,12 @@ def deploy(config_file_name: str, address: str): @cli.command( short_help="Run a Serve app.", help=( - "Runs a Serve app (specified in config_or_import_path) on a cluster as a Ray " - "Job. config_or_import_path is either a filepath to a YAML config file on the " - "Ray Cluster, or an import path on the Ray Cluster for a deployment node of " - "the pattern containing_module:deployment_node.\n\n" + "Runs the Serve app from the specified import path (e.g. " + "my_script:my_bound_deployment) or YAML config.\n\n" "If using a YAML config, existing deployments with no code changes " "will not be redeployed.\n\n" - "Any import path, whether directly specified as the command argument or " - "inside a config file, must lead to a FunctionNode or ClassNode object.\n\n" - "By default, this command will block and periodically log status. If you " + "Any import path must lead to a FunctionNode or ClassNode object. " + "By default, this will block and periodically log status. If you " "Ctrl-C the command, it will tear down the app." ), ) @@ -204,7 +206,7 @@ def deploy(config_file_name: str, address: str): default=None, required=False, help="Path to a local YAML file containing a runtime_env definition. " - "This will be passed to Ray Jobs as the default for deployments.", + "This will be passed to ray.init() as the default for deployments.", ) @click.option( "--runtime-env-json", @@ -212,7 +214,7 @@ def deploy(config_file_name: str, address: str): default=None, required=False, help="JSON-serialized runtime_env dictionary. This will be passed to " - "Ray Jobs as the default for deployments.", + "ray.init() as the default for deployments.", ) @click.option( "--working-dir", @@ -223,7 +225,7 @@ def deploy(config_file_name: str, address: str): "Directory containing files that your job will run in. Can be a " "local directory or a remote URI to a .zip file (S3, GS, HTTP). " "This overrides the working_dir in --runtime-env if both are " - "specified. This will be passed to Ray Jobs as the default for " + "specified. This will be passed to ray.init() as the default for " "deployments." ), ) @@ -232,12 +234,7 @@ def deploy(config_file_name: str, address: str): "-d", default=".", type=str, - help=( - "Directory on the Ray Cluster in which to look for the IMPORT_PATH (will be " - "inserted into PYTHONPATH). Defaults to '.', i.e. a deployment node `app_node` " - "in working_directory/main.py on the Ray Cluster can be run using " - "`main:app_node`. Not relevant if you're importing from an installed module." - ), + help=APP_DIR_HELP_STR, ) @click.option( "--address", @@ -290,55 +287,94 @@ def run( blocking: bool, gradio: bool, ): - # If no address is given and no local ray instance is running, we want to start one. - if address is None: - ray.init(namespace=SERVE_NAMESPACE) + sys.path.insert(0, app_dir) final_runtime_env = parse_runtime_env_args( runtime_env=runtime_env, runtime_env_json=runtime_env_json, working_dir=working_dir, ) - if "env_vars" not in final_runtime_env: - final_runtime_env["env_vars"] = {} - # Send interrupt signal to run_script, which triggers shutdown of Serve. - final_runtime_env["env_vars"]["RAY_JOB_STOP_SIGNAL"] = "SIGINT" - # Make sure Serve is shutdown correctly before the job is forcefully killed. - final_runtime_env["env_vars"]["RAY_JOB_STOP_WAIT_TIME_S"] = "30" - - # The job to run on the cluster, which imports and runs the serve app. - with open(run_script.__file__, "r") as f: - script = f.read() - - # Use Ray Job Submission to run serve. - client = JobSubmissionClient(address) - submission_id = client.submit_job( - entrypoint=( - f"python -c '{script}' " - f"--config-or-import-path={config_or_import_path} " - f"--app-dir={app_dir} " - + (f"--host={host} " if host is not None else "") - + (f"--port={port} " if port is not None else "") - + ("--blocking " if blocking else "") - + ("--gradio " if gradio else "") - ), - # Setting the runtime_env will set defaults for the deployments. - runtime_env=final_runtime_env, - ) - async def print_logs(): - async for lines in client.tail_job_logs(submission_id): - print(lines, end="") + if pathlib.Path(config_or_import_path).is_file(): + config_path = config_or_import_path + cli_logger.print(f'Deploying from config file: "{config_path}".') - def interrupt_handler(): - # Upon keyboard interrupt, stop job (which sends an interrupt signal to the job - # and shuts down serve). Then continue to stream logs until the job finishes. - client.stop_job(submission_id) + with open(config_path, "r") as config_file: + config_dict = yaml.safe_load(config_file) + # If host or port is specified as a CLI argument, they should take priority + # over config values. + config_dict.setdefault("host", DEFAULT_HTTP_HOST) + if host is not None: + config_dict["host"] = host - loop = asyncio.get_event_loop() - loop.add_signal_handler(signal.SIGINT, interrupt_handler) - loop.run_until_complete(print_logs()) - loop.close() + config_dict.setdefault("port", DEFAULT_HTTP_PORT) + if port is not None: + config_dict["port"] = port + + config = ServeApplicationSchema.parse_obj(config_dict) + is_config = True + else: + if host is None: + host = DEFAULT_HTTP_HOST + if port is None: + port = DEFAULT_HTTP_PORT + import_path = config_or_import_path + cli_logger.print(f'Deploying from import path: "{import_path}".') + node = import_attr(import_path) + is_config = False + + # Setting the runtime_env here will set defaults for the deployments. + ray.init(address=address, namespace=SERVE_NAMESPACE, runtime_env=final_runtime_env) + + if is_config: + client = _private_api.serve_start( + detached=True, + http_options={ + "host": config.host, + "port": config.port, + "location": "EveryNode", + }, + ) + else: + client = _private_api.serve_start( + detached=True, + http_options={"host": host, "port": port, "location": "EveryNode"}, + ) + + try: + if is_config: + client.deploy_apps(config, _blocking=gradio) + cli_logger.success("Submitted deploy config successfully.") + if gradio: + handle = serve.get_deployment("DAGDriver").get_handle() + else: + handle = serve.run(node, host=host, port=port) + cli_logger.success("Deployed Serve app successfully.") + + if gradio: + from ray.serve.experimental.gradio_visualize_graph import GraphVisualizer + + visualizer = GraphVisualizer() + visualizer.visualize_with_gradio(handle) + else: + if blocking: + while True: + # Block, letting Ray print logs to the terminal. + time.sleep(10) + + except KeyboardInterrupt: + cli_logger.info("Got KeyboardInterrupt, shutting down...") + serve.shutdown() + sys.exit() + + except Exception: + traceback.print_exc() + cli_logger.error( + "Received unexpected error, see console logs for more details. Shutting " + "down..." + ) + serve.shutdown() + sys.exit() @cli.command(help="Get the current config of the running Serve app.") @@ -434,12 +470,7 @@ def shutdown(address: str, yes: bool): "-d", default=".", type=str, - help=( - "Local directory to look for the IMPORT_PATH (will be inserted into " - "PYTHONPATH). Defaults to '.', meaning that an object in ./main.py " - "can be imported as 'main.object'. Not relevant if you're importing " - "from an installed module." - ), + help=APP_DIR_HELP_STR, ) @click.option( "--kubernetes_format", diff --git a/python/ray/serve/tests/test_cli.py b/python/ray/serve/tests/test_cli.py index 81a571cd62c41..7f94de77f2e25 100644 --- a/python/ray/serve/tests/test_cli.py +++ b/python/ray/serve/tests/test_cli.py @@ -357,8 +357,7 @@ def parrot(request): @pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.") -@pytest.mark.parametrize("address", ["auto", "http://127.0.0.1:8265"]) -def test_run_application(ray_start_stop, address): +def test_run_application(ray_start_stop): """Deploys valid config file and import path via `serve run`.""" # Deploy via config file @@ -367,7 +366,7 @@ def test_run_application(ray_start_stop, address): ) print('Running config file "arithmetic.yaml".') - p = subprocess.Popen(["serve", "run", f"--address={address}", config_file_name]) + p = subprocess.Popen(["serve", "run", "--address=auto", config_file_name]) wait_for_condition( lambda: requests.post("http://localhost:8000/", json=["ADD", 0]).json() == 1, timeout=15, @@ -387,7 +386,7 @@ def test_run_application(ray_start_stop, address): print('Running node at import path "ray.serve.tests.test_cli.parrot_node".') # Deploy via import path p = subprocess.Popen( - ["serve", "run", f"--address={address}", "ray.serve.tests.test_cli.parrot_node"] + ["serve", "run", "--address=auto", "ray.serve.tests.test_cli.parrot_node"] ) wait_for_condition( lambda: ping_endpoint("parrot", params="?sound=squawk") == "squawk" @@ -418,8 +417,7 @@ def __call__(self): @pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.") -@pytest.mark.parametrize("address", ["auto", "http://127.0.0.1:8265"]) -def test_run_deployment_node(ray_start_stop, address): +def test_run_deployment_node(ray_start_stop): """Test `serve run` with bound args and kwargs.""" # Deploy via import path @@ -427,11 +425,11 @@ def test_run_deployment_node(ray_start_stop, address): [ "serve", "run", - f"--address={address}", + "--address=auto", "ray.serve.tests.test_cli.molly_macaw", ] ) - wait_for_condition(lambda: ping_endpoint("Macaw") == "Molly is green!", timeout=15) + wait_for_condition(lambda: ping_endpoint("Macaw") == "Molly is green!", timeout=10) p.send_signal(signal.SIGINT) p.wait() assert ping_endpoint("Macaw") == CONNECTION_ERROR_MSG @@ -447,8 +445,7 @@ def __call__(self, *args): @pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.") -@pytest.mark.parametrize("address", ["auto", "http://127.0.0.1:8265"]) -def test_run_runtime_env(ray_start_stop, address): +def test_run_runtime_env(ray_start_stop): """Test `serve run` with runtime_env passed in.""" # With import path @@ -456,14 +453,14 @@ def test_run_runtime_env(ray_start_stop, address): [ "serve", "run", - f"--address={address}", + "--address=auto", "ray.serve.tests.test_cli.metal_detector_node", "--runtime-env-json", ('{"env_vars": {"buried_item": "lucky coin"} }'), ] ) wait_for_condition( - lambda: ping_endpoint("MetalDetector") == "lucky coin", timeout=15 + lambda: ping_endpoint("MetalDetector") == "lucky coin", timeout=10 ) p.send_signal(signal.SIGINT) p.wait() @@ -473,7 +470,7 @@ def test_run_runtime_env(ray_start_stop, address): [ "serve", "run", - f"--address={address}", + "--address=auto", os.path.join( os.path.dirname(__file__), "test_config_files", @@ -550,12 +547,14 @@ def test_run_teardown(ray_start_stop): """Consecutive serve runs should tear down controller so logs can always be seen.""" logs = subprocess.check_output( ["serve", "run", "ray.serve.tests.test_cli.constructor_failure_node"], + stderr=subprocess.STDOUT, timeout=30, ).decode() assert "Intentionally failing." in logs logs = subprocess.check_output( ["serve", "run", "ray.serve.tests.test_cli.constructor_failure_node"], + stderr=subprocess.STDOUT, timeout=30, ).decode() assert "Intentionally failing." in logs