Skip to content

Commit

Permalink
Fix dependency and refactor Ray options
Browse files Browse the repository at this point in the history
  • Loading branch information
coufon committed Jan 21, 2024
1 parent 1a7c810 commit 9c6598b
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 38 deletions.
45 changes: 34 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,25 @@ Unify data in your entire machine learning lifecycle with **Space**, a comprehen

<img src="docs/pics/overview.png" width="800" />

## Space 101

- Space uses [Arrow](https://arrow.apache.org/docs/python/index.html) in the API surface, e.g., schema, filter, data IO.
- Data operations in Space can run locally or distributedly in [Ray](https://github.com/ray-project/ray) clusters.
- All file paths in Space are [relative](./docs/design.md#relative-paths); datasets are immediately usable after downloading or moving.
- Please read [the design](docs/design.md) for more details.

## Onboarding Examples

- [Manage Tensorflow COCO dataset](notebooks/tfds_coco_tutorial.ipynb)
- [Ground truth database of LabelStudio](notebooks/label_studio_tutorial.ipynb)
- [Transforms and materialized views: Segment Anything as example](notebooks/segment_anything_tutorial.ipynb)
- [Incrementally build embedding vector indexes](notebooks/incremental_embedding_index.ipynb)

## Space 101

- Please find more information in [the design page](docs/design.md).
- Space uses [Arrow](https://arrow.apache.org/docs/python/index.html) in the API surface, e.g., schema, filter, data IO.
- All file paths in Space are [relative](./docs/design.md#relative-paths); datasets are immediately usable after downloading or moving.
- Data operations in Space can run locally or distributedly in [Ray](https://github.com/ray-project/ray) clusters.

## Quick Start

- [Install](#install)
- [Cloud Storage](#cloud-storage)
- [Cluster setup](#cluster-setup)
- [Create and Load Datasets](#create-and-load-datasets)
- [Write and Read](#write-and-read)
- [Transform and Materialized Views](#transform-and-materialized-views)
Expand Down Expand Up @@ -71,6 +72,24 @@ gcsfuse <mybucket> "/path/to/<mybucket>"

Space has not yet implemented Cloud Storage file systems. FUSE is the current suggested approach.

### Cluster Setup

Optionally, setup a cluster to run Space operations distributedly. We support Ray clusters, on the Ray cluster head/worker nodes:
```bash
# Start a Ray head node (IP 123.45.67.89, for example).
# See https://docs.ray.io/en/latest/ray-core/starting-ray.html for details.
ray start --head --port=6379
```

Using [Cloud Storage + FUSE]((#cloud-storage)) is required in the distributed mode, because the Ray cluster and the client machine should operate in the same directory of files. Run `gcsfuse` on all machines and the mapped local directory paths **must be the same**.

Run the following code on the client machine to offload computation to the Ray cluster:
```py
import ray
# Connect to the Ray cluster.
ray.init(address="ray://123.45.67.89:10001")
```

### Create and Load Datasets

Create a Space dataset with two index fields (`id`, `image_name`) (store in Parquet) and a record field (`feature`) (store in ArrayRecord).
Expand Down Expand Up @@ -120,9 +139,13 @@ print(catalog.datasets())
Append, delete some data. Each mutation generates a new version of data, represented by an increasing integer ID. We expect to support the [Iceberg](https://iceberg.apache.org/docs/latest/branching/) style tags and branches for better version management.
```py
import pyarrow.compute as pc
from space import RayOptions

# Create a local runner:
runner = ds.local()

# Create a local or Ray runner.
runner = ds.local() # or ds.ray()
# Or create a Ray runner:
runner = ds.ray(ray_options=RayOptions(max_parallelism=8))

# Appending data generates a new dataset version `snapshot_id=1`
# Write methods:
Expand Down Expand Up @@ -205,8 +228,8 @@ mv = view.materialize("/path/to/<mybucket>/example_mv")
# mv = catalog.materialize("example_mv", view)

mv_runner = mv.ray()
# Refresh the MV up to version `1`.
mv_runner.refresh(1) # mv_runner.refresh() refresh to the latest version
# Refresh the MV up to version tag `after_add` of the source.
mv_runner.refresh("after_add") # mv_runner.refresh() refresh to the latest version

# Use the MV runner instead of view runner to directly read from materialized
# view files, no data processing any more.
Expand Down
4 changes: 2 additions & 2 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ dependencies = [

[project.optional-dependencies]
dev = [
"pandas",
"pandas == 2.1.4",
"pyarrow-stubs",
"ray",
"ray == 2.9.1",
"tensorflow",
"types-protobuf",
]
Expand Down
1 change: 1 addition & 0 deletions python/src/space/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@
from space.core.random_access import RandomAccessDataSource
from space.core.schema.types import File, TfFeatures
from space.core.views import MaterializedView
from space.ray.options import RayOptions
3 changes: 2 additions & 1 deletion python/src/space/core/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
from space.core.transform.plans import LogicalPlanBuilder
from space.core.utils.lazy_imports_utils import ray
from space.core.views import View
from space.ray.runners import RayOptions, RayReadWriterRunner
from space.ray.options import RayOptions
from space.ray.runners import RayReadWriterRunner


class Dataset(View):
Expand Down
2 changes: 1 addition & 1 deletion python/src/space/core/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""APIs of Space core lib."""
"""Options of Space core lib."""

from dataclasses import dataclass
from typing import Any, Callable, List, Optional
Expand Down
13 changes: 8 additions & 5 deletions python/src/space/ray/ops/append.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as rt
from space.core.utils.lazy_imports_utils import ray
from space.ray.options import RayOptions


class RayAppendOp(BaseAppendOp):
Expand All @@ -35,31 +36,33 @@ class RayAppendOp(BaseAppendOp):
def __init__(self,
location: str,
metadata: meta.StorageMetadata,
parallelism: int,
ray_options: RayOptions,
file_options: FileOptions,
record_address_input: bool = False):
"""
Args:
record_address_input: if true, input record fields are addresses.
"""
self._parallelism = parallelism
self._ray_options = ray_options
self._actors = [
_AppendActor.remote( # type: ignore[attr-defined] # pylint: disable=no-member
location, metadata, file_options, record_address_input)
for _ in range(parallelism)
for _ in range(self._ray_options.max_parallelism)
]

def write(self, data: InputData) -> None:
if not isinstance(data, pa.Table):
data = pa.Table.from_pydict(data)

shard_size = data.num_rows // self._parallelism
num_shards = self._ray_options.max_parallelism

shard_size = data.num_rows // num_shards
if shard_size == 0:
shard_size = 1

responses = []
offset = 0
for i in range(self._parallelism):
for i in range(num_shards):
shard = data.slice(offset=offset, length=shard_size)
responses.append(self._actors[i].write.remote(shard))

Expand Down
13 changes: 6 additions & 7 deletions python/src/space/ray/ops/insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@
from space.core.storage import Storage
from space.core.utils import errors
from space.core.utils.lazy_imports_utils import ray
from space.ray.options import RayOptions


class RayInsertOp(LocalInsertOp):
"""Insert data to a dataset with distributed duplication check."""

def __init__(self, storage: Storage, options: InsertOptions,
parallelism: int, file_options: FileOptions):
ray_options: RayOptions, file_options: FileOptions):
LocalInsertOp.__init__(self, storage, options, file_options)
self._parallelism = parallelism
self._ray_options = ray_options

def _check_duplication(self, data_files: rt.FileSet, filter_: pc.Expression):
remote_duplicated_values = []
Expand All @@ -50,11 +51,10 @@ def _check_duplication(self, data_files: rt.FileSet, filter_: pc.Expression):

for duplicated in ray.get(remote_duplicated_values):
if duplicated:
raise errors.PrimaryKeyExistError(
"Primary key to insert already exist")
raise errors.PrimaryKeyExistError("Primary key to insert already exist")

def _append(self, data: pa.Table, patches: List[Optional[rt.Patch]]) -> None:
append_op = RayAppendOp(self._location, self._metadata, self._parallelism,
append_op = RayAppendOp(self._location, self._metadata, self._ray_options,
self._file_options)
append_op.write(data)
patches.append(append_op.finish())
Expand All @@ -64,5 +64,4 @@ def _append(self, data: pa.Table, patches: List[Optional[rt.Patch]]) -> None:
def _remote_filter_matched(location: str, metadata: meta.StorageMetadata,
data_files: rt.FileSet, pk_filter: pc.Expression,
primary_keys: List[str]) -> bool:
return filter_matched(location, metadata, data_files, pk_filter,
primary_keys)
return filter_matched(location, metadata, data_files, pk_filter, primary_keys)
24 changes: 24 additions & 0 deletions python/src/space/ray/options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Options of Space Ray lib."""

from dataclasses import dataclass


@dataclass
class RayOptions:
"""Options of Ray runners."""
# The max parallelism of computing resources to use in a Ray cluster.
max_parallelism: int = 8
21 changes: 10 additions & 11 deletions python/src/space/ray/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""Ray runner implementations."""

from __future__ import annotations
from dataclasses import dataclass
import copy
from typing import TYPE_CHECKING
from typing import Iterator, List, Optional, Union

Expand All @@ -41,19 +41,14 @@
from space.ray.ops.delete import RayDeleteOp
from space.ray.ops.insert import RayInsertOp
from space.ray.ops.utils import singleton_storage
from space.ray.options import RayOptions

if TYPE_CHECKING:
from space.core.datasets import Dataset
from space.core.storage import Storage, Transaction, Version
from space.core.views import MaterializedView, View


@dataclass
class RayOptions:
"""Options of Ray runners."""
parallelism: int = 2


class RayReadOnlyRunner(BaseReadOnlyRunner):
"""A read-only Ray runner."""

Expand Down Expand Up @@ -237,7 +232,7 @@ def __init__(self,
@StorageMixin.transactional
def append(self, data: InputData) -> Optional[rt.Patch]:
op = RayAppendOp(self._storage.location, self._storage.metadata,
self._ray_options.parallelism, self._file_options)
self._ray_options, self._file_options)
op.write(data)
return op.finish()

Expand All @@ -248,8 +243,12 @@ def append_from(
if not isinstance(source_fns, list):
source_fns = [source_fns]

ray_options = copy.deepcopy(self._ray_options)
ray_options.max_parallelism = min(len(source_fns),
ray_options.max_parallelism)

op = RayAppendOp(self._storage.location, self._storage.metadata,
self._ray_options.parallelism, self._file_options)
ray_options, self._file_options)
op.write_from(source_fns)

return op.finish()
Expand All @@ -267,8 +266,8 @@ def append_parquet(self, pattern: str) -> Optional[rt.Patch]:
@StorageMixin.transactional
def _insert(self, data: InputData,
mode: InsertOptions.Mode) -> Optional[rt.Patch]:
op = RayInsertOp(self._storage, InsertOptions(mode=mode),
self._ray_options.parallelism, self._file_options)
op = RayInsertOp(self._storage, InsertOptions(mode=mode), self._ray_options,
self._file_options)
return op.write(data)

@StorageMixin.transactional
Expand Down

0 comments on commit 9c6598b

Please sign in to comment.