Skip to content

Commit

Permalink
Support materialized views in directory catalog (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhou Fang committed Jan 19, 2024
1 parent b193bea commit 349b5fa
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 26 deletions.
10 changes: 9 additions & 1 deletion python/src/space/catalogs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pyarrow as pa

from space.core.datasets import Dataset
from space.core.views import MaterializedView
from space.core.views import MaterializedView, View


class BaseCatalog(ABC):
Expand All @@ -44,6 +44,14 @@ def create_dataset(self, name: str, schema: pa.Schema,
record_fields: fields stored in row format (ArrayRecord).
"""

def materialize(self, name: str, view: View):
"""Create a new materialized view.
Args:
name: the materialized view name.
view: the view to be materialized.
"""

@abstractmethod
def delete_dataset(self, name: str) -> None:
"""Delete an existing dataset or materialized view.
Expand Down
26 changes: 19 additions & 7 deletions python/src/space/catalogs/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

from space.catalogs.base import BaseCatalog, DatasetInfo
from space.core.datasets import Dataset
from space.core.utils import paths
from space.core.views import MaterializedView
import space.core.proto.metadata_pb2 as meta
from space.core.storage import Storage
from space.core.utils import errors, paths
from space.core.views import MaterializedView, View, load_materialized_view


class DirCatalog(BaseCatalog):
Expand All @@ -39,18 +41,28 @@ def __init__(self, location):
def create_dataset(self, name: str, schema: pa.Schema,
primary_keys: List[str],
record_fields: List[str]) -> Dataset:
# TODO: should disallow overwriting an entry point file, to avoid creating
# two datasets at the same location.
return Dataset.create(self._dataset_location(name), schema, primary_keys,
record_fields)

def materialize(self, name: str, view: View):
return view.materialize(self._dataset_location(name))

def delete_dataset(self, name: str) -> None:
raise NotImplementedError("delete_dataset has not been implemented")

def dataset(self, name: str) -> Union[Dataset, MaterializedView]:
# TODO: to catch file not found and re-throw a DatasetNotFoundError.
# TODO: to support loading a materialized view.
return Dataset.load(self._dataset_location(name))
try:
storage = Storage.load(self._dataset_location(name))
except FileNotFoundError as e:
raise errors.StorageNotFoundError(str(e)) from None

if storage.metadata.type == meta.StorageMetadata.DATASET:
return Dataset(storage)
elif storage.metadata.type == meta.StorageMetadata.MATERIALIZED_VIEW:
return load_materialized_view(storage)

raise errors.SpaceRuntimeError(
f"Storage type {storage.metadata.type} is not supported")

def datasets(self) -> List[DatasetInfo]:
results = []
Expand Down
20 changes: 12 additions & 8 deletions python/src/space/core/utils/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ class PrimaryKeyExistError(UserInputError):
"""Errors caused by duplicated primary keys."""


class FileExistError(UserInputError):
"""Errors caused by a file to create already exists."""


class StorageExistError(UserInputError):
"""Errors caused by a storage to create already exists."""


class StorageNotFoundError(UserInputError):
"""The storage to load is not found."""


class SpaceRuntimeError(RuntimeError):
"""Basic class of errors thrown from Space runtime."""

Expand All @@ -41,11 +53,3 @@ class TransactionError(SpaceRuntimeError):

class LogicalPlanError(SpaceRuntimeError):
"""Errors from parsing logical plan."""


class FileExistError(UserInputError):
"""Errors caused by a file to create already exists."""


class StorageExistError(UserInputError):
"""Errors caused by a storage to create already exists."""
21 changes: 12 additions & 9 deletions python/src/space/core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,7 @@ def _sanitize_fields(field_names: List[str]) -> None:
from space.core.transform.join import JoinTransform
from space.ray.ops.join import JoinInput
return JoinTransform(join_keys=keys,
left=JoinInput(left, left_fields,
left_reference_read),
left=JoinInput(left, left_fields, left_reference_read),
right=JoinInput(right, right_fields,
right_reference_read))

Expand Down Expand Up @@ -293,11 +292,15 @@ def create(cls, location: str, view: View, logical_plan: meta.LogicalPlan,
@classmethod
def load(cls, location: str) -> MaterializedView:
"""Load a materialized view from files."""
storage = Storage.load(location)
metadata = storage.metadata
plan = metadata.logical_plan.logical_plan
return load_materialized_view(Storage.load(location))

# pylint: disable=cyclic-import,import-outside-toplevel
from space.core.transform.udfs import load_view
view = load_view(storage.location, metadata, plan)
return MaterializedView(storage, view)

def load_materialized_view(storage: Storage) -> MaterializedView:
"""Load a materialized view from a storage."""
metadata = storage.metadata
plan = metadata.logical_plan.logical_plan

# pylint: disable=cyclic-import,import-outside-toplevel
from space.core.transform.udfs import load_view
view = load_view(storage.location, metadata, plan)
return MaterializedView(storage, view)
52 changes: 51 additions & 1 deletion python/tests/catalogs/test_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,25 @@
# limitations under the License.

import os
from typing import Dict

import numpy as np
import pyarrow as pa
import pytest

from space import DatasetInfo, DirCatalog
from space.core.utils import errors


# A sample UDF for testing.
def _sample_map_udf(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
batch["float64"] = batch["float64"] + 1
return batch


class TestDirectoryCatalog:

def test_crud(self, tmp_path):
def test_dataset_crud(self, tmp_path):
schema = pa.schema([("f", pa.int64())])
pks = ["f"]
records = []
Expand Down Expand Up @@ -57,3 +65,45 @@ def test_crud(self, tmp_path):
cat.create_dataset("ds2", schema, pks, records)

assert "already exists" in str(excinfo.value)

with pytest.raises(errors.StorageNotFoundError) as excinfo:
cat.dataset("ds_not_exist")

assert "Failed to open local file" in str(excinfo.value)

def test_materialized_view_crud(self, tmp_path):
schema = pa.schema([("f", pa.int64()), ("float64", pa.float64())])
pks = ["f"]
records = []

location = str(tmp_path / "cat")
cat = DirCatalog(location)

ds = cat.create_dataset("ds", schema, pks, records)
view = ds.map_batches(fn=_sample_map_udf,
input_fields=["f", "float64"],
output_schema=schema,
output_record_fields=[])

mv1 = cat.materialize("mv1", view)

ds.local().append({"f": [1, 2, 3], "float64": [0.1, 0.2, 0.3]})
mv1.ray().refresh()
expected_data = {"f": [1, 2, 3], "float64": [1.1, 1.2, 1.3]}
assert mv1.local().read_all().to_pydict() == expected_data

mv1_loaded = cat.dataset("mv1")
assert mv1_loaded.local().read_all().to_pydict() == expected_data

with pytest.raises(errors.StorageExistError):
cat.materialize("mv1", view)

with pytest.raises(errors.StorageExistError):
cat.materialize("ds", view)

key_fn = lambda ds: ds.location # pylint: disable=unnecessary-lambda-assignment
assert sorted(cat.datasets(), key=key_fn) == sorted([
DatasetInfo("ds", ds.storage.location),
DatasetInfo("mv1", mv1.storage.location)
],
key=key_fn)

0 comments on commit 349b5fa

Please sign in to comment.