Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
dfcefbc
ARROW-14705: [C++] Implement more complete type unification
lidavidm Dec 17, 2021
0b223be
ARROW-14705: [C++] Add remaining options
lidavidm Dec 17, 2021
7abdc13
ARROW-14705: [C++] Add expected failures
lidavidm Dec 17, 2021
f993f90
ARROW-14705: [C++] Add tests for unimplemented flags
lidavidm Dec 17, 2021
2dc904a
ARROW-14705: [C++] Implement dictionary merging
lidavidm Dec 17, 2021
daf3392
ARROW-14705: [C++] Implement decimals
lidavidm Dec 17, 2021
a2547f8
ARROW-14705: [C++] Report better errors
lidavidm Dec 17, 2021
34020dc
ARROW-14705: [C++] Update TODOs
lidavidm Dec 17, 2021
651c4cc
ARROW-14705: [C++] Implement temporal types
lidavidm Dec 17, 2021
940cfe5
ARROW-14705: [C++] Implement list types
lidavidm Dec 17, 2021
682dc98
ARROW-14705: [C++] Merge fixed_size_binary together
lidavidm Dec 20, 2021
b4a18a0
ARROW-14705: [C++] Refactor
lidavidm Dec 20, 2021
057ded1
ARROW-14705: [C++] Implement map
lidavidm Dec 20, 2021
2de01ed
ARROW-14705: [C++] Handle nonstandard field names
lidavidm Dec 20, 2021
f09ab8d
ARROW-14705: [C++] Refactor
lidavidm Dec 20, 2021
fde4c52
ARROW-14705: [C++] Implement structs
lidavidm Dec 20, 2021
d14814f
ARROW-14705: [C++] Add options to discovery
lidavidm Dec 20, 2021
fffb846
ARROW-14705: [Python] Add basic bindings
lidavidm Dec 20, 2021
e4fc346
ARROW-14705: [C++] Add missing export
lidavidm Dec 20, 2021
ceac692
ARROW-14705: [C++] Organize and document options
lidavidm Dec 28, 2021
1bb6642
ARROW-14705: [C++] Add missing header
lidavidm Jan 6, 2022
6992934
ARROW-14705: [C++][Python] Add unification to ConcatenateTables
lidavidm Jan 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Result<std::shared_ptr<Schema>> DatasetFactory::Inspect(InspectOptions options)
return arrow::schema({});
}

return UnifySchemas(schemas);
return UnifySchemas(schemas, options.field_merge_options);
}

Result<std::shared_ptr<Dataset>> DatasetFactory::Finish() {
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ struct InspectOptions {
/// `kInspectAllFragments`. A value of `0` disables inspection of fragments
/// altogether so only the partitioning schema will be inspected.
int fragments = 1;

/// Control how to unify types. By default, types are merged strictly (the
/// type must match exactly, except nulls can be merged with other types).
Field::MergeOptions field_merge_options = Field::MergeOptions::Defaults();
};

struct FinishOptions {
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/dataset/discovery_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ TEST_F(MockDatasetFactoryTest, UnifySchemas) {
ASSERT_RAISES(Invalid, factory_->Inspect());
// Return the individual schema for closer inspection should not fail.
AssertInspectSchemas({schema({i32, f64}), schema({f64, i32_fake})});

MakeFactory({schema({field("num", int32())}), schema({field("num", float64())})});
ASSERT_RAISES(Invalid, factory_->Inspect());
InspectOptions permissive_options;
permissive_options.field_merge_options = Field::MergeOptions::Permissive();
AssertInspect(schema({field("num", float64())}), permissive_options);
}

class FileSystemDatasetFactoryTest : public DatasetFactoryTest {
Expand Down Expand Up @@ -473,6 +479,12 @@ TEST(UnionDatasetFactoryTest, ConflictingSchemas) {
auto i32_schema = schema({i32});
ASSERT_OK_AND_ASSIGN(auto dataset, factory->Finish(i32_schema));
EXPECT_EQ(*dataset->schema(), *i32_schema);

// The user decided to allow merging the types.
FinishOptions options;
options.inspect_options.field_merge_options = Field::MergeOptions::Permissive();
ASSERT_OK_AND_ASSIGN(dataset, factory->Finish(options));
EXPECT_EQ(*dataset->schema(), *schema({f64, i32}));
}

} // namespace dataset
Expand Down
23 changes: 22 additions & 1 deletion cpp/src/arrow/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,15 @@
#include "arrow/type_fwd.h"
#include "arrow/type_traits.h"
#include "arrow/util/checked_cast.h"
// Get ARROW_COMPUTE definition
#include "arrow/util/config.h"
#include "arrow/util/logging.h"
#include "arrow/util/vector.h"

#ifdef ARROW_COMPUTE
#include "arrow/compute/cast.h"
#endif

namespace arrow {

using internal::checked_cast;
Expand Down Expand Up @@ -504,9 +510,24 @@ Result<std::shared_ptr<Table>> PromoteTableToSchema(const std::shared_ptr<Table>
continue;
}

#ifdef ARROW_COMPUTE
if (!compute::CanCast(*current_field->type(), *field->type())) {
return Status::Invalid("Unable to promote field ", field->name(),
": incompatible types: ", field->type()->ToString(), " vs ",
current_field->type()->ToString());
}
compute::ExecContext ctx(pool);
auto options = compute::CastOptions::Safe();
ARROW_ASSIGN_OR_RAISE(auto casted, compute::Cast(table->column(field_index),
field->type(), options, &ctx));
columns.push_back(casted.chunked_array());
#else
return Status::Invalid("Unable to promote field ", field->name(),
": incompatible types: ", field->type()->ToString(), " vs ",
current_field->type()->ToString());
current_field->type()->ToString(),
" (Arrow must be built with ARROW_COMPUTE "
"in order to cast incompatible types)");
#endif
}

auto unseen_field_iter = std::find(fields_seen.begin(), fields_seen.end(), false);
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/arrow/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,14 +293,18 @@ Result<std::shared_ptr<Table>> ConcatenateTables(

/// \brief Promotes a table to conform to the given schema.
///
/// If a field in the schema does not have a corresponding column in the
/// table, a column of nulls will be added to the resulting table.
/// If the corresponding column is of type Null, it will be promoted to
/// the type specified by schema, with null values filled.
/// If a field in the schema does not have a corresponding column in
/// the table, a column of nulls will be added to the resulting table.
/// If the corresponding column is of type Null, it will be promoted
/// to the type specified by schema, with null values filled. If Arrow
/// was built with ARROW_COMPUTE, then the column will be casted to
/// the type specified by the schema.
///
/// Returns an error:
/// - if the corresponding column's type is not compatible with the
/// schema.
/// - if there is a column in the table that does not exist in the schema.
/// - if the cast fails or casting would be required but is not available.
///
/// \param[in] table the input Table
/// \param[in] schema the target schema to promote to
Expand Down
42 changes: 40 additions & 2 deletions cpp/src/arrow/table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/type.h"
#include "arrow/util/config.h"
#include "arrow/util/key_value_metadata.h"

namespace arrow {
Expand Down Expand Up @@ -417,8 +418,9 @@ TEST_F(TestPromoteTableToSchema, IncompatibleTypes) {
// Invalid promotion: int32 to null.
ASSERT_RAISES(Invalid, PromoteTableToSchema(table, schema({field("field", null())})));

// Invalid promotion: int32 to uint32.
ASSERT_RAISES(Invalid, PromoteTableToSchema(table, schema({field("field", uint32())})));
// Invalid promotion: int32 to list.
ASSERT_RAISES(Invalid,
PromoteTableToSchema(table, schema({field("field", list(int32()))})));
}

TEST_F(TestPromoteTableToSchema, IncompatibleNullity) {
Expand Down Expand Up @@ -517,6 +519,42 @@ TEST_F(ConcatenateTablesWithPromotionTest, Simple) {
AssertTablesEqualUnorderedFields(*expected, *result);
}

TEST_F(ConcatenateTablesWithPromotionTest, Unify) {
auto t1 = TableFromJSON(schema({field("f0", int32())}), {"[[0], [1]]"});
auto t2 = TableFromJSON(schema({field("f0", int64())}), {"[[2], [3]]"});
auto t3 = TableFromJSON(schema({field("f0", null())}), {"[[null], [null]]"});

auto expected_int64 =
TableFromJSON(schema({field("f0", int64())}), {"[[0], [1], [2], [3]]"});
auto expected_null =
TableFromJSON(schema({field("f0", int32())}), {"[[0], [1], [null], [null]]"});

ConcatenateTablesOptions options;
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid,
::testing::HasSubstr("Schema at index 1 was different"),
ConcatenateTables({t1, t2}, options));
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid,
::testing::HasSubstr("Schema at index 1 was different"),
ConcatenateTables({t1, t3}, options));

options.unify_schemas = true;
EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid,
::testing::HasSubstr("Field f0 has incompatible types"),
ConcatenateTables({t1, t2}, options));
ASSERT_OK_AND_ASSIGN(auto actual, ConcatenateTables({t1, t3}, options));
AssertTablesEqual(*expected_null, *actual, /*same_chunk_layout=*/false);

options.field_merge_options.promote_numeric_width = true;
#ifdef ARROW_COMPUTE
ASSERT_OK_AND_ASSIGN(actual, ConcatenateTables({t1, t2}, options));
AssertTablesEqual(*expected_int64, *actual, /*same_chunk_layout=*/false);
#else
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid, ::testing::HasSubstr("must be built with ARROW_COMPUTE"),
ConcatenateTables({t1, t2}, options));
#endif
}

TEST_F(TestTable, Slice) {
const int64_t length = 10;

Expand Down
Loading