Skip to content

Commit

Permalink
ARROW-7439: [C++][Dataset] Remove pointer aliases
Browse files Browse the repository at this point in the history
  • Loading branch information
fsaintjacques committed Dec 19, 2019
1 parent 0afacd1 commit 5982a1d
Show file tree
Hide file tree
Showing 30 changed files with 921 additions and 885 deletions.
2 changes: 1 addition & 1 deletion cpp/examples/arrow/dataset-parquet-scan-example.cc
Expand Up @@ -65,7 +65,7 @@ struct Configuration {
std::shared_ptr<ds::Expression> filter = ("total_amount"_ > 1000.0f).Copy();
} conf;

std::shared_ptr<ds::Dataset> GetDatasetFromPath(fs::FileSystemPtr fs,
std::shared_ptr<ds::Dataset> GetDatasetFromPath(std::shared_ptr<fs::FileSystem> fs,
std::shared_ptr<ds::FileFormat> format,
std::string path) {
// Find all files under `path`
Expand Down
35 changes: 20 additions & 15 deletions cpp/src/arrow/dataset/dataset.cc
Expand Up @@ -29,21 +29,22 @@
namespace arrow {
namespace dataset {

DataFragment::DataFragment(ScanOptionsPtr scan_options)
DataFragment::DataFragment(std::shared_ptr<ScanOptions> scan_options)
: scan_options_(std::move(scan_options)), partition_expression_(scalar(true)) {}

SimpleDataFragment::SimpleDataFragment(
std::vector<std::shared_ptr<RecordBatch>> record_batches, ScanOptionsPtr scan_options)
std::vector<std::shared_ptr<RecordBatch>> record_batches,
std::shared_ptr<ScanOptions> scan_options)
: DataFragment(std::move(scan_options)), record_batches_(std::move(record_batches)) {}

Result<ScanTaskIterator> SimpleDataFragment::Scan(ScanContextPtr context) {
Result<ScanTaskIterator> SimpleDataFragment::Scan(std::shared_ptr<ScanContext> context) {
// Make an explicit copy of record_batches_ to ensure Scan can be called
// multiple times.
auto batches_it = MakeVectorIterator(record_batches_);

// RecordBatch -> ScanTask
auto scan_options = scan_options_;
auto fn = [=](std::shared_ptr<RecordBatch> batch) -> ScanTaskPtr {
auto fn = [=](std::shared_ptr<RecordBatch> batch) -> std::shared_ptr<ScanTask> {
std::vector<std::shared_ptr<RecordBatch>> batches{batch};
return ::arrow::internal::make_unique<SimpleScanTask>(
std::move(batches), std::move(scan_options), std::move(context));
Expand All @@ -52,21 +53,23 @@ Result<ScanTaskIterator> SimpleDataFragment::Scan(ScanContextPtr context) {
return MakeMapIterator(fn, std::move(batches_it));
}

Result<DatasetPtr> Dataset::Make(DataSourceVector sources,
std::shared_ptr<Schema> schema) {
return DatasetPtr(new Dataset(std::move(sources), std::move(schema)));
Result<std::shared_ptr<Dataset>> Dataset::Make(DataSourceVector sources,
std::shared_ptr<Schema> schema) {
return std::shared_ptr<Dataset>(new Dataset(std::move(sources), std::move(schema)));
}

Result<ScannerBuilderPtr> Dataset::NewScan(ScanContextPtr context) {
Result<std::shared_ptr<ScannerBuilder>> Dataset::NewScan(
std::shared_ptr<ScanContext> context) {
return std::make_shared<ScannerBuilder>(this->shared_from_this(), context);
}

Result<ScannerBuilderPtr> Dataset::NewScan() {
Result<std::shared_ptr<ScannerBuilder>> Dataset::NewScan() {
return NewScan(std::make_shared<ScanContext>());
}

bool DataSource::AssumePartitionExpression(
const ScanOptionsPtr& scan_options, ScanOptionsPtr* simplified_scan_options) const {
const std::shared_ptr<ScanOptions>& scan_options,
std::shared_ptr<ScanOptions>* simplified_scan_options) const {
if (partition_expression_ == nullptr) {
if (simplified_scan_options != nullptr) {
*simplified_scan_options = scan_options;
Expand All @@ -88,19 +91,21 @@ bool DataSource::AssumePartitionExpression(
return true;
}

DataFragmentIterator DataSource::GetFragments(ScanOptionsPtr scan_options) {
ScanOptionsPtr simplified_scan_options;
DataFragmentIterator DataSource::GetFragments(std::shared_ptr<ScanOptions> scan_options) {
std::shared_ptr<ScanOptions> simplified_scan_options;
if (!AssumePartitionExpression(scan_options, &simplified_scan_options)) {
return MakeEmptyIterator<DataFragmentPtr>();
return MakeEmptyIterator<std::shared_ptr<DataFragment>>();
}
return GetFragmentsImpl(std::move(simplified_scan_options));
}

DataFragmentIterator SimpleDataSource::GetFragmentsImpl(ScanOptionsPtr scan_options) {
DataFragmentIterator SimpleDataSource::GetFragmentsImpl(
std::shared_ptr<ScanOptions> scan_options) {
return MakeVectorIterator(fragments_);
}

DataFragmentIterator TreeDataSource::GetFragmentsImpl(ScanOptionsPtr options) {
DataFragmentIterator TreeDataSource::GetFragmentsImpl(
std::shared_ptr<ScanOptions> options) {
return GetFragmentsFromSources(children_, options);
}

Expand Down
47 changes: 26 additions & 21 deletions cpp/src/arrow/dataset/dataset.h
Expand Up @@ -38,7 +38,7 @@ class ARROW_DS_EXPORT DataFragment {
public:
/// \brief Scan returns an iterator of ScanTasks, each of which yields
/// RecordBatches from this DataFragment.
virtual Result<ScanTaskIterator> Scan(ScanContextPtr context) = 0;
virtual Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) = 0;

/// \brief Return true if the fragment can benefit from parallel
/// scanning
Expand All @@ -48,7 +48,7 @@ class ARROW_DS_EXPORT DataFragment {
/// scanning this fragment. May be nullptr, which indicates that no filtering
/// or schema reconciliation will be performed and all partitions will be
/// scanned.
ScanOptionsPtr scan_options() const { return scan_options_; }
std::shared_ptr<ScanOptions> scan_options() const { return scan_options_; }

virtual ~DataFragment() = default;

Expand All @@ -59,24 +59,25 @@ class ARROW_DS_EXPORT DataFragment {
}

protected:
explicit DataFragment(ScanOptionsPtr scan_options);
explicit DataFragment(std::shared_ptr<ScanOptions> scan_options);

DataFragment(ScanOptionsPtr scan_options, ExpressionPtr partition_expression)
DataFragment(std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression)
: scan_options_(std::move(scan_options)),
partition_expression_(std::move(partition_expression)) {}

ScanOptionsPtr scan_options_;
ExpressionPtr partition_expression_;
std::shared_ptr<ScanOptions> scan_options_;
std::shared_ptr<Expression> partition_expression_;
};

/// \brief A trivial DataFragment that yields ScanTask out of a fixed set of
/// RecordBatch.
class ARROW_DS_EXPORT SimpleDataFragment : public DataFragment {
public:
SimpleDataFragment(std::vector<std::shared_ptr<RecordBatch>> record_batches,
ScanOptionsPtr scan_options);
std::shared_ptr<ScanOptions> scan_options);

Result<ScanTaskIterator> Scan(ScanContextPtr context) override;
Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) override;

bool splittable() const override { return false; }

Expand All @@ -91,11 +92,13 @@ class ARROW_DS_EXPORT DataSource {
public:
/// \brief GetFragments returns an iterator of DataFragments. The ScanOptions
/// controls filtering and schema inference.
DataFragmentIterator GetFragments(ScanOptionsPtr options);
DataFragmentIterator GetFragments(std::shared_ptr<ScanOptions> options);

/// \brief An expression which evaluates to true for all data viewed by this DataSource.
/// May be null, which indicates no information is available.
const ExpressionPtr& partition_expression() const { return partition_expression_; }
const std::shared_ptr<Expression>& partition_expression() const {
return partition_expression_;
}

/// \brief The name identifying the kind of data source
virtual std::string type_name() const = 0;
Expand All @@ -104,16 +107,18 @@ class ARROW_DS_EXPORT DataSource {

protected:
DataSource() = default;
explicit DataSource(ExpressionPtr c) : partition_expression_(std::move(c)) {}
explicit DataSource(std::shared_ptr<Expression> c)
: partition_expression_(std::move(c)) {}

virtual DataFragmentIterator GetFragmentsImpl(ScanOptionsPtr options) = 0;
virtual DataFragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) = 0;

/// Mutates a ScanOptions by assuming partition_expression_ holds for all yielded
/// fragments. Returns false if the selector is not satisfiable in this DataSource.
virtual bool AssumePartitionExpression(const ScanOptionsPtr& scan_options,
ScanOptionsPtr* simplified_scan_options) const;
virtual bool AssumePartitionExpression(
const std::shared_ptr<ScanOptions>& scan_options,
std::shared_ptr<ScanOptions>* simplified_scan_options) const;

ExpressionPtr partition_expression_;
std::shared_ptr<Expression> partition_expression_;
};

/// \brief A DataSource consisting of a flat sequence of DataFragments
Expand All @@ -122,7 +127,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource {
explicit SimpleDataSource(DataFragmentVector fragments)
: fragments_(std::move(fragments)) {}

DataFragmentIterator GetFragmentsImpl(ScanOptionsPtr options) override;
DataFragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

std::string type_name() const override { return "simple"; }

Expand All @@ -135,7 +140,7 @@ class ARROW_DS_EXPORT TreeDataSource : public DataSource {
public:
explicit TreeDataSource(DataSourceVector children) : children_(std::move(children)) {}

DataFragmentIterator GetFragmentsImpl(ScanOptionsPtr options) override;
DataFragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

std::string type_name() const override { return "tree"; }

Expand All @@ -151,12 +156,12 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
//
/// \param[in] sources one or more input data sources
/// \param[in] schema a known schema to conform to
static Result<DatasetPtr> Make(DataSourceVector sources,
std::shared_ptr<Schema> schema);
static Result<std::shared_ptr<Dataset>> Make(DataSourceVector sources,
std::shared_ptr<Schema> schema);

/// \brief Begin to build a new Scan operation against this Dataset
Result<ScannerBuilderPtr> NewScan(ScanContextPtr context);
Result<ScannerBuilderPtr> NewScan();
Result<std::shared_ptr<ScannerBuilder>> NewScan(std::shared_ptr<ScanContext> context);
Result<std::shared_ptr<ScannerBuilder>> NewScan();

const DataSourceVector& sources() const { return sources_; }

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/dataset/dataset_internal.h
Expand Up @@ -35,12 +35,12 @@ namespace dataset {
/// \brief GetFragmentsFromSources transforms a vector<DataSource> into a
/// flattened DataFragmentIterator.
static inline DataFragmentIterator GetFragmentsFromSources(
const DataSourceVector& sources, ScanOptionsPtr options) {
const DataSourceVector& sources, std::shared_ptr<ScanOptions> options) {
// Iterator<DataSource>
auto sources_it = MakeVectorIterator(sources);

// DataSource -> Iterator<DataFragment>
auto fn = [options](DataSourcePtr source) -> DataFragmentIterator {
auto fn = [options](std::shared_ptr<DataSource> source) -> DataFragmentIterator {
return source->GetFragments(options);
};

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/dataset_test.cc
Expand Up @@ -298,7 +298,7 @@ TEST_F(TestEndToEnd, EndToEndSingleSource) {
// A DataSource is composed of DataFragments. Each DataFragment can yield
// multiple RecordBatches. DataSources can be created manually or "discovered"
// via the DataSourceDiscovery interface.
DataSourceDiscoveryPtr discovery;
std::shared_ptr<DataSourceDiscovery> discovery;

// The user must specify which FileFormat is used to create FileFragments.
// This option is specific to FileSystemDataSource (and the builder).
Expand Down
27 changes: 14 additions & 13 deletions cpp/src/arrow/dataset/discovery.cc
Expand Up @@ -55,8 +55,8 @@ Result<std::shared_ptr<Schema>> DataSourceDiscovery::Inspect() {
}

FileSystemDataSourceDiscovery::FileSystemDataSourceDiscovery(
fs::FileSystemPtr filesystem, fs::PathForest forest, FileFormatPtr format,
FileSystemDiscoveryOptions options)
std::shared_ptr<fs::FileSystem> filesystem, fs::PathForest forest,
std::shared_ptr<FileFormat> format, FileSystemDiscoveryOptions options)
: fs_(std::move(filesystem)),
forest_(std::move(forest)),
format_(std::move(format)),
Expand All @@ -78,8 +78,9 @@ bool StartsWithAnyOf(const std::vector<std::string>& prefixes, const std::string
}

Result<fs::PathForest> FileSystemDataSourceDiscovery::Filter(
const fs::FileSystemPtr& filesystem, const FileFormatPtr& format,
const FileSystemDiscoveryOptions& options, fs::PathForest forest) {
const std::shared_ptr<fs::FileSystem>& filesystem,
const std::shared_ptr<FileFormat>& format, const FileSystemDiscoveryOptions& options,
fs::PathForest forest) {
fs::FileStatsVector out;

auto& stats = forest.stats();
Expand All @@ -105,9 +106,9 @@ Result<fs::PathForest> FileSystemDataSourceDiscovery::Filter(
return fs::PathForest::MakeFromPreSorted(std::move(out));
}

Result<DataSourceDiscoveryPtr> FileSystemDataSourceDiscovery::Make(
fs::FileSystemPtr filesystem, const std::vector<std::string>& paths,
FileFormatPtr format, FileSystemDiscoveryOptions options) {
Result<std::shared_ptr<DataSourceDiscovery>> FileSystemDataSourceDiscovery::Make(
std::shared_ptr<fs::FileSystem> filesystem, const std::vector<std::string>& paths,
std::shared_ptr<FileFormat> format, FileSystemDiscoveryOptions options) {
ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetTargetStats(paths));
ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(files)));

Expand All @@ -133,13 +134,13 @@ Result<DataSourceDiscoveryPtr> FileSystemDataSourceDiscovery::Make(

ARROW_ASSIGN_OR_RAISE(forest, Filter(filesystem, format, options, std::move(forest)));

return DataSourceDiscoveryPtr(new FileSystemDataSourceDiscovery(
return std::shared_ptr<DataSourceDiscovery>(new FileSystemDataSourceDiscovery(
filesystem, std::move(forest), std::move(format), std::move(options)));
}

Result<DataSourceDiscoveryPtr> FileSystemDataSourceDiscovery::Make(
fs::FileSystemPtr filesystem, fs::FileSelector selector, FileFormatPtr format,
FileSystemDiscoveryOptions options) {
Result<std::shared_ptr<DataSourceDiscovery>> FileSystemDataSourceDiscovery::Make(
std::shared_ptr<fs::FileSystem> filesystem, fs::FileSelector selector,
std::shared_ptr<FileFormat> format, FileSystemDiscoveryOptions options) {
ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetTargetStats(selector));

ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(files)));
Expand All @@ -153,7 +154,7 @@ Result<DataSourceDiscoveryPtr> FileSystemDataSourceDiscovery::Make(
options.partition_base_dir = selector.base_dir;
}

return DataSourceDiscoveryPtr(new FileSystemDataSourceDiscovery(
return std::shared_ptr<DataSourceDiscovery>(new FileSystemDataSourceDiscovery(
filesystem, std::move(forest), std::move(format), std::move(options)));
}

Expand All @@ -171,7 +172,7 @@ FileSystemDataSourceDiscovery::InspectSchemas() {
return schemas;
}

Result<DataSourcePtr> FileSystemDataSourceDiscovery::Finish() {
Result<std::shared_ptr<DataSource>> FileSystemDataSourceDiscovery::Finish() {
ExpressionVector partitions(forest_.size(), scalar(true));

// apply partition_scheme to forest to derive partitions
Expand Down

0 comments on commit 5982a1d

Please sign in to comment.