Skip to content

Commit

Permalink
Merge pull request #9770 from Mytherin/parquetbigdecimal
Browse files Browse the repository at this point in the history
Support reading large decimals into doubles in the Parquet reader
  • Loading branch information
Mytherin committed Nov 23, 2023
2 parents 95fb6b3 + ee5e037 commit cafbcfb
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 13 deletions.
Binary file added data/parquet-testing/bigdecimal.parquet
Binary file not shown.
42 changes: 38 additions & 4 deletions extension/parquet/column_reader.cpp
Expand Up @@ -1137,8 +1137,8 @@ struct DecimalParquetValueConversion {
byte_len = plain_data.read<uint32_t>();
}
plain_data.available(byte_len);
auto res =
ParquetDecimalUtils::ReadDecimalValue<DUCKDB_PHYSICAL_TYPE>(const_data_ptr_cast(plain_data.ptr), byte_len);
auto res = ParquetDecimalUtils::ReadDecimalValue<DUCKDB_PHYSICAL_TYPE>(const_data_ptr_cast(plain_data.ptr),
byte_len, reader.Schema());

plain_data.inc(byte_len);
return res;
Expand Down Expand Up @@ -1192,11 +1192,39 @@ static unique_ptr<ColumnReader> CreateDecimalReaderInternal(ParquetReader &reade
case PhysicalType::INT128:
return make_uniq<DecimalColumnReader<hugeint_t, FIXED_LENGTH>>(reader, type_p, schema_p, file_idx_p, max_define,
max_repeat);
case PhysicalType::DOUBLE:
return make_uniq<DecimalColumnReader<double, FIXED_LENGTH>>(reader, type_p, schema_p, file_idx_p, max_define,
max_repeat);
default:
throw InternalException("Unrecognized type for Decimal");
}
}

template <>
double ParquetDecimalUtils::ReadDecimalValue(const_data_ptr_t pointer, idx_t size,
const duckdb_parquet::format::SchemaElement &schema_ele) {
double res = 0;
bool positive = (*pointer & 0x80) == 0;
for (idx_t i = 0; i < size; i += 8) {
auto byte_size = MinValue<idx_t>(sizeof(uint64_t), size - i);
uint64_t input = 0;
auto res_ptr = reinterpret_cast<uint8_t *>(&input);
for (idx_t k = 0; k < byte_size; k++) {
auto byte = pointer[i + k];
res_ptr[sizeof(uint64_t) - k - 1] = positive ? byte : byte ^ 0xFF;
}
res *= double(NumericLimits<uint64_t>::Maximum()) + 1;
res += input;
}
if (!positive) {
res += 1;
res /= pow(10, schema_ele.scale);
return -res;
}
res /= pow(10, schema_ele.scale);
return res;
}

unique_ptr<ColumnReader> ParquetDecimalUtils::CreateReader(ParquetReader &reader, const LogicalType &type_p,
const SchemaElement &schema_p, idx_t file_idx_p,
idx_t max_define, idx_t max_repeat) {
Expand Down Expand Up @@ -1372,8 +1400,14 @@ unique_ptr<ColumnReader> ColumnReader::CreateReader(ParquetReader &reader, const
return make_uniq<TemplatedColumnReader<float, TemplatedParquetValueConversion<float>>>(
reader, type_p, schema_p, file_idx_p, max_define, max_repeat);
case LogicalTypeId::DOUBLE:
return make_uniq<TemplatedColumnReader<double, TemplatedParquetValueConversion<double>>>(
reader, type_p, schema_p, file_idx_p, max_define, max_repeat);
switch (schema_p.type) {
case Type::BYTE_ARRAY:
case Type::FIXED_LEN_BYTE_ARRAY:
return ParquetDecimalUtils::CreateReader(reader, type_p, schema_p, file_idx_p, max_define, max_repeat);
default:
return make_uniq<TemplatedColumnReader<double, TemplatedParquetValueConversion<double>>>(
reader, type_p, schema_p, file_idx_p, max_define, max_repeat);
}
case LogicalTypeId::TIMESTAMP:
case LogicalTypeId::TIMESTAMP_TZ:
switch (schema_p.type) {
Expand Down
7 changes: 6 additions & 1 deletion extension/parquet/include/parquet_decimal_utils.hpp
Expand Up @@ -16,7 +16,8 @@ namespace duckdb {
class ParquetDecimalUtils {
public:
template <class PHYSICAL_TYPE>
static PHYSICAL_TYPE ReadDecimalValue(const_data_ptr_t pointer, idx_t size) {
static PHYSICAL_TYPE ReadDecimalValue(const_data_ptr_t pointer, idx_t size,
const duckdb_parquet::format::SchemaElement &schema_ele) {
D_ASSERT(size <= sizeof(PHYSICAL_TYPE));
PHYSICAL_TYPE res = 0;

Expand All @@ -40,4 +41,8 @@ class ParquetDecimalUtils {
idx_t max_repeat);
};

template <>
double ParquetDecimalUtils::ReadDecimalValue(const_data_ptr_t pointer, idx_t size,
const duckdb_parquet::format::SchemaElement &schema_ele);

} // namespace duckdb
3 changes: 3 additions & 0 deletions extension/parquet/parquet_reader.cpp
Expand Up @@ -203,6 +203,9 @@ LogicalType ParquetReader::DeriveLogicalType(const SchemaElement &s_ele, bool bi
if (!s_ele.__isset.precision || !s_ele.__isset.scale) {
throw IOException("DECIMAL requires a length and scale specifier!");
}
if (s_ele.precision > DecimalType::MaxWidth()) {
return LogicalType::DOUBLE;
}
switch (s_ele.type) {
case Type::BYTE_ARRAY:
case Type::FIXED_LEN_BYTE_ARRAY:
Expand Down
25 changes: 17 additions & 8 deletions extension/parquet/parquet_statistics.cpp
Expand Up @@ -92,6 +92,14 @@ Value ParquetStatisticsUtils::ConvertValue(const LogicalType &type,
return Value::FLOAT(val);
}
case LogicalTypeId::DOUBLE: {
switch (schema_ele.type) {
case Type::FIXED_LEN_BYTE_ARRAY:
case Type::BYTE_ARRAY:
// decimals cast to double
return Value::DOUBLE(ParquetDecimalUtils::ReadDecimalValue<double>(stats_data, stats.size(), schema_ele));
default:
break;
}
if (stats.size() != sizeof(double)) {
throw InternalException("Incorrect stats size for type DOUBLE");
}
Expand Down Expand Up @@ -124,17 +132,18 @@ Value ParquetStatisticsUtils::ConvertValue(const LogicalType &type,
}
switch (type.InternalType()) {
case PhysicalType::INT16:
return Value::DECIMAL(ParquetDecimalUtils::ReadDecimalValue<int16_t>(stats_data, stats.size()), width,
scale);
return Value::DECIMAL(
ParquetDecimalUtils::ReadDecimalValue<int16_t>(stats_data, stats.size(), schema_ele), width, scale);
case PhysicalType::INT32:
return Value::DECIMAL(ParquetDecimalUtils::ReadDecimalValue<int32_t>(stats_data, stats.size()), width,
scale);
return Value::DECIMAL(
ParquetDecimalUtils::ReadDecimalValue<int32_t>(stats_data, stats.size(), schema_ele), width, scale);
case PhysicalType::INT64:
return Value::DECIMAL(ParquetDecimalUtils::ReadDecimalValue<int64_t>(stats_data, stats.size()), width,
scale);
return Value::DECIMAL(
ParquetDecimalUtils::ReadDecimalValue<int64_t>(stats_data, stats.size(), schema_ele), width, scale);
case PhysicalType::INT128:
return Value::DECIMAL(ParquetDecimalUtils::ReadDecimalValue<hugeint_t>(stats_data, stats.size()), width,
scale);
return Value::DECIMAL(
ParquetDecimalUtils::ReadDecimalValue<hugeint_t>(stats_data, stats.size(), schema_ele), width,
scale);
default:
throw InternalException("Unsupported internal type for decimal");
}
Expand Down
18 changes: 18 additions & 0 deletions test/sql/copy/parquet/bigdecimal.test
@@ -0,0 +1,18 @@
# name: test/sql/copy/parquet/bigdecimal.test
# description: Read a file created by Google BigQuery with a BIGDECIMAL column (i.e. DECIMAL(77,38))
# group: [parquet]

require parquet

statement ok
PRAGMA enable_verification

query I
FROM 'data/parquet-testing/bigdecimal.parquet'
----
0.5
-0.5
1.2345678912345679e+26
-1.2345678912345679e+26
5.7896044618658096e+38
-5.7896044618658096e+38

0 comments on commit cafbcfb

Please sign in to comment.