Skip to content

Commit

Permalink
ARROW-8058: [Dataset] Relax DatasetFactory discovery validation
Browse files Browse the repository at this point in the history
This PR aims to improve the latency of the discovery process. Notably, it selects "fast" defaults over "safe" defaults.

- Add `InspectOptions` which limits the number of fragments inspected to infer the schema, it defaults to one fragment.
- Add `FinishOptions` which toggles if validation of the optional schema and also controls the number of fragments it validates with. It defaults to disabling validation.

This gives a noticeable speedup when the fragments have a uniform schema.

Closes #6687 from fsaintjacques/ARROW-8058-optional-discovery-validation

Authored-by: François Saint-Jacques <fsaintjacques@gmail.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
  • Loading branch information
fsaintjacques authored and bkietz committed Mar 25, 2020
1 parent 815531c commit 17b9980
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 106 deletions.
7 changes: 5 additions & 2 deletions cpp/examples/arrow/dataset-parquet-scan-example.cc
Expand Up @@ -63,6 +63,9 @@ struct Configuration {
// Indicates the filter by which rows will be filtered. This optimization can
// make use of partition information and/or file metadata if possible.
std::shared_ptr<ds::Expression> filter = ("total_amount"_ > 1000.0f).Copy();

ds::InspectOptions inspect_options{};
ds::FinishOptions finish_options{};
} conf;

std::shared_ptr<fs::FileSystem> GetFileSystemFromUri(const std::string& uri,
Expand All @@ -83,10 +86,10 @@ std::shared_ptr<ds::Dataset> GetDatasetFromPath(std::shared_ptr<fs::FileSystem>
auto factory = ds::FileSystemDatasetFactory::Make(fs, s, format, options).ValueOrDie();

// Try to infer a common schema for all files.
auto schema = factory->Inspect().ValueOrDie();
auto schema = factory->Inspect(conf.inspect_options).ValueOrDie();
// Caller can optionally decide another schema as long as it is compatible
// with the previous one, e.g. `factory->Finish(compatible_schema)`.
auto child = factory->Finish().ValueOrDie();
auto child = factory->Finish(conf.finish_options).ValueOrDie();

ds::DatasetVector children{conf.repeat, child};
auto dataset = ds::UnionDataset::Make(std::move(schema), std::move(children));
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/dataset/dataset_test.cc
Expand Up @@ -343,7 +343,9 @@ TEST_F(TestEndToEnd, EndToEndSingleDataset) {
// schema evolved by adding/renaming columns. In this case, the schema is
// passed to the dataset constructor.
// The inspected_schema may optionally be modified before being finalized.
ASSERT_OK_AND_ASSIGN(auto inspected_schema, factory->Inspect());
InspectOptions inspect_options;
inspect_options.fragments = InspectOptions::kInspectAllFragments;
ASSERT_OK_AND_ASSIGN(auto inspected_schema, factory->Inspect(inspect_options));
EXPECT_EQ(*schema_, *inspected_schema);

// Build the Dataset where partitions are attached to fragments (files).
Expand Down
80 changes: 44 additions & 36 deletions cpp/src/arrow/dataset/discovery.cc
Expand Up @@ -36,16 +36,27 @@ namespace dataset {

DatasetFactory::DatasetFactory() : root_partition_(scalar(true)) {}

Result<std::shared_ptr<Schema>> DatasetFactory::Inspect() {
ARROW_ASSIGN_OR_RAISE(auto schemas, InspectSchemas());
Result<std::shared_ptr<Schema>> DatasetFactory::Inspect(InspectOptions options) {
ARROW_ASSIGN_OR_RAISE(auto schemas, InspectSchemas(std::move(options)));

if (schemas.empty()) {
schemas.push_back(arrow::schema({}));
return arrow::schema({});
}

return UnifySchemas(schemas);
}

Result<std::shared_ptr<Dataset>> DatasetFactory::Finish() {
FinishOptions options;
return Finish(options);
}

Result<std::shared_ptr<Dataset>> DatasetFactory::Finish(std::shared_ptr<Schema> schema) {
FinishOptions options;
options.schema = schema;
return Finish(std::move(options));
}

UnionDatasetFactory::UnionDatasetFactory(
std::vector<std::shared_ptr<DatasetFactory>> factories)
: factories_(std::move(factories)) {}
Expand All @@ -62,42 +73,33 @@ Result<std::shared_ptr<DatasetFactory>> UnionDatasetFactory::Make(
new UnionDatasetFactory(std::move(factories))};
}

Result<std::vector<std::shared_ptr<Schema>>> UnionDatasetFactory::InspectSchemas() {
Result<std::vector<std::shared_ptr<Schema>>> UnionDatasetFactory::InspectSchemas(
InspectOptions options) {
std::vector<std::shared_ptr<Schema>> schemas;

for (const auto& child_factory : factories_) {
ARROW_ASSIGN_OR_RAISE(auto schema, child_factory->Inspect());
schemas.emplace_back(schema);
ARROW_ASSIGN_OR_RAISE(auto child_schemas, child_factory->InspectSchemas(options));
ARROW_ASSIGN_OR_RAISE(auto child_schema, UnifySchemas(child_schemas));
schemas.emplace_back(child_schema);
}

return schemas;
}

Result<std::shared_ptr<Schema>> UnionDatasetFactory::Inspect() {
ARROW_ASSIGN_OR_RAISE(auto schemas, InspectSchemas());
Result<std::shared_ptr<Dataset>> UnionDatasetFactory::Finish(FinishOptions options) {
std::vector<std::shared_ptr<Dataset>> children;

if (schemas.empty()) {
return arrow::schema({});
if (options.schema == nullptr) {
// Set the schema in the option directly for use in `child_factory->Finish()`
ARROW_ASSIGN_OR_RAISE(options.schema, Inspect(options.inspect_options));
}

return UnifySchemas(schemas);
}

Result<std::shared_ptr<Dataset>> UnionDatasetFactory::Finish(
const std::shared_ptr<Schema>& schema) {
std::vector<std::shared_ptr<Dataset>> children;

for (const auto& child_factory : factories_) {
ARROW_ASSIGN_OR_RAISE(auto child, child_factory->Finish(schema));
ARROW_ASSIGN_OR_RAISE(auto child, child_factory->Finish(options));
children.emplace_back(child);
}

return std::shared_ptr<Dataset>(new UnionDataset(schema, std::move(children)));
}

Result<std::shared_ptr<Dataset>> UnionDatasetFactory::Finish() {
ARROW_ASSIGN_OR_RAISE(auto schema, Inspect());
return Finish(schema);
return std::shared_ptr<Dataset>(new UnionDataset(options.schema, std::move(children)));
}

FileSystemDatasetFactory::FileSystemDatasetFactory(
Expand Down Expand Up @@ -220,11 +222,15 @@ Result<std::shared_ptr<Schema>> FileSystemDatasetFactory::PartitionSchema() {
return options_.partitioning.factory()->Inspect(paths);
}

Result<std::vector<std::shared_ptr<Schema>>> FileSystemDatasetFactory::InspectSchemas() {
Result<std::vector<std::shared_ptr<Schema>>> FileSystemDatasetFactory::InspectSchemas(
InspectOptions options) {
std::vector<std::shared_ptr<Schema>> schemas;

const bool has_fragments_limit = options.fragments >= 0;
int fragments = options.fragments;
for (const auto& f : forest_.infos()) {
if (!f.IsFile()) continue;
if (has_fragments_limit && fragments-- == 0) break;
FileSource src(f.path(), fs_.get());
ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect(src));
schemas.push_back(schema);
Expand All @@ -236,17 +242,20 @@ Result<std::vector<std::shared_ptr<Schema>>> FileSystemDatasetFactory::InspectSc
return schemas;
}

Result<std::shared_ptr<Dataset>> DatasetFactory::Finish() {
ARROW_ASSIGN_OR_RAISE(auto schema, Inspect());
return Finish(schema);
}
Result<std::shared_ptr<Dataset>> FileSystemDatasetFactory::Finish(FinishOptions options) {
std::shared_ptr<Schema> schema = options.schema;
bool schema_missing = schema == nullptr;
if (schema_missing) {
ARROW_ASSIGN_OR_RAISE(schema, Inspect(options.inspect_options));
}

Result<std::shared_ptr<Dataset>> FileSystemDatasetFactory::Finish(
const std::shared_ptr<Schema>& schema) {
// This validation can be costly, but better safe than sorry.
ARROW_ASSIGN_OR_RAISE(auto schemas, InspectSchemas());
for (const auto& s : schemas) {
RETURN_NOT_OK(SchemaBuilder::AreCompatible({schema, s}));
if (options.validate_fragments && !schema_missing) {
// If the schema was not explicitly provided we don't need to validate
// since Inspect has already succeeded in producing a valid unified schema.
ARROW_ASSIGN_OR_RAISE(auto schemas, InspectSchemas(options.inspect_options));
for (const auto& s : schemas) {
RETURN_NOT_OK(SchemaBuilder::AreCompatible({schema, s}));
}
}

ExpressionVector partitions(forest_.size(), scalar(true));
Expand All @@ -273,7 +282,6 @@ Result<std::shared_ptr<Dataset>> FileSystemDatasetFactory::Finish(
};

RETURN_NOT_OK(forest_.Visit(apply_partitioning));

return FileSystemDataset::Make(schema, root_partition_, format_, fs_, forest_,
std::move(partitions));
}
Expand Down
77 changes: 55 additions & 22 deletions cpp/src/arrow/dataset/discovery.h
Expand Up @@ -37,22 +37,55 @@
namespace arrow {
namespace dataset {

/// \brief SourceFactory provides a way to inspect/discover a Source's expected
/// schema before materializing said Source.
struct InspectOptions {
/// See `fragments` property.
static constexpr int kInspectAllFragments = -1;

/// Indicate how many fragments should be inspected to infer the unified dataset
/// schema. Limiting the number of fragments accessed improves the latency of
/// the discovery process when dealing with a high number of fragments and/or
/// high latency file systems.
///
/// The default value of `1` inspects the schema of the first (in no particular
/// order) fragment only. If the dataset has a uniform schema for all fragments,
/// this default is the optimal value. In order to inspect all fragments and
/// robustly unify their potentially varying schemas, set this option to
/// `kInspectAllFragments`. A value of `0` disables inspection of fragments
/// altogether so only the partitioning schema will be inspected.
int fragments = 1;
};

struct FinishOptions {
/// Finalize the dataset with this given schema. If the schema is not
/// provided, infer the schema via the Inspect, see the `inspect_options`
/// property.
std::shared_ptr<Schema> schema = NULLPTR;

/// If the schema is not provided, it will be discovered by passing the
/// following options to `DatasetDiscovery::Inspect`.
InspectOptions inspect_options{};

/// Indicate if the given Schema (when specified), should be validated against
/// the fragments' schemas. `inspect_options` will control how many fragments
/// are checked.
bool validate_fragments = false;
};

/// \brief DatasetFactory provides a way to inspect/discover a Dataset's expected
/// schema before materializing said Dataset.
class ARROW_DS_EXPORT DatasetFactory {
public:
/// \brief Get the schemas of the Fragments and Partitioning.
virtual Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas() = 0;
virtual Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
InspectOptions options) = 0;

/// \brief Get unified schema for the resulting Dataset.
virtual Result<std::shared_ptr<Schema>> Inspect();

/// \brief Create a Dataset with the given schema.
virtual Result<std::shared_ptr<Dataset>> Finish(
const std::shared_ptr<Schema>& schema) = 0;
Result<std::shared_ptr<Schema>> Inspect(InspectOptions options = {});

/// \brief Create a Dataset using the inspected schema.
virtual Result<std::shared_ptr<Dataset>> Finish();
/// \brief Create a Dataset
Result<std::shared_ptr<Dataset>> Finish();
Result<std::shared_ptr<Dataset>> Finish(std::shared_ptr<Schema> schema);
virtual Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) = 0;

/// \brief Optional root partition for the resulting Dataset.
const std::shared_ptr<Expression>& root_partition() const { return root_partition_; }
Expand Down Expand Up @@ -82,16 +115,15 @@ class ARROW_DS_EXPORT UnionDatasetFactory : public DatasetFactory {
}

/// \brief Get the schemas of the Datasets.
Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas() override;

/// \brief Get unified schema for the resulting Dataset.
Result<std::shared_ptr<Schema>> Inspect() override;

/// \brief Create a Dataset with the given schema.
Result<std::shared_ptr<Dataset>> Finish(const std::shared_ptr<Schema>& schema) override;
///
/// Instead of applying options globally, it applies at each child factory.
/// This will not respect `options.fragments` exactly, but will respect the
/// spirit of peeking the first fragments or all of them.
Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
InspectOptions options) override;

/// \brief Create a Dataset using the inspected schema.
Result<std::shared_ptr<Dataset>> Finish() override;
/// \brief Create a Dataset.
Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;

protected:
explicit UnionDatasetFactory(std::vector<std::shared_ptr<DatasetFactory>> factories);
Expand Down Expand Up @@ -131,7 +163,7 @@ struct FileSystemFactoryOptions {
// in a serial and single threaded fashion. Disabling this feature will skip the
// IO, but unsupported files may be present in the Dataset
// (resulting in an error at scan time).
bool exclude_invalid_files = true;
bool exclude_invalid_files = false;

// Files matching one of the following prefix will be ignored by the
// discovery process. This is matched to the basename of a path.
Expand Down Expand Up @@ -180,9 +212,10 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory {
std::shared_ptr<fs::FileSystem> filesystem, fs::FileSelector selector,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options);

Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas() override;
Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
InspectOptions options) override;

Result<std::shared_ptr<Dataset>> Finish(const std::shared_ptr<Schema>& schema) override;
Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;

protected:
FileSystemDatasetFactory(std::shared_ptr<fs::FileSystem> filesystem,
Expand Down

0 comments on commit 17b9980

Please sign in to comment.