Skip to content

Commit

Permalink
Add local delete operation implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
coufon committed Dec 24, 2023
1 parent b1f2201 commit e7590ec
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 48 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ jobs:
- name: Analysing test code with pylint
working-directory: ./python
run: |
pylint tests/**/* --disable="missing-module-docstring,missing-function-docstring,missing-class-docstring,duplicate-code"
pylint tests/**/* \
--disable="missing-module-docstring,missing-function-docstring,missing-class-docstring,duplicate-code"
- name: Checking type with mypy
working-directory: ./python/src
run: |
Expand Down
25 changes: 12 additions & 13 deletions python/src/space/core/manifests/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@
from space.core.schema.arrow import field_id, field_id_to_column_id_dict
from space.core.utils import paths

# Manifest file fields.
_INDEX_COMPRESSED_BYTES_FIELD = '_INDEX_COMPRESSED_BYTES'
_INDEX_UNCOMPRESSED_BYTES_FIELD = '_INDEX_UNCOMPRESSED_BYTES'


def _stats_subfields(type_: pa.DataType) -> List[pa.Field]:
"""Column stats struct field sub-fields."""
Expand All @@ -51,8 +47,8 @@ def _manifest_schema(

fields = [(constants.FILE_PATH_FIELD, pa.utf8()),
(constants.NUM_ROWS_FIELD, pa.int64()),
(_INDEX_COMPRESSED_BYTES_FIELD, pa.int64()),
(_INDEX_UNCOMPRESSED_BYTES_FIELD, pa.int64())]
(constants.INDEX_COMPRESSED_BYTES_FIELD, pa.int64()),
(constants.INDEX_UNCOMPRESSED_BYTES_FIELD, pa.int64())]

# Fields to collect Parquet column statistics: [(field_id, type), ...].
stats_fields: List[Tuple[int, pa.DataType]] = []
Expand Down Expand Up @@ -139,6 +135,8 @@ def __init__(self, metadata_dir: str, schema: pa.Schema,
self._stats_column_ids.append(column_id)
self._field_stats_dict[column_id] = _FieldStats(type_)

self._cached_manifest_data: List[pa.Table] = []

def write(self, file_path: str,
parquet_metadata: pq.FileMetaData) -> meta.StorageStatistics:
"""Write a new manifest row.
Expand Down Expand Up @@ -175,11 +173,12 @@ def write(self, file_path: str,
index_compressed_bytes=index_compressed_bytes,
index_uncompressed_bytes=index_uncompressed_bytes)

def write_arrow(self, manifest_data: pa.Table) -> None:
self._cached_manifest_data.append(manifest_data)

def finish(self) -> Optional[str]:
"""Materialize the manifest file and return the file path."""
# Convert cached manifest data to Arrow.
all_manifest_data: List[pa.Table] = []

if self._file_paths:
arrays = [
self._file_paths, self._num_rows, self._index_compressed_bytes,
Expand All @@ -189,15 +188,15 @@ def finish(self) -> Optional[str]:
for column_id in self._stats_column_ids:
arrays.append(self._field_stats_dict[column_id].to_arrow())

all_manifest_data.append(
self._cached_manifest_data.append(
pa.Table.from_arrays(
arrays=arrays,
schema=self._manifest_schema)) # type: ignore[call-arg]

if not all_manifest_data:
if not self._cached_manifest_data:
return None

manifest_data = pa.concat_tables(all_manifest_data)
manifest_data = pa.concat_tables(self._cached_manifest_data)
if manifest_data.num_rows == 0:
return None

Expand Down Expand Up @@ -254,6 +253,6 @@ def _index_manifests(table: pa.Table) -> _IndexManifests:
file_path=table.column(constants.FILE_PATH_FIELD).combine_chunks(),
num_rows=table.column(constants.NUM_ROWS_FIELD).combine_chunks(),
index_compressed_bytes=table.column(
_INDEX_COMPRESSED_BYTES_FIELD).combine_chunks(),
constants.INDEX_COMPRESSED_BYTES_FIELD).combine_chunks(),
index_uncompressed_bytes=table.column(
_INDEX_UNCOMPRESSED_BYTES_FIELD).combine_chunks())
constants.INDEX_UNCOMPRESSED_BYTES_FIELD).combine_chunks())
35 changes: 26 additions & 9 deletions python/src/space/core/ops/append.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,14 @@ class LocalAppendOp(BaseAppendOp, StoragePaths):
Not thread safe.
"""

def __init__(self, location: str, metadata: meta.StorageMetadata):
def __init__(self,
location: str,
metadata: meta.StorageMetadata,
record_address_input: bool = True):
"""
Args:
record_address_input: if true, input record fields are addresses.
"""
StoragePaths.__init__(self, location)

self._metadata = metadata
Expand All @@ -91,6 +98,8 @@ def __init__(self, location: str, metadata: meta.StorageMetadata):
self._index_fields, self._record_fields = arrow.classify_fields(
self._physical_schema, record_fields, selected_fields=None)

self._record_address_input = record_address_input

# Data file writers.
self._index_writer_info: Optional[_IndexWriterInfo] = None

Expand Down Expand Up @@ -151,6 +160,10 @@ def finish(self) -> Optional[runtime.Patch]:

return self._patch

def append_index_manifest(self, index_manifest_table: pa.Table) -> None:
"""Append external index manifest data."""
return self._index_manifest_writer.write_arrow(index_manifest_table)

def _append_arrow(self, data: pa.Table) -> None:
# TODO: to verify the schema of input data.
if data.num_rows == 0:
Expand All @@ -165,15 +178,19 @@ def _append_arrow(self, data: pa.Table) -> None:

# Write record fields into files.
# TODO: to parallelize it.
record_addresses = [
self._write_record_column(f, data.column(f.name))
for f in self._record_fields
]

# TODO: to preserve the field order in schema.
for field_name, address_column in record_addresses:
# TODO: the field/column added must have field ID.
index_data = index_data.append_column(field_name, address_column)
if self._record_address_input:
for f in self._record_fields:
index_data = index_data.append_column(f.name, data.column(f.name))
else:
record_addresses = [
self._write_record_column(f, data.column(f.name))
for f in self._record_fields
]

for field_name, address_column in record_addresses:
# TODO: the field/column added must have field ID.
index_data = index_data.append_column(field_name, address_column)

# Write index fields into files.
self._cached_index_file_bytes += index_data.nbytes
Expand Down
190 changes: 190 additions & 0 deletions python/src/space/core/ops/delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Local delete operation implementation."""

from abc import abstractmethod
from typing import List, Optional, Set

import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc

from space.core.ops import utils
from space.core.ops.append import LocalAppendOp
from space.core.ops.base import BaseOp
from space.core.proto import metadata_pb2 as meta
from space.core.proto import runtime_pb2 as runtime
from space.core.schema import arrow
from space.core.utils.paths import StoragePaths
from space.core.schema import constants


class BaseDeleteOp(BaseOp):
"""Abstract base delete operation class.
The deletion only applies to index files. The rows in record files are
cleaned up by a separate garbage collection operation.
"""

@abstractmethod
def delete(self) -> Optional[r.Patch]:
"""Delete data matching the filter from the storage.
TODO: a class is not needed for the current single thread implementation.
To revisit the interface.
"""


class FileSetDeleteOp(BaseDeleteOp, StoragePaths):
"""Delete operation of a given file set running locally.
It can be used as components of more complex operations and distributed
delete operation.
Not thread safe.
"""

def __init__(self, location: str, metadata: meta.StorageMetadata,
file_set: r.FileSet, filter_: pc.Expression):
StoragePaths.__init__(self, location)

if not _validate_files(file_set):
raise RuntimeError(f"Invalid input file set for delete op:\n{file_set}")

self._file_set = file_set
# Rows not matching not filter will be reinserted.
self._reinsert_filter = ~filter_

self._append_op = LocalAppendOp(location,
metadata,
record_address_input=True)

def delete(self) -> Optional[runtime.Patch]:
# The index files and manifests deleted, to remove them from index
# manifests.
deleted_files: List[str] = []
deleted_manifest_ids: Set[int] = set()

deleted_rows = 0
stats_before_delete = meta.StorageStatistics()
for file in self._file_set.index_files:
utils.update_index_storage_stats(stats_before_delete,
file.storage_statistics)

index_data = pq.read_table(self.full_path(file.path),
filters=self._reinsert_filter)

# No row is deleted. No need to re-insert rows.
if index_data.num_rows == file.storage_statistics.num_rows:
continue

# Collect statistics.
deleted_rows += (file.storage_statistics.num_rows - index_data.num_rows)
all_deleted = (index_data.num_rows == 0)

# Record deleted files and manifests information.
deleted_files.append(file.path)
deleted_manifest_ids.add(file.manifest_file_id)

# Write reinsert file for survived rows.
if all_deleted:
continue

# Re-insert survived rows.
self._append_op.write(index_data)

if deleted_rows == 0:
return None

# Carry over unmodified index files in reinserted manifests.
deleted_manifest_files: List[str] = []
for manifest_id in deleted_manifest_ids:
if manifest_id not in self._file_set.index_manifest_files:
raise RuntimeError(
f"Index manifest ID {manifest_id} not found in file set")

deleted_manifest_files.append(
self._file_set.index_manifest_files[manifest_id])

# Carry over survivor files to the new manifest data.
# Survivor stands for unmodified files.
survivor_files_filter = ~_build_file_path_filter(constants.FILE_PATH_FIELD,
deleted_files)
survivor_index_manifests = pq.ParquetDataset(
[self.full_path(f) for f in deleted_manifest_files],
filters=survivor_files_filter).read()
if survivor_index_manifests.num_rows > 0:
self._append_op.append_index_manifest(survivor_index_manifests)

reinsert_patch = self._append_op.finish()

# Populate the patch for the delete.
patch = runtime.Patch()
if reinsert_patch is not None:
patch.addition.index_manifest_files.extend(
reinsert_patch.addition.index_manifest_files)

for f in deleted_manifest_files:
patch.deletion.index_manifest_files.append(f)

# Compute storage statistics update.
survivor_stats = _read_index_statistics(survivor_index_manifests)
reinsert_stats = (reinsert_patch.storage_statistics_update if
reinsert_patch is not None else meta.StorageStatistics())

deleted_compressed_bytes = (reinsert_stats.index_compressed_bytes +
survivor_stats.index_compressed_bytes
) - stats_before_delete.index_compressed_bytes
deleted_uncompressed_bytes = (
reinsert_stats.index_uncompressed_bytes +
survivor_stats.index_uncompressed_bytes
) - stats_before_delete.index_uncompressed_bytes

patch.storage_statistics_update.CopyFrom(
meta.StorageStatistics(
num_rows=-deleted_rows,
index_compressed_bytes=-deleted_compressed_bytes,
index_uncompressed_bytes=-deleted_uncompressed_bytes))
return patch


def _build_file_path_filter(field_name: str, file_paths: List[str]):
filter_ = pc.scalar(False)
for f in file_paths:
filter_ |= (pc.field(field_name) == pc.scalar(f))
return filter_


def _read_index_statistics(manifest_data: pa.Table) -> meta.StorageStatistics:
"""Read storage statistics of unmodified index files from manifests."""

def sum_bytes(field_name: str) -> int:
return sum(manifest_data.column(field_name).combine_chunks().to_pylist())

return meta.StorageStatistics(index_compressed_bytes=sum_bytes(
constants.INDEX_COMPRESSED_BYTES_FIELD),
index_uncompressed_bytes=sum_bytes(
constants.INDEX_UNCOMPRESSED_BYTES_FIELD))


def _validate_files(file_set: runtime.FileSet) -> bool:
"""Return false if the file set does not contain sufficient information for
deletion.
"""
for f in file_set.index_files:
if not f.path or f.storage_statistics.num_rows == 0 or f.manifest_file_id == 0:
return False

return len(file_set.index_manifest_files) > 0
26 changes: 16 additions & 10 deletions python/src/space/core/ops/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ def __iter__(self) -> Iterator[pa.Table]:


class FileSetReadOp(BaseReadOp, StoragePaths):
"""Read data from a dataset."""
"""Read operation of a given file set running locally.
It can be used as components of more complex operations and distributed
read operation.
Not thread safe.
"""

def __init__(self,
location: str,
Expand Down Expand Up @@ -78,12 +84,12 @@ def __iter__(self) -> Iterator[pa.Table]:
yield self._read_index_and_record(file.path)

def _read_index_and_record(self, index_path: str) -> pa.Table:
index_table = pq.read_table(self.full_path(index_path),
filters=self._filter) # type: ignore[arg-type]
index_data = pq.read_table(self.full_path(index_path),
filters=self._filter) # type: ignore[arg-type]

index_column_ids: List[int] = []
record_columns: List[Tuple[int, pa.Field]] = []
for column_id, field in enumerate(index_table.schema):
for column_id, field in enumerate(index_data.schema):
field_id = arrow.field_id(field)
if field_id in self._index_field_ids:
index_column_ids.append(column_id)
Expand All @@ -92,18 +98,18 @@ def _read_index_and_record(self, index_path: str) -> pa.Table:
(column_id,
arrow.binary_field(self._record_fields_dict[field_id])))

result_table = index_table.select(
index_column_ids) # type: ignore[arg-type]
result_data = index_data.select(index_column_ids) # type: ignore[arg-type]

# Record record fields from addresses.
# Read record fields from addresses.
for column_id, field in record_columns:
result_table = result_table.append_column(
result_data = result_data.append_column(
field,
self._read_record_column(
index_table.select([column_id]), # type: ignore[list-item]
index_data.select([column_id]), # type: ignore[list-item]
field.name))

return result_table
# TODO: to keep field order the same as schema.
return result_data

def _read_record_column(self, record_address: pa.Table,
field: str) -> pa.BinaryArray:
Expand Down
Loading

0 comments on commit e7590ec

Please sign in to comment.