diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java index c13d21ec201a..8352ada1126e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/TwoToThreeUpgradeHandler.java @@ -20,6 +20,7 @@ package org.apache.hudi.table.upgrade; import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.Option; @@ -34,6 +35,8 @@ * UpgradeHandler to assist in upgrading {@link org.apache.hudi.table.HoodieTable} from version 2 to 3. */ public class TwoToThreeUpgradeHandler implements UpgradeHandler { + public static final String SPARK_SIMPLE_KEY_GENERATOR = "org.apache.hudi.keygen.SimpleKeyGenerator"; + @Override public Map upgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) { if (config.isMetadataTableEnabled()) { @@ -47,8 +50,13 @@ public Map upgrade(HoodieWriteConfig config, HoodieEngin tablePropsToAdd.put(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE, config.getStringOrDefault(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)); String keyGenClassName = Option.ofNullable(config.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)) .orElse(config.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME)); + if (keyGenClassName == null && config.getEngineType() == EngineType.SPARK) { + // For Spark, if the key generator class is not configured by user, + // set it to SimpleKeyGenerator as default + keyGenClassName = SPARK_SIMPLE_KEY_GENERATOR; + } ValidationUtils.checkState(keyGenClassName != null, String.format("Missing config: %s or %s", - HoodieTableConfig.KEY_GENERATOR_CLASS_NAME, HoodieWriteConfig.KEYGENERATOR_CLASS_NAME)); + HoodieTableConfig.KEY_GENERATOR_CLASS_NAME, HoodieWriteConfig.KEYGENERATOR_CLASS_NAME)); tablePropsToAdd.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME, keyGenClassName); return tablePropsToAdd; } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestTwoToThreeUpgradeHandler.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestTwoToThreeUpgradeHandler.java index 35928dc7cf31..d6339a9782e1 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestTwoToThreeUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestTwoToThreeUpgradeHandler.java @@ -21,13 +21,14 @@ import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.keygen.KeyGenerator; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; import java.util.Map; @@ -58,11 +59,24 @@ void upgradeHandlerShouldRetrieveKeyGeneratorConfig(String keyGenConfigKey) { assertEquals(KeyGenerator.class.getName(), kv.get(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)); } - @Test - void upgradeHandlerShouldThrowWhenKeyGeneratorNotSet() { + @ParameterizedTest + @EnumSource(EngineType.class) + void upgradeHandlerWhenKeyGeneratorNotSet(EngineType engineType) { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() + .withEngineType(engineType) + .forTable("foo") + .withPath("/foo") + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()) + .build(); TwoToThreeUpgradeHandler handler = new TwoToThreeUpgradeHandler(); - Throwable t = assertThrows(IllegalStateException.class, () -> handler - .upgrade(config, null, null, null)); - assertTrue(t.getMessage().startsWith("Missing config:")); + if (engineType == EngineType.SPARK) { + Map kv = handler.upgrade(config, null, null, null); + assertEquals(TwoToThreeUpgradeHandler.SPARK_SIMPLE_KEY_GENERATOR, + kv.get(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)); + } else { + Throwable t = assertThrows(IllegalStateException.class, () -> handler + .upgrade(writeConfig, null, null, null)); + assertTrue(t.getMessage().startsWith("Missing config:")); + } } } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java index a33af7ccd021..7751833fc04b 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java @@ -19,8 +19,8 @@ package org.apache.hudi.client.clustering.run.strategy; -import org.apache.avro.Schema; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecord; @@ -30,6 +30,8 @@ import org.apache.hudi.io.CreateHandleFactory; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.JavaBulkInsertHelper; + +import org.apache.avro.Schema; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -63,7 +65,8 @@ public List performClusteringWithRecordList( // We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files. props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), Boolean.FALSE.toString()); props.put(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes())); - HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); + HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder() + .withEngineType(EngineType.JAVA).withProps(props).build(); return (List) JavaBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig, false, getPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(preserveHoodieMetadata)); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java index 403b67e554d7..79f20b9f85c7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -38,6 +38,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.keygen.TimestampBasedKeyGenerator; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.marker.WriteMarkers; @@ -117,6 +119,16 @@ public static Stream downGradeConfigParams() { return Stream.of(data).map(Arguments::of); } + public static Stream twoToThreeUpgradeConfigParams() { + Object[][] data = new Object[][] { + {HoodieTableType.COPY_ON_WRITE, Option.empty()}, + {HoodieTableType.COPY_ON_WRITE, Option.of(TimestampBasedKeyGenerator.class.getName())}, + {HoodieTableType.MERGE_ON_READ, Option.empty()}, + {HoodieTableType.MERGE_ON_READ, Option.of(TimestampBasedKeyGenerator.class.getName())} + }; + return Stream.of(data).map(Arguments::of); + } + @BeforeEach public void setUp() throws Exception { initSparkContexts(); @@ -232,6 +244,51 @@ public void testUpgradeOneToTwo(HoodieTableType tableType) throws IOException { assertTableProps(cfg); } + @ParameterizedTest + @MethodSource("twoToThreeUpgradeConfigParams") + public void testUpgradeTwoToThree( + HoodieTableType tableType, Option keyGeneratorClass) throws IOException { + // init config, table and client. + Map params = new HashMap<>(); + addNewTableParamsToProps(params); + if (tableType == HoodieTableType.MERGE_ON_READ) { + params.put(TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); + metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ); + } + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder() + .withAutoCommit(false).withRollbackUsingMarkers(false).withProps(params); + if (keyGeneratorClass.isPresent()) { + cfgBuilder.withKeyGenerator(keyGeneratorClass.get()); + } + HoodieWriteConfig cfg = cfgBuilder.build(); + SparkRDDWriteClient client = getHoodieWriteClient(cfg); + // Write inserts + doInsert(client); + + // downgrade table props + downgradeTableConfigsFromThreeToTwo(cfg); + + // perform upgrade + new UpgradeDowngrade(metaClient, cfg, context, SparkUpgradeDowngradeHelper.getInstance()) + .run(HoodieTableVersion.THREE, null); + + // verify hoodie.table.version got upgraded + metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(cfg.getBasePath()) + .setLayoutVersion(Option.of(new TimelineLayoutVersion(cfg.getTimelineLayoutVersion()))).build(); + assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode()); + assertTableVersionFromPropertyFile(HoodieTableVersion.THREE); + + // verify table props + HoodieTableConfig tableConfig = metaClient.getTableConfig(); + Properties originalProps = cfg.getProps(); + assertEquals(tableConfig.getUrlEncodePartitioning(), + cfg.getStringOrDefault(HoodieTableConfig.URL_ENCODE_PARTITIONING)); + assertEquals(tableConfig.getHiveStylePartitioningEnable(), + cfg.getStringOrDefault(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE)); + assertEquals(tableConfig.getKeyGeneratorClassName(), originalProps.getOrDefault( + HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName())); + } + @Test public void testUpgradeDowngradeBetweenThreeAndCurrentVersion() throws IOException { // init config, table and client. @@ -298,6 +355,19 @@ private void downgradeTableConfigsFromTwoToOne(HoodieWriteConfig cfg) throws IOE metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE); } + private void downgradeTableConfigsFromThreeToTwo(HoodieWriteConfig cfg) throws IOException { + Properties properties = new Properties(cfg.getProps()); + properties.remove(HoodieTableConfig.URL_ENCODE_PARTITIONING.key()); + properties.remove(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key()); + properties.remove(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key()); + properties.remove(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key()); + properties.setProperty(HoodieTableConfig.VERSION.key(), "2"); + + metaClient = HoodieTestUtils.init(hadoopConf, basePath, getTableType(), properties); + // set hoodie.table.version to 2 in hoodie.properties file + metaClient.getTableConfig().setTableVersion(HoodieTableVersion.TWO); + } + private void assertTableProps(HoodieWriteConfig cfg) { HoodieTableConfig tableConfig = metaClient.getTableConfig(); Properties originalProps = cfg.getProps();