Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-36845: [C++][Python] Allow type promotion on pa.concat_tables #36846

Merged
merged 93 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
dfcefbc
ARROW-14705: [C++] Implement more complete type unification
lidavidm Dec 17, 2021
0b223be
ARROW-14705: [C++] Add remaining options
lidavidm Dec 17, 2021
7abdc13
ARROW-14705: [C++] Add expected failures
lidavidm Dec 17, 2021
f993f90
ARROW-14705: [C++] Add tests for unimplemented flags
lidavidm Dec 17, 2021
2dc904a
ARROW-14705: [C++] Implement dictionary merging
lidavidm Dec 17, 2021
daf3392
ARROW-14705: [C++] Implement decimals
lidavidm Dec 17, 2021
a2547f8
ARROW-14705: [C++] Report better errors
lidavidm Dec 17, 2021
34020dc
ARROW-14705: [C++] Update TODOs
lidavidm Dec 17, 2021
651c4cc
ARROW-14705: [C++] Implement temporal types
lidavidm Dec 17, 2021
940cfe5
ARROW-14705: [C++] Implement list types
lidavidm Dec 17, 2021
682dc98
ARROW-14705: [C++] Merge fixed_size_binary together
lidavidm Dec 20, 2021
b4a18a0
ARROW-14705: [C++] Refactor
lidavidm Dec 20, 2021
057ded1
ARROW-14705: [C++] Implement map
lidavidm Dec 20, 2021
2de01ed
ARROW-14705: [C++] Handle nonstandard field names
lidavidm Dec 20, 2021
f09ab8d
ARROW-14705: [C++] Refactor
lidavidm Dec 20, 2021
fde4c52
ARROW-14705: [C++] Implement structs
lidavidm Dec 20, 2021
d14814f
ARROW-14705: [C++] Add options to discovery
lidavidm Dec 20, 2021
fffb846
ARROW-14705: [Python] Add basic bindings
lidavidm Dec 20, 2021
e4fc346
ARROW-14705: [C++] Add missing export
lidavidm Dec 20, 2021
ceac692
ARROW-14705: [C++] Organize and document options
lidavidm Dec 28, 2021
1bb6642
ARROW-14705: [C++] Add missing header
lidavidm Jan 6, 2022
6992934
ARROW-14705: [C++][Python] Add unification to ConcatenateTables
lidavidm Jan 6, 2022
9d5195d
Merge branch 'main' of https://github.com/apache/arrow into arrow-14705
Fokko Jul 24, 2023
16bdc92
Cleanup
Fokko Jul 24, 2023
cbedc8b
Merge branch 'main' of https://github.com/apache/arrow into arrow-14705
Fokko Jul 24, 2023
535efb9
Comments
Fokko Jul 26, 2023
ff9f14f
Fix build
Fokko Jul 26, 2023
2451387
WIP
Fokko Jul 28, 2023
f6a43ff
Lint
Fokko Jul 31, 2023
8808f28
Merge branch 'main' of https://github.com/apache/arrow into arrow-14705
Fokko Jul 31, 2023
c37f999
Fix the CI
Fokko Jul 31, 2023
1c5643c
Fix the CI
Fokko Jul 31, 2023
eb45145
Merge branch 'main' of https://github.com/apache/arrow into arrow-14705
Fokko Aug 16, 2023
45f15cc
Comments
Fokko Aug 16, 2023
b16e849
Cleanup
Fokko Aug 16, 2023
5290949
WIP
Fokko Aug 21, 2023
a17c4a5
Merge branch 'main' of https://github.com/apache/arrow into arrow-14705
Fokko Aug 23, 2023
8f996f9
Comments
Fokko Aug 23, 2023
f1521a0
WIP
Fokko Aug 23, 2023
be0db91
Working on the tests
Fokko Aug 23, 2023
b2b8b4c
Fix tests
Fokko Aug 23, 2023
cc00894
Cleanup
Fokko Aug 28, 2023
895f87a
WIP
Fokko Aug 28, 2023
cad5e1d
Whitespace
Fokko Aug 28, 2023
f1301bd
Pylint
Fokko Aug 28, 2023
a8802c5
WIP
Fokko Aug 28, 2023
3a25ac7
Update test
Fokko Aug 28, 2023
9722b74
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Aug 28, 2023
af62b99
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Sep 11, 2023
78460ec
Fix tests
Fokko Sep 11, 2023
31ec8e6
Fix test
Fokko Sep 11, 2023
a9452c4
Hmm
Fokko Sep 12, 2023
ed58763
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Sep 12, 2023
d749851
Oops
Fokko Sep 12, 2023
beacead
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Sep 12, 2023
d35c446
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Sep 15, 2023
c3ca1f6
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Sep 16, 2023
94dc45f
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Sep 18, 2023
efcee5a
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Sep 18, 2023
81573a9
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Sep 20, 2023
e47b41d
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Sep 21, 2023
015b5a8
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Sep 26, 2023
318d65e
Remove ARROW_COMPUTE
Fokko Sep 26, 2023
742156a
Fix bug
Fokko Sep 26, 2023
5f570a7
Add tests as well
Fokko Sep 26, 2023
0d225ac
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Sep 26, 2023
8ade8fc
Make CI happy
Fokko Sep 26, 2023
c9062b8
Revert some stuff
Fokko Sep 26, 2023
a1cdba7
Fix the linting
Fokko Sep 26, 2023
6f93f14
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Sep 27, 2023
4ecc909
Remove import of header
Fokko Sep 27, 2023
f7c6328
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Sep 27, 2023
ea860d9
Remove cmakeh file
Fokko Sep 27, 2023
b12cc66
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Sep 28, 2023
42faccc
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Sep 29, 2023
de76774
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Oct 2, 2023
09b58d7
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Oct 3, 2023
0fa2e1a
Remove promote_options
Fokko Oct 3, 2023
2a505b1
Fix failing tests
Fokko Oct 3, 2023
a76f7f3
Fix the tests
Fokko Oct 3, 2023
59a267f
missing space..
Fokko Oct 3, 2023
5eb6c29
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Oct 3, 2023
977c703
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Oct 4, 2023
b038491
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Oct 5, 2023
8beb9a1
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Oct 6, 2023
cfc739b
Comments, Thanks Joris
Fokko Oct 6, 2023
493c4e9
Fix the exception
Fokko Oct 6, 2023
e19f064
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Oct 6, 2023
61ed125
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Oct 6, 2023
bf2842e
Merge branch 'main' of github.com:apache/arrow into arrow-14705
Fokko Oct 8, 2023
5dfae58
catch warnings in the test
jorisvandenbossche Oct 9, 2023
e10aed9
Merge remote-tracking branch 'upstream/main' into arrow-14705
jorisvandenbossche Oct 9, 2023
a216d48
Merge remote-tracking branch 'upstream/main' into arrow-14705
jorisvandenbossche Oct 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions c_glib/test/dataset/test-file-system-dataset-factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ def test_validate_fragments
point: Arrow::Int16DataType.new)
options.validate_fragments = true
message = "[file-system-dataset-factory][finish]: " +
"Invalid: Unable to merge: " +
"Type error: Unable to merge: " +
"Field point has incompatible types: int16 vs int32"
error = assert_raise(Arrow::Error::Invalid) do
error = assert_raise(Arrow::Error::Type) do
@factory.finish(options)
end
assert_equal(message, error.message.lines(chomp: true).first)
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Result<std::shared_ptr<Schema>> DatasetFactory::Inspect(InspectOptions options)
return arrow::schema({});
}

return UnifySchemas(schemas);
return UnifySchemas(schemas, options.field_merge_options);
}

Result<std::shared_ptr<Dataset>> DatasetFactory::Finish() {
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ struct InspectOptions {
/// `kInspectAllFragments`. A value of `0` disables inspection of fragments
/// altogether so only the partitioning schema will be inspected.
int fragments = 1;

/// Control how to unify types. By default, types are merged strictly (the
/// type must match exactly, except nulls can be merged with other types).
Field::MergeOptions field_merge_options = Field::MergeOptions::Defaults();
};

struct FinishOptions {
Expand Down
20 changes: 16 additions & 4 deletions cpp/src/arrow/dataset/discovery_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,15 @@ TEST_F(MockDatasetFactoryTest, UnifySchemas) {

MakeFactory({schema({i32, f64}), schema({f64, i32_fake})});
// Unification fails when fields with the same name have clashing types.
ASSERT_RAISES(Invalid, factory_->Inspect());
ASSERT_RAISES(TypeError, factory_->Inspect());
// Return the individual schema for closer inspection should not fail.
AssertInspectSchemas({schema({i32, f64}), schema({f64, i32_fake})});

MakeFactory({schema({field("num", int32())}), schema({field("num", float64())})});
ASSERT_RAISES(TypeError, factory_->Inspect());
InspectOptions permissive_options;
permissive_options.field_merge_options = Field::MergeOptions::Permissive();
AssertInspect(schema({field("num", float64())}), permissive_options);
}

class FileSystemDatasetFactoryTest : public DatasetFactoryTest {
Expand Down Expand Up @@ -335,7 +341,7 @@ TEST_F(FileSystemDatasetFactoryTest, FinishWithIncompatibleSchemaShouldFail) {
ASSERT_OK_AND_ASSIGN(auto dataset, factory_->Finish(options));

MakeFactory({fs::File("test")});
ASSERT_RAISES(Invalid, factory_->Finish(options));
ASSERT_RAISES(TypeError, factory_->Finish(options));

// Disable validation
options.validate_fragments = false;
Expand Down Expand Up @@ -463,8 +469,8 @@ TEST(UnionDatasetFactoryTest, ConflictingSchemas) {
{dataset_factory_1, dataset_factory_2, dataset_factory_3}));

// schema_3 conflicts with other, Inspect/Finish should not work
ASSERT_RAISES(Invalid, factory->Inspect());
ASSERT_RAISES(Invalid, factory->Finish());
ASSERT_RAISES(TypeError, factory->Inspect());
ASSERT_RAISES(TypeError, factory->Finish());

// The user can inspect without error
ASSERT_OK_AND_ASSIGN(auto schemas, factory->InspectSchemas({}));
Expand All @@ -474,6 +480,12 @@ TEST(UnionDatasetFactoryTest, ConflictingSchemas) {
auto i32_schema = schema({i32});
ASSERT_OK_AND_ASSIGN(auto dataset, factory->Finish(i32_schema));
EXPECT_EQ(*dataset->schema(), *i32_schema);

// The user decided to allow merging the types.
FinishOptions options;
options.inspect_options.field_merge_options = Field::MergeOptions::Permissive();
ASSERT_OK_AND_ASSIGN(dataset, factory->Finish(options));
EXPECT_EQ(*dataset->schema(), *schema({f64, i32}));
}

} // namespace dataset
Expand Down
24 changes: 19 additions & 5 deletions cpp/src/arrow/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "arrow/array/concatenate.h"
#include "arrow/array/util.h"
#include "arrow/chunked_array.h"
#include "arrow/compute/cast.h"
#include "arrow/pretty_print.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
Expand Down Expand Up @@ -450,6 +451,13 @@ Result<std::shared_ptr<Table>> ConcatenateTables(
Result<std::shared_ptr<Table>> PromoteTableToSchema(const std::shared_ptr<Table>& table,
const std::shared_ptr<Schema>& schema,
MemoryPool* pool) {
return PromoteTableToSchema(table, schema, compute::CastOptions::Safe(), pool);
}

Result<std::shared_ptr<Table>> PromoteTableToSchema(const std::shared_ptr<Table>& table,
const std::shared_ptr<Schema>& schema,
const compute::CastOptions& options,
MemoryPool* pool) {
const std::shared_ptr<Schema> current_schema = table->schema();
if (current_schema->Equals(*schema, /*check_metadata=*/false)) {
return table->ReplaceSchemaMetadata(schema->metadata());
Expand Down Expand Up @@ -487,8 +495,8 @@ Result<std::shared_ptr<Table>> PromoteTableToSchema(const std::shared_ptr<Table>
const int field_index = field_indices[0];
const auto& current_field = current_schema->field(field_index);
if (!field->nullable() && current_field->nullable()) {
return Status::Invalid("Unable to promote field ", current_field->name(),
": it was nullable but the target schema was not.");
return Status::TypeError("Unable to promote field ", current_field->name(),
": it was nullable but the target schema was not.");
}

fields_seen[field_index] = true;
Expand All @@ -502,9 +510,15 @@ Result<std::shared_ptr<Table>> PromoteTableToSchema(const std::shared_ptr<Table>
continue;
}

return Status::Invalid("Unable to promote field ", field->name(),
": incompatible types: ", field->type()->ToString(), " vs ",
current_field->type()->ToString());
if (!compute::CanCast(*current_field->type(), *field->type())) {
Fokko marked this conversation as resolved.
Show resolved Hide resolved
return Status::TypeError("Unable to promote field ", field->name(),
": incompatible types: ", field->type()->ToString(),
" vs ", current_field->type()->ToString());
}
compute::ExecContext ctx(pool);
ARROW_ASSIGN_OR_RAISE(auto casted, compute::Cast(table->column(field_index),
field->type(), options, &ctx));
columns.push_back(casted.chunked_array());
}

auto unseen_field_iter = std::find(fields_seen.begin(), fields_seen.end(), false);
Expand Down
39 changes: 35 additions & 4 deletions cpp/src/arrow/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,16 +313,23 @@ Result<std::shared_ptr<Table>> ConcatenateTables(
ConcatenateTablesOptions options = ConcatenateTablesOptions::Defaults(),
MemoryPool* memory_pool = default_memory_pool());

namespace compute {
class CastOptions;
}

/// \brief Promotes a table to conform to the given schema.
///
/// If a field in the schema does not have a corresponding column in the
/// table, a column of nulls will be added to the resulting table.
/// If the corresponding column is of type Null, it will be promoted to
/// the type specified by schema, with null values filled.
/// If a field in the schema does not have a corresponding column in
/// the table, a column of nulls will be added to the resulting table.
/// If the corresponding column is of type Null, it will be promoted
/// to the type specified by schema, with null values filled. The
/// column will be casted to the type specified by the schema.
///
/// Returns an error:
/// - if the corresponding column's type is not compatible with the
/// schema.
/// - if there is a column in the table that does not exist in the schema.
/// - if the cast fails or casting would be required but is not available.
Fokko marked this conversation as resolved.
Show resolved Hide resolved
///
/// \param[in] table the input Table
/// \param[in] schema the target schema to promote to
Expand All @@ -333,4 +340,28 @@ Result<std::shared_ptr<Table>> PromoteTableToSchema(
const std::shared_ptr<Table>& table, const std::shared_ptr<Schema>& schema,
MemoryPool* pool = default_memory_pool());

/// \brief Promotes a table to conform to the given schema.
///
/// If a field in the schema does not have a corresponding column in
/// the table, a column of nulls will be added to the resulting table.
/// If the corresponding column is of type Null, it will be promoted
/// to the type specified by schema, with null values filled. The column
/// will be casted to the type specified by the schema.
///
/// Returns an error:
/// - if the corresponding column's type is not compatible with the
/// schema.
/// - if there is a column in the table that does not exist in the schema.
/// - if the cast fails or casting would be required but is not available.
///
/// \param[in] table the input Table
/// \param[in] schema the target schema to promote to
/// \param[in] options The cast options to allow promotion of types
/// \param[in] pool The memory pool to be used if null-filled arrays need to
/// be created.
ARROW_EXPORT
Result<std::shared_ptr<Table>> PromoteTableToSchema(
const std::shared_ptr<Table>& table, const std::shared_ptr<Schema>& schema,
const compute::CastOptions& options, MemoryPool* pool = default_memory_pool());

} // namespace arrow
40 changes: 36 additions & 4 deletions cpp/src/arrow/table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "arrow/array/data.h"
#include "arrow/array/util.h"
#include "arrow/chunked_array.h"
#include "arrow/compute/cast.h"
#include "arrow/record_batch.h"
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"
Expand Down Expand Up @@ -418,16 +419,17 @@ TEST_F(TestPromoteTableToSchema, IncompatibleTypes) {
auto table = MakeTableWithOneNullFilledColumn("field", int32(), length);

// Invalid promotion: int32 to null.
ASSERT_RAISES(Invalid, PromoteTableToSchema(table, schema({field("field", null())})));
ASSERT_RAISES(TypeError, PromoteTableToSchema(table, schema({field("field", null())})));

// Invalid promotion: int32 to uint32.
ASSERT_RAISES(Invalid, PromoteTableToSchema(table, schema({field("field", uint32())})));
// Invalid promotion: int32 to list.
ASSERT_RAISES(TypeError,
PromoteTableToSchema(table, schema({field("field", list(int32()))})));
}

TEST_F(TestPromoteTableToSchema, IncompatibleNullity) {
const int length = 10;
auto table = MakeTableWithOneNullFilledColumn("field", int32(), length);
ASSERT_RAISES(Invalid,
ASSERT_RAISES(TypeError,
PromoteTableToSchema(
table, schema({field("field", uint32())->WithNullable(false)})));
}
Expand Down Expand Up @@ -520,6 +522,36 @@ TEST_F(ConcatenateTablesWithPromotionTest, Simple) {
AssertTablesEqualUnorderedFields(*expected, *result);
}

TEST_F(ConcatenateTablesWithPromotionTest, Unify) {
auto t_i32 = TableFromJSON(schema({field("f0", int32())}), {"[[0], [1]]"});
auto t_i64 = TableFromJSON(schema({field("f0", int64())}), {"[[2], [3]]"});
auto t_null = TableFromJSON(schema({field("f0", null())}), {"[[null], [null]]"});

auto expected_int64 =
TableFromJSON(schema({field("f0", int64())}), {"[[0], [1], [2], [3]]"});
auto expected_null =
TableFromJSON(schema({field("f0", int32())}), {"[[0], [1], [null], [null]]"});

ConcatenateTablesOptions options;
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid,
::testing::HasSubstr("Schema at index 1 was different"),
ConcatenateTables({t_i32, t_i64}, options));
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid,
::testing::HasSubstr("Schema at index 1 was different"),
ConcatenateTables({t_i32, t_null}, options));

options.unify_schemas = true;
EXPECT_RAISES_WITH_MESSAGE_THAT(TypeError,
::testing::HasSubstr("Field f0 has incompatible types"),
ConcatenateTables({t_i64, t_i32}, options));
ASSERT_OK_AND_ASSIGN(auto actual, ConcatenateTables({t_i32, t_null}, options));
AssertTablesEqual(*expected_null, *actual, /*same_chunk_layout=*/false);

options.field_merge_options.promote_numeric_width = true;
ASSERT_OK_AND_ASSIGN(actual, ConcatenateTables({t_i32, t_i64}, options));
AssertTablesEqual(*expected_int64, *actual, /*same_chunk_layout=*/false);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you tests for when casting fails because of overflow or other issues?

It would also showcase why the distinction between TypeError and Invalid is useful.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you haven't addressed this comment?

Copy link
Contributor Author

@Fokko Fokko Aug 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a test for combining a decimal(76, 75) and a int64


TEST_F(TestTable, Slice) {
const int64_t length = 10;

Expand Down
Loading
Loading