Skip to content

Commit

Permalink
[HUDI-3096] fixed the bug that the cow table(contains decimalType) wr…
Browse files Browse the repository at this point in the history
…ite by flink cannot be read by spark. (#4421)
  • Loading branch information
xiarixiaoyao committed Apr 7, 2022
1 parent e33149b commit 531381f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,13 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
return nullable ? nullableSchema(time) : time;
case DECIMAL:
DecimalType decimalType = (DecimalType) logicalType;
// store BigDecimal as byte[]
// store BigDecimal as Fixed
// for spark compatibility.
Schema decimal =
LogicalTypes.decimal(decimalType.getPrecision(), decimalType.getScale())
.addToSchema(SchemaBuilder.builder().bytesType());
.addToSchema(SchemaBuilder
.fixed(String.format("%s.fixed", rowName))
.size(computeMinBytesForDecimlPrecision(decimalType.getPrecision())));
return nullable ? nullableSchema(decimal) : decimal;
case ROW:
RowType rowType = (RowType) logicalType;
Expand Down Expand Up @@ -324,5 +327,13 @@ private static Schema nullableSchema(Schema schema) {
? schema
: Schema.createUnion(SchemaBuilder.builder().nullType(), schema);
}

private static int computeMinBytesForDecimlPrecision(int precision) {
int numBytes = 1;
while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) {
numBytes += 1;
}
return numBytes;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.util;

import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -34,6 +35,7 @@
import org.apache.flink.table.types.logical.TimestampType;

import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
Expand All @@ -50,6 +52,8 @@
@Internal
public class RowDataToAvroConverters {

private static Conversions.DecimalConversion decimalConversion = new Conversions.DecimalConversion();

// --------------------------------------------------------------------------------
// Runtime Converters
// --------------------------------------------------------------------------------
Expand Down Expand Up @@ -186,7 +190,8 @@ public Object convert(Schema schema, Object object) {

@Override
public Object convert(Schema schema, Object object) {
return ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
BigDecimal javaDecimal = ((DecimalData) object).toBigDecimal();
return decimalConversion.toFixed(javaDecimal, schema, schema.getLogicalType());
}
};
break;
Expand Down

0 comments on commit 531381f

Please sign in to comment.