Skip to content

Commit

Permalink
Add data_files() method to read alive data files at a given snapshot (#7
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Zhou Fang committed Dec 23, 2023
1 parent fbb7d9b commit a558715
Show file tree
Hide file tree
Showing 14 changed files with 395 additions and 303 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
- name: Analysing test code with pylint
working-directory: ./python
run: |
pylint tests/**/* --disable="missing-module-docstring,missing-function-docstring,missing-class-docstring"
pylint tests/**/* --disable="missing-module-docstring,missing-function-docstring,missing-class-docstring,duplicate-code"
- name: Checking type with mypy
working-directory: ./python/src
run: |
Expand Down
88 changes: 46 additions & 42 deletions python/src/space/core/manifests/falsifiable_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,66 +36,76 @@
from space.core.schema import constants


def substrait_expr(schema: pa.Schema,
arrow_expr: pc.Expression) -> ExtendedExpression:
def build_manifest_filter(schema: pa.Schema, field_name_ids: Dict[str, int],
filter_: pc.Expression) -> Optional[pc.Expression]:
"""Build a falsifiable filter on index manifest files column statistics.
Args:
schema: the storage schema.
filter_: a filter on data fields.
field_name_ids: a dict of field names to IDs mapping.
Returns:
Falsifiable filter, or None if not supported.
"""
expr = _substrait_expr(schema, filter_)

try:
return ~_falsifiable_filter(expr, field_name_ids) # type: ignore[operator]
except _ExpressionException as e:
logging.info(
"Index manifest filter push-down is not used, query may be slower; "
f"error: {e}")
return None


def _substrait_expr(schema: pa.Schema,
arrow_expr: pc.Expression) -> ExtendedExpression:
"""Convert an expression from Arrow to Substrait format.
PyArrow does not expose enough methods for processing expressions, thus we
convert it to Substrait format for processing.
"""
buf = ps.serialize_expressions( # type: ignore[attr-defined]
[arrow_expr], ['expr'], schema)
[arrow_expr], ["expr"], schema)

expr = ExtendedExpression()
expr.ParseFromString(buf.to_pybytes())
return expr


class ExpressionException(Exception):
"""Raise for exceptions in expressions."""
class _ExpressionException(Exception):
"""Raise for exceptions in Substrait expressions."""


def falsifiable_filter(
filter_: ExtendedExpression,
field_name_to_id_dict: Dict[str, int]) -> Optional[pc.Expression]:
"""Build a falsifiable filter.
Args:
filter_: a filter on data fields.
field_name_to_id_dict: a dict of field names to IDs mapping.
Returns:
Falsifiable filter, or None if not convertable.
"""
def _falsifiable_filter(filter_: ExtendedExpression,
field_name_ids: Dict[str, int]) -> pc.Expression:
if len(filter_.referred_expr) != 1:
logging.warning(
f"Expect 1 referred expr, found: {len(filter_.referred_expr)}; "
"Falsifiable filter is not used.")
return None
raise _ExpressionException(
f"Expect 1 referred expr, found: {len(filter_.referred_expr)}")

return _falsifiable_filter(
return _falsifiable_filter_internal(
filter_.extensions, # type: ignore[arg-type]
filter_.base_schema,
field_name_to_id_dict,
field_name_ids,
filter_.referred_expr[0].expression.scalar_function)


# pylint: disable=too-many-locals,too-many-return-statements
def _falsifiable_filter(
def _falsifiable_filter_internal(
extensions: List[SimpleExtensionDeclaration], base_schema: NamedStruct,
field_name_to_id_dict: Dict[str, int],
root: Expression.ScalarFunction) -> Optional[pc.Expression]:
field_name_ids: Dict[str, int],
root: Expression.ScalarFunction) -> pc.Expression:
if len(root.arguments) != 2:
logging.warning(f"Invalid number of arguments: {root.arguments}; "
"Falsifiable filter is not used.")
return None
raise _ExpressionException(
f"Invalid number of arguments: {root.arguments}")

fn = extensions[root.function_reference].extension_function.name
lhs = root.arguments[0].value
rhs = root.arguments[1].value

falsifiable_filter_fn = partial(_falsifiable_filter, extensions, base_schema,
field_name_to_id_dict)
falsifiable_filter_fn = partial(_falsifiable_filter_internal, extensions,
base_schema, field_name_ids)

if _has_scalar_function(lhs) and _has_scalar_function(rhs):
lhs_fn = lhs.scalar_function
Expand All @@ -109,18 +119,13 @@ def _falsifiable_filter(
return falsifiable_filter_fn(lhs_fn) & falsifiable_filter_fn(
rhs_fn) # type: ignore[operator]
else:
logging.warning(f"Unsupported fn: {fn}; Falsifiable filter is not used.")
return None
raise _ExpressionException(f"Unsupported fn: {fn}")

if _has_selection(lhs) and _has_selection(rhs):
logging.warning(f"Both args are fields: {root.arguments}; "
"Falsifiable filter is not used.")
return None
raise _ExpressionException(f"Both args are fields: {root.arguments}")

if _has_literal(lhs) and _has_literal(rhs):
logging.warning(f"Both args are constants: {root.arguments}; "
"Falsifiable filter is not used.")
return None
raise _ExpressionException(f"Both args are constants: {root.arguments}")

# Move literal to rhs.
if _has_selection(rhs):
Expand All @@ -129,7 +134,7 @@ def _falsifiable_filter(

field_index = lhs.selection.direct_reference.struct_field.field
field_name = base_schema.names[field_index]
field_id = field_name_to_id_dict[field_name]
field_id = field_name_ids[field_name]
field_min, field_max = _stats_field_min(field_id), _stats_field_max(field_id)
value = pc.scalar(
getattr(
Expand All @@ -144,8 +149,7 @@ def _falsifiable_filter(
elif fn == "equal":
return (field_min > value) | (field_max < value)

logging.warning(f"Unsupported fn: {fn}; Falsifiable filter is not used.")
return None
raise _ExpressionException(f"Unsupported fn: {fn}")


def _stats_field_min(field_id: int) -> pc.Expression:
Expand Down
64 changes: 62 additions & 2 deletions python/src/space/core/manifests/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
#
"""Index manifest files writer and reader implementation."""

import typing
from typing import Any, Dict, List, Optional, Tuple

from dataclasses import dataclass
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq

from space.core.manifests.utils import write_parquet_file
from space.core.utils.parquet import write_parquet_file
import space.core.proto.metadata_pb2 as meta
import space.core.proto.runtime_pb2 as runtime
from space.core.schema import constants
from space.core.schema import utils as schema_utils
from space.core.schema.arrow import field_id, field_id_to_column_id_dict
Expand Down Expand Up @@ -199,5 +203,61 @@ def finish(self) -> Optional[str]:
return None

file_path = paths.new_index_manifest_path(self._metadata_dir)
write_parquet_file(file_path, self._manifest_schema, manifest_data)
write_parquet_file(file_path, self._manifest_schema, [manifest_data])
return file_path


@dataclass
class _IndexManifests:
"""Represent index manfiests read from a Parquet file."""
file_path: pa.StringArray
num_rows: pa.Int64Array
index_compressed_bytes: pa.Int64Array
index_uncompressed_bytes: pa.Int64Array


def read_index_manifests(
manifest_path: str,
filter_: Optional[pc.Expression] = None) -> runtime.FileSet:
"""Read an index manifest file.
Args:
manifest_path: full file path of the manifest file.
filter_: a filter on the index manifest rows.
Returns:
A file set of data files in the manifest file.
"""
if filter_ is None:
table = pq.read_table(manifest_path)
else:
table = pq.read_table(manifest_path,
filters=filter_) # type: ignore[arg-type]

manifests = _index_manifests(table)

file_set = runtime.FileSet()
for i in range(table.num_rows):
file = runtime.DataFile()
file.path = manifests.file_path[i].as_py()

stats = file.storage_statistics
stats.num_rows = manifests.num_rows[i].as_py()
stats.index_compressed_bytes = manifests.index_compressed_bytes[i].as_py()
stats.index_uncompressed_bytes = manifests.index_uncompressed_bytes[
i].as_py()

file_set.index_files.append(file)

return file_set


@typing.no_type_check
def _index_manifests(table: pa.Table) -> _IndexManifests:
return _IndexManifests(
file_path=table.column(constants.FILE_PATH_FIELD).combine_chunks(),
num_rows=table.column(constants.NUM_ROWS_FIELD).combine_chunks(),
index_compressed_bytes=table.column(
_INDEX_COMPRESSED_BYTES_FIELD).combine_chunks(),
index_uncompressed_bytes=table.column(
_INDEX_UNCOMPRESSED_BYTES_FIELD).combine_chunks())
4 changes: 2 additions & 2 deletions python/src/space/core/manifests/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import pyarrow as pa

from space.core.manifests.utils import write_parquet_file
from space.core.utils.parquet import write_parquet_file
import space.core.proto.metadata_pb2 as meta
from space.core.utils import paths
from space.core.schema import constants
Expand Down Expand Up @@ -73,5 +73,5 @@ def finish(self) -> Optional[str]:
schema=self._manifest_schema) # type: ignore[call-arg]

file_path = paths.new_record_manifest_path(self._metadata_dir)
write_parquet_file(file_path, self._manifest_schema, manifest_data)
write_parquet_file(file_path, self._manifest_schema, [manifest_data])
return file_path
12 changes: 6 additions & 6 deletions python/src/space/core/ops/append.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ def __init__(self, location: str, metadata: meta.StorageMetadata):

self._metadata = metadata
record_fields = set(self._metadata.schema.record_fields)
self._schema = arrow.arrow_schema(self._metadata.schema.fields,
record_fields,
physical=True)
self._physical_schema = arrow.arrow_schema(self._metadata.schema.fields,
record_fields,
physical=True)
self._index_fields, self._record_fields = arrow.classify_fields(
self._schema, record_fields, selected_fields=None)
self._physical_schema, record_fields, selected_fields=None)

# Data file writers.
self._index_writer_info: Optional[_IndexWriterInfo] = None
Expand All @@ -104,7 +104,7 @@ def __init__(self, location: str, metadata: meta.StorageMetadata):

# Manifest file writers.
self._index_manifest_writer = IndexManifestWriter(
self._metadata_dir, self._schema,
self._metadata_dir, self._physical_schema,
self._metadata.schema.primary_keys) # type: ignore[arg-type]
self._record_manifest_writer = RecordManifestWriter(self._metadata_dir)

Expand Down Expand Up @@ -199,7 +199,7 @@ def _maybe_create_index_writer(self) -> None:
"""Create a new index file writer if needed."""
if self._index_writer_info is None:
full_file_path = paths.new_index_file_path(self._data_dir)
writer = pq.ParquetWriter(full_file_path, self._schema)
writer = pq.ParquetWriter(full_file_path, self._physical_schema)
self._index_writer_info = _IndexWriterInfo(
writer, self.short_path(full_file_path))

Expand Down
15 changes: 15 additions & 0 deletions python/src/space/core/proto/runtime.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@ import "space/core/proto/metadata.proto";

package space.proto;

// Information of a data file.
message DataFile {
// Data file path.
string path = 1;

// Storage statistics of data in the file.
StorageStatistics storage_statistics = 4;
}

// A set of associated data and manifest files.
message FileSet {
// Index data files.
repeated DataFile index_files = 1;
}

// A patch describing metadata changes to the storage.
// NEXT_ID: 4
message Patch {
Expand Down
18 changes: 11 additions & 7 deletions python/src/space/core/proto/runtime_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a558715

Please sign in to comment.