Skip to content

Commit

Permalink
➖ delete python-client from dependencies and include as module
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Apr 19, 2024
1 parent 3ba26bf commit c855217
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 37 deletions.
10 changes: 10 additions & 0 deletions dagster_ray/kuberay/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import logging

# Group, Version, Plural
GROUP = "ray.io"
VERSION = "v1alpha1"
PLURAL = "rayclusters"
KIND = "RayCluster"

# log level
LOGLEVEL = logging.INFO
207 changes: 204 additions & 3 deletions dagster_ray/kuberay/ray_cluster_api.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,36 @@
import logging
import time
from typing import Any, Optional

from kubernetes import client, config
from kubernetes.client import ApiException
from python_client import constants
from python_client.kuberay_cluster_api import RayClusterApi, log

import dagster_ray.kuberay.constants as constants

class PatchedRayClusterApi(RayClusterApi):
log = logging.getLogger(__name__)


if logging.getLevelName(log.level) == "NOTSET":
logging.basicConfig(format="%(asctime)s %(message)s", level=constants.LOGLEVEL)


class RayClusterApi:
"""
Taken from https://github.com/ray-project/kuberay/blob/master/clients/python-client/python_client/kuberay_cluster_api.py
This project is not published to PyPI, so it's impossible to install it as a dependency.
List of modifications to the original RayClusterApi:
- allow passing config_file and context to __init__\
- fixed get_ray_cluster_status hard-querying 'status' field which is not always present
RayClusterApi provides APIs to list, get, create, build, update, delete rayclusters.
Methods:
- list_ray_clusters(k8s_namespace: str = "default", async_req: bool = False) -> Any:
- get_ray_cluster(name: str, k8s_namespace: str = "default") -> Any:
- create_ray_cluster(body: Any, k8s_namespace: str = "default") -> Any:
- delete_ray_cluster(name: str, k8s_namespace: str = "default") -> bool:
- patch_ray_cluster(name: str, ray_patch: Any, k8s_namespace: str = "default") -> Any:
"""

def __init__(self, config_file: Optional[str], context: Optional[str] = None):
Expand Down Expand Up @@ -48,3 +67,185 @@ def get_ray_cluster_status(

log.info("raycluster {} status not set yet, timing out...".format(name))
return None

def __del__(self):
self.api = None
self.kube_config = None

def list_ray_clusters(
self, k8s_namespace: str = "default", label_selector: str = "", async_req: bool = False
) -> Any:
"""List Ray clusters in a given namespace.
Parameters:
- k8s_namespace (str, optional): The namespace in which to list the Ray clusters. Defaults to "default".
- async_req (bool, optional): Whether to make the request asynchronously. Defaults to False.
Returns:
Any: The custom resource for Ray clusters in the specified namespace, or None if not found.
Raises:
ApiException: If there was an error fetching the custom resource.
"""
try:
resource: Any = self.api.list_namespaced_custom_object(
group=constants.GROUP,
version=constants.VERSION,
plural=constants.PLURAL,
namespace=k8s_namespace,
label_selector=label_selector,
async_req=async_req,
)
if "items" in resource:
return resource
return None
except ApiException as e:
if e.status == 404:
log.error("raycluster resource is not found. error = {}".format(e))
return None
else:
log.error("error fetching custom resource: {}".format(e))
return None

def get_ray_cluster(self, name: str, k8s_namespace: str = "default") -> Any:
"""Get a specific Ray cluster in a given namespace.
Parameters:
- name (str): The name of the Ray cluster custom resource. Defaults to "".
- k8s_namespace (str, optional): The namespace in which to retrieve the Ray cluster. Defaults to "default".
Returns:
Any: The custom resource for the specified Ray cluster, or None if not found.
Raises:
ApiException: If there was an error fetching the custom resource.
"""
try:
resource: Any = self.api.get_namespaced_custom_object(
group=constants.GROUP,
version=constants.VERSION,
plural=constants.PLURAL,
name=name,
namespace=k8s_namespace,
)
return resource
except ApiException as e:
if e.status == 404:
log.error("raycluster resource is not found. error = {}".format(e))
return None
else:
log.error("error fetching custom resource: {}".format(e))
return None

def wait_until_ray_cluster_running(
self, name: str, k8s_namespace: str = "default", timeout: int = 60, delay_between_attempts: int = 5
) -> bool:
"""Get a specific Ray cluster in a given namespace.
Parameters:
- name (str): The name of the Ray cluster custom resource. Defaults to "".
- k8s_namespace (str, optional): The namespace in which to retrieve the Ray cluster. Defaults to "default".
- timeout (int, optional): The duration in seconds after which we stop trying to get status. Defaults to 60 seconds.
- delay_between_attempts (int, optional): The duration in seconds to wait between attempts to get status if not set. Defaults to 5 seconds.
Returns:
Bool: True if the raycluster status is Running, False otherwise.
"""
status = self.get_ray_cluster_status(name, k8s_namespace, timeout, delay_between_attempts)

# TODO: once we add State to Status, we should check for that as well <if status and status["state"] == "Running":>
if status and status["head"] and status["head"]["serviceIP"]:
return True

log.info(
"raycluster {} status is not running yet, current status is {}".format(
name, status["state"] if status else "unknown"
)
)
return False

def create_ray_cluster(self, body: Any, k8s_namespace: str = "default") -> Any:
"""Create a new Ray cluster custom resource.
Parameters:
- body (Any): The data of the custom resource to create.
- k8s_namespace (str, optional): The namespace in which to create the custom resource. Defaults to "default".
Returns:
Any: The created custom resource, or None if it already exists or there was an error.
"""
try:
resource: Any = self.api.create_namespaced_custom_object(
group=constants.GROUP,
version=constants.VERSION,
plural=constants.PLURAL,
body=body,
namespace=k8s_namespace,
)
return resource
except ApiException as e:
if e.status == 409:
log.error("raycluster resource already exists. error = {}".format(e.reason))
return None
else:
log.error("error creating custom resource: {}".format(e))
return None

def delete_ray_cluster(self, name: str, k8s_namespace: str = "default") -> bool:
"""Delete a Ray cluster custom resource.
Parameters:
- name (str): The name of the Ray cluster custom resource to delete.
- k8s_namespace (str, optional): The namespace in which the Ray cluster exists. Defaults to "default".
Returns:
Any: The deleted custom resource, or None if already deleted or there was an error.
"""
try:
resource: Any = self.api.delete_namespaced_custom_object(
group=constants.GROUP,
version=constants.VERSION,
plural=constants.PLURAL,
name=name,
namespace=k8s_namespace,
)
return resource
except ApiException as e:
if e.status == 404:
log.error("raycluster custom resource already deleted. error = {}".format(e.reason))
return None
else:
log.error("error deleting the raycluster custom resource: {}".format(e.reason))
return None

def patch_ray_cluster(self, name: str, ray_patch: Any, k8s_namespace: str = "default") -> Any:
"""Patch an existing Ray cluster custom resource.
Parameters:
- name (str): The name of the Ray cluster custom resource to be patched.
- ray_patch (Any): The patch data for the Ray cluster.
- k8s_namespace (str, optional): The namespace in which the Ray cluster exists. Defaults to "default".
Returns:
bool: True if the patch was successful, False otherwise.
"""
try:
# we patch the existing raycluster with the new config
self.api.patch_namespaced_custom_object(
group=constants.GROUP,
version=constants.VERSION,
plural=constants.PLURAL,
name=name,
body=ray_patch,
namespace=k8s_namespace,
)
except ApiException as e:
log.error("raycluster `{}` failed to patch, with error: {}".format(name, e))
return False
else:
log.info("raycluster `%s` is patched successfully", name)

return True
10 changes: 4 additions & 6 deletions dagster_ray/kuberay/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@

# yes, `python-client` is actually the KubeRay package name
# https://github.com/ray-project/kuberay/issues/2078
from python_client import kuberay_cluster_api

from dagster_ray.kuberay.configs import DEFAULT_DEPLOYMENT_NAME, RayClusterConfig
from dagster_ray.kuberay.ray_cluster_api import PatchedRayClusterApi
from dagster_ray.kuberay.ray_cluster_api import RayClusterApi

if sys.version_info >= (3, 11):
from typing import Self
Expand All @@ -33,12 +31,12 @@
class KubeRayAPI(ConfigurableResource):
kubeconfig_file: Optional[str] = None

_kuberay_api: PatchedRayClusterApi = PrivateAttr()
_kuberay_api: RayClusterApi = PrivateAttr()
_k8s_api: client.CustomObjectsApi = PrivateAttr()
_k8s_core_api: client.CoreV1Api = PrivateAttr()

@property
def kuberay(self) -> kuberay_cluster_api.RayClusterApi:
def kuberay(self) -> RayClusterApi:
if self._kuberay_api is None:
raise ValueError("KubeRayAPI not initialized")
return self._kuberay_api
Expand All @@ -58,7 +56,7 @@ def k8s_core(self) -> client.CoreV1Api:
def setup_for_execution(self, context: InitResourceContext) -> None:
self._load_kubeconfig(self.kubeconfig_file)

self._kuberay_api = PatchedRayClusterApi(config_file=self.kubeconfig_file)
self._kuberay_api = RayClusterApi(config_file=self.kubeconfig_file)
self._k8s_api = client.CustomObjectsApi()
self._k8s_core_api = client.CoreV1Api()

Expand Down
23 changes: 2 additions & 21 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ license = "Apache-2.0"
python = ">=3.8.1,<3.13"
pyyaml = ">=4.0.0"
kubernetes = ">=20.0.0" # no idea what's a good lower bound

# FIXME: switch to PyPI once the package is published
# https://github.com/ray-project/kuberay/issues/2078
python-client = { git = "https://github.com/ray-project/kuberay.git", subdirectory = "clients/python-client", optional = true }
tenacity = ">=8.0.0"
ray = {extras = ["all"], version = ">=2.7.0"}
dagster = ">=1.6.0"
Expand Down Expand Up @@ -140,4 +136,5 @@ exclude = [
"dist",
"node_modules",
"venv",
"dagster_ray/kuberay/ray_cluster_api.py" # taken from https://github.com/ray-project/kuberay/tree/master/clients/python-client/python_client
]
6 changes: 3 additions & 3 deletions tests/test_kuberay.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dagster_ray.kuberay import KubeRayAPI, KubeRayCluster, RayClusterConfig, cleanup_kuberay_clusters
from dagster_ray.kuberay.configs import DEFAULT_HEAD_GROUP_SPEC, DEFAULT_WORKER_GROUP_SPECS
from dagster_ray.kuberay.ops import CleanupKuberayClustersConfig
from dagster_ray.kuberay.ray_cluster_api import PatchedRayClusterApi
from dagster_ray.kuberay.ray_cluster_api import RayClusterApi
from tests import ROOT_DIR


Expand Down Expand Up @@ -230,7 +230,7 @@ def my_asset(context: AssetExecutionContext, ray_cluster: RayResource) -> None:
resources={"ray_cluster": ray_cluster_resource},
)

kuberay_api = PatchedRayClusterApi(config_file=str(k8s_with_raycluster.kubeconfig))
kuberay_api = RayClusterApi(config_file=str(k8s_with_raycluster.kubeconfig))

# make sure the RayCluster is cleaned up

Expand All @@ -257,7 +257,7 @@ def my_asset(ray_cluster: RayResource) -> None:
resources={"ray_cluster": ray_cluster_resource_skip_cleanup},
)

kuberay_api = PatchedRayClusterApi(config_file=str(k8s_with_raycluster.kubeconfig))
kuberay_api = RayClusterApi(config_file=str(k8s_with_raycluster.kubeconfig))

assert (
len(
Expand Down

0 comments on commit c855217

Please sign in to comment.