Skip to content

Commit

Permalink
📖 ♻️ ✅ refactor, docs, cleanup job tests (#6)
Browse files Browse the repository at this point in the history
* 📖 ♻️ refactor and docs
* add auto-cleanup job
* add tests for cleanup job
  • Loading branch information
danielgafni committed Apr 19, 2024
1 parent a419479 commit 3ba26bf
Show file tree
Hide file tree
Showing 16 changed files with 707 additions and 278 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: QA & Tests
name: CI

on:
workflow_dispatch:
Expand Down
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ repos:
entry: ruff format
language: system
pass_filenames: false
- id: format-docs
name: format docs
entry: blacken-docs
language: system
pass_filenames: true
files: "\\.(md)$"
- id: pyright
name: pyright
entry: pyright .
Expand Down
202 changes: 188 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,214 @@
[![image](https://img.shields.io/pypi/v/dagster-ray.svg)](https://pypi.python.org/pypi/dagster-ray)
[![image](https://img.shields.io/pypi/l/dagster-ray.svg)](https://pypi.python.org/pypi/dagster-ray)
[![image](https://img.shields.io/pypi/pyversions/dagster-ray.svg)](https://pypi.python.org/pypi/dagster-ray)
[![CI](https://github.com/danielgafni/dagster-ray/actions/workflows/ci.yml/badge.svg)](https://github.com/danielgafni/dagster-ray/actions/workflows/ci.yml)
[![CI](https://github.com/danielgafni/dagster-ray/actions/workflows/ci.yml/badge.svg)](https://github.com/danielgafni/dagster-ray/actions/workflows/CI.yml)
[![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white)](https://github.com/pre-commit/pre-commit)
[![Checked with pyright](https://microsoft.github.io/pyright/img/pyright_badge.svg)](https://microsoft.github.io/pyright/)
[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)

[Ray](https://github.com/ray-project/ray) integration library for [Dagster](https://github.com/dagster-io/dagster).

`dagster-ray` allows you to run Ray computations in your Dagster pipelines. The following backends are implemented:
`dagster-ray` allows running Ray computations in Dagster pipelines. It provides various Dagster abstractions, the most important being `Resource`, and helper `@op`s and `@schedule`s, for multiple backends.

The following backends are implemented:
- local
- `KubeRay` (kubernetes)

`dagster-ray` is tested across multiple version combinations of components such as `ray`, `dagster`, `KubeRay Operator`, and `Python`.

# Features
`dagster-ray` integrates with [Dagster+](https://dagster.io/plus) out of the box.

Documentation can be found below.

> [!NOTE]
> This project is in early development. Contributions are very welcome! See the [Development](#development) section below.
# Backends

`dagster-ray` provides a `RayResource` class, which does not implement any specific backend.
It defines the common interface for all `Ray` resources.
It can be used for type annotations in your `@op` and `@asset` definitions.

Examples:

```python
from dagster import asset
from dagster_ray import RayResource
import ray

## Resources

### `LocalRay`
@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
):
return ray.get(ray.put(42))
```

The other resources below are the actual backends that implement the `RayResource` interface.

## Local

These resources can be used for development and testing purposes.
They provide the same interface as the other `*Ray` resources, but don't require any external infrastructure.

The public objects can be imported from `dagster_ray.local` module.

### Resources

#### `LocalRay`

A dummy resource which is useful for testing and development.
It doesn't do anything, but provides the same interface as the other `*Ray` resources.

### `KubeRayCluster`
Examples:


Using the `LocalRay` resource

```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
import ray


@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))


definitions = Definitions(resources={"ray_cluster": LocalRay()}, assets=[my_asset])
```

Conditionally using the `LocalRay` resource in development and `KubeRayCluster` in production:

```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
from dagster_ray.kuberay import KubeRayCluster
import ray


@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))


IN_K8s = ...


definitions = Definitions(
resources={"ray_cluster": KubeRayCluster() if IN_K8s else LocalRay()},
assets=[my_asset],
)
```

## KubeRay

This backend requires a Kubernetes cluster with the `KubeRay Operator` installed.

Integrates with [Dagster+](https://dagster.io/plus) by injecting environment variables such as `DAGSTER_CLOUD_DEPLOYMENT_NAME` and tags such as `dagster/user` into default configuration values and `RayCluster` labels.

`KubeRayCluster` can be used for running Ray computations on Kubernetes. Requires `KubeRay Operator` to be installed the Kubernetes cluster.
The public objects can be imported from `dagster_ray.kuberay` module.

### Resources

#### `KubeRayCluster`

`KubeRayCluster` can be used for running Ray computations on Kubernetes.

When added as resource dependency to an `@op/@asset`, the `KubeRayCluster`:
- Starts a dedicated `RayCluster` for it
- connects `ray.init()` to the cluster (if `ray` is installed)
- tears down the cluster after the step is executed
- Connects to the cluster in client mode with `ray.init()` (unless `skip_init` is set to `True`)
- Tears down the cluster after the step is executed (unless `skip_cleanup` is set to `True`)

`RayCluster` comes with minimal default configuration, matching `KubeRay` defaults.

Examples:

Basic usage (will create a single-node, non-scaling `RayCluster`):

```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.kuberay import KubeRayCluster
import ray


@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))


definitions = Definitions(
resources={"ray_cluster": KubeRayCluster()}, assets=[my_asset]
)
```

Larger cluster with auto-scaling enabled:

```python
from dagster_ray.kuberay import KubeRayCluster, RayClusterConfig

ray_cluster = KubeRayCluster(
ray_cluster=RayClusterConfig(
enable_in_tree_autoscaling=True,
worker_group_specs=[
{
"groupName": "workers",
"replicas": 2,
"minReplicas": 1,
"maxReplicas": 10,
# ...
}
],
)
)
```
#### `KubeRayAPI`

This resource can be used to interact with the Kubernetes API Server.

Examples:

Listing currently running `RayClusters`:

```python
from dagster import op, Definitions
from dagster_ray.kuberay import KubeRayAPI


@op
def list_ray_clusters(
kube_ray_api: KubeRayAPI,
):
return kube_ray_api.kuberay.list_ray_clusters(k8s_namespace="kuberay")
```

### Jobs

#### `delete_kuberay_clusters`

This `job` can be used to delete `RayClusters` from a given list of names.

#### `cleanup_old_ray_clusters`

This `job` can be used to delete old `RayClusters` which no longer correspond to any active Dagster Runs.
They may be left behind if the automatic cluster cleanup was disabled or failed.

### Schedules

## Schedules
Cleanup schedules can be trivially created using the `cleanup_old_ray_clusters` or `delete_kuberay_clusters` jobs.

`dagster-ray` provides a schedule for automatic cleanup of old `RayClusters` in the cluster.
They may be left behind if the automatic cleanup was disabled or failed.
#### `cleanup_old_ray_clusters`
`dagster-ray` provides an example daily cleanup schedule.

## Executor
WIP
Expand All @@ -62,13 +236,13 @@ Required tools:
- `minikube`

Running `pytest` will **automatically**:
- build an image with the local `dagster-ray` code, using the current Python's interpreter version
- build an image with the local `dagster-ray` code
- start a `minikube` Kubernetes cluster
- load the built `dagster-ray` and loaded `kuberay-operator` images into the cluster
- install the `KubeRay Operator` in the cluster with `helm`
- run the tests

Thus, no manual setup is required, just the presence of the tools listed above.
Thus, no manual setup is required, just the presence of the tools listed above. This makes testing a breeze!

> [!NOTE]
> Specifying a comma-separated list of `KubeRay Operator` versions in the `KUBE_RAY_OPERATOR_VERSIONS` environment variable will spawn a new test for each version.
Expand Down
6 changes: 6 additions & 0 deletions dagster_ray/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from dagster_ray._base.resources import BaseRayResource

RayResource = BaseRayResource


__all__ = ["RayResource"]
28 changes: 21 additions & 7 deletions dagster_ray/_base/resources.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import sys
import uuid
from abc import ABC, abstractmethod
from typing import Optional, cast
from typing import TYPE_CHECKING, Optional, Union, cast

from dagster import ConfigurableResource, InitResourceContext
from dagster import ConfigurableResource, InitResourceContext, OpExecutionContext
from pydantic import Field, PrivateAttr

# yes, `python-client` is actually the KubeRay package name
Expand All @@ -18,8 +18,9 @@
else:
pass

import ray
from ray._private.worker import BaseContext as RayBaseContext # noqa

if TYPE_CHECKING:
from ray._private.worker import BaseContext as RayBaseContext # noqa


class BaseRayResource(ConfigurableResource, ABC):
Expand All @@ -36,7 +37,13 @@ class BaseRayResource(ConfigurableResource, ABC):
default=8265, description="Dashboard port for connection. Make sure to match with the actual available port."
)

_context: Optional[RayBaseContext] = PrivateAttr()
_context: Optional["RayBaseContext"] = PrivateAttr()

def setup_for_execution(self, context: InitResourceContext) -> None:
raise NotImplementedError(
"This is an abstract resource, it's not meant to be provided directly. "
"Use a backend-specific resource instead."
)

@property
def context(self) -> "RayBaseContext":
Expand All @@ -62,15 +69,22 @@ def runtime_job_id(self) -> str:
Returns the Ray Job ID for the current job which was created with `ray.init()`.
:return:
"""
import ray

return ray.get_runtime_context().get_job_id()

@retry(stop=stop_after_delay(120), retry=retry_if_exception_type(ConnectionError), reraise=True)
def init_ray(self) -> "RayBaseContext":
def init_ray(self, context: Union[OpExecutionContext, InitResourceContext]) -> "RayBaseContext":
assert context.log is not None

import ray

self.data_execution_options.apply()
self._context = ray.init(address=self.ray_address, ignore_reinit_error=True)
self.data_execution_options.apply()
self.data_execution_options.apply_remote()
return cast(RayBaseContext, self._context)
context.log.info("Initialized Ray!")
return cast("RayBaseContext", self._context)

def _get_step_key(self, context: InitResourceContext) -> str:
# just return a random string
Expand Down
17 changes: 15 additions & 2 deletions dagster_ray/kuberay/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
from dagster_ray.kuberay.resources import KubeRayCluster
from dagster_ray.kuberay.configs import RayClusterConfig
from dagster_ray.kuberay.jobs import cleanup_kuberay_clusters, delete_kuberay_clusters
from dagster_ray.kuberay.ops import cleanup_kuberay_clusters_op, delete_kuberay_clusters_op
from dagster_ray.kuberay.resources import KubeRayAPI, KubeRayCluster
from dagster_ray.kuberay.schedules import cleanup_kuberay_clusters_daily

__all__ = ["KubeRayCluster"]
__all__ = [
"KubeRayCluster",
"RayClusterConfig",
"KubeRayAPI",
"cleanup_kuberay_clusters",
"delete_kuberay_clusters",
"cleanup_kuberay_clusters_op",
"delete_kuberay_clusters_op",
"cleanup_kuberay_clusters_daily",
]

0 comments on commit 3ba26bf

Please sign in to comment.