In [None]:
!pip install ray["default"]=="2.9.3" pyarrow==15.0.1

In [None]:
!mkdir -p data/raw data/code data/transformed data/partitioned
!wget https://huggingface.co/datasets/scikit-learn/iris/raw/main/Iris.csv -P data/raw/

In [None]:
from pyarrow import csv

csv.read_csv("data/raw/Iris.csv")

In [None]:
%%writefile data/code/transform.py
import ray

ds = ray.data.read_csv("local:///data/raw/Iris.csv")


from typing import Dict
import numpy as np


# Compute a "petal area" attribute.
def transform_batch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    vec_a = batch["PetalLengthCm"]
    vec_b = batch["PetalWidthCm"]
    batch["PetalAreaCM^2"] = vec_a * vec_b
    return batch

transformed_ds = ds.map_batches(transform_batch)


@ray.remote(num_cpus=1)
def consume(ds: ray.data.Dataset) -> int:
    num_batches = 0
    for batch in ds.iter_batches(batch_size=8):
        num_batches += 1
    return num_batches

ray.get(consume.options(scheduling_strategy="DEFAULT").remote(transformed_ds))

@ray.remote
def write_file():
    transformed_ds.write_csv("local:///data/partitioned/")

ray.get(write_file.options(scheduling_strategy="DEFAULT").remote())

In [None]:
train_deps = [
    "pandas==2.1.4",
    "pyarrow==15.0.1"
]

from ray.job_submission import JobSubmissionClient
import os
client = JobSubmissionClient("http://"+os.environ["RAY_CLUSTER"]+":8265")
job_id = client.submit_job(
    entrypoint="python /data/code/transform.py",
    # Path to the local directory that contains the script.py file
    runtime_env={"pip": train_deps}
)
print(job_id)

In [None]:
from ray.job_submission import JobStatus
import time

def wait_until_status(job_id, status_to_wait_for, timeout_seconds=5):
    start = time.time()
    while time.time() - start <= timeout_seconds:
        status = client.get_job_status(job_id)
        print(f"status: {status}")
        if status in status_to_wait_for:
            break
        time.sleep(1)
wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})
logs = client.get_job_logs(job_id)
print(logs)

In [None]:
! cat data/partitioned/*.csv > data/transformed/Iris.csv

In [None]:
csv.read_csv("data/transformed/Iris.csv")