Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
bkietz committed Feb 4, 2020
1 parent de58eb7 commit 6d021d9
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 43 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class ARROW_DS_EXPORT FileFormat {

/// \brief Open a fragment
virtual Result<std::shared_ptr<Fragment>> MakeFragment(
const FileSource& location, std::shared_ptr<ScanOptions> options) = 0;
FileSource location, std::shared_ptr<ScanOptions> options) = 0;
};

/// \brief A Fragment that is stored in a file with a known format
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ Result<ScanTaskIterator> IpcFileFormat::ScanFile(
}

Result<std::shared_ptr<Fragment>> IpcFileFormat::MakeFragment(
const FileSource& source, std::shared_ptr<ScanOptions> options) {
return std::make_shared<IpcFragment>(source, options);
FileSource source, std::shared_ptr<ScanOptions> options) {
return std::make_shared<IpcFragment>(std::move(source), std::move(options));
}

} // namespace dataset
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/dataset/file_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
std::shared_ptr<ScanContext> context) const override;

Result<std::shared_ptr<Fragment>> MakeFragment(
const FileSource& source, std::shared_ptr<ScanOptions> options) override;
FileSource source, std::shared_ptr<ScanOptions> options) override;
};

class ARROW_DS_EXPORT IpcFragment : public FileFragment {
public:
IpcFragment(const FileSource& source, std::shared_ptr<ScanOptions> options)
: FileFragment(source, std::make_shared<IpcFileFormat>(), options) {}
IpcFragment(FileSource source, std::shared_ptr<ScanOptions> options)
: FileFragment(std::move(source), std::make_shared<IpcFileFormat>(), options) {}

bool splittable() const override { return true; }
};
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,9 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
}

Result<std::shared_ptr<Fragment>> ParquetFileFormat::MakeFragment(
const FileSource& source, std::shared_ptr<ScanOptions> options) {
return std::make_shared<ParquetFragment>(source, options);
FileSource source, std::shared_ptr<ScanOptions> options) {
auto format = weak_this_.lock();
return std::make_shared<ParquetFragment>(std::move(source), format, std::move(options));
}

} // namespace dataset
Expand Down
19 changes: 16 additions & 3 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <memory>
#include <string>
#include <unordered_set>

#include "arrow/dataset/file_base.h"
#include "arrow/dataset/type_fwd.h"
Expand All @@ -37,6 +38,12 @@ namespace dataset {
/// \brief A FileFormat implementation that reads from Parquet files
class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
public:
static std::shared_ptr<ParquetFileFormat> Make() {
std::shared_ptr<ParquetFileFormat> out{new ParquetFileFormat};
out->weak_this_ = out;
return out;
}

/// \defgroup parquet-file-format-reader-properties properties which correspond to
/// members of parquet::ReaderProperties.
///
Expand Down Expand Up @@ -67,13 +74,19 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
std::shared_ptr<ScanContext> context) const override;

Result<std::shared_ptr<Fragment>> MakeFragment(
const FileSource& source, std::shared_ptr<ScanOptions> options) override;
FileSource source, std::shared_ptr<ScanOptions> options) override;

private:
ParquetFileFormat() = default;

std::weak_ptr<ParquetFileFormat> weak_this_;
};

class ARROW_DS_EXPORT ParquetFragment : public FileFragment {
public:
ParquetFragment(const FileSource& source, std::shared_ptr<ScanOptions> options)
: FileFragment(source, std::make_shared<ParquetFileFormat>(), options) {}
ParquetFragment(FileSource source, std::shared_ptr<ParquetFileFormat> format,
std::shared_ptr<ScanOptions> options)
: FileFragment(std::move(source), std::move(format), std::move(options)) {}

bool splittable() const override { return true; }
};
Expand Down
54 changes: 27 additions & 27 deletions cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class ParquetBufferFixtureMixin : public ArrowParquetWriterMixin {

class TestParquetFileFormat : public ParquetBufferFixtureMixin {
protected:
ParquetFileFormat format_;
std::shared_ptr<ParquetFileFormat> format_ = ParquetFileFormat::Make();
std::shared_ptr<ScanOptions> opts_;
std::shared_ptr<ScanContext> ctx_ = std::make_shared<ScanContext>();
};
Expand All @@ -169,7 +169,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReader) {
auto source = GetFileSource(reader.get());

opts_ = ScanOptions::Make(reader->schema());
auto fragment = std::make_shared<ParquetFragment>(*source, opts_);
auto fragment = std::make_shared<ParquetFragment>(*source, format_, opts_);

ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_));
int64_t row_count = 0;
Expand All @@ -192,8 +192,8 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderDictEncoded) {

opts_ = ScanOptions::Make(reader->schema());

format_.read_dict_indices.insert(0);
ASSERT_OK_AND_ASSIGN(auto fragment, format_.MakeFragment(*source, opts_));
format_->read_dict_indices.insert(0);
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_));

ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_));
int64_t row_count = 0;
Expand All @@ -215,14 +215,14 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderDictEncoded) {

TEST_F(TestParquetFileFormat, OpenFailureWithRelevantError) {
std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
auto result = format_.Inspect(FileSource(buf));
auto result = format_->Inspect(FileSource(buf));
EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, testing::HasSubstr("<Buffer>"),
result.status());

constexpr auto file_name = "herp/derp";
ASSERT_OK_AND_ASSIGN(
auto fs, fs::internal::MockFileSystem::Make(fs::kNoTime, {fs::File(file_name)}));
result = format_.Inspect({file_name, fs.get()});
result = format_->Inspect({file_name, fs.get()});
EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, testing::HasSubstr(file_name),
result.status());
}
Expand All @@ -241,7 +241,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjected) {

auto reader = GetRecordBatchReader();
auto source = GetFileSource(reader.get());
auto fragment = std::make_shared<ParquetFragment>(*source, opts_);
auto fragment = std::make_shared<ParquetFragment>(*source, format_, opts_);

ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_));
int64_t row_count = 0;
Expand Down Expand Up @@ -282,7 +282,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjectedMissingCols) {

auto source =
GetFileSource({reader.get(), reader_without_i32.get(), reader_without_f64.get()});
auto fragment = std::make_shared<ParquetFragment>(*source, opts_);
auto fragment = std::make_shared<ParquetFragment>(*source, format_, opts_);

ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_));
int64_t row_count = 0;
Expand All @@ -303,16 +303,16 @@ TEST_F(TestParquetFileFormat, Inspect) {
auto reader = GetRecordBatchReader();
auto source = GetFileSource(reader.get());

ASSERT_OK_AND_ASSIGN(auto actual, format_.Inspect(*source.get()));
ASSERT_OK_AND_ASSIGN(auto actual, format_->Inspect(*source.get()));
EXPECT_EQ(*actual, *schema_);
}

TEST_F(TestParquetFileFormat, InspectDictEncoded) {
auto reader = GetRecordBatchReader();
auto source = GetFileSource(reader.get());

format_.read_dict_indices.insert(0);
ASSERT_OK_AND_ASSIGN(auto actual, format_.Inspect(*source.get()));
format_->read_dict_indices.insert(0);
ASSERT_OK_AND_ASSIGN(auto actual, format_->Inspect(*source.get()));

Schema expected_schema({field("f64", dictionary(int32(), float64()))});
EXPECT_EQ(*actual, expected_schema);
Expand All @@ -325,14 +325,14 @@ TEST_F(TestParquetFileFormat, IsSupported) {
bool supported = false;

std::shared_ptr<Buffer> buf = std::make_shared<Buffer>(util::string_view(""));
ASSERT_OK_AND_ASSIGN(supported, format_.IsSupported(FileSource(buf)));
ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(FileSource(buf)));
ASSERT_EQ(supported, false);

buf = std::make_shared<Buffer>(util::string_view("corrupted"));
ASSERT_OK_AND_ASSIGN(supported, format_.IsSupported(FileSource(buf)));
ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(FileSource(buf)));
ASSERT_EQ(supported, false);

ASSERT_OK_AND_ASSIGN(supported, format_.IsSupported(*source));
ASSERT_OK_AND_ASSIGN(supported, format_->IsSupported(*source));
EXPECT_EQ(supported, true);
}

Expand All @@ -357,12 +357,12 @@ void CountRowsInScan(ScanTaskIterator& it, int64_t expected_rows,

class TestParquetFileFormatPushDown : public TestParquetFileFormat {
public:
void CountRowsAndBatchesInScan(Fragment& fragment, int64_t expected_rows,
int64_t expected_batches) {
void CountRowsAndBatchesInScan(const std::shared_ptr<Fragment>& fragment,
int64_t expected_rows, int64_t expected_batches) {
int64_t actual_rows = 0;
int64_t actual_batches = 0;

ASSERT_OK_AND_ASSIGN(auto it, fragment.Scan(ctx_));
ASSERT_OK_AND_ASSIGN(auto it, fragment->Scan(ctx_));
for (auto maybe_scan_task : it) {
ASSERT_OK_AND_ASSIGN(auto scan_task, std::move(maybe_scan_task));
ASSERT_OK_AND_ASSIGN(auto rb_it, scan_task->Execute());
Expand Down Expand Up @@ -398,35 +398,35 @@ TEST_F(TestParquetFileFormatPushDown, Basic) {
auto source = GetFileSource(reader.get());

opts_ = ScanOptions::Make(reader->schema());
auto fragment = std::make_shared<ParquetFragment>(*source, opts_);
auto fragment = std::make_shared<ParquetFragment>(*source, format_, opts_);

opts_->filter = scalar(true);
CountRowsAndBatchesInScan(*fragment, kTotalNumRows, kNumRowGroups);
CountRowsAndBatchesInScan(fragment, kTotalNumRows, kNumRowGroups);

for (int64_t i = 1; i <= kNumRowGroups; i++) {
opts_->filter = ("i64"_ == int64_t(i)).Copy();
CountRowsAndBatchesInScan(*fragment, i, 1);
CountRowsAndBatchesInScan(fragment, i, 1);
}

/* Out of bound filters should skip all RowGroups. */
opts_->filter = scalar(false);
CountRowsAndBatchesInScan(*fragment, 0, 0);
CountRowsAndBatchesInScan(fragment, 0, 0);
opts_->filter = ("i64"_ == int64_t(kNumRowGroups + 1)).Copy();
CountRowsAndBatchesInScan(*fragment, 0, 0);
CountRowsAndBatchesInScan(fragment, 0, 0);
opts_->filter = ("i64"_ == int64_t(-1)).Copy();
CountRowsAndBatchesInScan(*fragment, 0, 0);
CountRowsAndBatchesInScan(fragment, 0, 0);
// No rows match 1 and 2.
opts_->filter = ("i64"_ == int64_t(1) and "u8"_ == uint8_t(2)).Copy();
CountRowsAndBatchesInScan(*fragment, 0, 0);
CountRowsAndBatchesInScan(fragment, 0, 0);

opts_->filter = ("i64"_ == int64_t(2) or "i64"_ == int64_t(4)).Copy();
CountRowsAndBatchesInScan(*fragment, 2 + 4, 2);
CountRowsAndBatchesInScan(fragment, 2 + 4, 2);

opts_->filter = ("i64"_ < int64_t(6)).Copy();
CountRowsAndBatchesInScan(*fragment, 5 * (5 + 1) / 2, 5);
CountRowsAndBatchesInScan(fragment, 5 * (5 + 1) / 2, 5);

opts_->filter = ("i64"_ >= int64_t(6)).Copy();
CountRowsAndBatchesInScan(*fragment, kTotalNumRows - (5 * (5 + 1) / 2),
CountRowsAndBatchesInScan(fragment, kTotalNumRows - (5 * (5 + 1) / 2),
kNumRowGroups - 5);
}

Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/dataset/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class DummyFileFormat : public FileFormat {
}

inline Result<std::shared_ptr<Fragment>> MakeFragment(
const FileSource& location, std::shared_ptr<ScanOptions> options) override;
FileSource source, std::shared_ptr<ScanOptions> options) override;

protected:
std::shared_ptr<Schema> schema_;
Expand All @@ -211,7 +211,7 @@ class DummyFragment : public FileFragment {
};

Result<std::shared_ptr<Fragment>> DummyFileFormat::MakeFragment(
const FileSource& source, std::shared_ptr<ScanOptions> options) {
FileSource source, std::shared_ptr<ScanOptions> options) {
return std::make_shared<DummyFragment>(source, options);
}

Expand Down Expand Up @@ -251,7 +251,7 @@ class JSONRecordBatchFileFormat : public FileFormat {
}

inline Result<std::shared_ptr<Fragment>> MakeFragment(
const FileSource& location, std::shared_ptr<ScanOptions> options) override;
FileSource source, std::shared_ptr<ScanOptions> options) override;

protected:
SchemaResolver resolver_;
Expand All @@ -268,7 +268,7 @@ class JSONRecordBatchFragment : public FileFragment {
};

Result<std::shared_ptr<Fragment>> JSONRecordBatchFileFormat::MakeFragment(
const FileSource& source, std::shared_ptr<ScanOptions> options) {
FileSource source, std::shared_ptr<ScanOptions> options) {
return std::make_shared<JSONRecordBatchFragment>(source, resolver_(source), options);
}

Expand Down
8 changes: 8 additions & 0 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,17 @@ cdef class FileFormat:

cdef class ParquetFileFormat(FileFormat):

cdef:
CParquetFileFormat* parquet_format

def __init__(self):
self.init(shared_ptr[CFileFormat](new CParquetFileFormat()))

@property
def use_buffered_stream(self):
"""The arrow Schema describing the partition scheme."""
return self.wrapped.


cdef class Partitioning:

Expand Down
5 changes: 4 additions & 1 deletion python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,10 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:

cdef cppclass CParquetFileFormat "arrow::dataset::ParquetFileFormat"(
CFileFormat):
pass
c_bool use_buffered_stream
int64_t buffer_size
unordered_set[c_int] read_dict_indices
int64_t batch_size

cdef cppclass CParquetFragment "arrow::dataset::ParquetFragment"(
CFileFragment):
Expand Down

0 comments on commit 6d021d9

Please sign in to comment.