Skip to content

Commit

Permalink
ARROW-8061: [C++][Dataset] Provide RowGroup fragments for ParquetFile…
Browse files Browse the repository at this point in the history
…Format

Provides ParquetFileFragment, which may view a subset of row groups within a parquet file. The indices of viewed row groups are available through the `row_groups()` property which is exposed to python. Construction of subset-viewing ParquetFileFragments is not yet exposed to python.

Closes #6670 from bkietz/8061-Ability-to-specify-granul

Authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Signed-off-by: François Saint-Jacques <fsaintjacques@gmail.com>
  • Loading branch information
bkietz authored and fsaintjacques committed Mar 27, 2020
1 parent 81d61a6 commit 2ca1706
Show file tree
Hide file tree
Showing 17 changed files with 683 additions and 141 deletions.
4 changes: 2 additions & 2 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
FileSource source, std::shared_ptr<ScanOptions> options,
std::shared_ptr<Expression> partition_expression) {
return std::make_shared<FileFragment>(std::move(source), shared_from_this(), options,
std::move(partition_expression));
return std::shared_ptr<FileFragment>(new FileFragment(
std::move(source), shared_from_this(), options, std::move(partition_expression)));
}

Result<std::shared_ptr<WriteTask>> FileFormat::WriteFragment(
Expand Down
11 changes: 2 additions & 9 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,27 +162,20 @@ class ARROW_DS_EXPORT FileFragment : public Fragment {
public:
Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) override;

// XXX should this include format_->type_name?
std::string type_name() const override { return "file"; }
std::string type_name() const override { return format_->type_name(); }
bool splittable() const override { return format_->splittable(); }

const FileSource& source() const { return source_; }
const std::shared_ptr<FileFormat>& format() const { return format_; }

FileFragment(FileSource source, std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options)
: Fragment(std::move(scan_options)),
source_(std::move(source)),
format_(std::move(format)) {}

protected:
FileFragment(FileSource source, std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression)
: Fragment(std::move(scan_options), std::move(partition_expression)),
source_(std::move(source)),
format_(std::move(format)) {}

protected:
FileSource source_;
std::shared_ptr<FileFormat> format_;

Expand Down
103 changes: 91 additions & 12 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,26 +183,29 @@ class RowGroupSkipper {

RowGroupSkipper(std::shared_ptr<parquet::FileMetaData> metadata,
parquet::ArrowReaderProperties arrow_properties,
std::shared_ptr<Expression> filter)
std::shared_ptr<Expression> filter, std::vector<int> row_groups)
: metadata_(std::move(metadata)),
arrow_properties_(std::move(arrow_properties)),
filter_(std::move(filter)),
row_group_idx_(0) {
num_row_groups_ = metadata_->num_row_groups();
}
row_group_idx_(0),
row_groups_(std::move(row_groups)),
num_row_groups_(row_groups_.empty() ? metadata_->num_row_groups()
: static_cast<int>(row_groups_.size())) {}

int Next() {
while (row_group_idx_ < num_row_groups_) {
const auto row_group_idx = row_group_idx_++;
const auto row_group = metadata_->RowGroup(row_group_idx);
const int row_group =
row_groups_.empty() ? row_group_idx_++ : row_groups_[row_group_idx_++];

const auto row_group_metadata = metadata_->RowGroup(row_group);

const auto num_rows = row_group->num_rows();
if (CanSkip(*row_group)) {
const int64_t num_rows = row_group_metadata->num_rows();
if (CanSkip(*row_group_metadata)) {
rows_skipped_ += num_rows;
continue;
}

return row_group_idx;
return row_group;
}

return kIterationDone;
Expand All @@ -225,6 +228,7 @@ class RowGroupSkipper {
parquet::ArrowReaderProperties arrow_properties_;
std::shared_ptr<Expression> filter_;
int row_group_idx_;
std::vector<int> row_groups_;
int num_row_groups_;
int64_t rows_skipped_;
};
Expand All @@ -234,7 +238,8 @@ class ParquetScanTaskIterator {
static Result<ScanTaskIterator> Make(std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context,
std::unique_ptr<parquet::ParquetFileReader> reader,
parquet::ArrowReaderProperties arrow_properties) {
parquet::ArrowReaderProperties arrow_properties,
const std::vector<int>& row_groups) {
auto metadata = reader->metadata();

auto column_projection = InferColumnProjection(*metadata, arrow_properties, options);
Expand All @@ -244,7 +249,7 @@ class ParquetScanTaskIterator {
arrow_properties, &arrow_reader));

RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
options->filter);
options->filter, row_groups);

return ScanTaskIterator(ParquetScanTaskIterator(
std::move(options), std::move(context), std::move(column_projection),
Expand Down Expand Up @@ -373,12 +378,86 @@ Result<std::shared_ptr<Schema>> ParquetFileFormat::Inspect(
Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
const FileSource& source, std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context) const {
return ScanFile(source, std::move(options), std::move(context), {});
}

Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
const FileSource& source, std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context, const std::vector<int>& row_groups) const {
auto properties = MakeReaderProperties(*this, context->pool);
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source, std::move(properties)));

for (int i : row_groups) {
if (i >= reader->metadata()->num_row_groups()) {
return Status::IndexError("trying to scan row group ", i, " but ", source.path(),
" only has ", reader->metadata()->num_row_groups(),
" row groups");
}
}

auto arrow_properties = MakeArrowReaderProperties(*this, options->batch_size, *reader);
return ParquetScanTaskIterator::Make(std::move(options), std::move(context),
std::move(reader), std::move(arrow_properties));
std::move(reader), std::move(arrow_properties),
row_groups);
}

Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
FileSource source, std::shared_ptr<ScanOptions> options,
std::shared_ptr<Expression> partition_expression, std::vector<int> row_groups) {
return std::shared_ptr<FileFragment>(
new ParquetFileFragment(std::move(source), shared_from_this(), std::move(options),
std::move(partition_expression), std::move(row_groups)));
}

Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
FileSource source, std::shared_ptr<ScanOptions> options,
std::shared_ptr<Expression> partition_expression) {
return std::shared_ptr<FileFragment>(
new ParquetFileFragment(std::move(source), shared_from_this(), std::move(options),
std::move(partition_expression), {}));
}

Result<FragmentIterator> ParquetFileFormat::GetRowGroupFragments(
const ParquetFileFragment& fragment, std::shared_ptr<Expression> extra_filter) {
auto properties = MakeReaderProperties(*this);
ARROW_ASSIGN_OR_RAISE(auto reader,
OpenReader(fragment.source(), std::move(properties)));

auto arrow_properties =
MakeArrowReaderProperties(*this, parquet::kArrowDefaultBatchSize, *reader);
auto metadata = reader->metadata();

auto row_groups = fragment.row_groups();
if (row_groups.empty()) {
row_groups = internal::Iota(metadata->num_row_groups());
}
FragmentVector fragments(row_groups.size());

auto new_options = std::make_shared<ScanOptions>(*fragment.scan_options());
if (!extra_filter->Equals(true)) {
new_options->filter = and_(std::move(extra_filter), std::move(new_options->filter));
}

RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties),
new_options->filter, std::move(row_groups));

for (int i = 0, row_group = skipper.Next();
row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) {
ARROW_ASSIGN_OR_RAISE(fragments[i++],
MakeFragment(fragment.source(), new_options,
fragment.partition_expression(), {row_group}));
}

return MakeVectorIterator(std::move(fragments));
}

Result<ScanTaskIterator> ParquetFileFragment::Scan(std::shared_ptr<ScanContext> context) {
return parquet_format().ScanFile(source_, scan_options_, std::move(context),
row_groups_);
}

const ParquetFileFormat& ParquetFileFragment::parquet_format() const {
return internal::checked_cast<const ParquetFileFormat&>(*format_);
}

} // namespace dataset
Expand Down
49 changes: 49 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>

#include "arrow/dataset/file_base.h"
#include "arrow/dataset/type_fwd.h"
Expand Down Expand Up @@ -86,6 +87,54 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
Result<ScanTaskIterator> ScanFile(const FileSource& source,
std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context) const override;

/// \brief Open a file for scanning, restricted to the specified row groups.
Result<ScanTaskIterator> ScanFile(const FileSource& source,
std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context,
const std::vector<int>& row_groups) const;

using FileFormat::MakeFragment;

Result<std::shared_ptr<FileFragment>> MakeFragment(
FileSource source, std::shared_ptr<ScanOptions> options,
std::shared_ptr<Expression> partition_expression) override;

/// \brief Create a Fragment, restricted to the specified row groups.
Result<std::shared_ptr<FileFragment>> MakeFragment(
FileSource source, std::shared_ptr<ScanOptions> options,
std::shared_ptr<Expression> partition_expression, std::vector<int> row_groups);

/// \brief Split a ParquetFileFragment into a Fragment for each row group.
/// Row groups whose metadata contradicts the fragment's filter or the extra_filter
/// will be excluded.
Result<FragmentIterator> GetRowGroupFragments(
const ParquetFileFragment& fragment,
std::shared_ptr<Expression> extra_filter = scalar(true));
};

class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
public:
Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) override;

/// \brief The row groups viewed by this Fragment. This may be empty which signifies all
/// row groups are selected.
const std::vector<int>& row_groups() const { return row_groups_; }

private:
ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression,
std::vector<int> row_groups)
: FileFragment(std::move(source), std::move(format), std::move(scan_options),
std::move(partition_expression)),
row_groups_(std::move(row_groups)) {}

const ParquetFileFormat& parquet_format() const;

std::vector<int> row_groups_;

friend class ParquetFileFormat;
};

} // namespace dataset
Expand Down
Loading

0 comments on commit 2ca1706

Please sign in to comment.