diff --git a/python/pyproject.toml b/python/pyproject.toml index a5b89f8..59162b6 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "space-datasets" -version = "0.0.3" +version = "0.0.4" authors = [{ name = "Space team", email = "no-reply@google.com" }] description = "Unified storage framework for machine learning datasets" readme = "README.md" diff --git a/python/src/space/__init__.py b/python/src/space/__init__.py index 639b3de..9f19cce 100644 --- a/python/src/space/__init__.py +++ b/python/src/space/__init__.py @@ -14,6 +14,8 @@ # """Space is a storage framework for ML datasets.""" +from space.catalogs.base import DatasetInfo +from space.catalogs.directory import DirCatalog from space.core.datasets import Dataset from space.core.options import JoinOptions, Range, ReadOptions from space.core.runners import LocalRunner diff --git a/python/src/space/catalogs/__init__.py b/python/src/space/catalogs/__init__.py new file mode 100644 index 0000000..5678014 --- /dev/null +++ b/python/src/space/catalogs/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/python/src/space/catalogs/base.py b/python/src/space/catalogs/base.py new file mode 100644 index 0000000..58c7341 --- /dev/null +++ b/python/src/space/catalogs/base.py @@ -0,0 +1,77 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Catalogs of Space datasets.""" + +from __future__ import annotations +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import List, Union + +import pyarrow as pa + +from space.core.datasets import Dataset +from space.core.views import MaterializedView + + +class BaseCatalog(ABC): + """A catalog is a container of datasets. + + Datasets in a catalog scope can be referenced by a dataset name uniquely. + """ + + @abstractmethod + def create_dataset(self, name: str, schema: pa.Schema, + primary_keys: List[str], + record_fields: List[str]) -> Dataset: + """Create a new empty dataset. + + Args: + name: the dataset name. + schema: the schema of the storage. + primary_keys: un-enforced primary keys. + record_fields: fields stored in row format (ArrayRecord). + """ + + @abstractmethod + def delete_dataset(self, name: str) -> None: + """Delete an existing dataset or materialized view. + + Args: + name: the dataset name. + """ + + @abstractmethod + def dataset(self, name: str) -> Union[Dataset, MaterializedView]: + """Get an existing dataset or materialized view. + + Args: + name: the dataset name. + """ + + @abstractmethod + def datasets(self) -> List[DatasetInfo]: + """List all datasets and materialized views in the catalog.""" + + +@dataclass +class DatasetInfo: + """Basic information of a dataset or materialized view.""" + + # Dataset name. + name: str + # Dataset storage location. + location: str + # TODO: to include create time, type; it requires us to store these fields in + # entry point file to avoid openning metadata file. diff --git a/python/src/space/catalogs/directory.py b/python/src/space/catalogs/directory.py new file mode 100644 index 0000000..41be0df --- /dev/null +++ b/python/src/space/catalogs/directory.py @@ -0,0 +1,66 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Directory catalog implementation.""" + +import os + +from typing import List, Union +import pyarrow as pa + +from space.catalogs.base import BaseCatalog, DatasetInfo +from space.core.datasets import Dataset +from space.core.utils import paths +from space.core.views import MaterializedView + + +class DirCatalog(BaseCatalog): + """A directory catalog consists of datasets with location under the same + directory. + + TODO: to build file system abstraction instead of directly using `os.path`, + for extension to more file system types. + """ + + def __init__(self, location): + self._location = location + + def create_dataset(self, name: str, schema: pa.Schema, + primary_keys: List[str], + record_fields: List[str]) -> Dataset: + # TODO: should disallow overwriting an entry point file, to avoid creating + # two datasets at the same location. + return Dataset.create(self._dataset_name(name), schema, primary_keys, + record_fields) + + def delete_dataset(self, name: str) -> None: + raise NotImplementedError("delete_dataset has not been implemented") + + def dataset(self, name: str) -> Union[Dataset, MaterializedView]: + # TODO: to catch file not found and re-throw a DatasetNotFoundError. + # TODO: to support loading a materialized view. + return Dataset.load(self._dataset_name(name)) + + def datasets(self) -> List[DatasetInfo]: + results = [] + for ds_name in os.listdir(self._location): + ds_location = self._dataset_name(ds_name) + if os.path.isdir(ds_location) and os.path.isfile( + paths.entry_point_path(ds_location)): + results.append(DatasetInfo(ds_name, ds_location)) + + return results + + def _dataset_name(self, name: str) -> str: + return os.path.join(self._location, name) diff --git a/python/src/space/core/fs/arrow.py b/python/src/space/core/fs/arrow.py index 34090b2..0dd66d8 100644 --- a/python/src/space/core/fs/arrow.py +++ b/python/src/space/core/fs/arrow.py @@ -40,7 +40,7 @@ def create_dir(self, dir_path: str) -> None: def write_proto(self, file_path: str, msg: ProtoT) -> None: # TODO: the current implement overwrite an existing file; to support an - # to disallow overwrite. + # option to disallow overwrite. tmp_file_path = f"{file_path}.{random_id()}.tmp" with self._fs.open_output_stream(tmp_file_path) as f: diff --git a/python/tests/catalogs/test_directory.py b/python/tests/catalogs/test_directory.py new file mode 100644 index 0000000..96bcec3 --- /dev/null +++ b/python/tests/catalogs/test_directory.py @@ -0,0 +1,53 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import pyarrow as pa +import pytest + +from space import DatasetInfo, DirCatalog + + +class TestDirectoryCatalog: + + def test_crud(self, tmp_path): + schema = pa.schema([("f", pa.int64())]) + pks = ["f"] + records = [] + + location = str(tmp_path / "cat") + cat = DirCatalog(location) + + with pytest.raises(FileNotFoundError): + cat.datasets() + + os.mkdir(location) + assert not cat.datasets() + + ds1 = cat.create_dataset("ds1", schema, pks, records) + ds1_data = {"f": [1, 2, 3]} + ds1.local().append(ds1_data) + + ds1_loaded = cat.dataset("ds1") + assert ds1_loaded.local().read_all().to_pydict() == ds1_data + + ds1_info = DatasetInfo("ds1", ds1_loaded.storage.location) + assert cat.datasets() == [ds1_info] + + ds2 = cat.create_dataset("ds2", schema, pks, records) + + key_fn = lambda ds: ds.location # pylint: disable=unnecessary-lambda-assignment + assert sorted(cat.datasets(), key=key_fn) == sorted( + [ds1_info, DatasetInfo("ds2", ds2.storage.location)], key=key_fn)