Skip to content

Commit

Permalink
ARROW-6244: [C++][Dataset] Add partition key to DataSource interface
Browse files Browse the repository at this point in the history
The condition is an expression guaranteed to evaluate true for all records in a DataSource. This provides some predicate push down funcitonality: DataSources whose condition precludes a filter expression will not yield any fragments (since those fragments would be filtered out anyway).

This patch does not implement evaluation of filter expressions against an in memory RecordBatch. It makes a half hearted attempt at API compatibility with #5157 which does implement this.

Closes #5221 from bkietz/6244-Implement-Partition-DataS and squashes the following commits:

142cc7b <Benjamin Kietzman> explicit move for Result returning functions
13b5948 <Benjamin Kietzman> add comment on motivation for type erasure approach
42e2ad3 <Benjamin Kietzman> clang-format
a9e5d7a <Benjamin Kietzman> bludgeon MSVC linker error with __forceinline
e8c8cd6 <Benjamin Kietzman> AssumePartitionExpression's inout argument is confusing
48b349f <Benjamin Kietzman> move overridable GetFragments to protected GetFragmentsImpl
19f26a0 <Benjamin Kietzman> DataSource::assume -> bool, remove partition_expr mutator
a651c65 <Benjamin Kietzman> rename DataSource::condition to partition_expression
949fa7a <Benjamin Kietzman> provide basic predicate pushdown to datasources
955cb56 <Benjamin Kietzman> flesh out shim Expression class
b1a6c54 <Benjamin Kietzman> remove unused FileSystemBasedDataSource::options_
4f5a8bc <Benjamin Kietzman> rename partitionner_
d66f159 <Benjamin Kietzman> add an Expression stub

Authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Signed-off-by: François Saint-Jacques <fsaintjacques@gmail.com>
  • Loading branch information
bkietz authored and fsaintjacques committed Sep 19, 2019
1 parent 211e240 commit 19d1d0a
Show file tree
Hide file tree
Showing 15 changed files with 239 additions and 88 deletions.
48 changes: 44 additions & 4 deletions cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include <memory>
#include <utility>

#include "arrow/dataset/filter.h"
#include "arrow/dataset/scanner.h"
#include "arrow/util/iterator.h"
#include "arrow/util/stl.h"

namespace arrow {
Expand All @@ -46,11 +48,10 @@ Status SimpleDataFragment::Scan(std::shared_ptr<ScanContext> scan_context,
return Status::OK();
}

Status Dataset::Make(const std::vector<std::shared_ptr<DataSource>>& sources,
const std::shared_ptr<Schema>& schema,
std::shared_ptr<Dataset>* out) {
Status Dataset::Make(std::vector<std::shared_ptr<DataSource>> sources,
std::shared_ptr<Schema> schema, std::shared_ptr<Dataset>* out) {
// TODO: Ensure schema and sources align.
*out = std::make_shared<Dataset>(sources, schema);
*out = std::make_shared<Dataset>(std::move(sources), std::move(schema));

return Status::OK();
}
Expand All @@ -61,5 +62,44 @@ Status Dataset::NewScan(std::unique_ptr<ScannerBuilder>* out) {
return Status::OK();
}

bool DataSource::AssumePartitionExpression(
const std::shared_ptr<ScanOptions>& scan_options,
std::shared_ptr<ScanOptions>* simplified_scan_options) const {
DCHECK_NE(simplified_scan_options, nullptr);
if (scan_options == nullptr) {
// null scan options; no selector to simplify
*simplified_scan_options = scan_options;
return true;
}

auto c = SelectorAssume(scan_options->selector, partition_expression_);
DCHECK_OK(c.status());
auto expr = std::move(c).ValueOrDie();

bool trivial = true;
if (expr->IsNull() || (expr->IsTrivialCondition(&trivial) && !trivial)) {
// selector is not satisfiable; yield no fragments
return false;
}

auto copy = std::make_shared<ScanOptions>(*scan_options);
copy->selector = ExpressionSelector(std::move(expr));
*simplified_scan_options = std::move(copy);
return true;
}

DataFragmentIterator DataSource::GetFragments(std::shared_ptr<ScanOptions> scan_options) {
std::shared_ptr<ScanOptions> simplified_scan_options;
if (!AssumePartitionExpression(scan_options, &simplified_scan_options)) {
return MakeEmptyIterator<std::shared_ptr<DataFragment>>();
}
return GetFragmentsImpl(std::move(simplified_scan_options));
}

DataFragmentIterator SimpleDataSource::GetFragmentsImpl(
std::shared_ptr<ScanOptions> scan_options) {
return MakeVectorIterator(fragments_);
}

} // namespace dataset
} // namespace arrow
40 changes: 29 additions & 11 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/util/iterator.h"
#include "arrow/util/macros.h"

namespace arrow {
namespace dataset {
Expand Down Expand Up @@ -77,11 +77,32 @@ class ARROW_DS_EXPORT DataSource {
public:
/// \brief GetFragments returns an iterator of DataFragments. The ScanOptions
/// controls filtering and schema inference.
virtual DataFragmentIterator GetFragments(std::shared_ptr<ScanOptions> options) = 0;
DataFragmentIterator GetFragments(std::shared_ptr<ScanOptions> options);

/// \brief An expression which evaluates to true for all data viewed by this DataSource.
/// May be null, which indicates no information is available.
const std::shared_ptr<Expression>& partition_expression() const {
return partition_expression_;
}

virtual std::string type() const = 0;

virtual ~DataSource() = default;

protected:
DataSource() = default;
explicit DataSource(std::shared_ptr<Expression> c)
: partition_expression_(std::move(c)) {}

virtual DataFragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) = 0;

/// Mutates a ScanOptions by assuming partition_expression_ holds for all yielded
/// fragments. Returns false if the selector is not satisfiable in this DataSource.
virtual bool AssumePartitionExpression(
const std::shared_ptr<ScanOptions>& scan_options,
std::shared_ptr<ScanOptions>* simplified_scan_options) const;

std::shared_ptr<Expression> partition_expression_;
};

/// \brief A DataSource consisting of a flat sequence of DataFragments
Expand All @@ -90,9 +111,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource {
explicit SimpleDataSource(DataFragmentVector fragments)
: fragments_(std::move(fragments)) {}

DataFragmentIterator GetFragments(std::shared_ptr<ScanOptions> options) override {
return MakeVectorIterator(fragments_);
}
DataFragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

std::string type() const override { return "simple_data_source"; }

Expand All @@ -107,13 +126,12 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
/// WARNING, this constructor is not recommend, use Dataset::Make instead.
/// \param[in] sources one or more input data sources
/// \param[in] schema a known schema to conform to, may be nullptr
explicit Dataset(const std::vector<std::shared_ptr<DataSource>>& sources,
const std::shared_ptr<Schema>& schema)
: schema_(schema), sources_(sources) {}
explicit Dataset(std::vector<std::shared_ptr<DataSource>> sources,
std::shared_ptr<Schema> schema)
: schema_(std::move(schema)), sources_(std::move(sources)) {}

static Status Make(const std::vector<std::shared_ptr<DataSource>>& sources,
const std::shared_ptr<Schema>& schema,
std::shared_ptr<Dataset>* out);
static Status Make(std::vector<std::shared_ptr<DataSource>> sourcs,
std::shared_ptr<Schema> schema, std::shared_ptr<Dataset>* out);

/// \brief Begin to build a new Scan operation against this Dataset
Status NewScan(std::unique_ptr<ScannerBuilder>* out);
Expand Down
31 changes: 23 additions & 8 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
#include <algorithm>
#include <vector>

#include "arrow/dataset/filter.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/util/iterator.h"
#include "arrow/util/stl.h"

namespace arrow {
Expand All @@ -47,18 +49,18 @@ Status FileBasedDataFragment::Scan(std::shared_ptr<ScanContext> scan_context,

FileSystemBasedDataSource::FileSystemBasedDataSource(
fs::FileSystem* filesystem, const fs::Selector& selector,
std::shared_ptr<FileFormat> format, std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<FileFormat> format, std::shared_ptr<Expression> partition_expression,
std::vector<fs::FileStats> stats)
: filesystem_(filesystem),
: DataSource(std::move(partition_expression)),
filesystem_(filesystem),
selector_(std::move(selector)),
format_(std::move(format)),
scan_options_(std::move(scan_options)),
stats_(std::move(stats)) {}

Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem,
const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression,
std::unique_ptr<FileSystemBasedDataSource>* out) {
std::vector<fs::FileStats> stats;
RETURN_NOT_OK(filesystem->GetTargetStats(selector, &stats));
Expand All @@ -71,12 +73,25 @@ Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem,
stats.resize(new_end - stats.begin());

out->reset(new FileSystemBasedDataSource(filesystem, selector, std::move(format),
std::move(scan_options), std::move(stats)));
std::move(partition_expression),
std::move(stats)));
return Status::OK();
}

DataFragmentIterator FileSystemBasedDataSource::GetFragments(
std::shared_ptr<ScanOptions> options) {
Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem,
const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::unique_ptr<FileSystemBasedDataSource>* out) {
return Make(filesystem, selector, std::move(format), nullptr, out);
}

DataFragmentIterator FileSystemBasedDataSource::GetFragmentsImpl(
std::shared_ptr<ScanOptions> scan_options) {
std::shared_ptr<ScanOptions> simplified_scan_options;
if (!AssumePartitionExpression(scan_options, &simplified_scan_options)) {
return MakeEmptyIterator<std::shared_ptr<DataFragment>>();
}

struct Impl : DataFragmentIterator {
Impl(fs::FileSystem* filesystem, std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options, std::vector<fs::FileStats> stats)
Expand Down Expand Up @@ -105,7 +120,7 @@ DataFragmentIterator FileSystemBasedDataSource::GetFragments(
std::vector<fs::FileStats> stats_;
};

return DataFragmentIterator(Impl(filesystem_, format_, options, stats_));
return DataFragmentIterator(Impl(filesystem_, format_, scan_options, stats_));
}

} // namespace dataset
Expand Down
13 changes: 8 additions & 5 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,23 +170,26 @@ class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource {
public:
static Status Make(fs::FileSystem* filesystem, const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options,
std::unique_ptr<FileSystemBasedDataSource>* out);

std::string type() const override { return "directory"; }
static Status Make(fs::FileSystem* filesystem, const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<Expression> partition_expression,
std::unique_ptr<FileSystemBasedDataSource>* out);

DataFragmentIterator GetFragments(std::shared_ptr<ScanOptions> options) override;
std::string type() const override { return "directory"; }

protected:
DataFragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector,
std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression,
std::vector<fs::FileStats> stats);

fs::FileSystem* filesystem_ = NULLPTR;
fs::Selector selector_;
std::shared_ptr<FileFormat> format_;
std::shared_ptr<ScanOptions> scan_options_;
std::vector<fs::FileStats> stats_;
};

Expand Down
10 changes: 6 additions & 4 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ using RecordBatchReaderPtr = std::unique_ptr<RecordBatchReader>;
// A set of RowGroup identifiers
using RowGroupSet = std::vector<int>;

// TODO(bkietz) refactor this to use ProjectedRecordBatchReader
class ParquetScanTask : public ScanTask {
public:
static Status Make(RowGroupSet row_groups, const std::vector<int>& columns_projection,
Expand Down Expand Up @@ -128,7 +129,7 @@ class ParquetScanTaskIterator {
}

Status Next(ScanTaskPtr* task) {
auto partition = partitionner_.Next();
auto partition = partitioner_.Next();

// Iteration is done.
if (partition.size() == 0) {
Expand All @@ -145,7 +146,8 @@ class ParquetScanTaskIterator {
static Status InferColumnProjection(const parquet::FileMetaData& metadata,
const std::shared_ptr<ScanOptions>& options,
std::vector<int>* out) {
// TODO(fsaintjacques): Compute intersection _and_ validity
// TODO(fsaintjacques): Compute intersection _and_ validity, could probably reuse
// RecordBatchProjector here
*out = internal::Iota(metadata.num_columns());

return Status::OK();
Expand All @@ -155,11 +157,11 @@ class ParquetScanTaskIterator {
std::shared_ptr<parquet::FileMetaData> metadata,
std::unique_ptr<parquet::arrow::FileReader> reader)
: columns_projection_(columns_projection),
partitionner_(std::move(metadata)),
partitioner_(std::move(metadata)),
reader_(std::move(reader)) {}

std::vector<int> columns_projection_;
ParquetRowGroupPartitioner partitionner_;
ParquetRowGroupPartitioner partitioner_;
std::shared_ptr<parquet::arrow::FileReader> reader_;
};

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,5 +185,9 @@ TEST_F(TestParquetFileSystemBasedDataSource, Recursive) { this->Recursive(); }

TEST_F(TestParquetFileSystemBasedDataSource, DeletedFile) { this->DeletedFile(); }

TEST_F(TestParquetFileSystemBasedDataSource, PredicatePushDown) {
this->PredicatePushDown();
}

} // namespace dataset
} // namespace arrow
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,9 @@ TEST_F(TestDummyFileSystemBasedDataSource, Recursive) { this->Recursive(); }

TEST_F(TestDummyFileSystemBasedDataSource, DeletedFile) { this->DeletedFile(); }

TEST_F(TestDummyFileSystemBasedDataSource, PredicatePushDown) {
this->PredicatePushDown();
}

} // namespace dataset
} // namespace arrow
29 changes: 29 additions & 0 deletions cpp/src/arrow/dataset/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "arrow/compute/context.h"
#include "arrow/compute/kernels/boolean.h"
#include "arrow/compute/kernels/compare.h"
#include "arrow/dataset/dataset.h"
#include "arrow/record_batch.h"
#include "arrow/util/logging.h"
#include "arrow/visitor_inline.h"
Expand Down Expand Up @@ -938,5 +939,33 @@ Result<std::shared_ptr<DataType>> FieldExpression::Validate(const Schema& schema
return null();
}

Result<std::shared_ptr<Expression>> SelectorAssume(
const std::shared_ptr<DataSelector>& selector,
const std::shared_ptr<Expression>& given) {
if (selector == nullptr || selector->filters.size() == 0) {
return ScalarExpression::Make(true);
}

auto get_expression = [](const std::shared_ptr<Filter>& f) {
DCHECK_EQ(f->type(), FilterType::EXPRESSION);
return checked_cast<const ExpressionFilter&>(*f).expression();
};

auto out_expr = get_expression(selector->filters[0]);
for (size_t i = 1; i < selector->filters.size(); ++i) {
out_expr = and_(std::move(out_expr), get_expression(selector->filters[i]));
}

if (given == nullptr) {
return std::move(out_expr);
}
return out_expr->Assume(*given);
}

std::shared_ptr<DataSelector> ExpressionSelector(std::shared_ptr<Expression> e) {
return std::make_shared<DataSelector>(
DataSelector{FilterVector{std::make_shared<ExpressionFilter>(std::move(e))}});
}

} // namespace dataset
} // namespace arrow
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string>
#include <utility>

#include "arrow/compute/kernel.h"
#include "arrow/compute/kernels/compare.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
Expand Down Expand Up @@ -402,5 +403,12 @@ inline FieldExpression operator"" _(const char* name, size_t name_length) {
}
} // namespace string_literals

ARROW_DS_EXPORT Result<std::shared_ptr<Expression>> SelectorAssume(
const std::shared_ptr<DataSelector>& selector,
const std::shared_ptr<Expression>& given);

ARROW_DS_EXPORT std::shared_ptr<DataSelector> ExpressionSelector(
std::shared_ptr<Expression> e);

} // namespace dataset
} // namespace arrow
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>

#include "arrow/dataset/dataset.h"
#include "arrow/util/iterator.h"

namespace arrow {
namespace dataset {
Expand Down
Loading

0 comments on commit 19d1d0a

Please sign in to comment.