diff --git a/dagster_ray/kuberay/constants.py b/dagster_ray/kuberay/constants.py new file mode 100644 index 0000000..ddf26d9 --- /dev/null +++ b/dagster_ray/kuberay/constants.py @@ -0,0 +1,10 @@ +import logging + +# Group, Version, Plural +GROUP = "ray.io" +VERSION = "v1alpha1" +PLURAL = "rayclusters" +KIND = "RayCluster" + +# log level +LOGLEVEL = logging.INFO diff --git a/dagster_ray/kuberay/ray_cluster_api.py b/dagster_ray/kuberay/ray_cluster_api.py index 6eb5643..3ac74f6 100644 --- a/dagster_ray/kuberay/ray_cluster_api.py +++ b/dagster_ray/kuberay/ray_cluster_api.py @@ -1,17 +1,36 @@ +import logging import time from typing import Any, Optional from kubernetes import client, config from kubernetes.client import ApiException -from python_client import constants -from python_client.kuberay_cluster_api import RayClusterApi, log +import dagster_ray.kuberay.constants as constants -class PatchedRayClusterApi(RayClusterApi): +log = logging.getLogger(__name__) + + +if logging.getLevelName(log.level) == "NOTSET": + logging.basicConfig(format="%(asctime)s %(message)s", level=constants.LOGLEVEL) + + +class RayClusterApi: """ + Taken from https://github.com/ray-project/kuberay/blob/master/clients/python-client/python_client/kuberay_cluster_api.py + This project is not published to PyPI, so it's impossible to install it as a dependency. + List of modifications to the original RayClusterApi: - allow passing config_file and context to __init__\ - fixed get_ray_cluster_status hard-querying 'status' field which is not always present + + RayClusterApi provides APIs to list, get, create, build, update, delete rayclusters. + + Methods: + - list_ray_clusters(k8s_namespace: str = "default", async_req: bool = False) -> Any: + - get_ray_cluster(name: str, k8s_namespace: str = "default") -> Any: + - create_ray_cluster(body: Any, k8s_namespace: str = "default") -> Any: + - delete_ray_cluster(name: str, k8s_namespace: str = "default") -> bool: + - patch_ray_cluster(name: str, ray_patch: Any, k8s_namespace: str = "default") -> Any: """ def __init__(self, config_file: Optional[str], context: Optional[str] = None): @@ -48,3 +67,185 @@ def get_ray_cluster_status( log.info("raycluster {} status not set yet, timing out...".format(name)) return None + + def __del__(self): + self.api = None + self.kube_config = None + + def list_ray_clusters( + self, k8s_namespace: str = "default", label_selector: str = "", async_req: bool = False + ) -> Any: + """List Ray clusters in a given namespace. + + Parameters: + - k8s_namespace (str, optional): The namespace in which to list the Ray clusters. Defaults to "default". + - async_req (bool, optional): Whether to make the request asynchronously. Defaults to False. + + Returns: + Any: The custom resource for Ray clusters in the specified namespace, or None if not found. + + Raises: + ApiException: If there was an error fetching the custom resource. + """ + try: + resource: Any = self.api.list_namespaced_custom_object( + group=constants.GROUP, + version=constants.VERSION, + plural=constants.PLURAL, + namespace=k8s_namespace, + label_selector=label_selector, + async_req=async_req, + ) + if "items" in resource: + return resource + return None + except ApiException as e: + if e.status == 404: + log.error("raycluster resource is not found. error = {}".format(e)) + return None + else: + log.error("error fetching custom resource: {}".format(e)) + return None + + def get_ray_cluster(self, name: str, k8s_namespace: str = "default") -> Any: + """Get a specific Ray cluster in a given namespace. + + Parameters: + - name (str): The name of the Ray cluster custom resource. Defaults to "". + - k8s_namespace (str, optional): The namespace in which to retrieve the Ray cluster. Defaults to "default". + + Returns: + Any: The custom resource for the specified Ray cluster, or None if not found. + + Raises: + ApiException: If there was an error fetching the custom resource. + """ + try: + resource: Any = self.api.get_namespaced_custom_object( + group=constants.GROUP, + version=constants.VERSION, + plural=constants.PLURAL, + name=name, + namespace=k8s_namespace, + ) + return resource + except ApiException as e: + if e.status == 404: + log.error("raycluster resource is not found. error = {}".format(e)) + return None + else: + log.error("error fetching custom resource: {}".format(e)) + return None + + def wait_until_ray_cluster_running( + self, name: str, k8s_namespace: str = "default", timeout: int = 60, delay_between_attempts: int = 5 + ) -> bool: + """Get a specific Ray cluster in a given namespace. + + Parameters: + - name (str): The name of the Ray cluster custom resource. Defaults to "". + - k8s_namespace (str, optional): The namespace in which to retrieve the Ray cluster. Defaults to "default". + - timeout (int, optional): The duration in seconds after which we stop trying to get status. Defaults to 60 seconds. + - delay_between_attempts (int, optional): The duration in seconds to wait between attempts to get status if not set. Defaults to 5 seconds. + + + + Returns: + Bool: True if the raycluster status is Running, False otherwise. + + """ + status = self.get_ray_cluster_status(name, k8s_namespace, timeout, delay_between_attempts) + + # TODO: once we add State to Status, we should check for that as well + if status and status["head"] and status["head"]["serviceIP"]: + return True + + log.info( + "raycluster {} status is not running yet, current status is {}".format( + name, status["state"] if status else "unknown" + ) + ) + return False + + def create_ray_cluster(self, body: Any, k8s_namespace: str = "default") -> Any: + """Create a new Ray cluster custom resource. + + Parameters: + - body (Any): The data of the custom resource to create. + - k8s_namespace (str, optional): The namespace in which to create the custom resource. Defaults to "default". + + Returns: + Any: The created custom resource, or None if it already exists or there was an error. + """ + try: + resource: Any = self.api.create_namespaced_custom_object( + group=constants.GROUP, + version=constants.VERSION, + plural=constants.PLURAL, + body=body, + namespace=k8s_namespace, + ) + return resource + except ApiException as e: + if e.status == 409: + log.error("raycluster resource already exists. error = {}".format(e.reason)) + return None + else: + log.error("error creating custom resource: {}".format(e)) + return None + + def delete_ray_cluster(self, name: str, k8s_namespace: str = "default") -> bool: + """Delete a Ray cluster custom resource. + + Parameters: + - name (str): The name of the Ray cluster custom resource to delete. + - k8s_namespace (str, optional): The namespace in which the Ray cluster exists. Defaults to "default". + + Returns: + Any: The deleted custom resource, or None if already deleted or there was an error. + """ + try: + resource: Any = self.api.delete_namespaced_custom_object( + group=constants.GROUP, + version=constants.VERSION, + plural=constants.PLURAL, + name=name, + namespace=k8s_namespace, + ) + return resource + except ApiException as e: + if e.status == 404: + log.error("raycluster custom resource already deleted. error = {}".format(e.reason)) + return None + else: + log.error("error deleting the raycluster custom resource: {}".format(e.reason)) + return None + + def patch_ray_cluster(self, name: str, ray_patch: Any, k8s_namespace: str = "default") -> Any: + """Patch an existing Ray cluster custom resource. + + Parameters: + - name (str): The name of the Ray cluster custom resource to be patched. + - ray_patch (Any): The patch data for the Ray cluster. + - k8s_namespace (str, optional): The namespace in which the Ray cluster exists. Defaults to "default". + + Returns: + bool: True if the patch was successful, False otherwise. + """ + try: + # we patch the existing raycluster with the new config + self.api.patch_namespaced_custom_object( + group=constants.GROUP, + version=constants.VERSION, + plural=constants.PLURAL, + name=name, + body=ray_patch, + namespace=k8s_namespace, + ) + except ApiException as e: + log.error("raycluster `{}` failed to patch, with error: {}".format(name, e)) + return False + else: + log.info("raycluster `%s` is patched successfully", name) + + return True diff --git a/dagster_ray/kuberay/resources.py b/dagster_ray/kuberay/resources.py index 65e9863..d08e624 100644 --- a/dagster_ray/kuberay/resources.py +++ b/dagster_ray/kuberay/resources.py @@ -14,10 +14,8 @@ # yes, `python-client` is actually the KubeRay package name # https://github.com/ray-project/kuberay/issues/2078 -from python_client import kuberay_cluster_api - from dagster_ray.kuberay.configs import DEFAULT_DEPLOYMENT_NAME, RayClusterConfig -from dagster_ray.kuberay.ray_cluster_api import PatchedRayClusterApi +from dagster_ray.kuberay.ray_cluster_api import RayClusterApi if sys.version_info >= (3, 11): from typing import Self @@ -33,12 +31,12 @@ class KubeRayAPI(ConfigurableResource): kubeconfig_file: Optional[str] = None - _kuberay_api: PatchedRayClusterApi = PrivateAttr() + _kuberay_api: RayClusterApi = PrivateAttr() _k8s_api: client.CustomObjectsApi = PrivateAttr() _k8s_core_api: client.CoreV1Api = PrivateAttr() @property - def kuberay(self) -> kuberay_cluster_api.RayClusterApi: + def kuberay(self) -> RayClusterApi: if self._kuberay_api is None: raise ValueError("KubeRayAPI not initialized") return self._kuberay_api @@ -58,7 +56,7 @@ def k8s_core(self) -> client.CoreV1Api: def setup_for_execution(self, context: InitResourceContext) -> None: self._load_kubeconfig(self.kubeconfig_file) - self._kuberay_api = PatchedRayClusterApi(config_file=self.kubeconfig_file) + self._kuberay_api = RayClusterApi(config_file=self.kubeconfig_file) self._k8s_api = client.CustomObjectsApi() self._k8s_core_api = client.CoreV1Api() diff --git a/poetry.lock b/poetry.lock index 65fc9e2..baaf13f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2892,25 +2892,6 @@ files = [ pytest = ">=7.2.1,<8.0.0" pyyaml = ">=6.0,<7.0" -[[package]] -name = "python_client" -version = "1.1.0" -description = "A Kuberay python client library to create/delete/update clusters" -optional = true -python-versions = ">=3.6.5" -files = [] -develop = false - -[package.dependencies] -kubernetes = "*" - -[package.source] -type = "git" -url = "https://github.com/ray-project/kuberay.git" -reference = "HEAD" -resolved_reference = "17809bcbe38bda06aba9194b2a3212bdc05a309e" -subdirectory = "clients/python-client" - [[package]] name = "python-dateutil" version = "2.8.2" @@ -4336,9 +4317,9 @@ docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.link testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy (>=0.9.1)", "pytest-ruff"] [extras] -kuberay = ["kubernetes", "python-client", "pyyaml"] +kuberay = ["kubernetes", "pyyaml"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.13" -content-hash = "7d063237e1ebef30fd2ebdd01cbe16c73ddafb006d879e7b75ba4d6b5e95dc6d" +content-hash = "9ae768ec0503b1419022a71e97a548552d70df683128d580151853c03071c730" diff --git a/pyproject.toml b/pyproject.toml index 625fc14..2e82941 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,10 +29,6 @@ license = "Apache-2.0" python = ">=3.8.1,<3.13" pyyaml = ">=4.0.0" kubernetes = ">=20.0.0" # no idea what's a good lower bound - -# FIXME: switch to PyPI once the package is published -# https://github.com/ray-project/kuberay/issues/2078 -python-client = { git = "https://github.com/ray-project/kuberay.git", subdirectory = "clients/python-client", optional = true } tenacity = ">=8.0.0" ray = {extras = ["all"], version = ">=2.7.0"} dagster = ">=1.6.0" @@ -140,4 +136,5 @@ exclude = [ "dist", "node_modules", "venv", + "dagster_ray/kuberay/ray_cluster_api.py" # taken from https://github.com/ray-project/kuberay/tree/master/clients/python-client/python_client ] diff --git a/tests/test_kuberay.py b/tests/test_kuberay.py index 74c188c..834359d 100644 --- a/tests/test_kuberay.py +++ b/tests/test_kuberay.py @@ -17,7 +17,7 @@ from dagster_ray.kuberay import KubeRayAPI, KubeRayCluster, RayClusterConfig, cleanup_kuberay_clusters from dagster_ray.kuberay.configs import DEFAULT_HEAD_GROUP_SPEC, DEFAULT_WORKER_GROUP_SPECS from dagster_ray.kuberay.ops import CleanupKuberayClustersConfig -from dagster_ray.kuberay.ray_cluster_api import PatchedRayClusterApi +from dagster_ray.kuberay.ray_cluster_api import RayClusterApi from tests import ROOT_DIR @@ -230,7 +230,7 @@ def my_asset(context: AssetExecutionContext, ray_cluster: RayResource) -> None: resources={"ray_cluster": ray_cluster_resource}, ) - kuberay_api = PatchedRayClusterApi(config_file=str(k8s_with_raycluster.kubeconfig)) + kuberay_api = RayClusterApi(config_file=str(k8s_with_raycluster.kubeconfig)) # make sure the RayCluster is cleaned up @@ -257,7 +257,7 @@ def my_asset(ray_cluster: RayResource) -> None: resources={"ray_cluster": ray_cluster_resource_skip_cleanup}, ) - kuberay_api = PatchedRayClusterApi(config_file=str(k8s_with_raycluster.kubeconfig)) + kuberay_api = RayClusterApi(config_file=str(k8s_with_raycluster.kubeconfig)) assert ( len(