Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41749: [GLib] Allow getting a RecordBatchReader from a Dataset or Scanner #41750

Merged
merged 9 commits into from
May 25, 2024
37 changes: 36 additions & 1 deletion c_glib/arrow-dataset-glib/dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <arrow-glib/error.hpp>
#include <arrow-glib/file-system.hpp>
#include <arrow-glib/reader.hpp>
#include <arrow-glib/table.hpp>

#include <arrow-dataset-glib/dataset-factory.hpp>
Expand Down Expand Up @@ -152,12 +153,46 @@ gadataset_dataset_to_table(GADatasetDataset *dataset, GError **error)
}
auto arrow_scanner = *arrow_scanner_result;
auto arrow_table_result = arrow_scanner->ToTable();
if (!garrow::check(error, arrow_scanner_result, "[dataset][to-table]")) {
if (!garrow::check(error, arrow_table_result, "[dataset][to-table]")) {
adamreeve marked this conversation as resolved.
Show resolved Hide resolved
return NULL;
}
return garrow_table_new_raw(&(*arrow_table_result));
}

/**
* gadataset_dataset_to_record_batch_reader:
* @dataset: A #GADatasetDataset.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: (transfer full) (nullable):
* A #GArrowRecordBatchReader on success, %NULL on error.
*
* Since: 17.0.0
*/
GArrowRecordBatchReader *
gadataset_dataset_to_record_batch_reader(GADatasetDataset *dataset, GError **error)
{
auto arrow_dataset = gadataset_dataset_get_raw(dataset);
auto arrow_scanner_builder_result = arrow_dataset->NewScan();
if (!garrow::check(error,
arrow_scanner_builder_result,
"[dataset][to-record-batch-reader]")) {
return nullptr;
}
auto arrow_scanner_builder = *arrow_scanner_builder_result;
auto arrow_scanner_result = arrow_scanner_builder->Finish();
if (!garrow::check(error, arrow_scanner_result, "[dataset][to-record-batch-reader]")) {
return nullptr;
}
auto arrow_scanner = *arrow_scanner_result;
auto arrow_reader_result = arrow_scanner->ToRecordBatchReader();
if (!garrow::check(error, arrow_reader_result, "[dataset][to-record-batch-reader]")) {
return nullptr;
}
auto sources = g_list_prepend(nullptr, dataset);
return garrow_record_batch_reader_new_raw(&(*arrow_reader_result), sources);
}

/**
* gadataset_dataset_get_type_name:
* @dataset: A #GADatasetDataset.
Expand Down
3 changes: 3 additions & 0 deletions c_glib/arrow-dataset-glib/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ gadataset_dataset_to_table(GADatasetDataset *dataset, GError **error);
GADATASET_AVAILABLE_IN_5_0
gchar *
gadataset_dataset_get_type_name(GADatasetDataset *dataset);
GADATASET_AVAILABLE_IN_17_0
GArrowRecordBatchReader *
gadataset_dataset_to_record_batch_reader(GADatasetDataset *dataset, GError **error);

#define GADATASET_TYPE_FILE_SYSTEM_DATASET_WRITE_OPTIONS \
(gadataset_file_system_dataset_write_options_get_type())
Expand Down
22 changes: 22 additions & 0 deletions c_glib/arrow-dataset-glib/scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,28 @@ gadataset_scanner_to_table(GADatasetScanner *scanner, GError **error)
}
}

/**
* gadataset_scanner_to_record_batch_reader:
* @scanner: A #GADatasetScanner.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: (transfer full) (nullable):
* A #GArrowRecordBatchReader on success, %NULL on error.
*
* Since: 17.0.0
*/
GArrowRecordBatchReader *
gadataset_scanner_to_record_batch_reader(GADatasetScanner *scanner, GError **error)
{
auto arrow_scanner = gadataset_scanner_get_raw(scanner);
auto arrow_reader_result = arrow_scanner->ToRecordBatchReader();
if (!garrow::check(error, arrow_reader_result, "[scanner][to-record-batch-reader]")) {
return nullptr;
}
auto sources = g_list_prepend(nullptr, scanner);
return garrow_record_batch_reader_new_raw(&(*arrow_reader_result), sources);
}

typedef struct GADatasetScannerBuilderPrivate_
{
std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder;
Expand Down
4 changes: 4 additions & 0 deletions c_glib/arrow-dataset-glib/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ GADATASET_AVAILABLE_IN_5_0
GArrowTable *
gadataset_scanner_to_table(GADatasetScanner *scanner, GError **error);

GADATASET_AVAILABLE_IN_17_0
GArrowRecordBatchReader *
gadataset_scanner_to_record_batch_reader(GADatasetScanner *scanner, GError **error);

#define GADATASET_TYPE_SCANNER_BUILDER (gadataset_scanner_builder_get_type())
GADATASET_AVAILABLE_IN_5_0
G_DECLARE_DERIVABLE_TYPE(
Expand Down
24 changes: 21 additions & 3 deletions c_glib/test/dataset/test-file-system-dataset.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ def test_partitioning
end

def test_read_write
dataset, expected_table = create_dataset
assert_equal(expected_table, dataset.to_table)
end

def test_to_record_batch_reader
dataset, expected_table = create_dataset
reader = dataset.to_record_batch_reader
begin
assert_equal(expected_table, reader.read_all)
ensure
# Unref to ensure the reader closes files and we can delete the temp directory
reader.unref
end
end

def create_dataset
table = build_table(label: build_string_array(["a", "a", "b", "c"]),
count: build_int32_array([1, 10, 2, 3]))
table_reader = Arrow::TableBatchReader.new(table)
Expand All @@ -73,7 +89,8 @@ def test_read_write
end
@factory.partition_base_dir = @dir
dataset = @factory.finish
assert_equal(build_table(count: [

expected_table = build_table(count: [
build_int32_array([1, 10]),
build_int32_array([2]),
build_int32_array([3]),
Expand All @@ -82,7 +99,8 @@ def test_read_write
build_string_array(["a", "a"]),
build_string_array(["b"]),
build_string_array(["c"]),
]),
dataset.to_table)
])

return dataset, expected_table
end
end
10 changes: 10 additions & 0 deletions c_glib/test/dataset/test-scanner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,14 @@ def setup
def test_to_table
assert_equal(@table, @scanner.to_table)
end

def test_to_record_batch_reader
reader = @scanner.to_record_batch_reader
begin
assert_equal(@table, reader.read_all)
ensure
# Unref to ensure the reader closes files and we can delete the temp directory
reader.unref
end
end
end
Loading