Skip to content

Commit

Permalink
[Datasets] Make data nightly tests still work with lazy execution (ra…
Browse files Browse the repository at this point in the history
…y-project#31460)

This is followup from ray-project#31286 (comment), here we audit all data nightly tests to make sure they are still working with lazy execution enabled by default.

Signed-off-by: Cheng Su <scnju13@gmail.com>
Signed-off-by: Andrea Pisoni <andreapiso@gmail.com>
  • Loading branch information
c21 authored and andreapiso committed Jan 22, 2023
1 parent 7bf9d7c commit 608398a
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 6 deletions.
6 changes: 3 additions & 3 deletions release/nightly_tests/dataset/inference.py
Expand Up @@ -86,17 +86,17 @@ def infer(batch):
ray_remote_args={"num_cpus": 0.5},
)
# Do a blocking map so that we can measure the download time.
ds = ds.map(lambda x: x)
ds = ds.map(lambda x: x).fully_executed()

end_download_time = time.time()
print("Preprocessing...")
ds = ds.map(preprocess)
ds = ds.map(preprocess).fully_executed()
end_preprocess_time = time.time()
print("Inferring...")
# NOTE: set a small batch size to avoid OOM on GRAM when doing inference.
ds = ds.map_batches(
infer, num_gpus=0.25, batch_size=128, batch_format="pandas", compute="actors"
)
).fully_executed()

end_time = time.time()

Expand Down
16 changes: 13 additions & 3 deletions release/nightly_tests/dataset/map_batches_benchmark.py
Expand Up @@ -19,16 +19,22 @@ def map_batches(
batch_format: Literal["default", "pandas", "pyarrow", "numpy"],
compute: Optional[Union[str, ComputeStrategy]] = None,
num_calls: Optional[int] = 1,
is_eager_executed: Optional[bool] = False,
) -> Dataset:

ds = input_ds
if is_eager_executed:
ds.fully_executed()

for _ in range(num_calls):
ds = ds.map_batches(
lambda x: x,
batch_format=batch_format,
batch_size=batch_size,
compute=compute,
)
if is_eager_executed:
ds.fully_executed()
return ds


Expand All @@ -37,6 +43,7 @@ def run_map_batches_benchmark(benchmark: Benchmark):
"s3://air-example-data/ursa-labs-taxi-data/by_year/2018/01"
)
lazy_input_ds = input_ds.lazy()
input_ds.fully_executed()

batch_formats = ["pandas", "numpy"]
batch_sizes = [1024, 2048, 4096, None]
Expand All @@ -56,14 +63,15 @@ def run_map_batches_benchmark(benchmark: Benchmark):
continue

num_calls = 2
test_name = f"map-batches-{batch_format}-{batch_size}-{num_calls}-default"
test_name = f"map-batches-{batch_format}-{batch_size}-{num_calls}-eager"
benchmark.run(
test_name,
map_batches,
input_ds=input_ds,
batch_format=batch_format,
batch_size=batch_size,
num_calls=num_calls,
is_eager_executed=True,
)
test_name = f"map-batches-{batch_format}-{batch_size}-{num_calls}-lazy"
benchmark.run(
Expand All @@ -86,7 +94,7 @@ def run_map_batches_benchmark(benchmark: Benchmark):

test_name = (
f"map-batches-{batch_format}-{batch_size}-{num_calls}-"
f"{compute_strategy}-default"
f"{compute_strategy}-eager"
)
benchmark.run(
test_name,
Expand All @@ -96,6 +104,7 @@ def run_map_batches_benchmark(benchmark: Benchmark):
batch_size=batch_size,
compute=compute,
num_calls=num_calls,
is_eager_executed=True,
)
test_name = (
f"map-batches-{batch_format}-{batch_size}-{num_calls}-"
Expand Down Expand Up @@ -131,7 +140,8 @@ def run_map_batches_benchmark(benchmark: Benchmark):
# Test reading multiple files.
input_ds = ray.data.read_parquet(
"s3://air-example-data/ursa-labs-taxi-data/by_year/2018"
)
).fully_executed()

for batch_format in batch_formats:
for compute in ["tasks", "actors"]:
test_name = f"map-batches-{batch_format}-{compute}-multi-files"
Expand Down
1 change: 1 addition & 0 deletions release/nightly_tests/dataset/sort.py
Expand Up @@ -118,6 +118,7 @@ def make_block(count: int, num_columns: int) -> Block:
ds = ds.random_shuffle()
else:
ds = ds.sort(key="c_0")
ds.fully_executed()
except Exception as e:
exc = e
pass
Expand Down

0 comments on commit 608398a

Please sign in to comment.