Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](parquet)Fix the be core issue when reading parquet unsigned types. #39926

Merged
merged 4 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions be/src/vec/exec/format/parquet/parquet_column_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ ColumnPtr PhysicalToLogicalConverter::get_physical_column(tparquet::Type::type s
src_physical_type = tparquet::Type::INT32;
src_logical_type = TypeDescriptor(PrimitiveType::TYPE_INT);
}
if (is_consistent() && _logical_converter->is_consistent()) {

if (!_convert_params->is_type_compatibility && is_consistent() &&
_logical_converter->is_consistent()) {
if (_cached_src_physical_type == nullptr) {
_cached_src_physical_type = DataTypeFactory::instance().create_data_type(
src_logical_type, dst_logical_type->is_nullable());
Expand Down Expand Up @@ -246,7 +248,19 @@ std::unique_ptr<PhysicalToLogicalConverter> PhysicalToLogicalConverter::get_conv
}
PrimitiveType src_logical_primitive = src_logical_type.type;

if (is_parquet_native_type(src_logical_primitive)) {
if (field_schema->is_type_compatibility) {
if (src_logical_type == TYPE_SMALLINT) {
physical_converter.reset(new UnsignedIntegerConverter<TYPE_SMALLINT>());
} else if (src_logical_type == TYPE_INT) {
physical_converter.reset(new UnsignedIntegerConverter<TYPE_INT>());
} else if (src_logical_type == TYPE_BIGINT) {
physical_converter.reset(new UnsignedIntegerConverter<TYPE_BIGINT>());
} else if (src_logical_type == TYPE_LARGEINT) {
physical_converter.reset(new UnsignedIntegerConverter<TYPE_LARGEINT>());
} else {
physical_converter.reset(new UnsupportedConverter(src_physical_type, src_logical_type));
}
} else if (is_parquet_native_type(src_logical_primitive)) {
if (is_string_type(src_logical_primitive) &&
src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
// for FixedSizeBinary
Expand Down
65 changes: 65 additions & 0 deletions be/src/vec/exec/format/parquet/parquet_column_convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ struct ConvertParams {
DecimalScaleParams decimal_scale;
FieldSchema* field_schema = nullptr;

//For UInt8 -> Int16,UInt16 -> Int32,UInt32 -> Int64,UInt64 -> Int128.
bool is_type_compatibility = false;

/**
* Some frameworks like paimon maybe writes non-standard parquet files. Timestamp field doesn't have
* logicalType or converted_type to indicates its precision. We have to reset the time mask.
Expand Down Expand Up @@ -108,6 +111,7 @@ struct ConvertParams {
t.from_unixtime(0, *ctz);
offset_days = t.day() == 31 ? -1 : 0;
}
is_type_compatibility = field_schema_->is_type_compatibility;
}

template <typename DecimalPrimitiveType>
Expand Down Expand Up @@ -273,6 +277,67 @@ class LittleIntPhysicalConverter : public PhysicalToLogicalConverter {
}
};

template <PrimitiveType type>
struct UnsignedTypeTraits;

template <>
struct UnsignedTypeTraits<TYPE_SMALLINT> {
using UnsignedCppType = UInt8;
//https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unsigned-integers
//INT(8, false), INT(16, false), and INT(32, false) must annotate an int32 primitive type and INT(64, false)
//must annotate an int64 primitive type.
using StorageCppType = Int32;
using StorageColumnType = vectorized::ColumnInt32;
};

template <>
struct UnsignedTypeTraits<TYPE_INT> {
using UnsignedCppType = UInt16;
using StorageCppType = Int32;
using StorageColumnType = vectorized::ColumnInt32;
};

template <>
struct UnsignedTypeTraits<TYPE_BIGINT> {
using UnsignedCppType = UInt32;
using StorageCppType = Int32;
using StorageColumnType = vectorized::ColumnInt32;
};

template <>
struct UnsignedTypeTraits<TYPE_LARGEINT> {
using UnsignedCppType = UInt64;
using StorageCppType = Int64;
using StorageColumnType = vectorized::ColumnInt64;
};

template <PrimitiveType IntPrimitiveType>
class UnsignedIntegerConverter : public PhysicalToLogicalConverter {
Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override {
using UnsignedCppType = typename UnsignedTypeTraits<IntPrimitiveType>::UnsignedCppType;
using StorageCppType = typename UnsignedTypeTraits<IntPrimitiveType>::StorageCppType;
using StorageColumnType = typename UnsignedTypeTraits<IntPrimitiveType>::StorageColumnType;
using DstColumnType = typename PrimitiveTypeTraits<IntPrimitiveType>::ColumnType;

ColumnPtr from_col = remove_nullable(src_physical_col);
MutableColumnPtr to_col = remove_nullable(src_logical_column)->assume_mutable();
auto& src_data = static_cast<const StorageColumnType*>(from_col.get())->get_data();

size_t rows = src_data.size();
size_t start_idx = to_col->size();
to_col->resize(start_idx + rows);
auto& data = static_cast<DstColumnType&>(*to_col.get()).get_data();

for (int i = 0; i < rows; i++) {
StorageCppType src_value = src_data[i];
auto unsigned_value = static_cast<UnsignedCppType>(src_value);
data[start_idx + i] = unsigned_value;
}

return Status::OK();
}
};

class FixedSizeBinaryConverter : public PhysicalToLogicalConverter {
private:
int _type_length;
Expand Down
58 changes: 40 additions & 18 deletions be/src/vec/exec/format/parquet/schema_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,19 @@ void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physic
physical_field->physical_type = physical_schema.type;
_physical_fields.push_back(physical_field);
physical_field->physical_column_index = _physical_fields.size() - 1;
physical_field->type = get_doris_type(physical_schema);
auto type = get_doris_type(physical_schema);
physical_field->type = type.first;
physical_field->is_type_compatibility = type.second;
}

TypeDescriptor FieldDescriptor::get_doris_type(const tparquet::SchemaElement& physical_schema) {
TypeDescriptor type;
type.type = INVALID_TYPE;
std::pair<TypeDescriptor, bool> FieldDescriptor::get_doris_type(
const tparquet::SchemaElement& physical_schema) {
std::pair<TypeDescriptor, bool> ans = {INVALID_TYPE, false};
TypeDescriptor& type = ans.first;
if (physical_schema.__isset.logicalType) {
type = convert_to_doris_type(physical_schema.logicalType);
ans = convert_to_doris_type(physical_schema.logicalType);
} else if (physical_schema.__isset.converted_type) {
type = convert_to_doris_type(physical_schema);
ans = convert_to_doris_type(physical_schema);
}
// use physical type instead
if (type.type == INVALID_TYPE) {
Expand Down Expand Up @@ -233,7 +236,7 @@ TypeDescriptor FieldDescriptor::get_doris_type(const tparquet::SchemaElement& ph
break;
}
}
return type;
return ans;
}

// Copy from org.apache.iceberg.avro.AvroSchemaUtil#validAvroName
Expand Down Expand Up @@ -302,8 +305,11 @@ void FieldDescriptor::iceberg_sanitize(const std::vector<std::string>& read_colu
}
}

TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logicalType) {
TypeDescriptor type;
std::pair<TypeDescriptor, bool> FieldDescriptor::convert_to_doris_type(
tparquet::LogicalType logicalType) {
std::pair<TypeDescriptor, bool> ans = {INVALID_TYPE, false};
TypeDescriptor& type = ans.first;
bool& is_type_compatibility = ans.second;
if (logicalType.__isset.STRING) {
type = TypeDescriptor(TYPE_STRING);
} else if (logicalType.__isset.DECIMAL) {
Expand All @@ -313,16 +319,25 @@ TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logi
type = TypeDescriptor(TYPE_DATEV2);
} else if (logicalType.__isset.INTEGER) {
if (logicalType.INTEGER.isSigned) {
if (logicalType.INTEGER.bitWidth <= 32) {
if (logicalType.INTEGER.bitWidth <= 8) {
type = TypeDescriptor(TYPE_TINYINT);
} else if (logicalType.INTEGER.bitWidth <= 16) {
type = TypeDescriptor(TYPE_SMALLINT);
} else if (logicalType.INTEGER.bitWidth <= 32) {
type = TypeDescriptor(TYPE_INT);
} else {
type = TypeDescriptor(TYPE_BIGINT);
}
} else {
if (logicalType.INTEGER.bitWidth <= 16) {
is_type_compatibility = true;
if (logicalType.INTEGER.bitWidth <= 8) {
type = TypeDescriptor(TYPE_SMALLINT);
} else if (logicalType.INTEGER.bitWidth <= 16) {
type = TypeDescriptor(TYPE_INT);
} else {
} else if (logicalType.INTEGER.bitWidth <= 32) {
type = TypeDescriptor(TYPE_BIGINT);
} else {
type = TypeDescriptor(TYPE_LARGEINT);
}
}
} else if (logicalType.__isset.TIME) {
Expand All @@ -344,12 +359,14 @@ TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logi
} else {
type = TypeDescriptor(INVALID_TYPE);
}
return type;
return ans;
}

TypeDescriptor FieldDescriptor::convert_to_doris_type(
std::pair<TypeDescriptor, bool> FieldDescriptor::convert_to_doris_type(
const tparquet::SchemaElement& physical_schema) {
TypeDescriptor type;
std::pair<TypeDescriptor, bool> ans = {INVALID_TYPE, false};
TypeDescriptor& type = ans.first;
bool& is_type_compatibility = ans.second;
switch (physical_schema.converted_type) {
case tparquet::ConvertedType::type::UTF8:
type = TypeDescriptor(TYPE_STRING);
Expand Down Expand Up @@ -378,28 +395,33 @@ TypeDescriptor FieldDescriptor::convert_to_doris_type(
type = TypeDescriptor(TYPE_TINYINT);
break;
case tparquet::ConvertedType::type::UINT_8:
is_type_compatibility = true;
[[fallthrough]];
case tparquet::ConvertedType::type::INT_16:
type = TypeDescriptor(TYPE_SMALLINT);
break;
case tparquet::ConvertedType::type::UINT_16:
is_type_compatibility = true;
[[fallthrough]];
case tparquet::ConvertedType::type::INT_32:
type = TypeDescriptor(TYPE_INT);
break;
case tparquet::ConvertedType::type::UINT_32:
[[fallthrough]];
case tparquet::ConvertedType::type::UINT_64:
is_type_compatibility = true;
[[fallthrough]];
case tparquet::ConvertedType::type::INT_64:
type = TypeDescriptor(TYPE_BIGINT);
break;
case tparquet::ConvertedType::type::UINT_64:
is_type_compatibility = true;
type = TypeDescriptor(TYPE_LARGEINT);
break;
default:
LOG(WARNING) << "Not supported parquet ConvertedType: " << physical_schema.converted_type;
type = TypeDescriptor(INVALID_TYPE);
break;
}
return type;
return ans;
}

Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElement>& t_schemas,
Expand Down
10 changes: 7 additions & 3 deletions be/src/vec/exec/format/parquet/schema_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ struct FieldSchema {
int16_t repeated_parent_def_level = 0;
std::vector<FieldSchema> children;

//For UInt8 -> Int16,UInt16 -> Int32,UInt32 -> Int64,UInt64 -> Int128.
bool is_type_compatibility = false;

FieldSchema() = default;
~FieldSchema() = default;
FieldSchema(const FieldSchema& fieldSchema) = default;
Expand Down Expand Up @@ -84,12 +87,13 @@ class FieldDescriptor {
Status parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas, size_t curr_pos,
FieldSchema* node_field);

TypeDescriptor convert_to_doris_type(tparquet::LogicalType logicalType);
std::pair<TypeDescriptor, bool> convert_to_doris_type(tparquet::LogicalType logicalType);

TypeDescriptor convert_to_doris_type(const tparquet::SchemaElement& physical_schema);
std::pair<TypeDescriptor, bool> convert_to_doris_type(
const tparquet::SchemaElement& physical_schema);

public:
TypeDescriptor get_doris_type(const tparquet::SchemaElement& physical_schema);
std::pair<TypeDescriptor, bool> get_doris_type(const tparquet::SchemaElement& physical_schema);

// org.apache.iceberg.avro.AvroSchemaUtil#sanitize will encode special characters,
// we have to decode these characters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ id int Yes false \N NONE
9 1 string 27 false 5 true 1

-- !desc_s3 --
__add_5 int Yes false \N NONE
__bit_or_7 int Yes false \N NONE
__add_5 smallint Yes false \N NONE
__bit_or_7 tinyint Yes false \N NONE
__cast_3 bigint Yes false \N NONE
__greater_than_4 boolean Yes false \N NONE
__in_predicate_6 boolean Yes false \N NONE
__literal_1 int Yes false \N NONE
__literal_1 tinyint Yes false \N NONE
__literal_2 text Yes false \N NONE
id int Yes false \N NONE

Expand Down
Loading
Loading