Skip to content

Commit

Permalink
Fail dataset creation if a dataset already exists at the location (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
coufon committed Jan 18, 2024
1 parent 70c7d38 commit b193bea
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 7 deletions.
14 changes: 11 additions & 3 deletions python/src/space/core/fs/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pyarrow import fs

from space.core.fs.base import BaseFileSystem, ProtoT
from space.core.utils import errors
from space.core.utils.protos import proto_to_text
from space.core.utils.uuids import random_id

Expand All @@ -38,9 +39,16 @@ def create_fs(self) -> fs.FileSystem:
def create_dir(self, dir_path: str) -> None:
self._fs.create_dir(dir_path)

def write_proto(self, file_path: str, msg: ProtoT) -> None:
# TODO: the current implement overwrite an existing file; to support an
# option to disallow overwrite.
def write_proto(self,
file_path: str,
msg: ProtoT,
fail_if_exists: bool = False) -> None:
# TODO: this check is not atomic; a more file system specific implement is
# needed.
if fail_if_exists and self._fs.get_file_info(
file_path).type != fs.FileType.NotFound:
raise errors.FileExistError(f"File {file_path} already exists")

tmp_file_path = f"{file_path}.{random_id()}.tmp"

with self._fs.open_output_stream(tmp_file_path) as f:
Expand Down
14 changes: 12 additions & 2 deletions python/src/space/core/fs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,18 @@ def create_dir(self, dir_path: str) -> None:
"""Create a new directory."""

@abstractmethod
def write_proto(self, file_path: str, msg: ProtoT) -> None:
"""Write a proto message in text format to a file."""
def write_proto(self,
file_path: str,
msg: ProtoT,
fail_if_exists: bool = False) -> None:
"""Write a proto message in text format to a file.
Args:
file_path: full path of the file to write to
msg: the proto message to write
fail_if_exists: if true, fail when the file already exists; otherwise
truncate the file
"""

@abstractmethod
def read_proto(self, file_path: str, empty_msg: ProtoT) -> ProtoT:
Expand Down
12 changes: 10 additions & 2 deletions python/src/space/core/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,13 @@ def _initialize_files(self, metadata_path: str) -> None:
self._fs.create_dir(self._data_dir)
self._fs.create_dir(self._metadata_dir)
self._fs.create_dir(self._change_data_dir)
self._write_metadata(metadata_path, self._metadata)

try:
self._write_metadata(metadata_path,
self._metadata,
fail_if_entry_point_exists=True)
except errors.FileExistError as e:
raise errors.StorageExistError(str(e)) from None

def _next_snapshot_id(self) -> int:
return self._metadata.current_snapshot_id + 1
Expand All @@ -407,12 +413,14 @@ def _write_metadata(
self,
metadata_path: str,
metadata: meta.StorageMetadata,
fail_if_entry_point_exists: bool = False,
) -> None:
"""Persist a StorageMetadata to files."""
self._fs.write_proto(metadata_path, metadata)
self._fs.write_proto(
self._entry_point_file,
meta.EntryPoint(metadata_file=self.short_path(metadata_path)))
meta.EntryPoint(metadata_file=self.short_path(metadata_path)),
fail_if_exists=fail_if_entry_point_exists)


def _patch_manifests(manifest_files: meta.ManifestFiles, patch: rt.Patch):
Expand Down
8 changes: 8 additions & 0 deletions python/src/space/core/utils/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ class TransactionError(SpaceRuntimeError):

class LogicalPlanError(SpaceRuntimeError):
"""Errors from parsing logical plan."""


class FileExistError(UserInputError):
"""Errors caused by a file to create already exists."""


class StorageExistError(UserInputError):
"""Errors caused by a storage to create already exists."""
6 changes: 6 additions & 0 deletions python/tests/catalogs/test_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pytest

from space import DatasetInfo, DirCatalog
from space.core.utils import errors


class TestDirectoryCatalog:
Expand Down Expand Up @@ -51,3 +52,8 @@ def test_crud(self, tmp_path):
key_fn = lambda ds: ds.location # pylint: disable=unnecessary-lambda-assignment
assert sorted(cat.datasets(), key=key_fn) == sorted(
[ds1_info, DatasetInfo("ds2", ds2.storage.location)], key=key_fn)

with pytest.raises(errors.StorageExistError) as excinfo:
cat.create_dataset("ds2", schema, pks, records)

assert "already exists" in str(excinfo.value)
6 changes: 6 additions & 0 deletions python/tests/core/fs/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from space.core.fs.arrow import ArrowLocalFileSystem
import space.core.proto.metadata_pb2 as meta
from space.core.utils import errors


class TestArrowLocalFileSystem:
Expand Down Expand Up @@ -57,3 +58,8 @@ def test_overwrite_proto_file(self, tmp_path, fs):
write_msg = meta.StorageMetadata(current_snapshot_id=200)
fs.write_proto(file_path, write_msg)
assert self._read_proto(fs, file_path).current_snapshot_id == 200

with pytest.raises(errors.FileExistError) as excinfo:
fs.write_proto(file_path, write_msg, fail_if_exists=True)

assert "already exists" in str(excinfo.value)
1 change: 1 addition & 0 deletions python/tests/core/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def test_add_read_remove_tag(self, sample_dataset):
ds.remove_tag(tag="insert1")
with pytest.raises(errors.VersionNotFoundError) as excinfo:
local_runner.read_all(version="insert1")

assert "Version insert1 is not found" in str(excinfo.value)

def test_dataset_with_file_type(self, tmp_path):
Expand Down
8 changes: 8 additions & 0 deletions python/tests/core/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ def test_create_storage(self, tmp_path):
primary_keys=["int64"],
record_fields=["binary"])

with pytest.raises(errors.StorageExistError) as excinfo:
Storage.create(location=str(location),
schema=_SCHEMA,
primary_keys=["int64"],
record_fields=["binary"])

assert "already exists" in str(excinfo.value)

entry_point_file = location / "metadata" / _ENTRY_POINT_FILE
assert entry_point_file.exists()

Expand Down

0 comments on commit b193bea

Please sign in to comment.