Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 73 additions & 8 deletions src/iceberg/manifest/manifest_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

#include "iceberg/manifest/manifest_group.h"

#include <algorithm>
#include <string>
#include <unordered_set>
#include <utility>

#include "iceberg/expression/binder.h"
#include "iceberg/expression/evaluator.h"
#include "iceberg/expression/expression.h"
#include "iceberg/expression/manifest_evaluator.h"
Expand All @@ -29,6 +33,7 @@
#include "iceberg/file_io.h"
#include "iceberg/manifest/manifest_reader.h"
#include "iceberg/partition_spec.h"
#include "iceberg/row/manifest_wrapper.h"
#include "iceberg/schema.h"
#include "iceberg/table_scan.h"
#include "iceberg/util/checked_cast.h"
Expand Down Expand Up @@ -265,10 +270,45 @@ Result<std::unique_ptr<ManifestReader>> ManifestGroup::MakeReader(
ICEBERG_ASSIGN_OR_RAISE(auto reader,
ManifestReader::Make(manifest, io_, schema_, specs_by_id_));

auto columns = columns_;
if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue &&
!columns.empty() &&
std::ranges::find(columns, Schema::kAllColumns) == columns.end()) {
auto spec_iter = specs_by_id_.find(manifest.partition_spec_id);
ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
"Cannot find partition spec for ID {}", manifest.partition_spec_id);

ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
spec_iter->second->PartitionType(*schema_));
auto data_file_schema = DataFile::Type(std::move(partition_type))->ToSchema();
ICEBERG_ASSIGN_OR_RAISE(
auto bound_file_filter,
Binder::Bind(*data_file_schema, file_filter_, case_sensitive_));
ICEBERG_ASSIGN_OR_RAISE(auto referenced_field_ids,
ReferenceVisitor::GetReferencedFieldIds(bound_file_filter));

std::unordered_set<std::string> selected_columns(columns.cbegin(), columns.cend());
for (const auto field_id : referenced_field_ids) {
ICEBERG_ASSIGN_OR_RAISE(auto column_name,
data_file_schema->FindColumnNameById(field_id));
if (column_name.has_value()) {
std::string column_name_str(column_name.value());
if (column_name_str.starts_with(DataFile::kPartitionField + ".")) {
column_name_str = DataFile::kPartitionField;
}
if (selected_columns.contains(column_name_str)) {
continue;
}
columns.push_back(std::move(column_name_str));
selected_columns.insert(columns.back());
}
}
}

reader->FilterRows(data_filter_)
.FilterPartitions(partition_filter_)
.CaseSensitive(case_sensitive_)
.Select(columns_);
.Select(std::move(columns));

return reader;
}
Expand Down Expand Up @@ -299,11 +339,31 @@ ManifestGroup::ReadEntries() {
return eval_cache[spec_id].get();
};

std::unique_ptr<Evaluator> data_file_evaluator;
if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue) {
// TODO(gangwu): create an Evaluator on the DataFile schema with empty
// partition type
}
const bool has_file_filter =
file_filter_ && file_filter_->op() != Expression::Operation::kTrue;
std::unordered_map<int32_t, std::unique_ptr<Evaluator>> data_file_eval_cache;
auto get_data_file_evaluator = [&](int32_t spec_id) -> Result<Evaluator*> {
if (!has_file_filter) {
return nullptr;
}
if (data_file_eval_cache.contains(spec_id)) {
return data_file_eval_cache[spec_id].get();
}

auto spec_iter = specs_by_id_.find(spec_id);
ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
"Cannot find partition spec for ID {}", spec_id);

ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
spec_iter->second->PartitionType(*schema_));
auto data_file_schema = DataFile::Type(std::move(partition_type))->ToSchema();
ICEBERG_ASSIGN_OR_RAISE(
auto data_file_evaluator,
Evaluator::Make(*data_file_schema, file_filter_, case_sensitive_));
data_file_eval_cache[spec_id] = std::move(data_file_evaluator);

return data_file_eval_cache[spec_id].get();
};

std::unordered_map<int32_t, std::vector<ManifestEntry>> result;

Expand Down Expand Up @@ -336,15 +396,20 @@ ManifestGroup::ReadEntries() {
ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest));
ICEBERG_ASSIGN_OR_RAISE(auto entries,
ignore_deleted_ ? reader->LiveEntries() : reader->Entries());
ICEBERG_ASSIGN_OR_RAISE(auto data_file_evaluator, get_data_file_evaluator(spec_id));

for (auto& entry : entries) {
if (ignore_existing_ && entry.status == ManifestStatus::kExisting) {
continue;
}

if (data_file_evaluator != nullptr) {
// TODO(gangwu): implement data_file_evaluator to evaluate StructLike on
// top of entry.data_file
DataFileStructLike data_file(*entry.data_file);
ICEBERG_ASSIGN_OR_RAISE(bool should_match,
data_file_evaluator->Evaluate(data_file));
if (!should_match) {
continue;
}
}

if (!manifest_entry_predicate_(entry)) {
Expand Down
146 changes: 146 additions & 0 deletions src/iceberg/row/manifest_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,55 @@

#include "iceberg/row/manifest_wrapper.h"

#include <iterator>
#include <map>
#include <memory>
#include <type_traits>
#include <vector>

#include "iceberg/manifest/manifest_reader_internal.h"
#include "iceberg/util/macros.h"

namespace iceberg {

namespace {

enum class DataFileFieldPosition : size_t {
kContent = 0,
kFilePath = 1,
kFileFormat = 2,
kPartition = 3,
kRecordCount = 4,
kFileSize = 5,
kColumnSizes = 6,
kValueCounts = 7,
kNullValueCounts = 8,
kNanValueCounts = 9,
kLowerBounds = 10,
kUpperBounds = 11,
kKeyMetadata = 12,
kSplitOffsets = 13,
kEqualityIds = 14,
kSortOrderId = 15,
kFirstRowId = 16,
kReferencedDataFile = 17,
kContentOffset = 18,
kContentSize = 19,
kNextUnusedId = 20,
};

template <typename T>
requires std::is_same_v<T, std::vector<uint8_t>> || std::is_same_v<T, std::string>
std::string_view ToView(const T& value) {
return {reinterpret_cast<const char*>(value.data()), value.size()}; // NOLINT
}

Scalar ToScalar(const int32_t value) { return value; }

Scalar ToScalar(const int64_t value) { return value; }

Scalar ToScalar(const std::vector<uint8_t>& value) { return ToView(value); }

template <typename T>
Result<Scalar> FromOptional(const std::optional<T>& value) {
if (value.has_value()) {
Expand All @@ -39,6 +76,56 @@ Result<Scalar> FromOptional(const std::optional<T>& value) {
return std::monostate{};
}

Result<Scalar> FromOptionalString(const std::optional<std::string>& value) {
if (value.has_value()) {
return ToView(value.value());
}
return std::monostate{};
}

template <typename T>
class VectorArrayLike : public ArrayLike {
public:
explicit VectorArrayLike(const std::vector<T>& values) : values_(values) {}

Result<Scalar> GetElement(size_t pos) const override {
if (pos >= size()) {
return InvalidArgument("Invalid array index: {}", pos);
}
return ToScalar(values_.get()[pos]);
}

size_t size() const override { return values_.get().size(); }

private:
std::reference_wrapper<const std::vector<T>> values_;
};

template <typename V>
class IntMapLike : public MapLike {
public:
explicit IntMapLike(const std::map<int32_t, V>& values) : values_(values) {}

Result<Scalar> GetKey(size_t pos) const override {
if (pos >= size()) {
return InvalidArgument("Invalid map index: {}", pos);
}
return std::next(values_.get().cbegin(), pos)->first;
}

Result<Scalar> GetValue(size_t pos) const override {
if (pos >= size()) {
return InvalidArgument("Invalid map index: {}", pos);
}
return ToScalar(std::next(values_.get().cbegin(), pos)->second);
}

size_t size() const override { return values_.get().size(); }

private:
std::reference_wrapper<const std::map<int32_t, V>> values_;
};

} // namespace

Result<Scalar> PartitionFieldSummaryStructLike::GetField(size_t pos) const {
Expand Down Expand Up @@ -134,4 +221,63 @@ std::unique_ptr<StructLike> FromManifestFile(const ManifestFile& file) {
return std::make_unique<ManifestFileStructLike>(file);
}

Result<Scalar> DataFileStructLike::GetField(size_t pos) const {
if (pos >= num_fields()) {
return InvalidArgument("Invalid data file field index: {}", pos);
}

const auto& data_file = data_file_.get();
switch (static_cast<DataFileFieldPosition>(pos)) {
case DataFileFieldPosition::kContent:
return static_cast<int32_t>(data_file.content);
case DataFileFieldPosition::kFilePath:
return ToView(data_file.file_path);
case DataFileFieldPosition::kFileFormat:
return ToString(data_file.file_format);
case DataFileFieldPosition::kPartition: {
partition_ = std::make_shared<PartitionValues>(data_file.partition);
return partition_;
}
case DataFileFieldPosition::kRecordCount:
return data_file.record_count;
case DataFileFieldPosition::kFileSize:
return data_file.file_size_in_bytes;
case DataFileFieldPosition::kColumnSizes:
return std::make_shared<IntMapLike<int64_t>>(data_file.column_sizes);
case DataFileFieldPosition::kValueCounts:
return std::make_shared<IntMapLike<int64_t>>(data_file.value_counts);
case DataFileFieldPosition::kNullValueCounts:
return std::make_shared<IntMapLike<int64_t>>(data_file.null_value_counts);
case DataFileFieldPosition::kNanValueCounts:
return std::make_shared<IntMapLike<int64_t>>(data_file.nan_value_counts);
case DataFileFieldPosition::kLowerBounds:
return std::make_shared<IntMapLike<std::vector<uint8_t>>>(data_file.lower_bounds);
case DataFileFieldPosition::kUpperBounds:
return std::make_shared<IntMapLike<std::vector<uint8_t>>>(data_file.upper_bounds);
case DataFileFieldPosition::kKeyMetadata:
return ToView(data_file.key_metadata);
case DataFileFieldPosition::kSplitOffsets:
return std::make_shared<VectorArrayLike<int64_t>>(data_file.split_offsets);
case DataFileFieldPosition::kEqualityIds:
return std::make_shared<VectorArrayLike<int32_t>>(data_file.equality_ids);
case DataFileFieldPosition::kSortOrderId:
return FromOptional(data_file.sort_order_id);
case DataFileFieldPosition::kFirstRowId:
return FromOptional(data_file.first_row_id);
case DataFileFieldPosition::kReferencedDataFile:
return FromOptionalString(data_file.referenced_data_file);
case DataFileFieldPosition::kContentOffset:
return FromOptional(data_file.content_offset);
case DataFileFieldPosition::kContentSize:
return FromOptional(data_file.content_size_in_bytes);
case DataFileFieldPosition::kNextUnusedId:
return InvalidArgument("Invalid data file field index: {}", pos);
}
return InvalidArgument("Invalid data file field index: {}", pos);
}

size_t DataFileStructLike::num_fields() const {
return static_cast<size_t>(DataFileFieldPosition::kNextUnusedId);
}

} // namespace iceberg
21 changes: 21 additions & 0 deletions src/iceberg/row/manifest_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <functional>

#include "iceberg/iceberg_export.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/row/struct_like.h"

Expand Down Expand Up @@ -97,4 +98,24 @@ class ICEBERG_EXPORT ManifestFileStructLike : public StructLike {
mutable std::shared_ptr<PartitionFieldSummaryArrayLike> summaries_;
};

/// \brief StructLike wrapper for DataFile metadata.
class ICEBERG_EXPORT DataFileStructLike : public StructLike {
public:
explicit DataFileStructLike(const DataFile& file) : data_file_(file) {}
~DataFileStructLike() override = default;

DataFileStructLike(const DataFileStructLike&) = delete;
DataFileStructLike& operator=(const DataFileStructLike&) = delete;

Result<Scalar> GetField(size_t pos) const override;

size_t num_fields() const override;

void Reset(const DataFile& file) { data_file_ = std::cref(file); }

private:
std::reference_wrapper<const DataFile> data_file_;
mutable std::shared_ptr<StructLike> partition_;
};

} // namespace iceberg
Loading
Loading