Skip to content

Commit

Permalink
ARROW-7788: [C++][Parquet] Enable Arrow Schema to Parquet Schema for …
Browse files Browse the repository at this point in the history
…missing types

*  Moves common methods for ListTypes to BaseList type and make FixedListType extend
   the same base class so they can mapped
*  Enable Large* types and FixedListType by adding them to the appropriate enum cases
*  Add implementation to Maps
*  Expose constructor/static factory to MapType that can take a field as a value
*  For list type expose a configuration parameter to make the parquet schema
   conform to the required naming.

Closes #6379 from emkornfield/schema and squashes the following commits:

f7cf062 <Micah Kornfield> address comments
11b50f2 <Micah Kornfield> rename
a35a59f <Micah Kornfield> fix format/compilation
4d88caa <Micah Kornfield> rename is_base_list_type to is_var_length_list_type which I think is clearer
449b852 <emkornfield> Remove additional logging
2063c08 <Micah Kornfield> ARROW-7788:  Enable Arrow Schema to Parquet Schema for missing types.

Lead-authored-by: Micah Kornfield <emkornfield@gmail.com>
Co-authored-by: emkornfield <emkornfield@gmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
emkornfield authored and pitrou committed Feb 18, 2020
1 parent 01190ab commit f392dd1
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 60 deletions.
4 changes: 2 additions & 2 deletions cpp/src/arrow/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ class NullArrayFactory {
}

template <typename T>
enable_if_base_list<T, Status> Visit(const T&) {
enable_if_var_size_list<T, Status> Visit(const T&) {
// values array may be empty, but there must be at least one offset of 0
return MaxOf(sizeof(typename T::offset_type) * (length_ + 1));
}
Expand Down Expand Up @@ -1439,7 +1439,7 @@ class NullArrayFactory {
}

template <typename T>
enable_if_base_list<T, Status> Visit(const T& type) {
enable_if_var_size_list<T, Status> Visit(const T& type) {
(*out_)->buffers.resize(2, buffer_);
return CreateChild(0, length_, &(*out_)->child_data[0]);
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/ipc/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ class ArrayWriter {
}

template <typename ArrayType>
enable_if_base_list<typename ArrayType::TypeClass, Status> Visit(
enable_if_var_size_list<typename ArrayType::TypeClass, Status> Visit(
const ArrayType& array) {
WriteValidityField(array);
WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length() + 1);
Expand Down Expand Up @@ -1281,7 +1281,7 @@ class ArrayReader {
}

template <typename T>
enable_if_base_list<T, Status> Visit(const T& type) {
enable_if_var_size_list<T, Status> Visit(const T& type) {
return CreateList<T>(type_, &result_);
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ class ArrayLoader {
}

template <typename T>
enable_if_base_list<T, Status> Visit(const T& type) {
enable_if_var_size_list<T, Status> Visit(const T& type) {
return LoadList(type);
}

Expand Down
20 changes: 14 additions & 6 deletions cpp/src/arrow/type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,13 @@ std::string LargeListType::ToString() const {

MapType::MapType(const std::shared_ptr<DataType>& key_type,
const std::shared_ptr<DataType>& item_type, bool keys_sorted)
: ListType(std::make_shared<Field>(
: MapType(key_type, field("value", item_type), keys_sorted) {}

MapType::MapType(const std::shared_ptr<DataType>& key_type,
const std::shared_ptr<Field>& item_field, bool keys_sorted)
: ListType(field(
"entries",
struct_({std::make_shared<Field>("key", key_type, false),
std::make_shared<Field>("value", item_type)}),
false)),
struct_({std::make_shared<Field>("key", key_type, false), item_field}), false)),
keys_sorted_(keys_sorted) {
id_ = type_id;
}
Expand Down Expand Up @@ -1404,9 +1406,15 @@ std::shared_ptr<DataType> large_list(const std::shared_ptr<Field>& value_field)
}

std::shared_ptr<DataType> map(const std::shared_ptr<DataType>& key_type,
const std::shared_ptr<DataType>& value_type,
const std::shared_ptr<DataType>& item_type,
bool keys_sorted) {
return std::make_shared<MapType>(key_type, item_type, keys_sorted);
}

std::shared_ptr<DataType> map(const std::shared_ptr<DataType>& key_type,
const std::shared_ptr<Field>& item_field,
bool keys_sorted) {
return std::make_shared<MapType>(key_type, value_type, keys_sorted);
return std::make_shared<MapType>(key_type, item_field, keys_sorted);
}

std::shared_ptr<DataType> fixed_size_list(const std::shared_ptr<DataType>& value_type,
Expand Down
38 changes: 21 additions & 17 deletions cpp/src/arrow/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,9 @@ class ARROW_EXPORT DoubleType
class ARROW_EXPORT BaseListType : public NestedType {
public:
using NestedType::NestedType;
std::shared_ptr<Field> value_field() const { return children_[0]; }

std::shared_ptr<DataType> value_type() const { return children_[0]->type(); }
};

/// \brief Concrete type class for list data
Expand All @@ -662,10 +665,6 @@ class ARROW_EXPORT ListType : public BaseListType {
children_ = {value_field};
}

std::shared_ptr<Field> value_field() const { return children_[0]; }

std::shared_ptr<DataType> value_type() const { return children_[0]->type(); }

DataTypeLayout layout() const override {
return DataTypeLayout(
{DataTypeLayout::Bitmap(), DataTypeLayout::FixedWidth(sizeof(offset_type))});
Expand Down Expand Up @@ -698,10 +697,6 @@ class ARROW_EXPORT LargeListType : public BaseListType {
children_ = {value_field};
}

std::shared_ptr<Field> value_field() const { return children_[0]; }

std::shared_ptr<DataType> value_type() const { return children_[0]->type(); }

DataTypeLayout layout() const override {
return DataTypeLayout(
{DataTypeLayout::Bitmap(), DataTypeLayout::FixedWidth(sizeof(offset_type))});
Expand Down Expand Up @@ -729,9 +724,14 @@ class ARROW_EXPORT MapType : public ListType {
MapType(const std::shared_ptr<DataType>& key_type,
const std::shared_ptr<DataType>& item_type, bool keys_sorted = false);

std::shared_ptr<DataType> key_type() const { return value_type()->child(0)->type(); }
MapType(const std::shared_ptr<DataType>& key_type,
const std::shared_ptr<Field>& item_field, bool keys_sorted = false);

std::shared_ptr<Field> key_field() const { return value_type()->child(0); }
std::shared_ptr<DataType> key_type() const { return key_field()->type(); }

std::shared_ptr<DataType> item_type() const { return value_type()->child(1)->type(); }
std::shared_ptr<Field> item_field() const { return value_type()->child(1); }
std::shared_ptr<DataType> item_type() const { return item_field()->type(); }

std::string ToString() const override;

Expand All @@ -746,7 +746,7 @@ class ARROW_EXPORT MapType : public ListType {
};

/// \brief Concrete type class for fixed size list data
class ARROW_EXPORT FixedSizeListType : public NestedType {
class ARROW_EXPORT FixedSizeListType : public BaseListType {
public:
static constexpr Type::type type_id = Type::FIXED_SIZE_LIST;
using offset_type = int32_t;
Expand All @@ -758,14 +758,10 @@ class ARROW_EXPORT FixedSizeListType : public NestedType {
: FixedSizeListType(std::make_shared<Field>("item", value_type), list_size) {}

FixedSizeListType(const std::shared_ptr<Field>& value_field, int32_t list_size)
: NestedType(type_id), list_size_(list_size) {
: BaseListType(type_id), list_size_(list_size) {
children_ = {value_field};
}

std::shared_ptr<Field> value_field() const { return children_[0]; }

std::shared_ptr<DataType> value_type() const { return children_[0]->type(); }

DataTypeLayout layout() const override {
return DataTypeLayout({DataTypeLayout::Bitmap()});
}
Expand Down Expand Up @@ -1524,7 +1520,15 @@ std::shared_ptr<DataType> large_list(const std::shared_ptr<DataType>& value_type
/// \brief Create a MapType instance from its key and value DataTypes
ARROW_EXPORT
std::shared_ptr<DataType> map(const std::shared_ptr<DataType>& key_type,
const std::shared_ptr<DataType>& value_type,
const std::shared_ptr<DataType>& item_type,
bool keys_sorted = false);

/// \brief Create a MapType instance from its key DataType and value field.
///
/// The field override is provided to communicate nullability of the value.
ARROW_EXPORT
std::shared_ptr<DataType> map(const std::shared_ptr<DataType>& key_type,
const std::shared_ptr<Field>& item_field,
bool keys_sorted = false);

/// \brief Create a FixedSizeListType instance from its child Field type
Expand Down
14 changes: 12 additions & 2 deletions cpp/src/arrow/type_traits.h
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,20 @@ template <typename T, typename R = void>
using enable_if_nested = enable_if_t<is_nested_type<T>::value, R>;

template <typename T>
using is_base_list_type = std::is_base_of<BaseListType, T>;
using is_var_length_list_type =
std::integral_constant<bool, std::is_base_of<LargeListType, T>::value ||
std::is_base_of<ListType, T>::value>;

template <typename T, typename R = void>
using enable_if_base_list = enable_if_t<is_base_list_type<T>::value, R>;
using enable_if_var_size_list = enable_if_t<is_var_length_list_type<T>::value, R>;

// DEPRECATED use is_var_length_list_type.
template <typename T>
using is_base_list_type = is_var_length_list_type<T>;

// DEPRECATED use enable_if_var_size_list
template <typename T, typename R = void>
using enable_if_base_list = enable_if_var_size_list<T, R>;

template <typename T>
using is_fixed_size_list_type = std::is_same<FixedSizeListType, T>;
Expand Down
166 changes: 164 additions & 2 deletions cpp/src/parquet/arrow/arrow_schema_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -662,11 +662,15 @@ class TestConvertArrowSchema : public ::testing::Test {
}
}

::arrow::Status ConvertSchema(const std::vector<std::shared_ptr<Field>>& fields) {
::arrow::Status ConvertSchema(
const std::vector<std::shared_ptr<Field>>& fields,
std::shared_ptr<::parquet::ArrowWriterProperties> arrow_properties =
::parquet::default_arrow_writer_properties()) {
arrow_schema_ = ::arrow::schema(fields);
std::shared_ptr<::parquet::WriterProperties> properties =
::parquet::default_writer_properties();
return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_schema_);
return ToParquetSchema(arrow_schema_.get(), *properties.get(), *arrow_properties,
&result_schema_);
}

protected:
Expand Down Expand Up @@ -743,6 +747,8 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
std::vector<FieldConstructionArguments> cases = {
{"boolean", ::arrow::boolean(), LogicalType::None(), ParquetType::BOOLEAN, -1},
{"binary", ::arrow::binary(), LogicalType::None(), ParquetType::BYTE_ARRAY, -1},
{"large_binary", ::arrow::large_binary(), LogicalType::None(),
ParquetType::BYTE_ARRAY, -1},
{"fixed_size_binary", ::arrow::fixed_size_binary(64), LogicalType::None(),
ParquetType::FIXED_LEN_BYTE_ARRAY, 64},
{"uint8", ::arrow::uint8(), LogicalType::Int(8, false), ParquetType::INT32, -1},
Expand All @@ -757,6 +763,8 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
{"float32", ::arrow::float32(), LogicalType::None(), ParquetType::FLOAT, -1},
{"float64", ::arrow::float64(), LogicalType::None(), ParquetType::DOUBLE, -1},
{"utf8", ::arrow::utf8(), LogicalType::String(), ParquetType::BYTE_ARRAY, -1},
{"large_utf8", ::arrow::large_utf8(), LogicalType::String(),
ParquetType::BYTE_ARRAY, -1},
{"decimal(1, 0)", ::arrow::decimal(1, 0), LogicalType::Decimal(1, 0),
ParquetType::FIXED_LEN_BYTE_ARRAY, 1},
{"decimal(8, 2)", ::arrow::decimal(8, 2), LogicalType::Decimal(8, 2),
Expand Down Expand Up @@ -933,6 +941,160 @@ TEST_F(TestConvertArrowSchema, ParquetLists) {
ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
}

TEST_F(TestConvertArrowSchema, ParquetMaps) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;

// optional group my_map (MAP) {
// repeated group key_value {
// required binary key (UTF8);
// optional binary value (UTF8);
// }
// }
{
auto key = PrimitiveNode::Make("key", Repetition::REQUIRED, ParquetType::BYTE_ARRAY,
ConvertedType::UTF8);
auto value = PrimitiveNode::Make("value", Repetition::OPTIONAL,
ParquetType::BYTE_ARRAY, ConvertedType::UTF8);

auto list = GroupNode::Make("key_value", Repetition::REPEATED, {key, value});
parquet_fields.push_back(
GroupNode::Make("my_map", Repetition::OPTIONAL, {list}, ConvertedType::MAP));
auto arrow_key = ::arrow::field("string", UTF8, /*nullable=*/false);
auto arrow_value = ::arrow::field("other_string", UTF8, /*nullable=*/true);
auto arrow_map = ::arrow::map(arrow_key->type(), arrow_value, /*nullable=*/false);
arrow_fields.push_back(::arrow::field("my_map", arrow_map, /*nullable=*/true));
}

// required group my_map (MAP) {
// repeated group key_value {
// required binary key (UTF8);
// required binary value (UTF8);
// }
// }
{
auto key = PrimitiveNode::Make("key", Repetition::REQUIRED, ParquetType::BYTE_ARRAY,
ConvertedType::UTF8);
auto value = PrimitiveNode::Make("value", Repetition::REQUIRED,
ParquetType::BYTE_ARRAY, ConvertedType::UTF8);

auto list = GroupNode::Make("key_value", Repetition::REPEATED, {key, value});
parquet_fields.push_back(
GroupNode::Make("my_map", Repetition::REQUIRED, {list}, ConvertedType::MAP));
auto arrow_key = ::arrow::field("string", UTF8, /*nullable=*/false);
auto arrow_value = ::arrow::field("other_string", UTF8, /*nullable=*/false);
auto arrow_map = ::arrow::map(arrow_key->type(), arrow_value);
arrow_fields.push_back(::arrow::field("my_map", arrow_map, /*nullable=*/false));
ARROW_LOG(INFO) << arrow_fields.back()->ToString();
}

ASSERT_OK(ConvertSchema(arrow_fields));

ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
}

TEST_F(TestConvertArrowSchema, ParquetOtherLists) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;

// parquet_arrow will always generate 3-level LIST encodings

// // List<String> (list non-null, elements nullable)
// required group my_list (LIST) {
// repeated group list {
// optional binary element (UTF8);
// }
// }
{
auto element = PrimitiveNode::Make("string", Repetition::OPTIONAL,
ParquetType::BYTE_ARRAY, ConvertedType::UTF8);
auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::REQUIRED, {list}, ConvertedType::LIST));
auto arrow_element = ::arrow::field("string", UTF8, true);
auto arrow_list = ::arrow::large_list(arrow_element);
arrow_fields.push_back(::arrow::field("my_list", arrow_list, false));
}
{
auto element = PrimitiveNode::Make("string", Repetition::OPTIONAL,
ParquetType::BYTE_ARRAY, ConvertedType::UTF8);
auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::REQUIRED, {list}, ConvertedType::LIST));
auto arrow_element = ::arrow::field("string", UTF8, true);
auto arrow_list = ::arrow::fixed_size_list(arrow_element, 10);
arrow_fields.push_back(::arrow::field("my_list", arrow_list, false));
}

ASSERT_OK(ConvertSchema(arrow_fields));

ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
}

TEST_F(TestConvertArrowSchema, ParquetNestedComplianceEnabledNullable) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;

// parquet_arrow will always generate 3-level LIST encodings

// // List<String> (list non-null, elements nullable)
// required group my_list (LIST) {
// repeated group list {
// optional binary element (UTF8);
// }
// }
{
auto element = PrimitiveNode::Make("element", Repetition::OPTIONAL,
ParquetType::BYTE_ARRAY, ConvertedType::UTF8);
auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::REQUIRED, {list}, ConvertedType::LIST));
auto arrow_element = ::arrow::field("string", UTF8, true);
auto arrow_list = ::arrow::large_list(arrow_element);
arrow_fields.push_back(::arrow::field("my_list", arrow_list, false));
}

ArrowWriterProperties::Builder builder;
builder.enable_compliant_nested_types();
auto arrow_properties = builder.build();

ASSERT_OK(ConvertSchema(arrow_fields, arrow_properties));

ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
}

TEST_F(TestConvertArrowSchema, ParquetNestedComplianceEnabledNotNullable) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;

// parquet_arrow will always generate 3-level LIST encodings

// // List<String> (list non-null, elements nullable)
// optional group my_list (LIST) {
// repeated group list {
// optional binary element (UTF8);
// }
// }
{
auto element = PrimitiveNode::Make("element", Repetition::REQUIRED,
ParquetType::BYTE_ARRAY, ConvertedType::UTF8);
auto list = GroupNode::Make("list", Repetition::REPEATED, {element});
parquet_fields.push_back(
GroupNode::Make("my_list", Repetition::REQUIRED, {list}, ConvertedType::LIST));
auto arrow_element = ::arrow::field("string", UTF8, false);
auto arrow_list = ::arrow::large_list(arrow_element);
arrow_fields.push_back(::arrow::field("my_list", arrow_list, false));
}

ArrowWriterProperties::Builder builder;
builder.enable_compliant_nested_types();
auto arrow_properties = builder.build();

ASSERT_OK(ConvertSchema(arrow_fields, arrow_properties));

ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
}

TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
Expand Down
Loading

0 comments on commit f392dd1

Please sign in to comment.