Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-7439: [C++][Dataset] Remove pointer aliases #6069

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/examples/arrow/dataset-parquet-scan-example.cc
Expand Up @@ -65,7 +65,7 @@ struct Configuration {
std::shared_ptr<ds::Expression> filter = ("total_amount"_ > 1000.0f).Copy();
} conf;

std::shared_ptr<ds::Dataset> GetDatasetFromPath(fs::FileSystemPtr fs,
std::shared_ptr<ds::Dataset> GetDatasetFromPath(std::shared_ptr<fs::FileSystem> fs,
std::shared_ptr<ds::FileFormat> format,
std::string path) {
// Find all files under `path`
Expand Down
35 changes: 20 additions & 15 deletions cpp/src/arrow/dataset/dataset.cc
Expand Up @@ -29,21 +29,22 @@
namespace arrow {
namespace dataset {

DataFragment::DataFragment(ScanOptionsPtr scan_options)
DataFragment::DataFragment(std::shared_ptr<ScanOptions> scan_options)
: scan_options_(std::move(scan_options)), partition_expression_(scalar(true)) {}

SimpleDataFragment::SimpleDataFragment(
std::vector<std::shared_ptr<RecordBatch>> record_batches, ScanOptionsPtr scan_options)
std::vector<std::shared_ptr<RecordBatch>> record_batches,
std::shared_ptr<ScanOptions> scan_options)
: DataFragment(std::move(scan_options)), record_batches_(std::move(record_batches)) {}

Result<ScanTaskIterator> SimpleDataFragment::Scan(ScanContextPtr context) {
Result<ScanTaskIterator> SimpleDataFragment::Scan(std::shared_ptr<ScanContext> context) {
// Make an explicit copy of record_batches_ to ensure Scan can be called
// multiple times.
auto batches_it = MakeVectorIterator(record_batches_);

// RecordBatch -> ScanTask
auto scan_options = scan_options_;
auto fn = [=](std::shared_ptr<RecordBatch> batch) -> ScanTaskPtr {
auto fn = [=](std::shared_ptr<RecordBatch> batch) -> std::shared_ptr<ScanTask> {
std::vector<std::shared_ptr<RecordBatch>> batches{batch};
return ::arrow::internal::make_unique<SimpleScanTask>(
std::move(batches), std::move(scan_options), std::move(context));
Expand All @@ -52,21 +53,23 @@ Result<ScanTaskIterator> SimpleDataFragment::Scan(ScanContextPtr context) {
return MakeMapIterator(fn, std::move(batches_it));
}

Result<DatasetPtr> Dataset::Make(DataSourceVector sources,
std::shared_ptr<Schema> schema) {
return DatasetPtr(new Dataset(std::move(sources), std::move(schema)));
Result<std::shared_ptr<Dataset>> Dataset::Make(DataSourceVector sources,
std::shared_ptr<Schema> schema) {
return std::shared_ptr<Dataset>(new Dataset(std::move(sources), std::move(schema)));
}

Result<ScannerBuilderPtr> Dataset::NewScan(ScanContextPtr context) {
Result<std::shared_ptr<ScannerBuilder>> Dataset::NewScan(
std::shared_ptr<ScanContext> context) {
return std::make_shared<ScannerBuilder>(this->shared_from_this(), context);
}

Result<ScannerBuilderPtr> Dataset::NewScan() {
Result<std::shared_ptr<ScannerBuilder>> Dataset::NewScan() {
return NewScan(std::make_shared<ScanContext>());
}

bool DataSource::AssumePartitionExpression(
const ScanOptionsPtr& scan_options, ScanOptionsPtr* simplified_scan_options) const {
const std::shared_ptr<ScanOptions>& scan_options,
std::shared_ptr<ScanOptions>* simplified_scan_options) const {
if (partition_expression_ == nullptr) {
if (simplified_scan_options != nullptr) {
*simplified_scan_options = scan_options;
Expand All @@ -88,19 +91,21 @@ bool DataSource::AssumePartitionExpression(
return true;
}

DataFragmentIterator DataSource::GetFragments(ScanOptionsPtr scan_options) {
ScanOptionsPtr simplified_scan_options;
DataFragmentIterator DataSource::GetFragments(std::shared_ptr<ScanOptions> scan_options) {
std::shared_ptr<ScanOptions> simplified_scan_options;
if (!AssumePartitionExpression(scan_options, &simplified_scan_options)) {
return MakeEmptyIterator<DataFragmentPtr>();
return MakeEmptyIterator<std::shared_ptr<DataFragment>>();
}
return GetFragmentsImpl(std::move(simplified_scan_options));
}

DataFragmentIterator SimpleDataSource::GetFragmentsImpl(ScanOptionsPtr scan_options) {
DataFragmentIterator SimpleDataSource::GetFragmentsImpl(
std::shared_ptr<ScanOptions> scan_options) {
return MakeVectorIterator(fragments_);
}

DataFragmentIterator TreeDataSource::GetFragmentsImpl(ScanOptionsPtr options) {
DataFragmentIterator TreeDataSource::GetFragmentsImpl(
std::shared_ptr<ScanOptions> options) {
return GetFragmentsFromSources(children_, options);
}

Expand Down
47 changes: 26 additions & 21 deletions cpp/src/arrow/dataset/dataset.h
Expand Up @@ -38,7 +38,7 @@ class ARROW_DS_EXPORT DataFragment {
public:
/// \brief Scan returns an iterator of ScanTasks, each of which yields
/// RecordBatches from this DataFragment.
virtual Result<ScanTaskIterator> Scan(ScanContextPtr context) = 0;
virtual Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) = 0;

/// \brief Return true if the fragment can benefit from parallel
/// scanning
Expand All @@ -48,7 +48,7 @@ class ARROW_DS_EXPORT DataFragment {
/// scanning this fragment. May be nullptr, which indicates that no filtering
/// or schema reconciliation will be performed and all partitions will be
/// scanned.
ScanOptionsPtr scan_options() const { return scan_options_; }
std::shared_ptr<ScanOptions> scan_options() const { return scan_options_; }

virtual ~DataFragment() = default;

Expand All @@ -59,24 +59,25 @@ class ARROW_DS_EXPORT DataFragment {
}

protected:
explicit DataFragment(ScanOptionsPtr scan_options);
explicit DataFragment(std::shared_ptr<ScanOptions> scan_options);

DataFragment(ScanOptionsPtr scan_options, ExpressionPtr partition_expression)
DataFragment(std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression)
: scan_options_(std::move(scan_options)),
partition_expression_(std::move(partition_expression)) {}

ScanOptionsPtr scan_options_;
ExpressionPtr partition_expression_;
std::shared_ptr<ScanOptions> scan_options_;
std::shared_ptr<Expression> partition_expression_;
};

/// \brief A trivial DataFragment that yields ScanTask out of a fixed set of
/// RecordBatch.
class ARROW_DS_EXPORT SimpleDataFragment : public DataFragment {
public:
SimpleDataFragment(std::vector<std::shared_ptr<RecordBatch>> record_batches,
ScanOptionsPtr scan_options);
std::shared_ptr<ScanOptions> scan_options);

Result<ScanTaskIterator> Scan(ScanContextPtr context) override;
Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) override;

bool splittable() const override { return false; }

Expand All @@ -91,11 +92,13 @@ class ARROW_DS_EXPORT DataSource {
public:
/// \brief GetFragments returns an iterator of DataFragments. The ScanOptions
/// controls filtering and schema inference.
DataFragmentIterator GetFragments(ScanOptionsPtr options);
DataFragmentIterator GetFragments(std::shared_ptr<ScanOptions> options);

/// \brief An expression which evaluates to true for all data viewed by this DataSource.
/// May be null, which indicates no information is available.
const ExpressionPtr& partition_expression() const { return partition_expression_; }
const std::shared_ptr<Expression>& partition_expression() const {
return partition_expression_;
}

/// \brief The name identifying the kind of data source
virtual std::string type_name() const = 0;
Expand All @@ -104,16 +107,18 @@ class ARROW_DS_EXPORT DataSource {

protected:
DataSource() = default;
explicit DataSource(ExpressionPtr c) : partition_expression_(std::move(c)) {}
explicit DataSource(std::shared_ptr<Expression> c)
: partition_expression_(std::move(c)) {}

virtual DataFragmentIterator GetFragmentsImpl(ScanOptionsPtr options) = 0;
virtual DataFragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) = 0;

/// Mutates a ScanOptions by assuming partition_expression_ holds for all yielded
/// fragments. Returns false if the selector is not satisfiable in this DataSource.
virtual bool AssumePartitionExpression(const ScanOptionsPtr& scan_options,
ScanOptionsPtr* simplified_scan_options) const;
virtual bool AssumePartitionExpression(
const std::shared_ptr<ScanOptions>& scan_options,
std::shared_ptr<ScanOptions>* simplified_scan_options) const;

ExpressionPtr partition_expression_;
std::shared_ptr<Expression> partition_expression_;
};

/// \brief A DataSource consisting of a flat sequence of DataFragments
Expand All @@ -122,7 +127,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource {
explicit SimpleDataSource(DataFragmentVector fragments)
: fragments_(std::move(fragments)) {}

DataFragmentIterator GetFragmentsImpl(ScanOptionsPtr options) override;
DataFragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

std::string type_name() const override { return "simple"; }

Expand All @@ -135,7 +140,7 @@ class ARROW_DS_EXPORT TreeDataSource : public DataSource {
public:
explicit TreeDataSource(DataSourceVector children) : children_(std::move(children)) {}

DataFragmentIterator GetFragmentsImpl(ScanOptionsPtr options) override;
DataFragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

std::string type_name() const override { return "tree"; }

Expand All @@ -151,12 +156,12 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
//
/// \param[in] sources one or more input data sources
/// \param[in] schema a known schema to conform to
static Result<DatasetPtr> Make(DataSourceVector sources,
std::shared_ptr<Schema> schema);
static Result<std::shared_ptr<Dataset>> Make(DataSourceVector sources,
std::shared_ptr<Schema> schema);

/// \brief Begin to build a new Scan operation against this Dataset
Result<ScannerBuilderPtr> NewScan(ScanContextPtr context);
Result<ScannerBuilderPtr> NewScan();
Result<std::shared_ptr<ScannerBuilder>> NewScan(std::shared_ptr<ScanContext> context);
Result<std::shared_ptr<ScannerBuilder>> NewScan();

const DataSourceVector& sources() const { return sources_; }

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/dataset/dataset_internal.h
Expand Up @@ -35,12 +35,12 @@ namespace dataset {
/// \brief GetFragmentsFromSources transforms a vector<DataSource> into a
/// flattened DataFragmentIterator.
static inline DataFragmentIterator GetFragmentsFromSources(
const DataSourceVector& sources, ScanOptionsPtr options) {
const DataSourceVector& sources, std::shared_ptr<ScanOptions> options) {
// Iterator<DataSource>
auto sources_it = MakeVectorIterator(sources);

// DataSource -> Iterator<DataFragment>
auto fn = [options](DataSourcePtr source) -> DataFragmentIterator {
auto fn = [options](std::shared_ptr<DataSource> source) -> DataFragmentIterator {
return source->GetFragments(options);
};

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/dataset_test.cc
Expand Up @@ -298,7 +298,7 @@ TEST_F(TestEndToEnd, EndToEndSingleSource) {
// A DataSource is composed of DataFragments. Each DataFragment can yield
// multiple RecordBatches. DataSources can be created manually or "discovered"
// via the DataSourceDiscovery interface.
DataSourceDiscoveryPtr discovery;
std::shared_ptr<DataSourceDiscovery> discovery;

// The user must specify which FileFormat is used to create FileFragments.
// This option is specific to FileSystemDataSource (and the builder).
Expand Down
27 changes: 14 additions & 13 deletions cpp/src/arrow/dataset/discovery.cc
Expand Up @@ -55,8 +55,8 @@ Result<std::shared_ptr<Schema>> DataSourceDiscovery::Inspect() {
}

FileSystemDataSourceDiscovery::FileSystemDataSourceDiscovery(
fs::FileSystemPtr filesystem, fs::PathForest forest, FileFormatPtr format,
FileSystemDiscoveryOptions options)
std::shared_ptr<fs::FileSystem> filesystem, fs::PathForest forest,
std::shared_ptr<FileFormat> format, FileSystemDiscoveryOptions options)
: fs_(std::move(filesystem)),
forest_(std::move(forest)),
format_(std::move(format)),
Expand All @@ -78,8 +78,9 @@ bool StartsWithAnyOf(const std::vector<std::string>& prefixes, const std::string
}

Result<fs::PathForest> FileSystemDataSourceDiscovery::Filter(
const fs::FileSystemPtr& filesystem, const FileFormatPtr& format,
const FileSystemDiscoveryOptions& options, fs::PathForest forest) {
const std::shared_ptr<fs::FileSystem>& filesystem,
const std::shared_ptr<FileFormat>& format, const FileSystemDiscoveryOptions& options,
fs::PathForest forest) {
fs::FileStatsVector out;

auto& stats = forest.stats();
Expand All @@ -105,9 +106,9 @@ Result<fs::PathForest> FileSystemDataSourceDiscovery::Filter(
return fs::PathForest::MakeFromPreSorted(std::move(out));
}

Result<DataSourceDiscoveryPtr> FileSystemDataSourceDiscovery::Make(
fs::FileSystemPtr filesystem, const std::vector<std::string>& paths,
FileFormatPtr format, FileSystemDiscoveryOptions options) {
Result<std::shared_ptr<DataSourceDiscovery>> FileSystemDataSourceDiscovery::Make(
std::shared_ptr<fs::FileSystem> filesystem, const std::vector<std::string>& paths,
std::shared_ptr<FileFormat> format, FileSystemDiscoveryOptions options) {
ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetTargetStats(paths));
ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(files)));

Expand All @@ -133,13 +134,13 @@ Result<DataSourceDiscoveryPtr> FileSystemDataSourceDiscovery::Make(

ARROW_ASSIGN_OR_RAISE(forest, Filter(filesystem, format, options, std::move(forest)));

return DataSourceDiscoveryPtr(new FileSystemDataSourceDiscovery(
return std::shared_ptr<DataSourceDiscovery>(new FileSystemDataSourceDiscovery(
filesystem, std::move(forest), std::move(format), std::move(options)));
}

Result<DataSourceDiscoveryPtr> FileSystemDataSourceDiscovery::Make(
fs::FileSystemPtr filesystem, fs::FileSelector selector, FileFormatPtr format,
FileSystemDiscoveryOptions options) {
Result<std::shared_ptr<DataSourceDiscovery>> FileSystemDataSourceDiscovery::Make(
std::shared_ptr<fs::FileSystem> filesystem, fs::FileSelector selector,
std::shared_ptr<FileFormat> format, FileSystemDiscoveryOptions options) {
ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetTargetStats(selector));

ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(files)));
Expand All @@ -153,7 +154,7 @@ Result<DataSourceDiscoveryPtr> FileSystemDataSourceDiscovery::Make(
options.partition_base_dir = selector.base_dir;
}

return DataSourceDiscoveryPtr(new FileSystemDataSourceDiscovery(
return std::shared_ptr<DataSourceDiscovery>(new FileSystemDataSourceDiscovery(
filesystem, std::move(forest), std::move(format), std::move(options)));
}

Expand All @@ -171,7 +172,7 @@ FileSystemDataSourceDiscovery::InspectSchemas() {
return schemas;
}

Result<DataSourcePtr> FileSystemDataSourceDiscovery::Finish() {
Result<std::shared_ptr<DataSource>> FileSystemDataSourceDiscovery::Finish() {
ExpressionVector partitions(forest_.size(), scalar(true));

// apply partition_scheme to forest to derive partitions
Expand Down