Skip to content

Commit

Permalink
ARROW-7439: [C++][Dataset] Remove pointer aliases
Browse files Browse the repository at this point in the history
  • Loading branch information
fsaintjacques committed Dec 19, 2019
1 parent 344ed4b commit abcfdbe
Show file tree
Hide file tree
Showing 30 changed files with 915 additions and 880 deletions.
2 changes: 1 addition & 1 deletion cpp/examples/arrow/dataset-parquet-scan-example.cc
Expand Up @@ -65,7 +65,7 @@ struct Configuration {
std::shared_ptr<ds::Expression> filter = ("total_amount"_ > 1000.0f).Copy();
} conf;

std::shared_ptr<ds::Dataset> GetDatasetFromPath(fs::FileSystemPtr fs,
std::shared_ptr<ds::Dataset> GetDatasetFromPath(std::shared_ptr<fs::FileSystem> fs,
std::shared_ptr<ds::FileFormat> format,
std::string path) {
// Find all files under `path`
Expand Down
35 changes: 20 additions & 15 deletions cpp/src/arrow/dataset/dataset.cc
Expand Up @@ -29,21 +29,22 @@
namespace arrow {
namespace dataset {

DataFragment::DataFragment(ScanOptionsPtr scan_options)
DataFragment::DataFragment(std::shared_ptr<ScanOptions> scan_options)
: scan_options_(std::move(scan_options)), partition_expression_(scalar(true)) {}

SimpleDataFragment::SimpleDataFragment(
std::vector<std::shared_ptr<RecordBatch>> record_batches, ScanOptionsPtr scan_options)
std::vector<std::shared_ptr<RecordBatch>> record_batches,
std::shared_ptr<ScanOptions> scan_options)
: DataFragment(std::move(scan_options)), record_batches_(std::move(record_batches)) {}

Result<ScanTaskIterator> SimpleDataFragment::Scan(ScanContextPtr context) {
Result<ScanTaskIterator> SimpleDataFragment::Scan(std::shared_ptr<ScanContext> context) {
// Make an explicit copy of record_batches_ to ensure Scan can be called
// multiple times.
auto batches_it = MakeVectorIterator(record_batches_);

// RecordBatch -> ScanTask
auto scan_options = scan_options_;
auto fn = [=](std::shared_ptr<RecordBatch> batch) -> ScanTaskPtr {
auto fn = [=](std::shared_ptr<RecordBatch> batch) -> std::shared_ptr<ScanTask> {
std::vector<std::shared_ptr<RecordBatch>> batches{batch};
return ::arrow::internal::make_unique<SimpleScanTask>(
std::move(batches), std::move(scan_options), std::move(context));
Expand All @@ -52,21 +53,23 @@ Result<ScanTaskIterator> SimpleDataFragment::Scan(ScanContextPtr context) {
return MakeMapIterator(fn, std::move(batches_it));
}

Result<DatasetPtr> Dataset::Make(DataSourceVector sources,
std::shared_ptr<Schema> schema) {
return DatasetPtr(new Dataset(std::move(sources), std::move(schema)));
Result<std::shared_ptr<Dataset>> Dataset::Make(DataSourceVector sources,
std::shared_ptr<Schema> schema) {
return std::shared_ptr<Dataset>(new Dataset(std::move(sources), std::move(schema)));
}

Result<ScannerBuilderPtr> Dataset::NewScan(ScanContextPtr context) {
Result<std::shared_ptr<ScannerBuilder>> Dataset::NewScan(
std::shared_ptr<ScanContext> context) {
return std::make_shared<ScannerBuilder>(this->shared_from_this(), context);
}

Result<ScannerBuilderPtr> Dataset::NewScan() {
Result<std::shared_ptr<ScannerBuilder>> Dataset::NewScan() {
return NewScan(std::make_shared<ScanContext>());
}

bool DataSource::AssumePartitionExpression(
const ScanOptionsPtr& scan_options, ScanOptionsPtr* simplified_scan_options) const {
const std::shared_ptr<ScanOptions>& scan_options,
std::shared_ptr<ScanOptions>* simplified_scan_options) const {
if (partition_expression_ == nullptr) {
if (simplified_scan_options != nullptr) {
*simplified_scan_options = scan_options;
Expand All @@ -88,19 +91,21 @@ bool DataSource::AssumePartitionExpression(
return true;
}

DataFragmentIterator DataSource::GetFragments(ScanOptionsPtr scan_options) {
ScanOptionsPtr simplified_scan_options;
DataFragmentIterator DataSource::GetFragments(std::shared_ptr<ScanOptions> scan_options) {
std::shared_ptr<ScanOptions> simplified_scan_options;
if (!AssumePartitionExpression(scan_options, &simplified_scan_options)) {
return MakeEmptyIterator<DataFragmentPtr>();
return MakeEmptyIterator<std::shared_ptr<DataFragment>>();
}
return GetFragmentsImpl(std::move(simplified_scan_options));
}

DataFragmentIterator SimpleDataSource::GetFragmentsImpl(ScanOptionsPtr scan_options) {
DataFragmentIterator SimpleDataSource::GetFragmentsImpl(
std::shared_ptr<ScanOptions> scan_options) {
return MakeVectorIterator(fragments_);
}

DataFragmentIterator TreeDataSource::GetFragmentsImpl(ScanOptionsPtr options) {
DataFragmentIterator TreeDataSource::GetFragmentsImpl(
std::shared_ptr<ScanOptions> options) {
return GetFragmentsFromSources(children_, options);
}

Expand Down
47 changes: 26 additions & 21 deletions cpp/src/arrow/dataset/dataset.h
Expand Up @@ -38,7 +38,7 @@ class ARROW_DS_EXPORT DataFragment {
public:
/// \brief Scan returns an iterator of ScanTasks, each of which yields
/// RecordBatches from this DataFragment.
virtual Result<ScanTaskIterator> Scan(ScanContextPtr context) = 0;
virtual Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) = 0;

/// \brief Return true if the fragment can benefit from parallel
/// scanning
Expand All @@ -48,7 +48,7 @@ class ARROW_DS_EXPORT DataFragment {
/// scanning this fragment. May be nullptr, which indicates that no filtering
/// or schema reconciliation will be performed and all partitions will be
/// scanned.
ScanOptionsPtr scan_options() const { return scan_options_; }
std::shared_ptr<ScanOptions> scan_options() const { return scan_options_; }

virtual ~DataFragment() = default;

Expand All @@ -59,24 +59,25 @@ class ARROW_DS_EXPORT DataFragment {
}

protected:
explicit DataFragment(ScanOptionsPtr scan_options);
explicit DataFragment(std::shared_ptr<ScanOptions> scan_options);

DataFragment(ScanOptionsPtr scan_options, ExpressionPtr partition_expression)
DataFragment(std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression)
: scan_options_(std::move(scan_options)),
partition_expression_(std::move(partition_expression)) {}

ScanOptionsPtr scan_options_;
ExpressionPtr partition_expression_;
std::shared_ptr<ScanOptions> scan_options_;
std::shared_ptr<Expression> partition_expression_;
};

/// \brief A trivial DataFragment that yields ScanTask out of a fixed set of
/// RecordBatch.
class ARROW_DS_EXPORT SimpleDataFragment : public DataFragment {
public:
SimpleDataFragment(std::vector<std::shared_ptr<RecordBatch>> record_batches,
ScanOptionsPtr scan_options);
std::shared_ptr<ScanOptions> scan_options);

Result<ScanTaskIterator> Scan(ScanContextPtr context) override;
Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) override;

bool splittable() const override { return false; }

Expand All @@ -91,11 +92,13 @@ class ARROW_DS_EXPORT DataSource {
public:
/// \brief GetFragments returns an iterator of DataFragments. The ScanOptions
/// controls filtering and schema inference.
DataFragmentIterator GetFragments(ScanOptionsPtr options);
DataFragmentIterator GetFragments(std::shared_ptr<ScanOptions> options);

/// \brief An expression which evaluates to true for all data viewed by this DataSource.
/// May be null, which indicates no information is available.
const ExpressionPtr& partition_expression() const { return partition_expression_; }
const std::shared_ptr<Expression>& partition_expression() const {
return partition_expression_;
}

/// \brief The name identifying the kind of data source
virtual std::string type_name() const = 0;
Expand All @@ -104,16 +107,18 @@ class ARROW_DS_EXPORT DataSource {

protected:
DataSource() = default;
explicit DataSource(ExpressionPtr c) : partition_expression_(std::move(c)) {}
explicit DataSource(std::shared_ptr<Expression> c)
: partition_expression_(std::move(c)) {}

virtual DataFragmentIterator GetFragmentsImpl(ScanOptionsPtr options) = 0;
virtual DataFragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) = 0;

/// Mutates a ScanOptions by assuming partition_expression_ holds for all yielded
/// fragments. Returns false if the selector is not satisfiable in this DataSource.
virtual bool AssumePartitionExpression(const ScanOptionsPtr& scan_options,
ScanOptionsPtr* simplified_scan_options) const;
virtual bool AssumePartitionExpression(
const std::shared_ptr<ScanOptions>& scan_options,
std::shared_ptr<ScanOptions>* simplified_scan_options) const;

ExpressionPtr partition_expression_;
std::shared_ptr<Expression> partition_expression_;
};

/// \brief A DataSource consisting of a flat sequence of DataFragments
Expand All @@ -122,7 +127,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource {
explicit SimpleDataSource(DataFragmentVector fragments)
: fragments_(std::move(fragments)) {}

DataFragmentIterator GetFragmentsImpl(ScanOptionsPtr options) override;
DataFragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

std::string type_name() const override { return "simple"; }

Expand All @@ -135,7 +140,7 @@ class ARROW_DS_EXPORT TreeDataSource : public DataSource {
public:
explicit TreeDataSource(DataSourceVector children) : children_(std::move(children)) {}

DataFragmentIterator GetFragmentsImpl(ScanOptionsPtr options) override;
DataFragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

std::string type_name() const override { return "tree"; }

Expand All @@ -151,12 +156,12 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
//
/// \param[in] sources one or more input data sources
/// \param[in] schema a known schema to conform to
static Result<DatasetPtr> Make(DataSourceVector sources,
std::shared_ptr<Schema> schema);
static Result<std::shared_ptr<Dataset>> Make(DataSourceVector sources,
std::shared_ptr<Schema> schema);

/// \brief Begin to build a new Scan operation against this Dataset
Result<ScannerBuilderPtr> NewScan(ScanContextPtr context);
Result<ScannerBuilderPtr> NewScan();
Result<std::shared_ptr<ScannerBuilder>> NewScan(std::shared_ptr<ScanContext> context);
Result<std::shared_ptr<ScannerBuilder>> NewScan();

const DataSourceVector& sources() const { return sources_; }

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/dataset/dataset_internal.h
Expand Up @@ -35,12 +35,12 @@ namespace dataset {
/// \brief GetFragmentsFromSources transforms a vector<DataSource> into a
/// flattened DataFragmentIterator.
static inline DataFragmentIterator GetFragmentsFromSources(
const DataSourceVector& sources, ScanOptionsPtr options) {
const DataSourceVector& sources, std::shared_ptr<ScanOptions> options) {
// Iterator<DataSource>
auto sources_it = MakeVectorIterator(sources);

// DataSource -> Iterator<DataFragment>
auto fn = [options](DataSourcePtr source) -> DataFragmentIterator {
auto fn = [options](std::shared_ptr<DataSource> source) -> DataFragmentIterator {
return source->GetFragments(options);
};

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/dataset_test.cc
Expand Up @@ -298,7 +298,7 @@ TEST_F(TestEndToEnd, EndToEndSingleSource) {
// A DataSource is composed of DataFragments. Each DataFragment can yield
// multiple RecordBatches. DataSources can be created manually or "discovered"
// via the DataSourceDiscovery interface.
DataSourceDiscoveryPtr discovery;
std::shared_ptr<DataSourceDiscovery> discovery;

// The user must specify which FileFormat is used to create FileFragments.
// This option is specific to FileSystemDataSource (and the builder).
Expand Down
20 changes: 10 additions & 10 deletions cpp/src/arrow/dataset/discovery.cc
Expand Up @@ -54,8 +54,8 @@ Result<std::shared_ptr<Schema>> DataSourceDiscovery::Inspect() {
}

FileSystemDataSourceDiscovery::FileSystemDataSourceDiscovery(
fs::FileSystemPtr filesystem, fs::FileStatsVector files, FileFormatPtr format,
FileSystemDiscoveryOptions options)
std::shared_ptr<fs::FileSystem> filesystem, fs::FileStatsVector files,
std::shared_ptr<FileFormat> format, FileSystemDiscoveryOptions options)
: fs_(std::move(filesystem)),
files_(std::move(files)),
format_(std::move(format)),
Expand All @@ -72,9 +72,9 @@ bool StartsWithAnyOf(const std::vector<std::string>& prefixes, const std::string
return std::any_of(prefixes.cbegin(), prefixes.cend(), matches_prefix);
}

Result<DataSourceDiscoveryPtr> FileSystemDataSourceDiscovery::Make(
fs::FileSystemPtr fs, fs::FileStatsVector files, FileFormatPtr format,
FileSystemDiscoveryOptions options) {
Result<std::shared_ptr<DataSourceDiscovery>> FileSystemDataSourceDiscovery::Make(
std::shared_ptr<fs::FileSystem> fs, fs::FileStatsVector files,
std::shared_ptr<FileFormat> format, FileSystemDiscoveryOptions options) {
DCHECK_NE(format, nullptr);

bool has_prefixes = !options.ignore_prefixes.empty();
Expand All @@ -99,13 +99,13 @@ Result<DataSourceDiscoveryPtr> FileSystemDataSourceDiscovery::Make(
filtered.push_back(stat);
}

return DataSourceDiscoveryPtr(new FileSystemDataSourceDiscovery(
return std::shared_ptr<DataSourceDiscovery>(new FileSystemDataSourceDiscovery(
fs, std::move(filtered), std::move(format), std::move(options)));
}

Result<DataSourceDiscoveryPtr> FileSystemDataSourceDiscovery::Make(
fs::FileSystemPtr filesystem, fs::FileSelector selector, FileFormatPtr format,
FileSystemDiscoveryOptions options) {
Result<std::shared_ptr<DataSourceDiscovery>> FileSystemDataSourceDiscovery::Make(
std::shared_ptr<fs::FileSystem> filesystem, fs::FileSelector selector,
std::shared_ptr<FileFormat> format, FileSystemDiscoveryOptions options) {
ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetTargetStats(selector));

// By automatically setting the options base_dir to the selector's base_dir,
Expand Down Expand Up @@ -133,7 +133,7 @@ FileSystemDataSourceDiscovery::InspectSchemas() {
return schemas;
}

Result<DataSourcePtr> FileSystemDataSourceDiscovery::Finish() {
Result<std::shared_ptr<DataSource>> FileSystemDataSourceDiscovery::Finish() {
ExpressionVector partitions(files_.size(), scalar(true));

ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(files_));
Expand Down
41 changes: 21 additions & 20 deletions cpp/src/arrow/dataset/discovery.h
Expand Up @@ -60,22 +60,24 @@ class ARROW_DS_EXPORT DataSourceDiscovery {
virtual Result<std::shared_ptr<Schema>> Inspect();

/// \brief Create a DataSource with a given partition.
virtual Result<DataSourcePtr> Finish() = 0;
virtual Result<std::shared_ptr<DataSource>> Finish() = 0;

std::shared_ptr<Schema> schema() const { return schema_; }
Status SetSchema(std::shared_ptr<Schema> schema) {
schema_ = schema;
return Status::OK();
}

const PartitionSchemePtr& partition_scheme() const { return partition_scheme_; }
Status SetPartitionScheme(PartitionSchemePtr partition_scheme) {
const std::shared_ptr<PartitionScheme>& partition_scheme() const {
return partition_scheme_;
}
Status SetPartitionScheme(std::shared_ptr<PartitionScheme> partition_scheme) {
partition_scheme_ = partition_scheme;
return Status::OK();
}

const ExpressionPtr& root_partition() const { return root_partition_; }
Status SetRootPartition(ExpressionPtr partition) {
const std::shared_ptr<Expression>& root_partition() const { return root_partition_; }
Status SetRootPartition(std::shared_ptr<Expression> partition) {
root_partition_ = partition;
return Status::OK();
}
Expand All @@ -86,8 +88,8 @@ class ARROW_DS_EXPORT DataSourceDiscovery {
DataSourceDiscovery();

std::shared_ptr<Schema> schema_;
PartitionSchemePtr partition_scheme_;
ExpressionPtr root_partition_;
std::shared_ptr<PartitionScheme> partition_scheme_;
std::shared_ptr<Expression> root_partition_;
};

struct FileSystemDiscoveryOptions {
Expand Down Expand Up @@ -141,10 +143,9 @@ class ARROW_DS_EXPORT FileSystemDataSourceDiscovery : public DataSourceDiscovery
/// \param[in] paths passed to FileSystemDataSource
/// \param[in] format passed to FileSystemDataSource
/// \param[in] options see FileSystemDiscoveryOptions for more information.
static Result<DataSourceDiscoveryPtr> Make(fs::FileSystemPtr filesystem,
fs::FileStatsVector paths,
FileFormatPtr format,
FileSystemDiscoveryOptions options);
static Result<std::shared_ptr<DataSourceDiscovery>> Make(
std::shared_ptr<fs::FileSystem> filesystem, fs::FileStatsVector paths,
std::shared_ptr<FileFormat> format, FileSystemDiscoveryOptions options);

/// \brief Build a FileSystemDataSourceDiscovery from a fs::FileSelector.
///
Expand All @@ -159,23 +160,23 @@ class ARROW_DS_EXPORT FileSystemDataSourceDiscovery : public DataSourceDiscovery
/// \param[in] selector used to crawl and search files
/// \param[in] format passed to FileSystemDataSource
/// \param[in] options see FileSystemDiscoveryOptions for more information.
static Result<DataSourceDiscoveryPtr> Make(fs::FileSystemPtr filesystem,
fs::FileSelector selector,
FileFormatPtr format,
FileSystemDiscoveryOptions options);
static Result<std::shared_ptr<DataSourceDiscovery>> Make(
std::shared_ptr<fs::FileSystem> filesystem, fs::FileSelector selector,
std::shared_ptr<FileFormat> format, FileSystemDiscoveryOptions options);

Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas() override;

Result<DataSourcePtr> Finish() override;
Result<std::shared_ptr<DataSource>> Finish() override;

protected:
FileSystemDataSourceDiscovery(fs::FileSystemPtr filesystem,
std::vector<fs::FileStats> files, FileFormatPtr format,
FileSystemDataSourceDiscovery(std::shared_ptr<fs::FileSystem> filesystem,
std::vector<fs::FileStats> files,
std::shared_ptr<FileFormat> format,
FileSystemDiscoveryOptions options);

fs::FileSystemPtr fs_;
std::shared_ptr<fs::FileSystem> fs_;
std::vector<fs::FileStats> files_;
FileFormatPtr format_;
std::shared_ptr<FileFormat> format_;
FileSystemDiscoveryOptions options_;
};

Expand Down

0 comments on commit abcfdbe

Please sign in to comment.