Skip to content

Commit

Permalink
add tests for cleanup job
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Apr 19, 2024
1 parent 362be78 commit 2dc0236
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 6 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ Documentation can be found below.
# Backends

`dagster-ray` provides a `RayResource` class, which does not implement any specific backend, but defines the common interface for all `Ray` resources.
`dagster-ray` provides a `RayResource` class, which does not implement any specific backend.
It defines the common interface for all `Ray` resources.
It can be used for type annotations in your `@op` and `@asset` definitions.

Examples:
Expand All @@ -43,7 +44,7 @@ import ray
@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
):
return ray.get(ray.put(42))
```

Expand Down
88 changes: 84 additions & 4 deletions tests/test_kuberay.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
import pytest
import pytest_cases
import ray
from dagster import AssetExecutionContext, asset, materialize_to_memory
from dagster import AssetExecutionContext, RunConfig, asset, materialize_to_memory
from pytest_kubernetes.options import ClusterOptions
from pytest_kubernetes.providers import AClusterManager, select_provider_manager

from dagster_ray.kuberay import KubeRayAPI, KubeRayCluster
from dagster_ray import RayResource
from dagster_ray.kuberay import KubeRayAPI, KubeRayCluster, cleanup_kuberay_clusters
from dagster_ray.kuberay.configs import DEFAULT_HEAD_GROUP_SPEC, DEFAULT_WORKER_GROUP_SPECS, RayClusterConfig
from tests import ROOT_DIR

Expand Down Expand Up @@ -159,6 +160,30 @@ def ray_cluster_resource(
)


@pytest.fixture(scope="session")
def ray_cluster_resource_skip_cleanup(
k8s_with_raycluster: AClusterManager,
dagster_ray_image: str,
head_group_spec: Dict[str, Any],
worker_group_specs: List[Dict[str, Any]],
) -> KubeRayCluster:
redis_port = get_random_free_port()

return KubeRayCluster(
# have have to first run port-forwarding with minikube
# we can only init ray after that
skip_init=True,
skip_cleanup=True,
api=KubeRayAPI(kubeconfig_file=str(k8s_with_raycluster.kubeconfig)),
ray_cluster=RayClusterConfig(
image=dagster_ray_image,
head_group_spec=head_group_spec,
worker_group_specs=worker_group_specs,
),
redis_port=redis_port,
)


@ray.remote
def get_hostname():
return socket.gethostname()
Expand All @@ -169,10 +194,13 @@ def test_kuberay_cluster_resource(
k8s_with_raycluster: AClusterManager,
):
@asset
def my_asset(context: AssetExecutionContext, ray_cluster: KubeRayCluster) -> None:
# testing RayResource type annotation too!
def my_asset(context: AssetExecutionContext, ray_cluster: RayResource) -> None:
# port-forward to the head node
# because it's not possible to access it otherwise

assert isinstance(ray_cluster, KubeRayCluster)

with k8s_with_raycluster.port_forwarding(
target=f"svc/{ray_cluster.cluster_name}-head-svc",
source_port=cast(int, ray_cluster.redis_port),
Expand All @@ -189,7 +217,59 @@ def my_asset(context: AssetExecutionContext, ray_cluster: KubeRayCluster) -> Non
# not in localhost
assert ray_cluster.cluster_name in ray.get(get_hostname.remote())

materialize_to_memory(
ray_cluster_description = ray_cluster.api.kuberay.get_ray_cluster(
ray_cluster.cluster_name, k8s_namespace=ray_cluster.namespace
)
assert ray_cluster_description["metadata"]["labels"]["dagster.io/run_id"] == context.run_id
assert ray_cluster_description["metadata"]["labels"]["dagster.io/cluster"] == ray_cluster.cluster_name

result = materialize_to_memory(
[my_asset],
resources={"ray_cluster": ray_cluster_resource},
)

# make sure the RayCluster is cleaned up

assert not ray_cluster_resource.api.kuberay.list_ray_clusters(
k8s_namespace=ray_cluster_resource.namespace, label_selector=f"dagster.io/run_id={result.run_id}"
)["items"]


def test_kuberay_cleanup_job(
ray_cluster_resource_skip_cleanup: KubeRayCluster,
k8s_with_raycluster: AClusterManager,
):
@asset
def my_asset(ray_cluster: RayResource) -> None:
assert isinstance(ray_cluster, KubeRayCluster)

result = materialize_to_memory(
[my_asset],
resources={"ray_cluster": ray_cluster_resource_skip_cleanup},
)

assert (
len(
ray_cluster_resource_skip_cleanup.api.kuberay.list_ray_clusters(
k8s_namespace=ray_cluster_resource_skip_cleanup.namespace,
label_selector=f"dagster.io/run_id={result.run_id}",
)["items"]
)
> 0
)

cleanup_kuberay_clusters.execute_in_process(
run_config=RunConfig(
ops={
"cleanup_kuberay_clusters": {
"config": {
"namespace": ray_cluster_resource_skip_cleanup.namespace,
}
}
}
)
)

assert not ray_cluster_resource_skip_cleanup.api.kuberay.list_ray_clusters(
k8s_namespace=ray_cluster_resource_skip_cleanup.namespace, label_selector=f"dagster.io/run_id={result.run_id}"
)["items"]

0 comments on commit 2dc0236

Please sign in to comment.