Skip to content

Commit

Permalink
address all comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xiarixiaoyao committed Dec 13, 2021
1 parent 3d18373 commit 893fe09
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,8 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.HashMap;

import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
Expand All @@ -53,32 +46,13 @@ public class HoodieRowParquetWriteSupport extends ParquetWriteSupport {
public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, BloomFilter bloomFilter, HoodieWriteConfig writeConfig) {
super();
Configuration hadoopConf = new Configuration(conf);
hadoopConf.set("spark.sql.parquet.writeLegacyFormat", findSmallPrecisionDecimalType(structType) ? "true" : writeConfig.parquetWriteLegacyFormatEnabled());
hadoopConf.set("spark.sql.parquet.writeLegacyFormat", writeConfig.parquetWriteLegacyFormatEnabled());
hadoopConf.set("spark.sql.parquet.outputTimestampType", writeConfig.parquetOutputTimestampType());
this.hadoopConf = hadoopConf;
setSchema(structType, hadoopConf);
this.bloomFilter = bloomFilter;
}

// Now by default ParquetWriteSupport will write DecimalType to parquet as int32/int64 when the scale of decimalType < Decimal.MAX_LONG_DIGITS(),
// but AvroParquetReader which used by HoodieParquetReader cannot support read int32/int64 as DecimalType.
// try to find current sparkType whether contains that DecimalType.
private boolean findSmallPrecisionDecimalType(DataType sparkType) {
if (sparkType instanceof StructType) {
StructField[] fields = ((StructType) sparkType).fields();
return Arrays.stream(fields).anyMatch(f -> findSmallPrecisionDecimalType(f.dataType()));
} else if (sparkType instanceof MapType) {
MapType map = (MapType) sparkType;
return findSmallPrecisionDecimalType(map.keyType()) || findSmallPrecisionDecimalType(map.valueType());
} else if (sparkType instanceof ArrayType) {
return findSmallPrecisionDecimalType(((ArrayType) sparkType).elementType());
} else if (sparkType instanceof DecimalType) {
DecimalType decimalType = (DecimalType) sparkType;
return decimalType.scale() < Decimal.MAX_LONG_DIGITS();
}
return false;
}

public Configuration getHadoopConf() {
return hadoopConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@

package org.apache.hudi.util;

import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.ByteType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType$;
import org.apache.spark.sql.types.FloatType$;
import org.apache.spark.sql.types.IntegerType$;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.ShortType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
Expand Down Expand Up @@ -119,4 +123,26 @@ private static boolean areCompatible(@Nonnull StructType left, @Nonnull StructTy
private static <T> HashSet<T> newHashSet(T... ts) {
return new HashSet<>(Arrays.asList(ts));
}

/**
* Try to find current sparktype whether contains that DecimalType which's scale < Decimal.MAX_LONG_DIGITS().
*
* @param sparkType spark schema.
* @return found result.
*/
public static boolean foundSmallPrecisionDecimalType(DataType sparkType) {
if (sparkType instanceof StructType) {
StructField[] fields = ((StructType) sparkType).fields();
return Arrays.stream(fields).anyMatch(f -> foundSmallPrecisionDecimalType(f.dataType()));
} else if (sparkType instanceof MapType) {
MapType map = (MapType) sparkType;
return foundSmallPrecisionDecimalType(map.keyType()) || foundSmallPrecisionDecimalType(map.valueType());
} else if (sparkType instanceof ArrayType) {
return foundSmallPrecisionDecimalType(((ArrayType) sparkType).elementType());
} else if (sparkType instanceof DecimalType) {
DecimalType decimalType = (DecimalType) sparkType;
return decimalType.scale() < Decimal.MAX_LONG_DIGITS();
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -735,14 +735,17 @@ class TestCOWDataSource extends HoodieClientTestBase {
.mode(SaveMode.Overwrite)
.save(basePath)

val records2 = recordsToStrings(dataGen.generateUpdates("002", 5)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
.withColumn("shortDecimal", lit(new java.math.BigDecimal(s"2090.0000"))) // create decimalType(8, 4)
// update the value of shortDecimal
val inputDF2 = inputDF1.withColumn("shortDecimal", lit(new java.math.BigDecimal(s"3090.0000")))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(basePath)
assert(spark.read.format("hudi").load(basePath).count() == 5)
val readResult = spark.read.format("hudi").load(basePath)
assert(readResult.count() == 5)
// compare the test result
assertEquals(inputDF2.sort("_row_key").select("shortDecimal").collect().map(_.getDecimal(0).toPlainString).mkString(","),
readResult.sort("_row_key").select("shortDecimal").collect().map(_.getDecimal(0).toPlainString).mkString(","))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.hudi.config.HoodieInternalConfig;
import org.apache.hudi.config.HoodieWriteConfig;

import org.apache.hudi.util.DataTypeUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
Expand All @@ -33,6 +36,7 @@
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.types.StructType;

import java.util.Map;
import java.util.Optional;

/**
Expand All @@ -41,6 +45,8 @@
public class DefaultSource extends BaseDefaultSource implements DataSourceV2,
ReadSupport, WriteSupport, DataSourceRegister {

private static final Logger LOG = LogManager.getLogger(DefaultSource.class);

@Override
public String shortName() {
return "hudi_internal";
Expand All @@ -62,10 +68,19 @@ public Optional<DataSourceWriter> createWriter(String writeUUID, StructType sche
String instantTime = options.get(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY).get();
String path = options.get("path").get();
String tblName = options.get(HoodieWriteConfig.TBL_NAME.key()).get();
Map<String, String> parameters = options.asMap();
boolean populateMetaFields = options.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS.key(),
Boolean.parseBoolean(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()));
// Now by default ParquetWriteSupport will write DecimalType to parquet as int32/int64 when the scale of decimalType < Decimal.MAX_LONG_DIGITS(),
// but AvroParquetReader which used by HoodieParquetReader cannot support read int32/int64 as DecimalType.
// try to find current schema whether contains that DecimalType, and auto set the value of "hoodie.parquet.writeLegacyFormat.enabled"
if (DataTypeUtils.foundSmallPrecisionDecimalType(schema)
&& !options.getBoolean("hoodie.parquet.writeLegacyFormat.enabled", false)) {
parameters.put("hoodie.parquet.writeLegacyFormat.enabled", "true");
LOG.warn("Small Decimal Type found in current schema, auto set the value of hoodie.parquet.writeLegacyFormat.enabled to true");
}
// 1st arg to createHoodieConfig is not really required to be set. but passing it anyways.
HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(options.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()).get(), path, tblName, options.asMap());
HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(options.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()).get(), path, tblName, parameters);
boolean arePartitionRecordsSorted = HoodieInternalConfig.getBulkInsertIsPartitionRecordsSorted(
options.get(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED).isPresent()
? options.get(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED).get() : null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.apache.hudi.internal.BaseDefaultSource;
import org.apache.hudi.internal.DataSourceInternalWriterHelper;

import org.apache.hudi.util.DataTypeUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableProvider;
import org.apache.spark.sql.connector.expressions.Transform;
Expand All @@ -39,6 +42,8 @@
*/
public class DefaultSource extends BaseDefaultSource implements TableProvider {

private static final Logger LOG = LogManager.getLogger(DefaultSource.class);

@Override
public StructType inferSchema(CaseInsensitiveStringMap options) {
return StructType.fromDDL(options.get(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key()));
Expand All @@ -53,6 +58,15 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()));
boolean arePartitionRecordsSorted = Boolean.parseBoolean(properties.getOrDefault(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED,
Boolean.toString(HoodieInternalConfig.DEFAULT_BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED)));

// Now by default ParquetWriteSupport will write DecimalType to parquet as int32/int64 when the scale of decimalType < Decimal.MAX_LONG_DIGITS(),
// but AvroParquetReader which used by HoodieParquetReader cannot support read int32/int64 as DecimalType.
// try to find current schema whether contains that DecimalType, and auto set the value of "hoodie.parquet.writeLegacyFormat.enabled"
if (DataTypeUtils.foundSmallPrecisionDecimalType(schema)
&& !Boolean.parseBoolean(properties.getOrDefault("hoodie.parquet.writeLegacyFormat.enabled", "false"))) {
properties.put("hoodie.parquet.writeLegacyFormat.enabled", "true");
LOG.warn("Small Decimal Type found in current schema, auto set the value of hoodie.parquet.writeLegacyFormat.enabled to true");
}
// 1st arg to createHoodieConfig is not really required to be set. but passing it anyways.
HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(properties.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()), path, tblName, properties);
return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(),
Expand Down

0 comments on commit 893fe09

Please sign in to comment.