Skip to content

Commit

Permalink
ARROW-7886: [C++][Dataset][Python][R] Consolidate Source and Dataset …
Browse files Browse the repository at this point in the history
…classes

Closes #6470 from bkietz/7886-Dataset-Consolidate-Sourc and squashes the following commits:

6f37651 <Krisztián Szűcs> fix pytest markers
11bf0c9 <Benjamin Kietzman> typo fixes, reintroduce test_open_dataset_from_source_additional_kwargs
e52e29d <Benjamin Kietzman> move doccomments, correct UnionDatasetFactory inheritance
4b2ab6e <Benjamin Kietzman> workaround TestSchemaUnification
866737e <Benjamin Kietzman> lint fix
0485790 <Benjamin Kietzman> rename TreeDataset to UnionDataset
ad1cc77 <Benjamin Kietzman> update parquet-scan-example
dc2d840 <Benjamin Kietzman> dataset(single_path) should return FileSystemDataset
9a4db4a <Benjamin Kietzman> reorder cython class declarations
fe0bf14 <Benjamin Kietzman> lint/rdoc fixes
731b553 <Benjamin Kietzman> construct InMemoryDataset with make_shared
fd9cc8b <Benjamin Kietzman> ARROW-7886:  Consolidate Source and Dataset classes

Lead-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Co-authored-by: Krisztián Szűcs <szucs.krisztian@gmail.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
  • Loading branch information
bkietz and kszucs committed Feb 27, 2020
1 parent b77cd0b commit 3d81e4e
Show file tree
Hide file tree
Showing 33 changed files with 1,339 additions and 1,627 deletions.
12 changes: 6 additions & 6 deletions cpp/examples/arrow/dataset-parquet-scan-example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ using ds::string_literals::operator"" _;
} while (0);

struct Configuration {
// Increase the ds::DataSet by repeating `repeat` times the ds::Source.
// Increase the ds::DataSet by repeating `repeat` times the ds::Dataset.
size_t repeat = 1;

// Indicates if the Scanner::ToTable should consume in parallel.
Expand Down Expand Up @@ -79,17 +79,17 @@ std::shared_ptr<ds::Dataset> GetDatasetFromPath(std::shared_ptr<fs::FileSystem>
s.recursive = true;

ds::FileSystemFactoryOptions options;
// The factory will try to build a source.
auto factory = ds::FileSystemSourceFactory::Make(fs, s, format, options).ValueOrDie();
// The factory will try to build a child dataset.
auto factory = ds::FileSystemDatasetFactory::Make(fs, s, format, options).ValueOrDie();

// Try to infer a common schema for all files.
auto schema = factory->Inspect().ValueOrDie();
// Caller can optionally decide another schema as long as it is compatible
// with the previous one, e.g. `factory->Finish(compatible_schema)`.
auto source = factory->Finish().ValueOrDie();
auto child = factory->Finish().ValueOrDie();

ds::SourceVector sources{conf.repeat, source};
auto dataset = ds::Dataset::Make(std::move(sources), schema);
ds::DatasetVector children{conf.repeat, child};
auto dataset = ds::UnionDataset::Make(std::move(schema), std::move(children));

return dataset.ValueOrDie();
}
Expand Down
42 changes: 25 additions & 17 deletions cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ Result<ScanTaskIterator> InMemoryFragment::Scan(std::shared_ptr<ScanContext> con
return MakeMapIterator(fn, std::move(batches_it));
}

Result<std::shared_ptr<Dataset>> Dataset::Make(SourceVector sources,
std::shared_ptr<Schema> schema) {
return std::shared_ptr<Dataset>(new Dataset(std::move(sources), std::move(schema)));
}

Result<std::shared_ptr<ScannerBuilder>> Dataset::NewScan(
std::shared_ptr<ScanContext> context) {
return std::make_shared<ScannerBuilder>(this->shared_from_this(), context);
Expand All @@ -72,7 +67,7 @@ Result<std::shared_ptr<ScannerBuilder>> Dataset::NewScan() {
return NewScan(std::make_shared<ScanContext>());
}

bool Source::AssumePartitionExpression(
bool Dataset::AssumePartitionExpression(
const std::shared_ptr<ScanOptions>& scan_options,
std::shared_ptr<ScanOptions>* simplified_scan_options) const {
if (partition_expression_ == nullptr) {
Expand All @@ -96,15 +91,15 @@ bool Source::AssumePartitionExpression(
return true;
}

FragmentIterator Source::GetFragments(std::shared_ptr<ScanOptions> scan_options) {
FragmentIterator Dataset::GetFragments(std::shared_ptr<ScanOptions> scan_options) {
std::shared_ptr<ScanOptions> simplified_scan_options;
if (!AssumePartitionExpression(scan_options, &simplified_scan_options)) {
return MakeEmptyIterator<std::shared_ptr<Fragment>>();
}
return GetFragmentsImpl(std::move(simplified_scan_options));
}

struct VectorRecordBatchGenerator : InMemorySource::RecordBatchGenerator {
struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
explicit VectorRecordBatchGenerator(std::vector<std::shared_ptr<RecordBatch>> batches)
: batches_(std::move(batches)) {}

Expand All @@ -113,12 +108,12 @@ struct VectorRecordBatchGenerator : InMemorySource::RecordBatchGenerator {
std::vector<std::shared_ptr<RecordBatch>> batches_;
};

InMemorySource::InMemorySource(std::shared_ptr<Schema> schema,
std::vector<std::shared_ptr<RecordBatch>> batches)
: Source(std::move(schema)),
InMemoryDataset::InMemoryDataset(std::shared_ptr<Schema> schema,
std::vector<std::shared_ptr<RecordBatch>> batches)
: Dataset(std::move(schema)),
get_batches_(new VectorRecordBatchGenerator(std::move(batches))) {}

struct TableRecordBatchGenerator : InMemorySource::RecordBatchGenerator {
struct TableRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
explicit TableRecordBatchGenerator(std::shared_ptr<Table> table)
: table_(std::move(table)) {}

Expand All @@ -131,11 +126,11 @@ struct TableRecordBatchGenerator : InMemorySource::RecordBatchGenerator {
std::shared_ptr<Table> table_;
};

InMemorySource::InMemorySource(std::shared_ptr<Table> table)
: Source(table->schema()),
InMemoryDataset::InMemoryDataset(std::shared_ptr<Table> table)
: Dataset(table->schema()),
get_batches_(new TableRecordBatchGenerator(std::move(table))) {}

FragmentIterator InMemorySource::GetFragmentsImpl(
FragmentIterator InMemoryDataset::GetFragmentsImpl(
std::shared_ptr<ScanOptions> scan_options) {
auto schema = this->schema();

Expand All @@ -162,8 +157,21 @@ FragmentIterator InMemorySource::GetFragmentsImpl(
return MakeMaybeMapIterator(std::move(create_fragment), get_batches_->Get());
}

FragmentIterator TreeSource::GetFragmentsImpl(std::shared_ptr<ScanOptions> options) {
return GetFragmentsFromSources(children_, options);
Result<std::shared_ptr<UnionDataset>> UnionDataset::Make(std::shared_ptr<Schema> schema,
DatasetVector children) {
for (const auto& child : children) {
if (!child->schema()->Equals(*schema)) {
return Status::TypeError("child Dataset had schema ", *child->schema(),
" but the union schema was ", *schema);
}
}

return std::shared_ptr<UnionDataset>(
new UnionDataset(std::move(schema), std::move(children)));
}

FragmentIterator UnionDataset::GetFragmentsImpl(std::shared_ptr<ScanOptions> options) {
return GetFragmentsFromDatasets(children_, options);
}

} // namespace dataset
Expand Down
87 changes: 36 additions & 51 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,39 +87,42 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
std::vector<std::shared_ptr<RecordBatch>> record_batches_;
};

/// \brief A basic component of a Dataset which yields zero or more
/// Fragments. A Source acts as a discovery mechanism of Fragments
/// and partitions, e.g. files deeply nested in a directory.
class ARROW_DS_EXPORT Source {
/// \brief A container of zero or more Fragments. A Dataset acts as a discovery mechanism
/// of Fragments and partitions, e.g. files deeply nested in a directory.
class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
public:
/// \brief Begin to build a new Scan operation against this Dataset
Result<std::shared_ptr<ScannerBuilder>> NewScan(std::shared_ptr<ScanContext> context);
Result<std::shared_ptr<ScannerBuilder>> NewScan();

/// \brief GetFragments returns an iterator of Fragments. The ScanOptions
/// controls filtering and schema inference.
FragmentIterator GetFragments(std::shared_ptr<ScanOptions> options);

const std::shared_ptr<Schema>& schema() const { return schema_; }

/// \brief An expression which evaluates to true for all data viewed by this Source.
/// \brief An expression which evaluates to true for all data viewed by this Dataset.
/// May be null, which indicates no information is available.
const std::shared_ptr<Expression>& partition_expression() const {
return partition_expression_;
}

/// \brief The name identifying the kind of source
/// \brief The name identifying the kind of Dataset
virtual std::string type_name() const = 0;

virtual ~Source() = default;
virtual ~Dataset() = default;

protected:
explicit Source(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {}
explicit Dataset(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {}

Source(std::shared_ptr<Schema> schema, std::shared_ptr<Expression> e)
Dataset(std::shared_ptr<Schema> schema, std::shared_ptr<Expression> e)
: schema_(std::move(schema)), partition_expression_(std::move(e)) {}
Source() = default;
Dataset() = default;

virtual FragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) = 0;

/// Mutates a ScanOptions by assuming partition_expression_ holds for all yielded
/// fragments. Returns false if the selector is not satisfiable in this Source.
/// fragments. Returns false if the selector is not satisfiable in this Dataset.
virtual bool AssumePartitionExpression(
const std::shared_ptr<ScanOptions>& scan_options,
std::shared_ptr<ScanOptions>* simplified_scan_options) const;
Expand All @@ -131,23 +134,23 @@ class ARROW_DS_EXPORT Source {
/// \brief A Source which yields fragments wrapping a stream of record batches.
///
/// The record batches must match the schema provided to the source at construction.
class ARROW_DS_EXPORT InMemorySource : public Source {
class ARROW_DS_EXPORT InMemoryDataset : public Dataset {
public:
class RecordBatchGenerator {
public:
virtual ~RecordBatchGenerator() = default;
virtual RecordBatchIterator Get() const = 0;
};

InMemorySource(std::shared_ptr<Schema> schema,
std::unique_ptr<RecordBatchGenerator> get_batches)
: Source(std::move(schema)), get_batches_(std::move(get_batches)) {}
InMemoryDataset(std::shared_ptr<Schema> schema,
std::unique_ptr<RecordBatchGenerator> get_batches)
: Dataset(std::move(schema)), get_batches_(std::move(get_batches)) {}

// Convenience constructor taking a fixed list of batches
InMemorySource(std::shared_ptr<Schema> schema,
std::vector<std::shared_ptr<RecordBatch>> batches);
InMemoryDataset(std::shared_ptr<Schema> schema,
std::vector<std::shared_ptr<RecordBatch>> batches);

explicit InMemorySource(std::shared_ptr<Table> table);
explicit InMemoryDataset(std::shared_ptr<Table> table);

FragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

Expand All @@ -157,48 +160,30 @@ class ARROW_DS_EXPORT InMemorySource : public Source {
std::unique_ptr<RecordBatchGenerator> get_batches_;
};

/// \brief A recursive Source with child Sources.
class ARROW_DS_EXPORT TreeSource : public Source {
/// \brief A Dataset wrapping child Datasets.
class ARROW_DS_EXPORT UnionDataset : public Dataset {
public:
explicit TreeSource(std::shared_ptr<Schema> schema, SourceVector children)
: Source(std::move(schema)), children_(std::move(children)) {}
/// \brief Construct a UnionDataset wrapping child Datasets.
///
/// \param[in] schema the schema of the resulting dataset.
/// \param[in] children one or more child Datasets. Their schemas must be identical to
/// schema.
static Result<std::shared_ptr<UnionDataset>> Make(std::shared_ptr<Schema> schema,
DatasetVector children);

FragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

std::string type_name() const override { return "tree"; }
const DatasetVector& children() const { return children_; }

private:
SourceVector children_;
};

/// \brief Top-level interface for a Dataset with fragments coming
/// from possibly multiple sources.
class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
public:
/// \brief Build a Dataset from uniform sources.
//
/// \param[in] sources one or more input sources
/// \param[in] schema a known schema to conform to
static Result<std::shared_ptr<Dataset>> Make(SourceVector sources,
std::shared_ptr<Schema> schema);

/// \brief Begin to build a new Scan operation against this Dataset
Result<std::shared_ptr<ScannerBuilder>> NewScan(std::shared_ptr<ScanContext> context);
Result<std::shared_ptr<ScannerBuilder>> NewScan();

const SourceVector& sources() const { return sources_; }

std::shared_ptr<Schema> schema() const { return schema_; }
std::string type_name() const override { return "union"; }

protected:
explicit Dataset(SourceVector sources, std::shared_ptr<Schema> schema)
: schema_(std::move(schema)), sources_(std::move(sources)) {}
explicit UnionDataset(std::shared_ptr<Schema> schema, DatasetVector children)
: Dataset(std::move(schema)), children_(std::move(children)) {}

// The sources must conform their output to this schema (with
// projections and filters taken into account)
std::shared_ptr<Schema> schema_;
DatasetVector children_;

SourceVector sources_;
friend class UnionDatasetFactory;
};

} // namespace dataset
Expand Down
18 changes: 9 additions & 9 deletions cpp/src/arrow/dataset/dataset_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@
namespace arrow {
namespace dataset {

/// \brief GetFragmentsFromSources transforms a vector<Source> into a
/// \brief GetFragmentsFromDatasets transforms a vector<Dataset> into a
/// flattened FragmentIterator.
static inline FragmentIterator GetFragmentsFromSources(
const SourceVector& sources, std::shared_ptr<ScanOptions> options) {
// Iterator<Source>
auto sources_it = MakeVectorIterator(sources);
static inline FragmentIterator GetFragmentsFromDatasets(
const DatasetVector& datasets, std::shared_ptr<ScanOptions> options) {
// Iterator<Dataset>
auto datasets_it = MakeVectorIterator(datasets);

// Source -> Iterator<Fragment>
auto fn = [options](std::shared_ptr<Source> source) -> FragmentIterator {
return source->GetFragments(options);
// Dataset -> Iterator<Fragment>
auto fn = [options](std::shared_ptr<Dataset> dataset) -> FragmentIterator {
return dataset->GetFragments(options);
};

// Iterator<Iterator<Fragment>>
auto fragments_it = MakeMapIterator(fn, std::move(sources_it));
auto fragments_it = MakeMapIterator(fn, std::move(datasets_it));

// Iterator<Fragment>
return MakeFlattenIterator(std::move(fragments_it));
Expand Down
Loading

0 comments on commit 3d81e4e

Please sign in to comment.