diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index e94c38bd16af8..f60f6f2d46c15 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -515,8 +515,10 @@ protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String basePa .setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))) .setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build(); + HoodieWriteConfig updatedConfig = HoodieWriteConfig.newBuilder().withProps(config.getProps()) + .forTable(metaClient.getTableConfig().getTableName()).build(); try { - new UpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance()) + new UpgradeDowngrade(metaClient, updatedConfig, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance()) .run(HoodieTableVersion.valueOf(toVersion), null); LOG.info(String.format("Table at \"%s\" upgraded / downgraded to version \"%s\".", basePath, toVersion)); return 0; diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java index b3650fa027626..c9bfacd9aa57a 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestUpgradeDowngradeCommand.java @@ -36,11 +36,16 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; +import java.util.Arrays; +import java.util.stream.Stream; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS; @@ -83,10 +88,32 @@ public void init() throws Exception { .withMarkerFile(DEFAULT_THIRD_PARTITION_PATH, "file-3", IOType.MERGE); } - @Test - public void testDowngradeCommand() throws Exception { - // update hoodie.table.version to 1 - metaClient.getTableConfig().setTableVersion(HoodieTableVersion.ONE); + @AfterEach + public void cleanup() { + if (timelineService != null) { + timelineService.close(); + } + } + + private static Stream testArgsForUpgradeDowngradeCommand() { + return Arrays.stream(new HoodieTableVersion[][] { + {HoodieTableVersion.FIVE, HoodieTableVersion.ZERO}, + {HoodieTableVersion.ZERO, HoodieTableVersion.ONE}, + // Table upgrade from version ONE to TWO requires key generator related configs + // such as "hoodie.datasource.write.recordkey.field" which is only available + // when user configures the write job. So the table upgrade from version ONE to TWO + // through CLI is not supported, and user should rely on the automatic upgrade + // in the write client instead. + // {HoodieTableVersion.ONE, HoodieTableVersion.TWO}, + {HoodieTableVersion.TWO, HoodieTableVersion.FIVE} + }).map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("testArgsForUpgradeDowngradeCommand") + public void testUpgradeDowngradeCommand(HoodieTableVersion fromVersion, HoodieTableVersion toVersion) throws Exception { + // Start with hoodie.table.version to 5 + metaClient.getTableConfig().setTableVersion(HoodieTableVersion.FIVE); try (FSDataOutputStream os = metaClient.getFs().create(new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE), true)) { metaClient.getTableConfig().getProps().store(os, ""); } @@ -97,28 +124,35 @@ public void testDowngradeCommand() throws Exception { assertEquals(1, FileCreateUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101", IOType.MERGE)); } - SparkMain.upgradeOrDowngradeTable(jsc(), tablePath, HoodieTableVersion.ZERO.name()); - - // verify hoodie.table.version got downgraded - metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + if (fromVersion != HoodieTableVersion.FIVE) { + SparkMain.upgradeOrDowngradeTable(jsc(), tablePath, fromVersion.name()); + } + verifyTableVersion(fromVersion); - // verify hoodie.table.version - assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.ZERO.versionCode()); - assertTableVersionFromPropertyFile(); + SparkMain.upgradeOrDowngradeTable(jsc(), tablePath, toVersion.name()); + verifyTableVersion(toVersion); - // verify marker files are non existent - for (String partitionPath : DEFAULT_PARTITION_PATHS) { - assertEquals(0, FileCreateUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101", IOType.MERGE)); + if (toVersion == HoodieTableVersion.ZERO) { + // verify marker files are non existent + for (String partitionPath : DEFAULT_PARTITION_PATHS) { + assertEquals(0, FileCreateUtils.getTotalMarkerFileCount(tablePath, partitionPath, "101", IOType.MERGE)); + } } } - private void assertTableVersionFromPropertyFile() throws IOException { + private void verifyTableVersion(HoodieTableVersion expectedVersion) throws IOException { + metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); + assertEquals(expectedVersion.versionCode(), metaClient.getTableConfig().getTableVersion().versionCode()); + assertTableVersionFromPropertyFile(expectedVersion); + } + + private void assertTableVersionFromPropertyFile(HoodieTableVersion expectedVersion) throws IOException { Path propertyFile = new Path(metaClient.getMetaPath() + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE); // Load the properties and verify FSDataInputStream fsDataInputStream = metaClient.getFs().open(propertyFile); HoodieConfig hoodieConfig = HoodieConfig.create(fsDataInputStream); fsDataInputStream.close(); - assertEquals(Integer.toString(HoodieTableVersion.ZERO.versionCode()), hoodieConfig + assertEquals(Integer.toString(expectedVersion.versionCode()), hoodieConfig .getString(HoodieTableConfig.VERSION)); } }