Skip to content

Commit

Permalink
ARROW-7963: [C++][Dataset][Python] Expose Dataset Fragments to Python
Browse files Browse the repository at this point in the history
In C++ adds `Scanner::GetFragments`, currently only used from python.

In Python adds class `Fragment` and subclass `FileFragment`. The former has only a `partition_expression` property, the latter adds a `path` property.

Also in C++: deletes unused FileFragment subclasses.

Closes #6570 from bkietz/7963-Expose-listing-fragments and squashes the following commits:

e0fb05c <Benjamin Kietzman> add root_partition to expected fragment partitions
f915d7b <Benjamin Kietzman> add test for fragment partition expressions
275fbb3 <Benjamin Kietzman> fix merge error: spurious assertion
9eae9f3 <Benjamin Kietzman> GetFragments -> get_fragments
bdb1b9e <Benjamin Kietzman> Update python/pyarrow/_dataset.pyx
b8c2ba6 <Benjamin Kietzman> Update python/pyarrow/_dataset.pyx
68517d8 <Benjamin Kietzman> ARROW-7963:  Expose Dataset Fragments to Python

Authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Signed-off-by: Neal Richardson <neal.p.richardson@gmail.com>
  • Loading branch information
bkietz authored and nealrichardson committed Mar 10, 2020
1 parent 5ca8217 commit 018dd80
Show file tree
Hide file tree
Showing 19 changed files with 343 additions and 194 deletions.
7 changes: 7 additions & 0 deletions cpp/src/arrow/dataset/dataset.cc
Expand Up @@ -42,6 +42,13 @@ InMemoryFragment::InMemoryFragment(
std::shared_ptr<ScanOptions> scan_options)
: Fragment(std::move(scan_options)), record_batches_(std::move(record_batches)) {}

InMemoryFragment::InMemoryFragment(
std::vector<std::shared_ptr<RecordBatch>> record_batches,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression)
: Fragment(std::move(scan_options), std::move(partition_expression)),
record_batches_(std::move(record_batches)) {}

Result<ScanTaskIterator> InMemoryFragment::Scan(std::shared_ptr<ScanContext> context) {
// Make an explicit copy of record_batches_ to ensure Scan can be called
// multiple times.
Expand Down
11 changes: 9 additions & 2 deletions cpp/src/arrow/dataset/dataset.h
Expand Up @@ -51,6 +51,8 @@ class ARROW_DS_EXPORT Fragment {
/// \brief Return true if the fragment can benefit from parallel scanning.
virtual bool splittable() const = 0;

virtual std::string type_name() const = 0;

/// \brief Filtering, schema reconciliation, and partition options to use when
/// scanning this fragment.
const std::shared_ptr<ScanOptions>& scan_options() const { return scan_options_; }
Expand Down Expand Up @@ -84,10 +86,16 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
InMemoryFragment(std::vector<std::shared_ptr<RecordBatch>> record_batches,
std::shared_ptr<ScanOptions> scan_options);

InMemoryFragment(std::vector<std::shared_ptr<RecordBatch>> record_batches,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression);

Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) override;

bool splittable() const override { return false; }

std::string type_name() const override { return "in-memory"; }

protected:
std::vector<std::shared_ptr<RecordBatch>> record_batches_;
};
Expand All @@ -100,8 +108,7 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
Result<std::shared_ptr<ScannerBuilder>> NewScan(std::shared_ptr<ScanContext> context);
Result<std::shared_ptr<ScannerBuilder>> NewScan();

/// \brief GetFragments returns an iterator of Fragments. The ScanOptions
/// controls filtering and schema inference.
/// \brief GetFragments returns an iterator of Fragments given ScanOptions.
FragmentIterator GetFragments(std::shared_ptr<ScanOptions> options);

const std::shared_ptr<Schema>& schema() const { return schema_; }
Expand Down
24 changes: 12 additions & 12 deletions cpp/src/arrow/dataset/discovery_test.cc
Expand Up @@ -145,8 +145,8 @@ class FileSystemDatasetFactoryTest : public DatasetFactoryTest {
ASSERT_OK_AND_ASSIGN(schema, factory_->Inspect());
}
options_ = ScanOptions::Make(schema);
ASSERT_OK_AND_ASSIGN(source_, factory_->Finish(schema));
AssertFragmentsAreFromPath(source_->GetFragments(options_), paths);
ASSERT_OK_AND_ASSIGN(dataset_, factory_->Finish(schema));
AssertFragmentsAreFromPath(dataset_->GetFragments(options_), paths);
}

protected:
Expand Down Expand Up @@ -279,12 +279,12 @@ TEST(UnionDatasetFactoryTest, Basic) {
auto schema_2 = schema({f64, i32});
auto schema_3 = schema({str, i32});

auto source_1 = DatasetFactoryFromSchemas({schema_1, schema_2});
auto source_2 = DatasetFactoryFromSchemas({schema_2});
auto source_3 = DatasetFactoryFromSchemas({schema_3});
auto dataset_1 = DatasetFactoryFromSchemas({schema_1, schema_2});
auto dataset_2 = DatasetFactoryFromSchemas({schema_2});
auto dataset_3 = DatasetFactoryFromSchemas({schema_3});

ASSERT_OK_AND_ASSIGN(auto factory,
UnionDatasetFactory::Make({source_1, source_2, source_3}));
UnionDatasetFactory::Make({dataset_1, dataset_2, dataset_3}));

ASSERT_OK_AND_ASSIGN(auto schemas, factory->InspectSchemas());
AssertSchemasAre(schemas, {schema_2, schema_2, schema_3});
Expand Down Expand Up @@ -312,13 +312,13 @@ TEST(UnionDatasetFactoryTest, ConflictingSchemas) {
// Incompatible with schema_1
auto schema_3 = schema({bad_f64, i32});

auto source_factory_1 = DatasetFactoryFromSchemas({schema_1, schema_2});
auto source_factory_2 = DatasetFactoryFromSchemas({schema_2});
auto source_factory_3 = DatasetFactoryFromSchemas({schema_3});
auto dataset_factory_1 = DatasetFactoryFromSchemas({schema_1, schema_2});
auto dataset_factory_2 = DatasetFactoryFromSchemas({schema_2});
auto dataset_factory_3 = DatasetFactoryFromSchemas({schema_3});

ASSERT_OK_AND_ASSIGN(
auto factory,
UnionDatasetFactory::Make({source_factory_1, source_factory_2, source_factory_3}));
ASSERT_OK_AND_ASSIGN(auto factory,
UnionDatasetFactory::Make(
{dataset_factory_1, dataset_factory_2, dataset_factory_3}));

// schema_3 conflicts with other, Inspect/Finish should not work
ASSERT_RAISES(Invalid, factory->Inspect());
Expand Down
28 changes: 27 additions & 1 deletion cpp/src/arrow/dataset/file_base.cc
Expand Up @@ -39,6 +39,18 @@ Result<std::shared_ptr<arrow::io::RandomAccessFile>> FileSource::Open() const {
return std::make_shared<::arrow::io::BufferReader>(buffer());
}

Result<std::shared_ptr<Fragment>> FileFormat::MakeFragment(
FileSource source, std::shared_ptr<ScanOptions> options) {
return MakeFragment(std::move(source), std::move(options), scalar(true));
}

Result<std::shared_ptr<Fragment>> FileFormat::MakeFragment(
FileSource source, std::shared_ptr<ScanOptions> options,
std::shared_ptr<Expression> partition_expression) {
return std::make_shared<FileFragment>(std::move(source), shared_from_this(), options,
std::move(partition_expression));
}

Result<ScanTaskIterator> FileFragment::Scan(std::shared_ptr<ScanContext> context) {
return format_->ScanFile(source_, scan_options_, std::move(context));
}
Expand Down Expand Up @@ -143,20 +155,32 @@ util::optional<std::pair<std::string, std::shared_ptr<Scalar>>> GetKey(
internal::checked_cast<const ScalarExpression&>(*cmp.right_operand()).value());
}

std::shared_ptr<Expression> FoldingAnd(const std::shared_ptr<Expression>& l,
const std::shared_ptr<Expression>& r) {
if (l->Equals(true)) return r;
if (r->Equals(true)) return l;
return and_(l, r);
}

FragmentIterator FileSystemDataset::GetFragmentsImpl(
std::shared_ptr<ScanOptions> root_options) {
FragmentVector fragments;
std::vector<std::shared_ptr<ScanOptions>> options(forest_.size());

ExpressionVector fragment_partitions(forest_.size());

auto collect_fragments = [&](fs::PathForest::Ref ref) -> fs::PathForest::MaybePrune {
auto partition = partitions_[ref.i];

// if available, copy parent's filter and projector
// (which are appropriately simplified and loaded with default values)
if (auto parent = ref.parent()) {
options[ref.i].reset(new ScanOptions(*options[parent.i]));
fragment_partitions[ref.i] =
FoldingAnd(fragment_partitions[parent.i], partitions_[ref.i]);
} else {
options[ref.i].reset(new ScanOptions(*root_options));
fragment_partitions[ref.i] = FoldingAnd(partition_expression_, partitions_[ref.i]);
}

// simplify filter by partition information
Expand All @@ -180,7 +204,9 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl(
if (ref.info().IsFile()) {
// generate a fragment for this file
FileSource src(ref.info().path(), filesystem_.get());
ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment(src, options[ref.i]));
ARROW_ASSIGN_OR_RAISE(
auto fragment,
format_->MakeFragment(src, options[ref.i], fragment_partitions[ref.i]));
fragments.push_back(std::move(fragment));
}

Expand Down
24 changes: 21 additions & 3 deletions cpp/src/arrow/dataset/file_base.h
Expand Up @@ -110,13 +110,16 @@ class ARROW_DS_EXPORT FileSource {
};

/// \brief Base class for file format implementation
class ARROW_DS_EXPORT FileFormat {
class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileFormat> {
public:
virtual ~FileFormat() = default;

/// \brief The name identifying the kind of file format
virtual std::string type_name() const = 0;

/// \brief Return true if fragments of this format can benefit from parallel scanning.
virtual bool splittable() const { return false; }

/// \brief Indicate if the FileSource is supported/readable by this format.
virtual Result<bool> IsSupported(const FileSource& source) const = 0;

Expand All @@ -129,8 +132,12 @@ class ARROW_DS_EXPORT FileFormat {
std::shared_ptr<ScanContext> context) const = 0;

/// \brief Open a fragment
virtual Result<std::shared_ptr<Fragment>> MakeFragment(
FileSource location, std::shared_ptr<ScanOptions> options) = 0;
Result<std::shared_ptr<Fragment>> MakeFragment(
FileSource source, std::shared_ptr<ScanOptions> options,
std::shared_ptr<Expression> partition_expression);

Result<std::shared_ptr<Fragment>> MakeFragment(FileSource source,
std::shared_ptr<ScanOptions> options);
};

/// \brief A Fragment that is stored in a file with a known format
Expand All @@ -142,11 +149,22 @@ class ARROW_DS_EXPORT FileFragment : public Fragment {
source_(std::move(source)),
format_(std::move(format)) {}

FileFragment(FileSource source, std::shared_ptr<FileFormat> format,
std::shared_ptr<ScanOptions> scan_options,
std::shared_ptr<Expression> partition_expression)
: Fragment(std::move(scan_options), std::move(partition_expression)),
source_(std::move(source)),
format_(std::move(format)) {}

Result<ScanTaskIterator> Scan(std::shared_ptr<ScanContext> context) override;

const FileSource& source() const { return source_; }
const std::shared_ptr<FileFormat>& format() const { return format_; }

// XXX should this include format_->type_name?
std::string type_name() const override { return "file"; }
bool splittable() const override { return format_->splittable(); }

protected:
FileSource source_;
std::shared_ptr<FileFormat> format_;
Expand Down
5 changes: 0 additions & 5 deletions cpp/src/arrow/dataset/file_ipc.cc
Expand Up @@ -148,10 +148,5 @@ Result<ScanTaskIterator> IpcFileFormat::ScanFile(
return IpcScanTaskIterator::Make(options, context, source);
}

Result<std::shared_ptr<Fragment>> IpcFileFormat::MakeFragment(
FileSource source, std::shared_ptr<ScanOptions> options) {
return std::make_shared<IpcFragment>(std::move(source), std::move(options));
}

} // namespace dataset
} // namespace arrow
13 changes: 2 additions & 11 deletions cpp/src/arrow/dataset/file_ipc.h
Expand Up @@ -33,6 +33,8 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
public:
std::string type_name() const override { return "ipc"; }

bool splittable() const override { return true; }

Result<bool> IsSupported(const FileSource& source) const override;

/// \brief Return the schema of the file if possible.
Expand All @@ -42,17 +44,6 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
Result<ScanTaskIterator> ScanFile(const FileSource& source,
std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context) const override;

Result<std::shared_ptr<Fragment>> MakeFragment(
FileSource source, std::shared_ptr<ScanOptions> options) override;
};

class ARROW_DS_EXPORT IpcFragment : public FileFragment {
public:
IpcFragment(FileSource source, std::shared_ptr<ScanOptions> options)
: FileFragment(std::move(source), std::make_shared<IpcFileFormat>(), options) {}

bool splittable() const override { return true; }
};

} // namespace dataset
Expand Down
23 changes: 10 additions & 13 deletions cpp/src/arrow/dataset/file_ipc_test.cc
Expand Up @@ -97,6 +97,7 @@ class TestIpcFileFormat : public ArrowIpcWriterMixin {
}

protected:
std::shared_ptr<FileFormat> format_ = std::make_shared<IpcFileFormat>();
std::shared_ptr<ScanOptions> opts_;
std::shared_ptr<ScanContext> ctx_ = std::make_shared<ScanContext>();
std::shared_ptr<Schema> schema_ = schema({field("f64", float64())});
Expand All @@ -107,7 +108,7 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReader) {
auto source = GetFileSource(reader.get());

opts_ = ScanOptions::Make(reader->schema());
auto fragment = std::make_shared<IpcFragment>(*source, opts_);
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_));

int64_t row_count = 0;

Expand All @@ -120,17 +121,15 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReader) {
}

TEST_F(TestIpcFileFormat, OpenFailureWithRelevantError) {
auto format = IpcFileFormat();

std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
auto result = format.Inspect(FileSource(buf));
auto result = format_->Inspect(FileSource(buf));
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("<Buffer>"),
result.status());

constexpr auto file_name = "herp/derp";
ASSERT_OK_AND_ASSIGN(
auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)}));
result = format.Inspect({file_name, fs.get()});
result = format_->Inspect({file_name, fs.get()});
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr(file_name),
result.status());
}
Expand All @@ -149,7 +148,7 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjected) {

auto reader = GetRecordBatchReader();
auto source = GetFileSource(reader.get());
auto fragment = std::make_shared<IpcFragment>(*source, opts_);
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_));

int64_t row_count = 0;

Expand Down Expand Up @@ -182,7 +181,7 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjectedMissingCols) {
auto readers = {reader.get(), reader_without_i32.get(), reader_without_f64.get()};
for (auto reader : readers) {
auto source = GetFileSource(reader);
auto fragment = std::make_shared<IpcFragment>(*source, opts_);
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_));

// NB: projector is applied by the scanner; Fragment does not evaluate it.
// We will not drop "i32" even though it is not in the projector's schema.
Expand Down Expand Up @@ -214,28 +213,26 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjectedMissingCols) {
TEST_F(TestIpcFileFormat, Inspect) {
auto reader = GetRecordBatchReader();
auto source = GetFileSource(reader.get());
auto format = IpcFileFormat();

ASSERT_OK_AND_ASSIGN(auto actual, format.Inspect(*source.get()));
ASSERT_OK_AND_ASSIGN(auto actual, format_->Inspect(*source.get()));
EXPECT_EQ(*actual, *schema_);
}

TEST_F(TestIpcFileFormat, IsSupported) {
auto reader = GetRecordBatchReader();
auto source = GetFileSource(reader.get());
auto format = IpcFileFormat();

bool supported = false;

std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
ASSERT_OK_AND_ASSIGN(supported, format.IsSupported(FileSource(buf)));
ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(FileSource(buf)));
ASSERT_EQ(supported, false);

buf = std::make_shared<Buffer>(util::string_view("corrupted"));
ASSERT_OK_AND_ASSIGN(supported, format.IsSupported(FileSource(buf)));
ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(FileSource(buf)));
ASSERT_EQ(supported, false);

ASSERT_OK_AND_ASSIGN(supported, format.IsSupported(*source));
ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source));
EXPECT_EQ(supported, true);
}

Expand Down
6 changes: 0 additions & 6 deletions cpp/src/arrow/dataset/file_parquet.cc
Expand Up @@ -383,11 +383,5 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
std::move(arrow_properties));
}

Result<std::shared_ptr<Fragment>> ParquetFileFormat::MakeFragment(
FileSource source, std::shared_ptr<ScanOptions> options) {
return std::make_shared<ParquetFragment>(std::move(source), shared_from_this(),
std::move(options));
}

} // namespace dataset
} // namespace arrow
18 changes: 3 additions & 15 deletions cpp/src/arrow/dataset/file_parquet.h
Expand Up @@ -39,9 +39,7 @@ namespace arrow {
namespace dataset {

/// \brief A FileFormat implementation that reads from Parquet files
class ARROW_DS_EXPORT ParquetFileFormat
: public FileFormat,
public std::enable_shared_from_this<ParquetFileFormat> {
class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
public:
ParquetFileFormat() = default;

Expand Down Expand Up @@ -77,6 +75,8 @@ class ARROW_DS_EXPORT ParquetFileFormat

std::string type_name() const override { return "parquet"; }

bool splittable() const override { return true; }

Result<bool> IsSupported(const FileSource& source) const override;

/// \brief Return the schema of the file if possible.
Expand All @@ -86,18 +86,6 @@ class ARROW_DS_EXPORT ParquetFileFormat
Result<ScanTaskIterator> ScanFile(const FileSource& source,
std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context) const override;

Result<std::shared_ptr<Fragment>> MakeFragment(
FileSource source, std::shared_ptr<ScanOptions> options) override;
};

class ARROW_DS_EXPORT ParquetFragment : public FileFragment {
public:
ParquetFragment(FileSource source, std::shared_ptr<ParquetFileFormat> format,
std::shared_ptr<ScanOptions> options)
: FileFragment(std::move(source), std::move(format), std::move(options)) {}

bool splittable() const override { return true; }
};

} // namespace dataset
Expand Down

0 comments on commit 018dd80

Please sign in to comment.