Skip to content

Commit

Permalink
ARROW-8372: [C++] Migrate Table and RecordBatch APIs to Result<T>
Browse files Browse the repository at this point in the history
Closes #6876 from pitrou/ARROW-8372-result-batch-apis

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
pitrou authored and wesm committed Apr 8, 2020
1 parent b056e5e commit 2fc20fe
Show file tree
Hide file tree
Showing 28 changed files with 379 additions and 362 deletions.
14 changes: 6 additions & 8 deletions c_glib/arrow-glib/record-batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,9 @@ garrow_record_batch_add_column(GArrowRecordBatch *record_batch,
const auto arrow_record_batch = garrow_record_batch_get_raw(record_batch);
const auto arrow_field = garrow_field_get_raw(field);
const auto arrow_column = garrow_array_get_raw(column);
std::shared_ptr<arrow::RecordBatch> arrow_new_record_batch;
auto status = arrow_record_batch->AddColumn(i, arrow_field, arrow_column, &arrow_new_record_batch);
if (garrow_error_check(error, status, "[record-batch][add-column]")) {
return garrow_record_batch_new_raw(&arrow_new_record_batch);
auto maybe_new_batch = arrow_record_batch->AddColumn(i, arrow_field, arrow_column);
if (garrow::check(error, maybe_new_batch, "[record-batch][add-column]")) {
return garrow_record_batch_new_raw(&(*maybe_new_batch));
} else {
return NULL;
}
Expand All @@ -399,10 +398,9 @@ garrow_record_batch_remove_column(GArrowRecordBatch *record_batch,
GError **error)
{
const auto arrow_record_batch = garrow_record_batch_get_raw(record_batch);
std::shared_ptr<arrow::RecordBatch> arrow_new_record_batch;
auto status = arrow_record_batch->RemoveColumn(i, &arrow_new_record_batch);
if (garrow_error_check(error, status, "[record-batch][remove-column]")) {
return garrow_record_batch_new_raw(&arrow_new_record_batch);
auto maybe_new_batch = arrow_record_batch->RemoveColumn(i);
if (garrow::check(error, maybe_new_batch, "[record-batch][remove-column]")) {
return garrow_record_batch_new_raw(&(*maybe_new_batch));
} else {
return NULL;
}
Expand Down
21 changes: 9 additions & 12 deletions c_glib/arrow-glib/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,9 @@ garrow_schema_add_field(GArrowSchema *schema,
{
const auto arrow_schema = garrow_schema_get_raw(schema);
const auto arrow_field = garrow_field_get_raw(field);
std::shared_ptr<arrow::Schema> arrow_new_schema;
auto status = arrow_schema->AddField(i, arrow_field, &arrow_new_schema);
if (garrow_error_check(error, status, "[schema][add-field]")) {
return garrow_schema_new_raw(&arrow_new_schema);
auto maybe_new_schema = arrow_schema->AddField(i, arrow_field);
if (garrow::check(error, maybe_new_schema, "[schema][add-field]")) {
return garrow_schema_new_raw(&(*maybe_new_schema));
} else {
return NULL;
}
Expand All @@ -324,10 +323,9 @@ garrow_schema_remove_field(GArrowSchema *schema,
GError **error)
{
const auto arrow_schema = garrow_schema_get_raw(schema);
std::shared_ptr<arrow::Schema> arrow_new_schema;
auto status = arrow_schema->RemoveField(i, &arrow_new_schema);
if (garrow_error_check(error, status, "[schema][remove-field]")) {
return garrow_schema_new_raw(&arrow_new_schema);
auto maybe_new_schema = arrow_schema->RemoveField(i);
if (garrow::check(error, maybe_new_schema, "[schema][remove-field]")) {
return garrow_schema_new_raw(&(*maybe_new_schema));
} else {
return NULL;
}
Expand All @@ -353,10 +351,9 @@ garrow_schema_replace_field(GArrowSchema *schema,
{
const auto arrow_schema = garrow_schema_get_raw(schema);
const auto arrow_field = garrow_field_get_raw(field);
std::shared_ptr<arrow::Schema> arrow_new_schema;
auto status = arrow_schema->SetField(i, arrow_field, &arrow_new_schema);
if (garrow_error_check(error, status, "[schema][replace-field]")) {
return garrow_schema_new_raw(&arrow_new_schema);
auto maybe_new_schema = arrow_schema->SetField(i, arrow_field);
if (garrow::check(error, maybe_new_schema, "[schema][replace-field]")) {
return garrow_schema_new_raw(&(*maybe_new_schema));
} else {
return NULL;
}
Expand Down
64 changes: 27 additions & 37 deletions c_glib/arrow-glib/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,28 +203,27 @@ garrow_table_new_values(GArrowSchema *schema,
}

if (!arrow_chunked_arrays.empty()) {
auto arrow_table = arrow::Table::Make(arrow_schema, arrow_chunked_arrays);
auto arrow_table = arrow::Table::Make(arrow_schema,
std::move(arrow_chunked_arrays));
auto status = arrow_table->Validate();
if (garrow_error_check(error, status, context)) {
return garrow_table_new_raw(&arrow_table);
} else {
return NULL;
}
} else if (!arrow_arrays.empty()) {
auto arrow_table = arrow::Table::Make(arrow_schema, arrow_arrays);
auto arrow_table = arrow::Table::Make(arrow_schema, std::move(arrow_arrays));
auto status = arrow_table->Validate();
if (garrow_error_check(error, status, context)) {
return garrow_table_new_raw(&arrow_table);
} else {
return NULL;
}
} else {
std::shared_ptr<arrow::Table> arrow_table;
auto status = arrow::Table::FromRecordBatches(arrow_schema,
arrow_record_batches,
&arrow_table);
if (garrow_error_check(error, status, context)) {
return garrow_table_new_raw(&arrow_table);
auto maybe_table = arrow::Table::FromRecordBatches(
arrow_schema, std::move(arrow_record_batches));
if (garrow::check(error, maybe_table, context)) {
return garrow_table_new_raw(&(*maybe_table));
} else {
return NULL;
}
Expand Down Expand Up @@ -322,12 +321,10 @@ garrow_table_new_record_batches(GArrowSchema *schema,
arrow_record_batches.push_back(arrow_record_batch);
}

std::shared_ptr<arrow::Table> arrow_table;
auto status = arrow::Table::FromRecordBatches(arrow_schema,
arrow_record_batches,
&arrow_table);
if (garrow_error_check(error, status, "[table][new][record-batches]")) {
return garrow_table_new_raw(&arrow_table);
auto maybe_table = arrow::Table::FromRecordBatches(arrow_schema,
arrow_record_batches);
if (garrow::check(error, maybe_table, "[table][new][record-batches]")) {
return garrow_table_new_raw(&(*maybe_table));
} else {
return NULL;
}
Expand Down Expand Up @@ -458,13 +455,11 @@ garrow_table_add_column(GArrowTable *table,
const auto arrow_table = garrow_table_get_raw(table);
const auto arrow_field = garrow_field_get_raw(field);
const auto arrow_chunked_array = garrow_chunked_array_get_raw(chunked_array);
std::shared_ptr<arrow::Table> arrow_new_table;
auto status = arrow_table->AddColumn(i,
arrow_field,
arrow_chunked_array,
&arrow_new_table);
if (garrow_error_check(error, status, "[table][add-column]")) {
return garrow_table_new_raw(&arrow_new_table);
auto maybe_new_table = arrow_table->AddColumn(i,
arrow_field,
arrow_chunked_array);
if (garrow::check(error, maybe_new_table, "[table][add-column]")) {
return garrow_table_new_raw(&(*maybe_new_table));
} else {
return NULL;
}
Expand All @@ -487,10 +482,9 @@ garrow_table_remove_column(GArrowTable *table,
GError **error)
{
const auto arrow_table = garrow_table_get_raw(table);
std::shared_ptr<arrow::Table> arrow_new_table;
auto status = arrow_table->RemoveColumn(i, &arrow_new_table);
if (garrow_error_check(error, status, "[table][remove-column]")) {
return garrow_table_new_raw(&arrow_new_table);
auto maybe_new_table = arrow_table->RemoveColumn(i);
if (garrow::check(error, maybe_new_table, "[table][remove-column]")) {
return garrow_table_new_raw(&(*maybe_new_table));
} else {
return NULL;
}
Expand Down Expand Up @@ -520,13 +514,11 @@ garrow_table_replace_column(GArrowTable *table,
const auto arrow_table = garrow_table_get_raw(table);
const auto arrow_field = garrow_field_get_raw(field);
const auto arrow_chunked_array = garrow_chunked_array_get_raw(chunked_array);
std::shared_ptr<arrow::Table> arrow_new_table;
auto status = arrow_table->SetColumn(i,
arrow_field,
arrow_chunked_array,
&arrow_new_table);
if (garrow_error_check(error, status, "[table][replace-column]")) {
return garrow_table_new_raw(&arrow_new_table);
auto maybe_new_table = arrow_table->SetColumn(i,
arrow_field,
arrow_chunked_array);
if (garrow::check(error, maybe_new_table, "[table][replace-column]")) {
return garrow_table_new_raw(&(*maybe_new_table));
} else {
return NULL;
}
Expand Down Expand Up @@ -630,11 +622,9 @@ garrow_table_combine_chunks(GArrowTable *table,
{
const auto arrow_table = garrow_table_get_raw(table);

std::shared_ptr<arrow::Table> arrow_combined_table;
auto status = arrow_table->CombineChunks(arrow::default_memory_pool(),
&arrow_combined_table);
if (garrow_error_check(error, status, "[table][combine-chunks]")) {
return garrow_table_new_raw(&arrow_combined_table);
auto maybe_new_table = arrow_table->CombineChunks();
if (garrow::check(error, maybe_new_table, "[table][combine-chunks]")) {
return garrow_table_new_raw(&(*maybe_new_table));
} else {
return NULL;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/adapters/orc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ class ORCFileReader::Impl {
opts.range(stripes_[stripe].offset, stripes_[stripe].length);
RETURN_NOT_OK(ReadBatch(opts, schema, stripes_[stripe].num_rows, &batches[stripe]));
}
return Table::FromRecordBatches(schema, batches, out);
return Table::FromRecordBatches(schema, std::move(batches)).Value(out);
}

Status ReadBatch(const liborc::RowReaderOptions& opts,
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -590,9 +590,8 @@ Status ExportArray(const Array& array, struct ArrowArray* out,

Status ExportRecordBatch(const RecordBatch& batch, struct ArrowArray* out,
struct ArrowSchema* out_schema) {
std::shared_ptr<Array> array;
// XXX perhaps bypass ToStructArray() for speed?
RETURN_NOT_OK(batch.ToStructArray(&array));
ARROW_ASSIGN_OR_RAISE(auto array, batch.ToStructArray());

SchemaExportGuard guard(out_schema);
if (out_schema != nullptr) {
Expand Down
4 changes: 1 addition & 3 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,7 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
// Wait for all tasks to complete, or the first error.
RETURN_NOT_OK(task_group->Finish());

std::shared_ptr<Table> out;
RETURN_NOT_OK(Table::FromRecordBatches(scan_options_->schema(), batches, &out));
return out;
return Table::FromRecordBatches(scan_options_->schema(), std::move(batches));
}

} // namespace dataset
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/dataset/scanner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ TEST_F(TestScanner, ToTable) {
std::vector<std::shared_ptr<RecordBatch>> batches{kNumberBatches * kNumberChildDatasets,
batch};

std::shared_ptr<Table> expected;
ASSERT_OK(Table::FromRecordBatches(batches, &expected));
ASSERT_OK_AND_ASSIGN(auto expected, Table::FromRecordBatches(batches));

auto scanner = MakeScanner(batch);
std::shared_ptr<Table> actual;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ Status MetadataRecordBatchReader::ReadAll(
Status MetadataRecordBatchReader::ReadAll(std::shared_ptr<Table>* table) {
std::vector<std::shared_ptr<RecordBatch>> batches;
RETURN_NOT_OK(ReadAll(&batches));
return Table::FromRecordBatches(schema(), batches, table);
return Table::FromRecordBatches(schema(), std::move(batches)).Value(table);
}

SimpleFlightListing::SimpleFlightListing(const std::vector<FlightInfo>& flights)
Expand Down
8 changes: 3 additions & 5 deletions cpp/src/arrow/ipc/feather.cc
Original file line number Diff line number Diff line change
Expand Up @@ -716,11 +716,9 @@ class ReaderV2 : public Reader {

Status Read(const IpcReadOptions& options, std::shared_ptr<Table>* out) {
ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchFileReader::Open(source_, options));
std::vector<std::shared_ptr<RecordBatch>> batches;
std::vector<std::shared_ptr<RecordBatch>> batches(reader->num_record_batches());
for (int i = 0; i < reader->num_record_batches(); ++i) {
std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(reader->ReadRecordBatch(i, &batch));
batches.emplace_back(batch);
RETURN_NOT_OK(reader->ReadRecordBatch(i, &batches[i]));
}

// XXX: Handle included_fields in RecordBatchFileReader::schema
Expand All @@ -735,7 +733,7 @@ class ReaderV2 : public Reader {
}
out_schema = ::arrow::schema(fields, out_schema->metadata());
}
return Table::FromRecordBatches(out_schema, batches, out);
return Table::FromRecordBatches(std::move(out_schema), std::move(batches)).Value(out);
}

Status Read(std::shared_ptr<Table>* out) override {
Expand Down
15 changes: 5 additions & 10 deletions cpp/src/arrow/ipc/feather_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ class TestFeather : public ::testing::TestWithParam<TestParam> {

void CheckSlice(std::shared_ptr<RecordBatch> batch, int start, int size) {
batch = batch->Slice(start, size);
std::shared_ptr<Table> table;
ASSERT_OK(Table::FromRecordBatches({batch}, &table));
ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches({batch}));

DoWrite(*table);
std::shared_ptr<Table> result;
Expand All @@ -108,9 +107,8 @@ class TestFeather : public ::testing::TestWithParam<TestParam> {
}

void CheckRoundtrip(std::shared_ptr<RecordBatch> batch) {
std::shared_ptr<Table> table;
std::vector<std::shared_ptr<RecordBatch>> batches = {batch};
ASSERT_OK(Table::FromRecordBatches(batches, &table));
ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches(batches));

DoWrite(*table);

Expand Down Expand Up @@ -139,8 +137,7 @@ TEST_P(TestFeather, ReadIndicesOrNames) {
std::shared_ptr<RecordBatch> batch1;
ASSERT_OK(ipc::test::MakeIntRecordBatch(&batch1));

std::shared_ptr<Table> table;
ASSERT_OK(Table::FromRecordBatches({batch1}, &table));
ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches({batch1}));

DoWrite(*table);

Expand Down Expand Up @@ -181,8 +178,7 @@ TEST_P(TestFeather, PrimitiveRoundTrip) {
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(ipc::test::MakeIntRecordBatch(&batch));

std::shared_ptr<Table> table;
ASSERT_OK(Table::FromRecordBatches({batch}, &table));
ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches({batch}));

DoWrite(*table);

Expand Down Expand Up @@ -253,8 +249,7 @@ TEST_P(TestFeather, PrimitiveNullRoundTrip) {
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(ipc::test::MakeNullRecordBatch(&batch));

std::shared_ptr<Table> table;
ASSERT_OK(Table::FromRecordBatches({batch}, &table));
ASSERT_OK_AND_ASSIGN(auto table, Table::FromRecordBatches({batch}));

DoWrite(*table);

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/json/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class TableReaderImpl : public TableReader,

std::shared_ptr<ChunkedArray> array;
RETURN_NOT_OK(builder_->Finish(&array));
return Table::FromChunkedStructArray(array, out);
return Table::FromChunkedStructArray(array).Value(out);
}

private:
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/json/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,7 @@ TEST(ReaderTest, ListArrayWithFewValues) {
{"a": [1], "b": {"c": true, "d": "1991-02-03"}},
{"a": [], "b": {"c": false, "d": "2019-04-01"}}
])");
std::shared_ptr<Table> expected_table;
ASSERT_OK(Table::FromRecordBatches({expected_batch}, &expected_table));
ASSERT_OK_AND_ASSIGN(auto expected_table, Table::FromRecordBatches({expected_batch}));

std::string json = R"({"a": [1], "b": {"c": true, "d": "1991-02-03"}}
{"a": [], "b": {"c": false, "d": "2019-04-01"}}
Expand Down
Loading

0 comments on commit 2fc20fe

Please sign in to comment.