Skip to content

Commit

Permalink
ARROW-8276: [C++][Dataset] Use Scanner for Fragment.to_table
Browse files Browse the repository at this point in the history
This way batches yielded by Fragment.Scan are projected and filtered.

Closes #6765 from bkietz/8276-Scanning-a-Fragment-does-

Lead-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
  • Loading branch information
bkietz and jorisvandenbossche committed Apr 2, 2020
1 parent 8d243d0 commit 9ad2cc4
Show file tree
Hide file tree
Showing 8 changed files with 303 additions and 142 deletions.
21 changes: 2 additions & 19 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,25 +190,8 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl(
}

// if possible, extract a partition key and pass it to the projector
auto projector = &options[ref.i]->projector;
{
int index = -1;
std::shared_ptr<Scalar> value_to_materialize;

DCHECK_OK(KeyValuePartitioning::VisitKeys(
*partition, [&](const std::string& name, const std::shared_ptr<Scalar>& value) {
if (index != -1) return Status::OK();

index = projector->schema()->GetFieldIndex(name);
if (index != -1) value_to_materialize = value;

return Status::OK();
}));

if (index != -1) {
RETURN_NOT_OK(projector->SetDefaultValue(index, std::move(value_to_materialize)));
}
}
RETURN_NOT_OK(KeyValuePartitioning::SetDefaultValuesFromKeys(
*partition, &options[ref.i]->projector));

if (ref.info().IsFile()) {
// generate a fragment for this file
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/dataset/partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,19 @@ Status KeyValuePartitioning::VisitKeys(
checked_cast<const ScalarExpression*>(rhs)->value());
}

Status KeyValuePartitioning::SetDefaultValuesFromKeys(const Expression& expr,
RecordBatchProjector* projector) {
return KeyValuePartitioning::VisitKeys(
expr, [projector](const std::string& name, const std::shared_ptr<Scalar>& value) {
ARROW_ASSIGN_OR_RAISE(auto match,
FieldRef(name).FindOneOrNone(*projector->schema()));
if (match.indices().empty()) {
return Status::OK();
}
return projector->SetDefaultValue(match, value);
});
}

Result<std::shared_ptr<Expression>> KeyValuePartitioning::ConvertKey(
const Key& key, const Schema& schema) {
ARROW_ASSIGN_OR_RAISE(auto field, FieldRef(key.name).GetOneOrNone(schema));
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/dataset/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning {
const std::function<Status(const std::string& name,
const std::shared_ptr<Scalar>& value)>& visitor);

static Status SetDefaultValuesFromKeys(const Expression& expr,
RecordBatchProjector* projector);

/// Convert a Key to a full expression.
/// If the field referenced in key is absent from the schema will be ignored.
static Result<std::shared_ptr<Expression>> ConvertKey(const Key& key,
Expand Down
64 changes: 31 additions & 33 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,43 +60,15 @@ std::vector<std::string> ScanOptions::MaterializedFields() const {
return fields;
}

Result<std::shared_ptr<Table>> ScanTask::ToTable(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<ScanContext>& context, ScanTaskIterator scan_task_it) {
std::mutex mutex;
RecordBatchVector batches;

auto task_group = context->TaskGroup();

for (auto maybe_scan_task : scan_task_it) {
ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task));

task_group->Append([&batches, &mutex, scan_task] {
ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());

for (auto maybe_batch : batch_it) {
ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch));
std::lock_guard<std::mutex> lock(mutex);
batches.emplace_back(std::move(batch));
}

return Status::OK();
});
}

// Wait for all tasks to complete, or the first error.
RETURN_NOT_OK(task_group->Finish());

std::shared_ptr<Table> out;
RETURN_NOT_OK(Table::FromRecordBatches(options->schema(), batches, &out));
return out;
}

Result<RecordBatchIterator> InMemoryScanTask::Execute() {
return MakeVectorIterator(record_batches_);
}

FragmentIterator Scanner::GetFragments() {
if (fragment_ != nullptr) {
return MakeVectorIterator(FragmentVector{fragment_});
}

// Transform Datasets in a flat Iterator<Fragment>. This
// iterator is lazily constructed, i.e. Dataset::GetFragments is
// not invoked until a Fragment is requested.
Expand Down Expand Up @@ -186,7 +158,33 @@ std::shared_ptr<internal::TaskGroup> ScanContext::TaskGroup() const {

Result<std::shared_ptr<Table>> Scanner::ToTable() {
ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
return ScanTask::ToTable(scan_options_, scan_context_, std::move(scan_task_it));
std::mutex mutex;
RecordBatchVector batches;

auto task_group = scan_context_->TaskGroup();

for (auto maybe_scan_task : scan_task_it) {
ARROW_ASSIGN_OR_RAISE(auto scan_task, std::move(maybe_scan_task));

task_group->Append([&batches, &mutex, scan_task] {
ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());

for (auto maybe_batch : batch_it) {
ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch));
std::lock_guard<std::mutex> lock(mutex);
batches.emplace_back(std::move(batch));
}

return Status::OK();
});
}

// Wait for all tasks to complete, or the first error.
RETURN_NOT_OK(task_group->Finish());

std::shared_ptr<Table> out;
RETURN_NOT_OK(Table::FromRecordBatches(scan_options_->schema(), batches, &out));
return out;
}

} // namespace dataset
Expand Down
15 changes: 7 additions & 8 deletions cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,6 @@ class ARROW_DS_EXPORT ScanTask {
const std::shared_ptr<ScanOptions>& options() const { return options_; }
const std::shared_ptr<ScanContext>& context() const { return context_; }

/// \brief Convert a sequence of ScanTasks into a Table.
///
/// Use this convenience utility with care. This will serially materialize the
/// Scan result in memory before creating the Table.
static Result<std::shared_ptr<Table>> ToTable(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<ScanContext>& context, ScanTaskIterator scan_tasks);

protected:
ScanTask(std::shared_ptr<ScanOptions> options, std::shared_ptr<ScanContext> context)
: options_(std::move(options)), context_(std::move(context)) {}
Expand Down Expand Up @@ -166,6 +158,11 @@ class ARROW_DS_EXPORT Scanner {
scan_options_(std::move(scan_options)),
scan_context_(std::move(scan_context)) {}

Scanner(std::shared_ptr<Fragment> fragment, std::shared_ptr<ScanContext> scan_context)
: fragment_(std::move(fragment)),
scan_options_(fragment_->scan_options()),
scan_context_(std::move(scan_context)) {}

/// \brief The Scan operator returns a stream of ScanTask. The caller is
/// responsible to dispatch/schedule said tasks. Tasks should be safe to run
/// in a concurrent fashion and outlive the iterator.
Expand All @@ -188,6 +185,8 @@ class ARROW_DS_EXPORT Scanner {

protected:
std::shared_ptr<Dataset> dataset_;
// TODO(ARROW-8065) remove fragment_ after a Dataset is constuctible from fragments
std::shared_ptr<Fragment> fragment_;
std::shared_ptr<ScanOptions> scan_options_;
std::shared_ptr<ScanContext> scan_context_;
};
Expand Down
Loading

0 comments on commit 9ad2cc4

Please sign in to comment.