Skip to content

Commit

Permalink
GH-41749: [GLib] Allow getting a RecordBatchReader from a Dataset or …
Browse files Browse the repository at this point in the history
…Scanner (#41750)

### Rationale for this change

See #41749

### What changes are included in this PR?

Adds `to_reader` methods to `GADatasetDataset` and `GADatasetScanner`.

### Are these changes tested?

Yes I've added new unit tests.

### Are there any user-facing changes?

Yes this is a new feature.
* GitHub Issue: #41749

Lead-authored-by: Adam Reeve <adreeve@gmail.com>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
adamreeve and kou committed May 25, 2024
1 parent 283f66f commit 1c9e393
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 4 deletions.
37 changes: 36 additions & 1 deletion c_glib/arrow-dataset-glib/dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <arrow-glib/error.hpp>
#include <arrow-glib/file-system.hpp>
#include <arrow-glib/reader.hpp>
#include <arrow-glib/table.hpp>

#include <arrow-dataset-glib/dataset-factory.hpp>
Expand Down Expand Up @@ -152,12 +153,46 @@ gadataset_dataset_to_table(GADatasetDataset *dataset, GError **error)
}
auto arrow_scanner = *arrow_scanner_result;
auto arrow_table_result = arrow_scanner->ToTable();
if (!garrow::check(error, arrow_scanner_result, "[dataset][to-table]")) {
if (!garrow::check(error, arrow_table_result, "[dataset][to-table]")) {
return NULL;
}
return garrow_table_new_raw(&(*arrow_table_result));
}

/**
* gadataset_dataset_to_record_batch_reader:
* @dataset: A #GADatasetDataset.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: (transfer full) (nullable):
* A #GArrowRecordBatchReader on success, %NULL on error.
*
* Since: 17.0.0
*/
GArrowRecordBatchReader *
gadataset_dataset_to_record_batch_reader(GADatasetDataset *dataset, GError **error)
{
auto arrow_dataset = gadataset_dataset_get_raw(dataset);
auto arrow_scanner_builder_result = arrow_dataset->NewScan();
if (!garrow::check(error,
arrow_scanner_builder_result,
"[dataset][to-record-batch-reader]")) {
return nullptr;
}
auto arrow_scanner_builder = *arrow_scanner_builder_result;
auto arrow_scanner_result = arrow_scanner_builder->Finish();
if (!garrow::check(error, arrow_scanner_result, "[dataset][to-record-batch-reader]")) {
return nullptr;
}
auto arrow_scanner = *arrow_scanner_result;
auto arrow_reader_result = arrow_scanner->ToRecordBatchReader();
if (!garrow::check(error, arrow_reader_result, "[dataset][to-record-batch-reader]")) {
return nullptr;
}
auto sources = g_list_prepend(nullptr, dataset);
return garrow_record_batch_reader_new_raw(&(*arrow_reader_result), sources);
}

/**
* gadataset_dataset_get_type_name:
* @dataset: A #GADatasetDataset.
Expand Down
3 changes: 3 additions & 0 deletions c_glib/arrow-dataset-glib/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ gadataset_dataset_to_table(GADatasetDataset *dataset, GError **error);
GADATASET_AVAILABLE_IN_5_0
gchar *
gadataset_dataset_get_type_name(GADatasetDataset *dataset);
GADATASET_AVAILABLE_IN_17_0
GArrowRecordBatchReader *
gadataset_dataset_to_record_batch_reader(GADatasetDataset *dataset, GError **error);

#define GADATASET_TYPE_FILE_SYSTEM_DATASET_WRITE_OPTIONS \
(gadataset_file_system_dataset_write_options_get_type())
Expand Down
22 changes: 22 additions & 0 deletions c_glib/arrow-dataset-glib/scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,28 @@ gadataset_scanner_to_table(GADatasetScanner *scanner, GError **error)
}
}

/**
* gadataset_scanner_to_record_batch_reader:
* @scanner: A #GADatasetScanner.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: (transfer full) (nullable):
* A #GArrowRecordBatchReader on success, %NULL on error.
*
* Since: 17.0.0
*/
GArrowRecordBatchReader *
gadataset_scanner_to_record_batch_reader(GADatasetScanner *scanner, GError **error)
{
auto arrow_scanner = gadataset_scanner_get_raw(scanner);
auto arrow_reader_result = arrow_scanner->ToRecordBatchReader();
if (!garrow::check(error, arrow_reader_result, "[scanner][to-record-batch-reader]")) {
return nullptr;
}
auto sources = g_list_prepend(nullptr, scanner);
return garrow_record_batch_reader_new_raw(&(*arrow_reader_result), sources);
}

typedef struct GADatasetScannerBuilderPrivate_
{
std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder;
Expand Down
4 changes: 4 additions & 0 deletions c_glib/arrow-dataset-glib/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ GADATASET_AVAILABLE_IN_5_0
GArrowTable *
gadataset_scanner_to_table(GADatasetScanner *scanner, GError **error);

GADATASET_AVAILABLE_IN_17_0
GArrowRecordBatchReader *
gadataset_scanner_to_record_batch_reader(GADatasetScanner *scanner, GError **error);

#define GADATASET_TYPE_SCANNER_BUILDER (gadataset_scanner_builder_get_type())
GADATASET_AVAILABLE_IN_5_0
G_DECLARE_DERIVABLE_TYPE(
Expand Down
24 changes: 21 additions & 3 deletions c_glib/test/dataset/test-file-system-dataset.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ def test_partitioning
end

def test_read_write
dataset, expected_table = create_dataset
assert_equal(expected_table, dataset.to_table)
end

def test_to_record_batch_reader
dataset, expected_table = create_dataset
reader = dataset.to_record_batch_reader
begin
assert_equal(expected_table, reader.read_all)
ensure
# Unref to ensure the reader closes files and we can delete the temp directory
reader.unref
end
end

def create_dataset
table = build_table(label: build_string_array(["a", "a", "b", "c"]),
count: build_int32_array([1, 10, 2, 3]))
table_reader = Arrow::TableBatchReader.new(table)
Expand All @@ -73,7 +89,8 @@ def test_read_write
end
@factory.partition_base_dir = @dir
dataset = @factory.finish
assert_equal(build_table(count: [

expected_table = build_table(count: [
build_int32_array([1, 10]),
build_int32_array([2]),
build_int32_array([3]),
Expand All @@ -82,7 +99,8 @@ def test_read_write
build_string_array(["a", "a"]),
build_string_array(["b"]),
build_string_array(["c"]),
]),
dataset.to_table)
])

return dataset, expected_table
end
end
10 changes: 10 additions & 0 deletions c_glib/test/dataset/test-scanner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,14 @@ def setup
def test_to_table
assert_equal(@table, @scanner.to_table)
end

def test_to_record_batch_reader
reader = @scanner.to_record_batch_reader
begin
assert_equal(@table, reader.read_all)
ensure
# Unref to ensure the reader closes files and we can delete the temp directory
reader.unref
end
end
end

0 comments on commit 1c9e393

Please sign in to comment.