Skip to content

Commit

Permalink
ARROW-12355: Provided ScanBatchesAsync override that uses async CSV
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed Apr 19, 2021
1 parent 32acd37 commit 4f0b2e0
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 18 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
// formats should provide their own efficient implementation.
Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& scan_options,
const std::shared_ptr<FileFragment>& file) {
const std::shared_ptr<FileFragment>& file) const {
ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
struct State {
State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it)
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileForma

virtual Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& file);
const std::shared_ptr<FileFragment>& file) const;

/// \brief Open a fragment
virtual Result<std::shared_ptr<FileFragment>> MakeFragment(
Expand Down
36 changes: 22 additions & 14 deletions cpp/src/arrow/dataset/file_csv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ Result<std::unordered_set<std::string>> GetColumnNames(

static inline Result<csv::ConvertOptions> GetConvertOptions(
const CsvFileFormat& format, const std::shared_ptr<ScanOptions>& scan_options,
const util::string_view first_block, MemoryPool* pool) {
ARROW_ASSIGN_OR_RAISE(auto column_names,
GetColumnNames(format.parse_options, first_block, pool));
const util::string_view first_block) {
ARROW_ASSIGN_OR_RAISE(
auto column_names,
GetColumnNames(format.parse_options, first_block, scan_options->pool));

ARROW_ASSIGN_OR_RAISE(
auto csv_scan_options,
Expand Down Expand Up @@ -116,8 +117,7 @@ static inline Result<csv::ReadOptions> GetReadOptions(

static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
const FileSource& source, const CsvFileFormat& format,
const std::shared_ptr<ScanOptions>& scan_options, internal::Executor* cpu_executor,
MemoryPool* pool) {
const std::shared_ptr<ScanOptions>& scan_options, internal::Executor* cpu_executor) {
ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));

ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
Expand All @@ -133,8 +133,8 @@ static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
const auto& parse_options = format.parse_options;
auto convert_options = csv::ConvertOptions::Defaults();
if (scan_options != nullptr) {
ARROW_ASSIGN_OR_RAISE(convert_options, GetConvertOptions(format, scan_options,
first_block, pool));
ARROW_ASSIGN_OR_RAISE(convert_options,
GetConvertOptions(format, scan_options, first_block));
}

auto reader_fut = csv::StreamingReader::MakeAsync(
Expand All @@ -156,7 +156,7 @@ static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
const std::shared_ptr<ScanOptions>& scan_options = nullptr,
MemoryPool* pool = default_memory_pool()) {
auto open_reader_fut =
OpenReaderAsync(source, format, scan_options, internal::GetCpuThreadPool(), pool);
OpenReaderAsync(source, format, scan_options, internal::GetCpuThreadPool());
return open_reader_fut.result();
}

Expand All @@ -180,24 +180,22 @@ class CsvScanTask : public ScanTask {
source_(fragment->source()) {}

Result<RecordBatchIterator> Execute() override {
auto reader_fut = OpenReaderAsync(source_, *format_, options(),
internal::GetCpuThreadPool(), options()->pool);
auto reader_fut =
OpenReaderAsync(source_, *format_, options(), internal::GetCpuThreadPool());
auto reader_gen = GeneratorFromReader(std::move(reader_fut));
return MakeGeneratorIterator(std::move(reader_gen));
}

Future<RecordBatchVector> SafeExecute(internal::Executor* executor) override {
auto reader_fut =
OpenReaderAsync(source_, *format_, options(), executor, options()->pool);
auto reader_fut = OpenReaderAsync(source_, *format_, options(), executor);
auto reader_gen = GeneratorFromReader(std::move(reader_fut));
return CollectAsyncGenerator(reader_gen);
}

Future<> SafeVisit(
internal::Executor* executor,
std::function<Status(std::shared_ptr<RecordBatch>)> visitor) override {
auto reader_fut =
OpenReaderAsync(source_, *format_, options(), executor, options()->pool);
auto reader_fut = OpenReaderAsync(source_, *format_, options(), executor);
auto reader_gen = GeneratorFromReader(std::move(reader_fut));
return VisitAsyncGenerator(reader_gen, visitor);
}
Expand Down Expand Up @@ -242,5 +240,15 @@ Result<ScanTaskIterator> CsvFileFormat::ScanFile(
return MakeVectorIterator<std::shared_ptr<ScanTask>>({std::move(task)});
}

Result<RecordBatchGenerator> CsvFileFormat::ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& scan_options,
const std::shared_ptr<FileFragment>& file) const {
auto this_ = checked_pointer_cast<const CsvFileFormat>(shared_from_this());
auto source = file->source();
auto reader_fut =
OpenReaderAsync(source, *this, scan_options, internal::GetCpuThreadPool());
return GeneratorFromReader(std::move(reader_fut));
}

} // namespace dataset
} // namespace arrow
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/file_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& fragment) const override;

Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& scan_options,
const std::shared_ptr<FileFragment>& file) const override;

Result<std::shared_ptr<FileWriter>> MakeWriter(
std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
std::shared_ptr<FileWriteOptions> options) const override {
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -516,8 +516,9 @@ Result<EnumeratedRecordBatchGenerator> AsyncScanner::ScanBatchesUnorderedAsync(
FragmentsToBatches(self, std::move(fragment_gen)));
auto batch_gen_gen_readahead = MakeSerialReadaheadGenerator(
std::move(batch_gen_gen), scan_options_->fragment_readahead);
return MakeMergedGenerator(std::move(batch_gen_gen_readahead),
scan_options_->fragment_readahead);
auto batch_gen = MakeMergedGenerator(std::move(batch_gen_gen_readahead),
scan_options_->fragment_readahead);
return MakeReadaheadGenerator(batch_gen, scan_options_->fragment_readahead);
}

Result<TaggedRecordBatchGenerator> AsyncScanner::ScanBatchesAsync() {
Expand Down

0 comments on commit 4f0b2e0

Please sign in to comment.