Skip to content

Commit

Permalink
[HUDI-2958] AutoMatically set spark.sql.parquet.writelegacyformat, wh…
Browse files Browse the repository at this point in the history
…en using bulkinsert to insert data which contains decimalType
  • Loading branch information
xiarixiaoyao committed Dec 8, 2021
1 parent c56d93e commit 34dd491
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,15 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.HashMap;

import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
Expand All @@ -46,13 +53,32 @@ public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, BloomFilter bloomFilter, HoodieWriteConfig writeConfig) {
super();
Configuration hadoopConf = new Configuration(conf);
hadoopConf.set("spark.sql.parquet.writeLegacyFormat", writeConfig.parquetWriteLegacyFormatEnabled());
hadoopConf.set("spark.sql.parquet.writeLegacyFormat", findSmallPrecisionDecimalType(structType) ? "true" : writeConfig.parquetWriteLegacyFormatEnabled());
hadoopConf.set("spark.sql.parquet.outputTimestampType", writeConfig.parquetOutputTimestampType());
this.hadoopConf = hadoopConf;
setSchema(structType, hadoopConf);
this.bloomFilter = bloomFilter;
}

// Now by default ParquetWriteSupport will write DecimalType to parquet as int32/int64 when the scale of decimalType < Decimal.MAX_LONG_DIGITS(),
// but AvroParquetReader which used by HoodieParquetReader cannot support read int32/int64 as DecimalType.
// try to find current sparkType whether contains that DecimalType.
private boolean findSmallPrecisionDecimalType(DataType sparkType) {
if (sparkType instanceof StructType) {
StructField[] fields = ((StructType) sparkType).fields();
return Arrays.stream(fields).anyMatch(f -> findSmallPrecisionDecimalType(f.dataType()));
} else if (sparkType instanceof MapType) {
MapType map = (MapType) sparkType;
return findSmallPrecisionDecimalType(map.keyType()) || findSmallPrecisionDecimalType(map.valueType());
} else if (sparkType instanceof ArrayType) {
return findSmallPrecisionDecimalType(((ArrayType) sparkType).elementType());
} else if (sparkType instanceof DecimalType) {
DecimalType decimalType = (DecimalType) sparkType;
return decimalType.scale() < Decimal.MAX_LONG_DIGITS();
}
return false;
}

public Configuration getHadoopConf() {
return hadoopConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,4 +723,26 @@ class TestCOWDataSource extends HoodieClientTestBase {
val result = spark.sql("select * from tmptable limit 1").collect()(0)
result.schema.contains(new StructField("partition", StringType, true))
}

@Test
def testWriteSmallPrecisionDecimalTable(): Unit = {
val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
.withColumn("shortDecimal", lit(new java.math.BigDecimal(s"2090.0000"))) // create decimalType(8, 4)
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

val records2 = recordsToStrings(dataGen.generateUpdates("002", 5)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
.withColumn("shortDecimal", lit(new java.math.BigDecimal(s"2090.0000"))) // create decimalType(8, 4)
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
assert(spark.read.format("hudi").load(basePath).count() == 5)
}
}

0 comments on commit 34dd491

Please sign in to comment.