diff --git a/.asf.yaml b/.asf.yaml index 2eb171432c30d..eb3098284e8d5 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -38,6 +38,10 @@ github: squash: true merge: false rebase: false + protected_branches: + master: + required_pull_request_reviews: + required_approving_review_count: 1 notifications: commits: commits@hudi.apache.org issues: commits@hudi.apache.org diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 9787387a1eab7..5f4cf358d5b44 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -1327,7 +1327,7 @@ public void testSmallInsertHandlingForInserts(boolean mergeAllowDuplicateInserts JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); List statuses = client.insert(insertRecordsRDD1, commitTime1).collect(); assertNoWriteErrors(statuses); - assertPartitionMetadata(new String[] {testPartitionPath}, fs); + assertPartitionMetadata(basePath, new String[] {testPartitionPath}, fs); assertEquals(1, statuses.size(), "Just 1 file needs to be added."); String file1 = statuses.get(0).getFileId(); assertEquals(100, diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 0f40c28508e7f..c1dae9afa49c0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -31,6 +31,7 @@ import org.apache.hudi.avro.model.HoodieSliceInfo; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.bootstrap.TestBootstrapIndex; import org.apache.hudi.common.config.HoodieMetadataConfig; @@ -84,8 +85,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -99,10 +98,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -130,21 +127,24 @@ public class TestCleaner extends HoodieClientTestBase { private static final int BIG_BATCH_INSERT_SIZE = 500; - private static final Logger LOG = LogManager.getLogger(TestCleaner.class); + private static final int PARALLELISM = 10; /** * Helper method to do first batch of insert for clean by versions/commits tests. * - * @param cfg Hoodie Write Config + * @param context Spark engine context + * @param metaClient Hoodie table meta client * @param client Hoodie Client * @param recordGenFunction Function to generate records for insertion * @param insertFn Insertion API for testing * @throws Exception in case of error */ - private Pair> insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, SparkRDDWriteClient client, + public static Pair> insertFirstBigBatchForClientCleanerTest( + HoodieSparkEngineContext context, + HoodieTableMetaClient metaClient, + SparkRDDWriteClient client, Function2, String, Integer> recordGenFunction, - Function3, SparkRDDWriteClient, JavaRDD, String> insertFn, - HoodieCleaningPolicy cleaningPolicy) throws Exception { + Function3, SparkRDDWriteClient, JavaRDD, String> insertFn) throws Exception { /* * do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages @@ -153,7 +153,7 @@ private Pair> insertFirstBigBatchForClientCleanerTe String newCommitTime = client.startCommit(); List records = recordGenFunction.apply(newCommitTime, BIG_BATCH_INSERT_SIZE); - JavaRDD writeRecords = jsc.parallelize(records, 5); + JavaRDD writeRecords = context.getJavaSparkContext().parallelize(records, PARALLELISM); JavaRDD statuses = insertFn.apply(client, writeRecords, newCommitTime); // Verify there are no errors @@ -172,8 +172,8 @@ private Pair> insertFirstBigBatchForClientCleanerTe assertTrue(table.getCompletedCleanTimeline().empty()); if (client.getConfig().shouldAutoCommit()) { - HoodieIndex index = SparkHoodieIndexFactory.createIndex(cfg); - List taggedRecords = tagLocation(index, jsc.parallelize(records, 1), table).collect(); + HoodieIndex index = SparkHoodieIndexFactory.createIndex(client.getConfig()); + List taggedRecords = tagLocation(index, context, context.getJavaSparkContext().parallelize(records, PARALLELISM), table).collect(); checkTaggedRecords(taggedRecords, newCommitTime); } return Pair.of(newCommitTime, statuses); @@ -182,16 +182,17 @@ private Pair> insertFirstBigBatchForClientCleanerTe /** * Helper method to do first batch of insert for clean by versions/commits tests. * - * @param cfg Hoodie Write Config + * @param context Spark engine context * @param client Hoodie Client * @param recordGenFunction Function to generate records for insertion * @param insertFn Insertion API for testing * @throws Exception in case of error */ - private Pair> insertFirstFailedBigBatchForClientCleanerTest(HoodieWriteConfig cfg, SparkRDDWriteClient client, - Function2, String, Integer> recordGenFunction, - Function3, SparkRDDWriteClient, JavaRDD, String> insertFn, - HoodieCleaningPolicy cleaningPolicy) throws Exception { + public static Pair> insertFirstFailedBigBatchForClientCleanerTest( + HoodieSparkEngineContext context, + SparkRDDWriteClient client, + Function2, String, Integer> recordGenFunction, + Function3, SparkRDDWriteClient, JavaRDD, String> insertFn) throws Exception { /* * do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages @@ -200,7 +201,7 @@ private Pair> insertFirstFailedBigBatchForClientCle String newCommitTime = client.startCommit(); List records = recordGenFunction.apply(newCommitTime, BIG_BATCH_INSERT_SIZE); - JavaRDD writeRecords = jsc.parallelize(records, 5); + JavaRDD writeRecords = context.getJavaSparkContext().parallelize(records, 5); JavaRDD statuses = insertFn.apply(client, writeRecords, newCommitTime); // Verify there are no errors @@ -357,8 +358,7 @@ private void testInsertAndCleanByVersions( final Function2, String, Integer> recordUpsertGenWrappedFunction = generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates); - insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn, - HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS); + insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn); Map compactionFileIdToLatestFileSlice = new HashMap<>(); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -456,15 +456,6 @@ private void testInsertAndCleanByVersions( } } - /** - * Test Clean-By-Commits using insert/upsert API. - */ - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testInsertAndCleanByCommits(boolean isAsync) throws Exception { - testInsertAndCleanByCommits(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false, isAsync); - } - /** * Test Clean-By-Commits using insert/upsert API. */ @@ -473,116 +464,6 @@ public void testFailedInsertAndCleanByCommits() throws Exception { testFailedInsertAndCleanByCommits(SparkRDDWriteClient::insert, false); } - /** - * Test Clean-By-Commits using prepped version of insert/upsert API. - */ - @Test - public void testInsertPreppedAndCleanByCommits() throws Exception { - testInsertAndCleanByCommits(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords, - true, false); - } - - /** - * Test Clean-By-Commits using prepped versions of bulk-insert/upsert API. - */ - @Test - public void testBulkInsertPreppedAndCleanByCommits() throws Exception { - testInsertAndCleanByCommits( - (client, recordRDD, instantTime) -> client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()), - SparkRDDWriteClient::upsertPreppedRecords, true, false); - } - - /** - * Test Clean-By-Commits using bulk-insert/upsert API. - */ - @Test - public void testBulkInsertAndCleanByCommits() throws Exception { - testInsertAndCleanByCommits(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false, false); - } - - /** - * Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective. - * - * @param insertFn Insert API to be tested - * @param upsertFn Upsert API to be tested - * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during - * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs) - * @throws Exception in case of errors - */ - private void testInsertAndCleanByCommits( - Function3, SparkRDDWriteClient, JavaRDD, String> insertFn, - Function3, SparkRDDWriteClient, JavaRDD, String> upsertFn, boolean isPreppedAPI, boolean isAsync) - throws Exception { - int maxCommits = 3; // keep upto 3 commits from the past - HoodieWriteConfig cfg = getConfigBuilder() - .withCleanConfig(HoodieCleanConfig.newBuilder() - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(isAsync).retainCommits(maxCommits).build()) - .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) - .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) - .build(); - SparkRDDWriteClient client = getHoodieWriteClient(cfg); - - final Function2, String, Integer> recordInsertGenWrappedFunction = - generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts); - - final Function2, String, Integer> recordUpsertGenWrappedFunction = - generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates); - - insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn, - HoodieCleaningPolicy.KEEP_LATEST_COMMITS); - - // Keep doing some writes and clean inline. Make sure we have expected number of files remaining. - for (int i = 0; i < 8; i++) { - String newCommitTime = makeNewCommitTime(); - try { - client.startCommitWithTime(newCommitTime); - List records = recordUpsertGenWrappedFunction.apply(newCommitTime, 100); - - List statuses = upsertFn.apply(client, jsc.parallelize(records, 1), newCommitTime).collect(); - // Verify there are no errors - assertNoWriteErrors(statuses); - - metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table1 = HoodieSparkTable.create(cfg, context, metaClient); - HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline(); - HoodieInstant lastInstant = activeTimeline.lastInstant().get(); - if (cfg.isAsyncClean()) { - activeTimeline = activeTimeline.findInstantsBefore(lastInstant.getTimestamp()); - } - // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest - // commit - Option earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits); - Set acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet()); - if (earliestRetainedCommit.isPresent()) { - acceptableCommits - .removeAll(activeTimeline.findInstantsInRange("000", earliestRetainedCommit.get().getTimestamp()) - .getInstants().collect(Collectors.toSet())); - acceptableCommits.add(earliestRetainedCommit.get()); - } - - TableFileSystemView fsView = table1.getFileSystemView(); - // Need to ensure the following - for (String partitionPath : dataGen.getPartitionPaths()) { - List fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList()); - for (HoodieFileGroup fileGroup : fileGroups) { - Set commitTimes = new HashSet<>(); - fileGroup.getAllBaseFiles().forEach(value -> { - LOG.debug("Data File - " + value); - commitTimes.add(value.getCommitTime()); - }); - if (cfg.isAsyncClean()) { - commitTimes.remove(lastInstant.getTimestamp()); - } - assertEquals(acceptableCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), commitTimes, - "Only contain acceptable versions of file should be present"); - } - } - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - } - /** * Test Helper for Cleaning failed commits by commits logic from HoodieWriteClient API perspective. * @@ -609,22 +490,18 @@ private void testFailedInsertAndCleanByCommits( final Function2, String, Integer> recordInsertGenWrappedFunction = generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts); - Pair> result = insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn, - HoodieCleaningPolicy.KEEP_LATEST_COMMITS); + Pair> result = insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn); client.commit(result.getLeft(), result.getRight()); HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, metaClient); assertTrue(table.getCompletedCleanTimeline().empty()); - insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn, - HoodieCleaningPolicy.KEEP_LATEST_COMMITS); + insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); - insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn, - HoodieCleaningPolicy.KEEP_LATEST_COMMITS); + insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); Pair> ret = - insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn, - HoodieCleaningPolicy.KEEP_LATEST_COMMITS); + insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); // Await till enough time passes such that the last failed commits heartbeats are expired await().atMost(10, TimeUnit.SECONDS).until(() -> client.getHeartbeatClient() .isHeartbeatExpired(ret.getLeft())); @@ -1349,8 +1226,7 @@ private void testInsertAndCleanFailedWritesByVersions( final Function2, String, Integer> recordInsertGenWrappedFunction = generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts); - Pair> result = insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn, - HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS); + Pair> result = insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn); client.commit(result.getLeft(), result.getRight()); @@ -1358,15 +1234,12 @@ private void testInsertAndCleanFailedWritesByVersions( assertTrue(table.getCompletedCleanTimeline().empty()); - insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn, - HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS); + insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); - insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn, - HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS); + insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); Pair> ret = - insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn, - HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS); + insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn); // Await till enough time passes such that the last failed commits heartbeats are expired await().atMost(10, TimeUnit.SECONDS).until(() -> client.getHeartbeatClient() diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java new file mode 100644 index 0000000000000..7f5cd5cd99ecc --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.clean; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.transaction.lock.InProcessLockProvider; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.HoodieFileGroup; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.TableFileSystemView; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieLockConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime; +import static org.apache.hudi.table.TestCleaner.insertFirstBigBatchForClientCleanerTest; +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.apache.hudi.testutils.HoodieClientTestBase.Function2; +import static org.apache.hudi.testutils.HoodieClientTestBase.Function3; +import static org.apache.hudi.testutils.HoodieClientTestBase.wrapRecordsGenFunctionForPreppedCalls; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestCleanerInsertAndCleanByCommits extends SparkClientFunctionalTestHarness { + + private static final Logger LOG = LogManager.getLogger(TestCleanerInsertAndCleanByCommits.class); + private static final int BATCH_SIZE = 100; + private static final int PARALLELISM = 2; + + /** + * Test Clean-By-Commits using insert/upsert API. + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testInsertAndCleanByCommits(boolean isAsync) throws Exception { + testInsertAndCleanByCommits(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false, isAsync); + } + + /** + * Test Clean-By-Commits using prepped version of insert/upsert API. + */ + @Test + public void testInsertPreppedAndCleanByCommits() throws Exception { + testInsertAndCleanByCommits(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords, + true, false); + } + + /** + * Test Clean-By-Commits using prepped versions of bulk-insert/upsert API. + */ + @Test + public void testBulkInsertPreppedAndCleanByCommits() throws Exception { + testInsertAndCleanByCommits( + (client, recordRDD, instantTime) -> client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()), + SparkRDDWriteClient::upsertPreppedRecords, true, false); + } + + /** + * Test Clean-By-Commits using bulk-insert/upsert API. + */ + @Test + public void testBulkInsertAndCleanByCommits() throws Exception { + testInsertAndCleanByCommits(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false, false); + } + + /** + * Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective. + * + * @param insertFn Insert API to be tested + * @param upsertFn Upsert API to be tested + * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during + * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs) + * @throws Exception in case of errors + */ + private void testInsertAndCleanByCommits( + Function3, SparkRDDWriteClient, JavaRDD, String> insertFn, + Function3, SparkRDDWriteClient, JavaRDD, String> upsertFn, boolean isPreppedAPI, boolean isAsync) + throws Exception { + int maxCommits = 3; // keep upto 3 commits from the past + HoodieWriteConfig cfg = getConfigBuilder(true) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS) + .withAsyncClean(isAsync).retainCommits(maxCommits).build()) + .withParallelism(PARALLELISM, PARALLELISM) + .withBulkInsertParallelism(PARALLELISM) + .withFinalizeWriteParallelism(PARALLELISM) + .withDeleteParallelism(PARALLELISM) + .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()) + .build(); + final SparkRDDWriteClient client = getHoodieWriteClient(cfg); + + final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(System.nanoTime()); + final Function2, String, Integer> recordInsertGenWrappedFunction = isPreppedAPI + ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), context(), cfg, dataGen::generateInserts) + : dataGen::generateInserts; + final Function2, String, Integer> recordUpsertGenWrappedFunction = isPreppedAPI + ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), context(), cfg, dataGen::generateUniqueUpdates) + : dataGen::generateUniqueUpdates; + + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE); + insertFirstBigBatchForClientCleanerTest(context(), metaClient, client, recordInsertGenWrappedFunction, insertFn); + + // Keep doing some writes and clean inline. Make sure we have expected number of files remaining. + for (int i = 0; i < 8; i++) { + String newCommitTime = makeNewCommitTime(); + try { + client.startCommitWithTime(newCommitTime); + List records = recordUpsertGenWrappedFunction.apply(newCommitTime, BATCH_SIZE); + + List statuses = upsertFn.apply(client, jsc().parallelize(records, PARALLELISM), newCommitTime).collect(); + // Verify there are no errors + assertNoWriteErrors(statuses); + + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table1 = HoodieSparkTable.create(cfg, context(), metaClient); + HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline(); + HoodieInstant lastInstant = activeTimeline.lastInstant().get(); + if (cfg.isAsyncClean()) { + activeTimeline = activeTimeline.findInstantsBefore(lastInstant.getTimestamp()); + } + // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest + // commit + Option earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits); + Set acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet()); + if (earliestRetainedCommit.isPresent()) { + acceptableCommits + .removeAll(activeTimeline.findInstantsInRange("000", earliestRetainedCommit.get().getTimestamp()) + .getInstants().collect(Collectors.toSet())); + acceptableCommits.add(earliestRetainedCommit.get()); + } + + TableFileSystemView fsView = table1.getFileSystemView(); + // Need to ensure the following + for (String partitionPath : dataGen.getPartitionPaths()) { + List fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList()); + for (HoodieFileGroup fileGroup : fileGroups) { + Set commitTimes = new HashSet<>(); + fileGroup.getAllBaseFiles().forEach(value -> { + LOG.debug("Data File - " + value); + commitTimes.add(value.getCommitTime()); + }); + if (cfg.isAsyncClean()) { + commitTimes.remove(lastInstant.getTimestamp()); + } + assertEquals(acceptableCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), commitTimes, + "Only contain acceptable versions of file should be present"); + } + } + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index 424bb6c53e37e..d2286decfe6b6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -20,6 +20,7 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.HoodieCleanStat; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.model.EmptyHoodieRecordPayload; @@ -39,15 +40,16 @@ import org.apache.hudi.common.testutils.RawTripTestPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCleanConfig; -import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.index.SparkHoodieIndexFactory; import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; @@ -165,18 +167,18 @@ public HoodieSparkTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieW return table; } - public void assertPartitionMetadataForRecords(List inputRecords, FileSystem fs) throws IOException { + public void assertPartitionMetadataForRecords(String basePath, List inputRecords, FileSystem fs) throws IOException { Set partitionPathSet = inputRecords.stream() .map(HoodieRecord::getPartitionPath) .collect(Collectors.toSet()); - assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs); + assertPartitionMetadata(basePath, partitionPathSet.stream().toArray(String[]::new), fs); } - public void assertPartitionMetadataForKeys(List inputKeys, FileSystem fs) throws IOException { + public void assertPartitionMetadataForKeys(String basePath, List inputKeys, FileSystem fs) throws IOException { Set partitionPathSet = inputKeys.stream() .map(HoodieKey::getPartitionPath) .collect(Collectors.toSet()); - assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs); + assertPartitionMetadata(basePath, partitionPathSet.stream().toArray(String[]::new), fs); } /** @@ -186,7 +188,7 @@ public void assertPartitionMetadataForKeys(List inputKeys, FileSystem * @param fs File System * @throws IOException in case of error */ - public void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException { + public static void assertPartitionMetadata(String basePath, String[] partitionPaths, FileSystem fs) throws IOException { for (String partitionPath : partitionPaths) { assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath))); HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath)); @@ -201,7 +203,7 @@ public void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) thro * @param taggedRecords Tagged Records * @param instantTime Commit Timestamp */ - public void checkTaggedRecords(List taggedRecords, String instantTime) { + public static void checkTaggedRecords(List taggedRecords, String instantTime) { for (HoodieRecord rec : taggedRecords) { assertTrue(rec.isCurrentLocationKnown(), "Record " + rec + " found with no location."); assertEquals(rec.getCurrentLocation().getInstantTime(), instantTime, @@ -214,7 +216,7 @@ public void checkTaggedRecords(List taggedRecords, String instantT * * @param records List of Hoodie records */ - public void assertNodupesWithinPartition(List> records) { + public static void assertNodupesWithinPartition(List> records) { Map> partitionToKeys = new HashMap<>(); for (HoodieRecord r : records) { String key = r.getRecordKey(); @@ -233,29 +235,46 @@ public void assertNodupesWithinPartition(List> * guaranteed by record-generation function itself. * * @param writeConfig Hoodie Write Config - * @param recordGenFunction Records Generation function + * @param recordsGenFunction Records Generation function * @return Wrapped function */ - private Function2, String, Integer> wrapRecordsGenFunctionForPreppedCalls( - final HoodieWriteConfig writeConfig, final Function2, String, Integer> recordGenFunction) { + public static Function2, String, Integer> wrapRecordsGenFunctionForPreppedCalls( + final String basePath, + final Configuration hadoopConf, + final HoodieSparkEngineContext context, + final HoodieWriteConfig writeConfig, + final Function2, String, Integer> recordsGenFunction) { return (commit, numRecords) -> { final HoodieIndex index = SparkHoodieIndexFactory.createIndex(writeConfig); - List records = recordGenFunction.apply(commit, numRecords); + List records = recordsGenFunction.apply(commit, numRecords); final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, metaClient); - JavaRDD taggedRecords = tagLocation(index, jsc.parallelize(records, 1), table); + JavaRDD taggedRecords = tagLocation(index, context, context.getJavaSparkContext().parallelize(records, 1), table); return taggedRecords.collect(); }; } - private Function3, String, Integer, String> wrapRecordsGenFunctionForPreppedCalls( - final HoodieWriteConfig writeConfig, final Function3, String, Integer, String> recordGenFunction) { + /** + * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records + * to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is + * guaranteed by record-generation function itself. + * + * @param writeConfig Hoodie Write Config + * @param recordsGenFunction Records Generation function (for partition) + * @return Wrapped function + */ + public static Function3, String, Integer, String> wrapPartitionRecordsGenFunctionForPreppedCalls( + final String basePath, + final Configuration hadoopConf, + final HoodieSparkEngineContext context, + final HoodieWriteConfig writeConfig, + final Function3, String, Integer, String> recordsGenFunction) { return (commit, numRecords, partition) -> { final HoodieIndex index = SparkHoodieIndexFactory.createIndex(writeConfig); - List records = recordGenFunction.apply(commit, numRecords, partition); + List records = recordsGenFunction.apply(commit, numRecords, partition); final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, metaClient); - JavaRDD taggedRecords = tagLocation(index, jsc.parallelize(records, 1), table); + JavaRDD taggedRecords = tagLocation(index, context, context.getJavaSparkContext().parallelize(records, 1), table); return taggedRecords.collect(); }; } @@ -269,16 +288,20 @@ private Function3, String, Integer, String> wrapRecordsGenFun * @param keyGenFunction Keys Generation function * @return Wrapped function */ - private Function> wrapDeleteKeysGenFunctionForPreppedCalls( - final HoodieWriteConfig writeConfig, final Function> keyGenFunction) { + public static Function> wrapDeleteKeysGenFunctionForPreppedCalls( + final String basePath, + final Configuration hadoopConf, + final HoodieSparkEngineContext context, + final HoodieWriteConfig writeConfig, + final Function> keyGenFunction) { return (numRecords) -> { final HoodieIndex index = SparkHoodieIndexFactory.createIndex(writeConfig); List records = keyGenFunction.apply(numRecords); final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build(); HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, metaClient); - JavaRDD recordsToDelete = jsc.parallelize(records, 1) + JavaRDD recordsToDelete = context.getJavaSparkContext().parallelize(records, 1) .map(key -> new HoodieAvroRecord(key, new EmptyHoodieRecordPayload())); - JavaRDD taggedRecords = tagLocation(index, recordsToDelete, table); + JavaRDD taggedRecords = tagLocation(index, context, recordsToDelete, table); return taggedRecords.map(record -> record.getKey()).collect(); }; } @@ -295,16 +318,24 @@ public Function2, String, Integer> generateWrapRecordsFn(bool HoodieWriteConfig writeConfig, Function2, String, Integer> wrapped) { if (isPreppedAPI) { - return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped); + return wrapRecordsGenFunctionForPreppedCalls(basePath, hadoopConf, context, writeConfig, wrapped); } else { return wrapped; } } + /** + * Generate wrapper for record generation function for testing Prepped APIs. + * + * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs + * @param writeConfig Hoodie Write Config + * @param wrapped Actual Records Generation function (for partition) + * @return Wrapped Function + */ public Function3, String, Integer, String> generateWrapRecordsForPartitionFn(boolean isPreppedAPI, HoodieWriteConfig writeConfig, Function3, String, Integer, String> wrapped) { if (isPreppedAPI) { - return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped); + return wrapPartitionRecordsGenFunctionForPreppedCalls(basePath, hadoopConf, context, writeConfig, wrapped); } else { return wrapped; } @@ -321,7 +352,7 @@ public Function3, String, Integer, String> generateWrapRecord public Function> generateWrapDeleteKeysFn(boolean isPreppedAPI, HoodieWriteConfig writeConfig, Function> wrapped) { if (isPreppedAPI) { - return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped); + return wrapDeleteKeysGenFunctionForPreppedCalls(basePath, hadoopConf, context, writeConfig, wrapped); } else { return wrapped; } @@ -562,7 +593,7 @@ private JavaRDD writeBatchHelper(SparkRDDWriteClient client, String client.commit(newCommitTime, result); } // check the partition metadata is written out - assertPartitionMetadataForRecords(records, fs); + assertPartitionMetadataForRecords(basePath, records, fs); // verify that there is a commit HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); @@ -634,7 +665,7 @@ public JavaRDD deleteBatch(SparkRDDWriteClient client, String newCo assertNoWriteErrors(statuses); // check the partition metadata is written out - assertPartitionMetadataForKeys(keysToDelete, fs); + assertPartitionMetadataForKeys(basePath, keysToDelete, fs); // verify that there is a commit HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index e6a4d63e8c9e2..a0c093be16b8a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -17,13 +17,6 @@ package org.apache.hudi.testutils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hudi.HoodieConversionUtils; import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanMetadata; @@ -75,6 +68,14 @@ import org.apache.hudi.table.WorkloadStat; import org.apache.hudi.timeline.service.TimelineService; import org.apache.hudi.util.JFunction; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -86,7 +87,6 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; -import scala.Tuple2; import java.io.IOException; import java.io.Serializable; @@ -105,6 +105,8 @@ import java.util.function.Function; import java.util.stream.Collectors; +import scala.Tuple2; + import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -510,12 +512,21 @@ public HoodieTableFileSystemView getHoodieTableFileSystemView(HoodieTableMetaCli return tableView; } + /** + * @deprecated Use {@link #tagLocation(HoodieIndex, HoodieEngineContext, JavaRDD, HoodieTable)} instead. + */ + @Deprecated public JavaRDD tagLocation( HoodieIndex index, JavaRDD records, HoodieTable table) { return HoodieJavaRDD.getJavaRDD( index.tagLocation(HoodieJavaRDD.of(records), context, table)); } + public static JavaRDD tagLocation( + HoodieIndex index, HoodieEngineContext context, JavaRDD records, HoodieTable table) { + return HoodieJavaRDD.getJavaRDD(index.tagLocation(HoodieJavaRDD.of(records), context, table)); + } + public static Pair, WorkloadStat> buildProfile(JavaRDD inputRecordsRDD) { HashMap partitionPathStatMap = new HashMap<>(); WorkloadStat globalStat = new WorkloadStat(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java index 91637102f0404..6c1efc1561210 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java @@ -18,8 +18,6 @@ package org.apache.hudi.common.model; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.hadoop.fs.Path; import org.apache.hudi.common.util.JsonUtils; @@ -31,7 +29,6 @@ /** * Statistics about a single Hoodie write operation. */ -@JsonIgnoreProperties(ignoreUnknown = true) public class HoodieWriteStat implements Serializable { public static final String NULL_COMMIT = "null"; @@ -163,7 +160,6 @@ public class HoodieWriteStat implements Serializable { private Long maxEventTime; @Nullable - @JsonIgnore private RuntimeStats runtimeStats; public HoodieWriteStat() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java index ca59c301c887e..c31184244390f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java @@ -54,6 +54,7 @@ import org.apache.orc.TypeDescription; import static org.apache.avro.JsonProperties.NULL_VALUE; +import static org.apache.hudi.common.util.BinaryUtil.toBytes; /** * Methods including addToVector, addUnionValue, createOrcSchema are originally from @@ -221,8 +222,7 @@ public static void addToVector(TypeDescription type, ColumnVector colVector, Sch binaryBytes = ((GenericData.Fixed)value).bytes(); } else if (value instanceof ByteBuffer) { final ByteBuffer byteBuffer = (ByteBuffer) value; - binaryBytes = new byte[byteBuffer.remaining()]; - byteBuffer.get(binaryBytes); + binaryBytes = toBytes(byteBuffer); } else if (value instanceof byte[]) { binaryBytes = (byte[]) value; } else { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java index 9fec2c8cf5924..9d8f6c8e90cf3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.util; +import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.zip.CRC32; @@ -117,6 +118,15 @@ public static byte updatePos(byte a, int apos, byte b, int bpos) { return (byte) (a ^ (1 << (7 - apos))); } + /** + * Copies {@link ByteBuffer} into allocated {@code byte[]} array + */ + public static byte[] toBytes(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + } + public static byte[] toBytes(int val) { byte[] b = new byte[4]; for (int i = 3; i > 0; i--) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java index 0cc40591972a0..5afe354d0e755 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -52,6 +52,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.hudi.common.util.BinaryUtil.toBytes; + /** * Utility functions for ORC files. */ @@ -238,8 +240,7 @@ public Schema readAvroSchema(Configuration conf, Path orcFilePath) { try (Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))) { if (reader.hasMetadataValue("orc.avro.schema")) { ByteBuffer metadataValue = reader.getMetadataValue("orc.avro.schema"); - byte[] bytes = new byte[metadataValue.remaining()]; - metadataValue.get(bytes); + byte[] bytes = toBytes(metadataValue); return new Schema.Parser().parse(new String(bytes)); } else { TypeDescription orcSchema = reader.getSchema(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java index b0b59f8cbcb8b..e8c159540a3d3 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java @@ -77,8 +77,9 @@ public void testPerfStatPresenceInHoodieMetadata() throws Exception { String serializedCommitMetadata = commitMetadata.toJsonString(); HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(serializedCommitMetadata, HoodieCommitMetadata.class); - // Make sure timing metrics are not written to instant file - assertEquals(0, (long) metadata.getTotalScanTime()); + assertTrue(commitMetadata.getTotalCreateTime() > 0); + assertTrue(commitMetadata.getTotalUpsertTime() > 0); + assertTrue(commitMetadata.getTotalScanTime() > 0); assertTrue(metadata.getTotalLogFilesCompacted() > 0); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index 43d842241abf5..55abeaaa56c2c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -722,9 +722,10 @@ private FlinkOptions() { // ------------------------------------------------------------------------ public static final ConfigOption HIVE_SYNC_ENABLED = ConfigOptions - .key("hive_sync.enable") + .key("hive_sync.enabled") .booleanType() .defaultValue(false) + .withFallbackKeys("hive_sync.enable") .withDescription("Asynchronously sync Hive meta to HMS, default false"); public static final ConfigOption HIVE_SYNC_DB = ConfigOptions diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java index d2f56d9a3e6ca..b1c8457c1ac1d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java @@ -26,6 +26,7 @@ import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.table.format.FilePathUtils; +import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; @@ -93,7 +94,7 @@ public static HiveSyncContext create(Configuration conf, SerializableConfigurati @VisibleForTesting public static Properties buildSyncConfig(Configuration conf) { - TypedProperties props = new TypedProperties(); + TypedProperties props = StreamerUtil.flinkConf2TypedProperties(conf); props.setPropertyIfNonNull(META_SYNC_BASE_PATH.key(), conf.getString(FlinkOptions.PATH)); props.setPropertyIfNonNull(META_SYNC_BASE_FILE_FORMAT.key(), conf.getString(FlinkOptions.HIVE_SYNC_FILE_FORMAT)); props.setPropertyIfNonNull(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), "false"); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java index 381b292c25f9f..ae30b39906e0b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java @@ -19,25 +19,26 @@ package org.apache.hudi.sink.utils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.hive.HiveSyncConfig; import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.Test; -import java.lang.reflect.Method; import java.util.Properties; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Test cases for {@link HiveSyncContext}. */ public class TestHiveSyncContext { /** - * Test that the file ids generated by the task can finally shuffled to itself. + * Test partition path fields sync. */ @Test - public void testBuildSyncConfig() throws Exception { + void testSyncedPartitions() { Configuration configuration1 = new Configuration(); Configuration configuration2 = new Configuration(); String hiveSyncPartitionField = "hiveSyncPartitionField"; @@ -48,15 +49,21 @@ public void testBuildSyncConfig() throws Exception { configuration2.setString(FlinkOptions.PARTITION_PATH_FIELD, partitionPathField); - Class threadClazz = Class.forName("org.apache.hudi.sink.utils.HiveSyncContext"); - Method buildSyncConfigMethod = threadClazz.getDeclaredMethod("buildSyncConfig", Configuration.class); - buildSyncConfigMethod.setAccessible(true); - Properties props1 = HiveSyncContext.buildSyncConfig(configuration1); Properties props2 = HiveSyncContext.buildSyncConfig(configuration2); assertEquals(hiveSyncPartitionField, props1.getProperty(META_SYNC_PARTITION_FIELDS.key())); assertEquals(partitionPathField, props2.getProperty(META_SYNC_PARTITION_FIELDS.key())); + } + /** + * Test an option that has no shortcut key. + */ + @Test + void testOptionWithoutShortcutKey() { + Configuration configuration3 = new Configuration(); + configuration3.setBoolean(HiveSyncConfig.HIVE_CREATE_MANAGED_TABLE.key(), true); + Properties props3 = HiveSyncContext.buildSyncConfig(configuration3); + assertTrue(Boolean.parseBoolean(props3.getProperty(HiveSyncConfig.HIVE_CREATE_MANAGED_TABLE.key(), "false"))); } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index 58511f791ed78..5cf7a5ec035ab 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -29,6 +29,7 @@ import org.apache.hudi.common.data.HoodieData import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.FileSystemViewStorageConfig +import org.apache.hudi.common.util.BinaryUtil.toBytes import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.collection import org.apache.hudi.common.util.hash.ColumnIndexID @@ -469,10 +470,7 @@ object ColumnStatsIndexSupport { } case BinaryType => value match { - case b: ByteBuffer => - val bytes = new Array[Byte](b.remaining) - b.get(bytes) - bytes + case b: ByteBuffer => toBytes(b) case other => other } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala index 32d1960ee13ee..631644121c133 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala @@ -17,39 +17,28 @@ package org.apache.spark.sql.hudi -import java.io.ByteArrayOutputStream - -import com.esotericsoftware.kryo.Kryo -import com.esotericsoftware.kryo.io.{Input, Output} +import org.apache.hudi.common.util.BinaryUtil import org.apache.spark.SparkConf -import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.serializer.{KryoSerializer, SerializerInstance} + +import java.nio.ByteBuffer object SerDeUtils { - private val kryoLocal = new ThreadLocal[Kryo] { + private val SERIALIZER_THREAD_LOCAL = new ThreadLocal[SerializerInstance] { - override protected def initialValue: Kryo = { - val serializer = new KryoSerializer(new SparkConf(true)) - serializer.newKryo() + override protected def initialValue: SerializerInstance = { + new KryoSerializer(new SparkConf(true)).newInstance() } } def toBytes(o: Any): Array[Byte] = { - val outputStream = new ByteArrayOutputStream(4096 * 5) - val output = new Output(outputStream) - try { - kryoLocal.get.writeClassAndObject(output, o) - output.flush() - } finally { - output.clear() - output.close() - } - outputStream.toByteArray + val buf = SERIALIZER_THREAD_LOCAL.get.serialize(o) + BinaryUtil.toBytes(buf) } def toObject(bytes: Array[Byte]): Any = { - val input = new Input(bytes) - kryoLocal.get.readClassAndObject(input) + SERIALIZER_THREAD_LOCAL.get.deserialize(ByteBuffer.wrap(bytes)) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala index f5435e09adfc2..6237e223646f3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala @@ -588,15 +588,8 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { .save(basePath) val instant2 = metaClient.reloadActiveTimeline.lastInstant().get() - val cdcLogFiles2 = getCDCLogFile(instant2) - // with a small value for 'hoodie.logfile.data.max.size', - // it will write out >1 cdc log files due to rollover. - assert(cdcLogFiles2.size > 1) - // with a small value for 'hoodie.logfile.data.block.max.size', - // it will write out >1 cdc data blocks in one single cdc log file. - assert(getCDCBlocks(cdcLogFiles2.head, cdcSchema).size > 1) - // check cdc data + val cdcLogFiles2 = getCDCLogFile(instant2) val cdcDataFromCDCLogFile2 = cdcLogFiles2.flatMap(readCDCLogFile(_, cdcSchema)) // check the num of cdc data assertEquals(cdcDataFromCDCLogFile2.size, 50) diff --git a/hudi-timeline-service/pom.xml b/hudi-timeline-service/pom.xml index a1658ddf519ff..ae429f82aa107 100644 --- a/hudi-timeline-service/pom.xml +++ b/hudi-timeline-service/pom.xml @@ -104,7 +104,7 @@ io.javalin javalin - 2.8.0 + 4.6.0 diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index 14539c45b050f..b53c2534bc887 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -43,10 +43,10 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import io.javalin.BadRequestResponse; -import io.javalin.Context; -import io.javalin.Handler; import io.javalin.Javalin; +import io.javalin.http.BadRequestResponse; +import io.javalin.http.Context; +import io.javalin.http.Handler; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; @@ -63,7 +63,7 @@ import java.util.stream.Collectors; /** - * Main REST Handler class that handles and delegates calls to timeline relevant handlers. + * Main REST Handler class that handles and delegates calls to timeline relevant handlers. */ public class RequestHandler { @@ -77,7 +77,7 @@ public class RequestHandler { private final FileSliceHandler sliceHandler; private final BaseFileHandler dataFileHandler; private final MarkerHandler markerHandler; - private Registry metricsRegistry = Registry.getRegistry("TimelineService"); + private final Registry metricsRegistry = Registry.getRegistry("TimelineService"); private ScheduledExecutorService asyncResultService = Executors.newSingleThreadScheduledExecutor(); public RequestHandler(Javalin app, Configuration conf, TimelineService.Config timelineServiceConfig, @@ -100,6 +100,38 @@ public RequestHandler(Javalin app, Configuration conf, TimelineService.Config ti } } + /** + * Serializes the result into JSON String. + * + * @param ctx Javalin context + * @param obj object to serialize + * @param metricsRegistry {@code Registry} instance for storing metrics + * @param objectMapper JSON object mapper + * @param logger {@code Logger} instance + * @return JSON String from the input object + * @throws JsonProcessingException + */ + public static String jsonifyResult( + Context ctx, Object obj, Registry metricsRegistry, ObjectMapper objectMapper, Logger logger) + throws JsonProcessingException { + HoodieTimer timer = new HoodieTimer().startTimer(); + boolean prettyPrint = ctx.queryParam("pretty") != null; + String result = + prettyPrint ? objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(obj) + : objectMapper.writeValueAsString(obj); + final long jsonifyTime = timer.endTimer(); + metricsRegistry.add("WRITE_VALUE_CNT", 1); + metricsRegistry.add("WRITE_VALUE_TIME", jsonifyTime); + if (logger.isDebugEnabled()) { + logger.debug("Jsonify TimeTaken=" + jsonifyTime); + } + return result; + } + + private static boolean isRefreshCheckDisabledInQuery(Context ctxt) { + return Boolean.parseBoolean(ctxt.queryParam(RemoteHoodieTableFileSystemView.REFRESH_OFF)); + } + public void register() { registerDataFilesAPI(); registerFileSlicesAPI(); @@ -121,8 +153,8 @@ public void stop() { private boolean isLocalViewBehind(Context ctx) { String basePath = ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM); String lastKnownInstantFromClient = - ctx.queryParam(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, HoodieTimeline.INVALID_INSTANT_TS); - String timelineHashFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.TIMELINE_HASH, ""); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS); + String timelineHashFromClient = ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH, String.class).getOrDefault(""); HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline().filterCompletedOrMajorOrMinorCompactionInstants(); if (LOG.isDebugEnabled()) { @@ -151,8 +183,7 @@ private boolean isLocalViewBehind(Context ctx) { private boolean syncIfLocalViewBehind(Context ctx) { if (isLocalViewBehind(ctx)) { String basePath = ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM); - String lastKnownInstantFromClient = - ctx.queryParam(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, HoodieTimeline.INVALID_INSTANT_TS); + String lastKnownInstantFromClient = ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS); SyncableFileSystemView view = viewManager.getFileSystemView(basePath); synchronized (view) { if (isLocalViewBehind(ctx)) { @@ -176,41 +207,13 @@ private void writeValueAsString(Context ctx, Object obj) throws JsonProcessingEx } } - /** - * Serializes the result into JSON String. - * - * @param ctx Javalin context - * @param obj object to serialize - * @param metricsRegistry {@code Registry} instance for storing metrics - * @param objectMapper JSON object mapper - * @param logger {@code Logger} instance - * @return JSON String from the input object - * @throws JsonProcessingException - */ - public static String jsonifyResult( - Context ctx, Object obj, Registry metricsRegistry, ObjectMapper objectMapper, Logger logger) - throws JsonProcessingException { - HoodieTimer timer = new HoodieTimer().startTimer(); - boolean prettyPrint = ctx.queryParam("pretty") != null; - String result = - prettyPrint ? objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(obj) - : objectMapper.writeValueAsString(obj); - final long jsonifyTime = timer.endTimer(); - metricsRegistry.add("WRITE_VALUE_CNT", 1); - metricsRegistry.add("WRITE_VALUE_TIME", jsonifyTime); - if (logger.isDebugEnabled()) { - logger.debug("Jsonify TimeTaken=" + jsonifyTime); - } - return result; - } - private void writeValueAsStringSync(Context ctx, Object obj) throws JsonProcessingException { String result = jsonifyResult(ctx, obj, metricsRegistry, OBJECT_MAPPER, LOG); ctx.result(result); } private void writeValueAsStringAsync(Context ctx, Object obj) { - ctx.result(CompletableFuture.supplyAsync(() -> { + ctx.future(CompletableFuture.supplyAsync(() -> { try { return jsonifyResult(ctx, obj, metricsRegistry, OBJECT_MAPPER, LOG); } catch (JsonProcessingException e) { @@ -225,15 +228,13 @@ private void writeValueAsStringAsync(Context ctx, Object obj) { private void registerTimelineAPI() { app.get(RemoteHoodieTableFileSystemView.LAST_INSTANT, new ViewHandler(ctx -> { metricsRegistry.add("LAST_INSTANT", 1); - List dtos = instantHandler - .getLastInstant(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getValue()); + List dtos = instantHandler.getLastInstant(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).get()); writeValueAsString(ctx, dtos); }, false)); app.get(RemoteHoodieTableFileSystemView.TIMELINE, new ViewHandler(ctx -> { metricsRegistry.add("TIMELINE", 1); - TimelineDTO dto = instantHandler - .getTimeline(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getValue()); + TimelineDTO dto = instantHandler.getTimeline(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).get()); writeValueAsString(ctx, dto); }, false)); } @@ -245,59 +246,60 @@ private void registerDataFilesAPI() { app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_DATA_FILES_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_PARTITION_DATA_FILES", 1); List dtos = dataFileHandler.getLatestDataFiles( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), - ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); + writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_DATA_FILE_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_PARTITION_DATA_FILE", 1); List dtos = dataFileHandler.getLatestDataFile( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), - ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""), - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.FILEID_PARAM).getOrThrow()); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault(""), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.FILEID_PARAM, String.class).getOrThrow(e -> new HoodieException("FILEID is invalid"))); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_ALL_DATA_FILES, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_ALL_DATA_FILES", 1); - List dtos = dataFileHandler - .getLatestDataFiles(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow()); + List dtos = dataFileHandler.getLatestDataFiles( + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"))); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_DATA_FILES_BEFORE_ON_INSTANT", 1); List dtos = dataFileHandler.getLatestDataFilesBeforeOrOn( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), - ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""), - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM).getOrThrow()); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault(""), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is invalid"))); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILE_ON_INSTANT_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_DATA_FILE_ON_INSTANT", 1); List dtos = dataFileHandler.getLatestDataFileOn( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), - ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""), - ctx.queryParam(RemoteHoodieTableFileSystemView.INSTANT_PARAM), - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.FILEID_PARAM).getOrThrow()); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault(""), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANT_PARAM, String.class).get(), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.FILEID_PARAM, String.class).getOrThrow(e -> new HoodieException("FILEID is invalid"))); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.ALL_DATA_FILES, new ViewHandler(ctx -> { metricsRegistry.add("ALL_DATA_FILES", 1); List dtos = dataFileHandler.getAllDataFiles( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), - ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILES_RANGE_INSTANT_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_DATA_FILES_RANGE_INSTANT", 1); List dtos = dataFileHandler.getLatestDataFilesInRange( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), Arrays - .asList(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.INSTANTS_PARAM).getOrThrow().split(","))); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + Arrays.asList(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANTS_PARAM, String.class).getOrThrow(e -> new HoodieException("INSTANTS_PARAM is invalid")).split(","))); writeValueAsString(ctx, dtos); }, true)); } @@ -309,124 +311,124 @@ private void registerFileSlicesAPI() { app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_PARTITION_SLICES", 1); List dtos = sliceHandler.getLatestFileSlices( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), - ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICE_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_PARTITION_SLICE", 1); List dtos = sliceHandler.getLatestFileSlice( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), - ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""), - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.FILEID_PARAM).getOrThrow()); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault(""), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.FILEID_PARAM, String.class).getOrThrow(e -> new HoodieException("FILEID is invalid"))); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_UNCOMPACTED_SLICES_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_PARTITION_UNCOMPACTED_SLICES", 1); List dtos = sliceHandler.getLatestUnCompactedFileSlices( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), - ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.ALL_SLICES_URL, new ViewHandler(ctx -> { metricsRegistry.add("ALL_SLICES", 1); List dtos = sliceHandler.getAllFileSlices( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), - ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_RANGE_INSTANT_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_SLICE_RANGE_INSTANT", 1); List dtos = sliceHandler.getLatestFileSliceInRange( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), Arrays - .asList(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.INSTANTS_PARAM).getOrThrow().split(","))); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + Arrays.asList(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANTS_PARAM, String.class).getOrThrow(e -> new HoodieException("INSTANTS_PARAM is invalid")).split(","))); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_SLICES_MERGED_BEFORE_ON_INSTANT", 1); List dtos = sliceHandler.getLatestMergedFileSlicesBeforeOrOn( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), - ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""), - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM).getOrThrow()); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault(""), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is invalid"))); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_BEFORE_ON_INSTANT_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_SLICES_BEFORE_ON_INSTANT", 1); List dtos = sliceHandler.getLatestFileSlicesBeforeOrOn( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), - ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""), - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM).getOrThrow(), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault(""), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is invalid")), Boolean.parseBoolean( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM) - .getOrThrow())); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM, String.class) + .getOrThrow(e -> new HoodieException("INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM is invalid")))); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.PENDING_COMPACTION_OPS, new ViewHandler(ctx -> { metricsRegistry.add("PEDING_COMPACTION_OPS", 1); List dtos = sliceHandler.getPendingCompactionOperations( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow()); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"))); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.PENDING_LOG_COMPACTION_OPS, new ViewHandler(ctx -> { metricsRegistry.add("PEDING_LOG_COMPACTION_OPS", 1); List dtos = sliceHandler.getPendingLogCompactionOperations( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow()); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"))); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.ALL_FILEGROUPS_FOR_PARTITION_URL, new ViewHandler(ctx -> { metricsRegistry.add("ALL_FILEGROUPS_FOR_PARTITION", 1); List dtos = sliceHandler.getAllFileGroups( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), - ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); writeValueAsString(ctx, dtos); }, true)); app.post(RemoteHoodieTableFileSystemView.REFRESH_TABLE, new ViewHandler(ctx -> { metricsRegistry.add("REFRESH_TABLE", 1); boolean success = sliceHandler - .refreshTable(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow()); + .refreshTable(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"))); writeValueAsString(ctx, success); }, false)); app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON, new ViewHandler(ctx -> { metricsRegistry.add("ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON", 1); List dtos = sliceHandler.getReplacedFileGroupsBeforeOrOn( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), - ctx.queryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,""), - ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, String.class).getOrDefault(""), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE, new ViewHandler(ctx -> { metricsRegistry.add("ALL_REPLACED_FILEGROUPS_BEFORE", 1); List dtos = sliceHandler.getReplacedFileGroupsBefore( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), - ctx.queryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,""), - ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, String.class).getOrDefault(""), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_PARTITION, new ViewHandler(ctx -> { metricsRegistry.add("ALL_REPLACED_FILEGROUPS_PARTITION", 1); List dtos = sliceHandler.getAllReplacedFileGroups( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), - ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS, new ViewHandler(ctx -> { metricsRegistry.add("PENDING_CLUSTERING_FILEGROUPS", 1); List dtos = sliceHandler.getFileGroupsInPendingClustering( - ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow()); + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"))); writeValueAsString(ctx, dtos); }, true)); } @@ -435,42 +437,49 @@ private void registerMarkerAPI() { app.get(MarkerOperation.ALL_MARKERS_URL, new ViewHandler(ctx -> { metricsRegistry.add("ALL_MARKERS", 1); Set markers = markerHandler.getAllMarkers( - ctx.queryParam(MarkerOperation.MARKER_DIR_PATH_PARAM, "")); + ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, String.class).getOrDefault("")); writeValueAsString(ctx, markers); }, false)); app.get(MarkerOperation.CREATE_AND_MERGE_MARKERS_URL, new ViewHandler(ctx -> { metricsRegistry.add("CREATE_AND_MERGE_MARKERS", 1); Set markers = markerHandler.getCreateAndMergeMarkers( - ctx.queryParam(MarkerOperation.MARKER_DIR_PATH_PARAM, "")); + ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, String.class).getOrDefault("")); writeValueAsString(ctx, markers); }, false)); app.get(MarkerOperation.MARKERS_DIR_EXISTS_URL, new ViewHandler(ctx -> { metricsRegistry.add("MARKERS_DIR_EXISTS", 1); boolean exist = markerHandler.doesMarkerDirExist( - ctx.queryParam(MarkerOperation.MARKER_DIR_PATH_PARAM, "")); + ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, String.class).getOrDefault("")); writeValueAsString(ctx, exist); }, false)); app.post(MarkerOperation.CREATE_MARKER_URL, new ViewHandler(ctx -> { metricsRegistry.add("CREATE_MARKER", 1); - ctx.result(markerHandler.createMarker( + ctx.future(markerHandler.createMarker( ctx, - ctx.queryParam(MarkerOperation.MARKER_DIR_PATH_PARAM, ""), - ctx.queryParam(MarkerOperation.MARKER_NAME_PARAM, ""))); + ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, String.class).getOrDefault(""), + ctx.queryParamAsClass(MarkerOperation.MARKER_NAME_PARAM, String.class).getOrDefault(""))); }, false)); app.post(MarkerOperation.DELETE_MARKER_DIR_URL, new ViewHandler(ctx -> { metricsRegistry.add("DELETE_MARKER_DIR", 1); boolean success = markerHandler.deleteMarkers( - ctx.queryParam(MarkerOperation.MARKER_DIR_PATH_PARAM, "")); + ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, String.class).getOrDefault("")); writeValueAsString(ctx, success); }, false)); } - private static boolean isRefreshCheckDisabledInQuery(Context ctxt) { - return Boolean.parseBoolean(ctxt.queryParam(RemoteHoodieTableFileSystemView.REFRESH_OFF)); + /** + * Determine whether to throw an exception when local view of table's timeline is behind that of client's view. + */ + private boolean shouldThrowExceptionIfLocalViewBehind(HoodieTimeline localTimeline, String timelineHashFromClient) { + Option lastInstant = localTimeline.lastInstant(); + // When performing async clean, we may have one more .clean.completed after lastInstantTs. + // In this case, we do not need to throw an exception. + return !lastInstant.isPresent() || !lastInstant.get().getAction().equals(HoodieTimeline.CLEAN_ACTION) + || !localTimeline.findInstantsBefore(lastInstant.get().getTimestamp()).getTimelineHash().equals(timelineHashFromClient); } /** @@ -511,17 +520,16 @@ public void handle(@NotNull Context context) throws Exception { if (refreshCheck) { long beginFinalCheck = System.currentTimeMillis(); if (isLocalViewBehind(context)) { - String lastKnownInstantFromClient = context.queryParam(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, - HoodieTimeline.INVALID_INSTANT_TS); - String timelineHashFromClient = context.queryParam(RemoteHoodieTableFileSystemView.TIMELINE_HASH, ""); + String lastKnownInstantFromClient = context.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS); + String timelineHashFromClient = context.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH, String.class).getOrDefault(""); HoodieTimeline localTimeline = viewManager.getFileSystemView(context.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM)).getTimeline(); if (shouldThrowExceptionIfLocalViewBehind(localTimeline, timelineHashFromClient)) { String errMsg = - "Last known instant from client was " - + lastKnownInstantFromClient - + " but server has the following timeline " - + localTimeline.getInstants().collect(Collectors.toList()); + "Last known instant from client was " + + lastKnownInstantFromClient + + " but server has the following timeline " + + localTimeline.getInstants().collect(Collectors.toList()); throw new BadRequestResponse(errMsg); } } @@ -546,26 +554,11 @@ public void handle(@NotNull Context context) throws Exception { metricsRegistry.add("TOTAL_API_CALLS", 1); LOG.debug(String.format( - "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], " - + "Success=%s, Query=%s, Host=%s, synced=%s", - timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, finalCheckTimeTaken, success, - context.queryString(), context.host(), synced)); + "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], " + + "Success=%s, Query=%s, Host=%s, synced=%s", + timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, finalCheckTimeTaken, success, + context.queryString(), context.host(), synced)); } } } - - /** - * Determine whether to throw an exception when local view of table's timeline is behind that of client's view. - */ - private boolean shouldThrowExceptionIfLocalViewBehind(HoodieTimeline localTimeline, String timelineHashFromClient) { - Option lastInstant = localTimeline.lastInstant(); - // When performing async clean, we may have one more .clean.completed after lastInstantTs. - // In this case, we do not need to throw an exception. - if (lastInstant.isPresent() && lastInstant.get().getAction().equals(HoodieTimeline.CLEAN_ACTION) - && localTimeline.findInstantsBefore(lastInstant.get().getTimestamp()).getTimelineHash().equals(timelineHashFromClient)) { - return false; - } else { - return true; - } - } } diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java index 7b8257705146e..4b884ee25ef62 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java @@ -31,7 +31,6 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import io.javalin.Javalin; -import io.javalin.core.util.JettyServerUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; @@ -264,13 +263,12 @@ private int startServiceOnPort(int port) throws IOException { } public int startService() throws IOException { - final Server server = timelineServerConf.numThreads == DEFAULT_NUM_THREADS ? JettyServerUtil.defaultServer() - : new Server(new QueuedThreadPool(timelineServerConf.numThreads)); + final Server server = timelineServerConf.numThreads == DEFAULT_NUM_THREADS ? new Server() : new Server(new QueuedThreadPool(timelineServerConf.numThreads)); - app = Javalin.create().server(() -> server); - if (!timelineServerConf.compress) { - app.disableDynamicGzip(); - } + app = Javalin.create(c -> { + c.compressionStrategy(io.javalin.core.compression.CompressionStrategy.NONE); + c.server(() -> server); + }); requestHandler = new RequestHandler( app, conf, timelineServerConf, context, fs, fsViewsManager); diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java index 32b85bdfc35b6..43c7d93bd6e8b 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java @@ -27,7 +27,7 @@ import org.apache.hudi.timeline.service.handlers.marker.MarkerCreationFuture; import org.apache.hudi.timeline.service.handlers.marker.MarkerDirState; -import io.javalin.Context; +import io.javalin.http.Context; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationFuture.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationFuture.java index 5ff8baa90da1f..d965e56a01cb9 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationFuture.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationFuture.java @@ -20,7 +20,7 @@ import org.apache.hudi.common.util.HoodieTimer; -import io.javalin.Context; +import io.javalin.http.Context; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java index 87afd56d83c06..f5a0fadc87fdd 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java @@ -21,32 +21,25 @@ import org.apache.hudi.avro.model.HoodieIndexCommitMetadata; import org.apache.hudi.avro.model.HoodieIndexPartitionInfo; -import org.apache.hudi.client.SparkRDDReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; -import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.testutils.providers.SparkProvider; -import org.apache.hadoop.fs.Path; -import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -75,46 +68,29 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkProvider { +public class TestHoodieIndexer extends SparkClientFunctionalTestHarness implements SparkProvider { - private static transient SparkSession spark; - private static transient SQLContext sqlContext; - private static transient JavaSparkContext jsc; - private static transient HoodieSparkEngineContext context; - private static int colStatsFileGroupCount; + private static final HoodieTestDataGenerator DATA_GENERATOR = new HoodieTestDataGenerator(0L); + private static int colStatsFileGroupCount = HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.defaultValue(); + private HoodieTableMetaClient metaClient; @BeforeEach public void init() throws IOException { - boolean initialized = spark != null; - if (!initialized) { - SparkConf sparkConf = conf(); - SparkRDDWriteClient.registerClasses(sparkConf); - SparkRDDReadClient.addHoodieSupport(sparkConf); - spark = SparkSession.builder().config(sparkConf).getOrCreate(); - sqlContext = spark.sqlContext(); - jsc = new JavaSparkContext(spark.sparkContext()); - context = new HoodieSparkEngineContext(jsc); - colStatsFileGroupCount = HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.defaultValue(); - } - initPath(); - initMetaClient(); + this.metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE); } - protected void initMetaClient() throws IOException { - String rootPathStr = "file://" + tempDir.toAbsolutePath().toString(); - Path rootPath = new Path(rootPathStr); - rootPath.getFileSystem(jsc.hadoopConfiguration()).mkdirs(rootPath); - metaClient = HoodieTestUtils.init(rootPathStr, getTableType()); - basePath = metaClient.getBasePath(); + @AfterAll + public static void cleanup() { + DATA_GENERATOR.close(); } @Test public void testGetRequestedPartitionTypes() { HoodieIndexer.Config config = new HoodieIndexer.Config(); - config.basePath = basePath; + config.basePath = basePath(); config.tableName = "indexer_test"; config.indexTypes = "FILES,BLOOM_FILTERS,COLUMN_STATS"; - HoodieIndexer indexer = new HoodieIndexer(jsc, config); + HoodieIndexer indexer = new HoodieIndexer(jsc(), config); List partitionTypes = indexer.getRequestedPartitionTypes(config.indexTypes, Option.empty()); assertTrue(partitionTypes.contains(FILES)); assertTrue(partitionTypes.contains(BLOOM_FILTERS)); @@ -124,10 +100,10 @@ public void testGetRequestedPartitionTypes() { @Test public void testIsIndexBuiltForAllRequestedTypes() { HoodieIndexer.Config config = new HoodieIndexer.Config(); - config.basePath = basePath; + config.basePath = basePath(); config.tableName = "indexer_test"; config.indexTypes = "BLOOM_FILTERS,COLUMN_STATS"; - HoodieIndexer indexer = new HoodieIndexer(jsc, config); + HoodieIndexer indexer = new HoodieIndexer(jsc(), config); HoodieIndexCommitMetadata commitMetadata = HoodieIndexCommitMetadata.newBuilder() .setIndexPartitionInfos(Arrays.asList(new HoodieIndexPartitionInfo( 1, @@ -137,39 +113,37 @@ public void testIsIndexBuiltForAllRequestedTypes() { assertFalse(indexer.isIndexBuiltForAllRequestedTypes(commitMetadata.getIndexPartitionInfos())); config.indexTypes = "COLUMN_STATS"; - indexer = new HoodieIndexer(jsc, config); + indexer = new HoodieIndexer(jsc(), config); assertTrue(indexer.isIndexBuiltForAllRequestedTypes(commitMetadata.getIndexPartitionInfos())); } @Test public void testIndexerWithNotAllIndexesEnabled() { - initTestDataGenerator(); - tableName = "indexer_test"; + String tableName = "indexer_test"; // enable files and bloom_filters on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true); - initializeWriteClient(metadataConfigBuilder.build()); + initializeWriteClient(metadataConfigBuilder.build(), tableName); // validate table config assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); // build indexer config which has only column_stats enabled (files and bloom filter is already enabled) - indexMetadataPartitionsAndAssert(COLUMN_STATS, Arrays.asList(new MetadataPartitionType[] {FILES, BLOOM_FILTERS}), Collections.emptyList()); + indexMetadataPartitionsAndAssert(COLUMN_STATS, Arrays.asList(new MetadataPartitionType[] {FILES, BLOOM_FILTERS}), Collections.emptyList(), tableName); } @Test public void testIndexerWithFilesPartition() { - initTestDataGenerator(); - tableName = "indexer_test"; + String tableName = "indexer_test"; // enable files and bloom_filters on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true); - initializeWriteClient(metadataConfigBuilder.build()); + initializeWriteClient(metadataConfigBuilder.build(), tableName); // validate table config assertFalse(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); // build indexer config which has only files enabled - indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS})); + indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS}), tableName); } private static Stream colStatsFileGroupCountParams() { @@ -185,20 +159,19 @@ private static Stream colStatsFileGroupCountParams() { @MethodSource("colStatsFileGroupCountParams") public void testColStatsFileGroupCount(int colStatsFileGroupCount) { TestHoodieIndexer.colStatsFileGroupCount = colStatsFileGroupCount; - initTestDataGenerator(); - tableName = "indexer_test"; + String tableName = "indexer_test"; // enable files and bloom_filters on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false).withMetadataIndexBloomFilter(true); - initializeWriteClient(metadataConfigBuilder.build()); + initializeWriteClient(metadataConfigBuilder.build(), tableName); // validate table config assertFalse(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); // build indexer config which has only files enabled - indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS})); + indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS}), tableName); // build indexer config which has only col stats enabled - indexMetadataPartitionsAndAssert(COLUMN_STATS, Collections.singletonList(FILES), Arrays.asList(new MetadataPartitionType[] {BLOOM_FILTERS})); + indexMetadataPartitionsAndAssert(COLUMN_STATS, Collections.singletonList(FILES), Arrays.asList(new MetadataPartitionType[] {BLOOM_FILTERS}), tableName); HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getMetaPath() + "/metadata").build(); List partitionFileSlices = @@ -212,24 +185,23 @@ public void testColStatsFileGroupCount(int colStatsFileGroupCount) { */ @Test public void testIndexerForExceptionWithNonFilesPartition() { - initTestDataGenerator(); - tableName = "indexer_test"; + String tableName = "indexer_test"; // enable files and bloom_filters on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(false, false); - initializeWriteClient(metadataConfigBuilder.build()); + initializeWriteClient(metadataConfigBuilder.build(), tableName); // validate table config assertFalse(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); // build indexer config which has only column stats enabled. expected to throw exception. HoodieIndexer.Config config = new HoodieIndexer.Config(); String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath(); - config.basePath = basePath; + config.basePath = basePath(); config.tableName = tableName; config.indexTypes = COLUMN_STATS.name(); config.runningMode = SCHEDULE_AND_EXECUTE; config.propsFilePath = propsPath; // start the indexer and validate index building fails - HoodieIndexer indexer = new HoodieIndexer(jsc, config); + HoodieIndexer indexer = new HoodieIndexer(jsc(), config); assertEquals(-1, indexer.start(0)); // validate table config @@ -238,13 +210,13 @@ public void testIndexerForExceptionWithNonFilesPartition() { assertFalse(metaClient.getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath())); assertFalse(metaClient.getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); // validate metadata partitions actually exist - assertFalse(metadataPartitionExists(basePath, context, FILES)); + assertFalse(metadataPartitionExists(basePath(), context(), FILES)); // trigger FILES partition and indexing should succeed. - indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS})); + indexMetadataPartitionsAndAssert(FILES, Collections.emptyList(), Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, BLOOM_FILTERS}), tableName); // build indexer config which has only col stats enabled - indexMetadataPartitionsAndAssert(COLUMN_STATS, Collections.singletonList(FILES), Arrays.asList(new MetadataPartitionType[] {BLOOM_FILTERS})); + indexMetadataPartitionsAndAssert(COLUMN_STATS, Collections.singletonList(FILES), Arrays.asList(new MetadataPartitionType[] {BLOOM_FILTERS}), tableName); HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getMetaPath() + "/metadata").build(); List partitionFileSlices = @@ -252,23 +224,24 @@ public void testIndexerForExceptionWithNonFilesPartition() { assertEquals(partitionFileSlices.size(), HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.defaultValue()); } - private void initializeWriteClient(HoodieMetadataConfig metadataConfig) { - HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName); + private void initializeWriteClient(HoodieMetadataConfig metadataConfig, String tableName) { + HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath(), tableName); HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfig).build(); // do one upsert with synchronous metadata update - SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig); + SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig); String instant = "0001"; writeClient.startCommitWithTime(instant); - List records = dataGen.generateInserts(instant, 100); - JavaRDD result = writeClient.upsert(jsc.parallelize(records, 1), instant); + List records = DATA_GENERATOR.generateInserts(instant, 100); + JavaRDD result = writeClient.upsert(jsc().parallelize(records, 1), instant); List statuses = result.collect(); assertNoWriteErrors(statuses); } - private void indexMetadataPartitionsAndAssert(MetadataPartitionType partitionTypeToIndex, List alreadyCompletedPartitions, List nonExistantPartitions) { + private void indexMetadataPartitionsAndAssert(MetadataPartitionType partitionTypeToIndex, List alreadyCompletedPartitions, List nonExistantPartitions, + String tableName) { HoodieIndexer.Config config = new HoodieIndexer.Config(); String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath(); - config.basePath = basePath; + config.basePath = basePath(); config.tableName = tableName; config.indexTypes = partitionTypeToIndex.name(); config.runningMode = SCHEDULE_AND_EXECUTE; @@ -277,7 +250,7 @@ private void indexMetadataPartitionsAndAssert(MetadataPartitionType partitionTyp config.configs.add(HoodieMetadataConfig.METADATA_INDEX_COLUMN_STATS_FILE_GROUP_COUNT.key() + "=" + colStatsFileGroupCount); } // start the indexer and validate files index is completely built out - HoodieIndexer indexer = new HoodieIndexer(jsc, config); + HoodieIndexer indexer = new HoodieIndexer(jsc(), config); assertEquals(0, indexer.start(0)); // validate table config @@ -288,140 +261,138 @@ private void indexMetadataPartitionsAndAssert(MetadataPartitionType partitionTyp nonExistantPartitions.forEach(entry -> assertFalse(completedPartitions.contains(entry.getPartitionPath()))); // validate metadata partitions actually exist - assertTrue(metadataPartitionExists(basePath, context, partitionTypeToIndex)); - alreadyCompletedPartitions.forEach(entry -> assertTrue(metadataPartitionExists(basePath, context, entry))); + assertTrue(metadataPartitionExists(basePath(), context(), partitionTypeToIndex)); + alreadyCompletedPartitions.forEach(entry -> assertTrue(metadataPartitionExists(basePath(), context(), entry))); } @Test public void testIndexerDropPartitionDeletesInstantFromTimeline() { - initTestDataGenerator(); String tableName = "indexer_test"; - HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName); + HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath(), tableName); // enable files on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false).withMetadataIndexBloomFilter(true); HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build(); // do one upsert with synchronous metadata update - SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig); + SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig); String instant = "0001"; writeClient.startCommitWithTime(instant); - List records = dataGen.generateInserts(instant, 100); - JavaRDD result = writeClient.upsert(jsc.parallelize(records, 1), instant); + List records = DATA_GENERATOR.generateInserts(instant, 100); + JavaRDD result = writeClient.upsert(jsc().parallelize(records, 1), instant); List statuses = result.collect(); assertNoWriteErrors(statuses); // validate partitions built successfully assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath, context, FILES)); + assertTrue(metadataPartitionExists(basePath(), context(), FILES)); assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS)); + assertTrue(metadataPartitionExists(basePath(), context(), BLOOM_FILTERS)); // build indexer config which has only column_stats enabled (files is enabled by default) HoodieIndexer.Config config = new HoodieIndexer.Config(); String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource("delta-streamer-config/indexer.properties")).getPath(); - config.basePath = basePath; + config.basePath = basePath(); config.tableName = tableName; config.indexTypes = COLUMN_STATS.name(); config.runningMode = SCHEDULE; config.propsFilePath = propsPath; // schedule indexing and validate column_stats index is also initialized - HoodieIndexer indexer = new HoodieIndexer(jsc, config); + HoodieIndexer indexer = new HoodieIndexer(jsc(), config); assertEquals(0, indexer.start(0)); Option indexInstantInTimeline = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant(); assertTrue(indexInstantInTimeline.isPresent()); assertEquals(REQUESTED, indexInstantInTimeline.get().getState()); - assertTrue(metadataPartitionExists(basePath, context, COLUMN_STATS)); + assertTrue(metadataPartitionExists(basePath(), context(), COLUMN_STATS)); // drop column_stats and validate indexing.requested is also removed from the timeline config.runningMode = DROP_INDEX; - indexer = new HoodieIndexer(jsc, config); + indexer = new HoodieIndexer(jsc(), config); assertEquals(0, indexer.start(0)); indexInstantInTimeline = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant(); assertFalse(indexInstantInTimeline.isPresent()); - assertFalse(metadataPartitionExists(basePath, context, COLUMN_STATS)); + assertFalse(metadataPartitionExists(basePath(), context(), COLUMN_STATS)); // check other partitions are intact assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath, context, FILES)); + assertTrue(metadataPartitionExists(basePath(), context(), FILES)); assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS)); + assertTrue(metadataPartitionExists(basePath(), context(), BLOOM_FILTERS)); } @Test public void testTwoIndexersOneCreateOneDropPartition() { - initTestDataGenerator(); String tableName = "indexer_test"; - HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath, tableName); + HoodieWriteConfig.Builder writeConfigBuilder = getWriteConfigBuilder(basePath(), tableName); // enable files on the regular write client HoodieMetadataConfig.Builder metadataConfigBuilder = getMetadataConfigBuilder(true, false); HoodieWriteConfig writeConfig = writeConfigBuilder.withMetadataConfig(metadataConfigBuilder.build()).build(); // do one upsert with synchronous metadata update - SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig); + SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig); String instant = "0001"; writeClient.startCommitWithTime(instant); - List records = dataGen.generateInserts(instant, 100); - JavaRDD result = writeClient.upsert(jsc.parallelize(records, 1), instant); + List records = DATA_GENERATOR.generateInserts(instant, 100); + JavaRDD result = writeClient.upsert(jsc().parallelize(records, 1), instant); List statuses = result.collect(); assertNoWriteErrors(statuses); // validate files partition built successfully assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath, context, FILES)); + assertTrue(metadataPartitionExists(basePath(), context(), FILES)); // build indexer config which has only bloom_filters enabled - HoodieIndexer.Config config = getHoodieIndexConfig(BLOOM_FILTERS.name(), SCHEDULE_AND_EXECUTE, "delta-streamer-config/indexer-only-bloom.properties"); + HoodieIndexer.Config config = getHoodieIndexConfig(BLOOM_FILTERS.name(), SCHEDULE_AND_EXECUTE, "delta-streamer-config/indexer-only-bloom.properties", tableName); // start the indexer and validate bloom_filters index is also complete - HoodieIndexer indexer = new HoodieIndexer(jsc, config); + HoodieIndexer indexer = new HoodieIndexer(jsc(), config); assertEquals(0, indexer.start(0)); assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS)); + assertTrue(metadataPartitionExists(basePath(), context(), BLOOM_FILTERS)); // completed index timeline for later validation Option bloomIndexInstant = metaClient.reloadActiveTimeline().filterCompletedIndexTimeline().lastInstant(); assertTrue(bloomIndexInstant.isPresent()); // build indexer config which has only column_stats enabled - config = getHoodieIndexConfig(COLUMN_STATS.name(), SCHEDULE, "delta-streamer-config/indexer.properties"); + config = getHoodieIndexConfig(COLUMN_STATS.name(), SCHEDULE, "delta-streamer-config/indexer.properties", tableName); // schedule indexing and validate column_stats index is also initialized // and indexing.requested instant is present - indexer = new HoodieIndexer(jsc, config); + indexer = new HoodieIndexer(jsc(), config); assertEquals(0, indexer.start(0)); Option columnStatsIndexInstant = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant(); assertTrue(columnStatsIndexInstant.isPresent()); assertEquals(REQUESTED, columnStatsIndexInstant.get().getState()); - assertTrue(metadataPartitionExists(basePath, context, COLUMN_STATS)); + assertTrue(metadataPartitionExists(basePath(), context(), COLUMN_STATS)); // drop column_stats and validate indexing.requested is also removed from the timeline // and completed indexing instant corresponding to bloom_filters index is still present - dropIndexAndAssert(COLUMN_STATS, "delta-streamer-config/indexer.properties", Option.empty()); + dropIndexAndAssert(COLUMN_STATS, "delta-streamer-config/indexer.properties", Option.empty(), tableName); // check other partitions are intact assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath, context, FILES)); + assertTrue(metadataPartitionExists(basePath(), context(), FILES)); assertTrue(reload(metaClient).getTableConfig().getMetadataPartitions().contains(BLOOM_FILTERS.getPartitionPath())); - assertTrue(metadataPartitionExists(basePath, context, BLOOM_FILTERS)); + assertTrue(metadataPartitionExists(basePath(), context(), BLOOM_FILTERS)); // drop bloom filter partition. timeline files should not be deleted since the index building is complete. - dropIndexAndAssert(BLOOM_FILTERS, "delta-streamer-config/indexer-only-bloom.properties", bloomIndexInstant); + dropIndexAndAssert(BLOOM_FILTERS, "delta-streamer-config/indexer-only-bloom.properties", bloomIndexInstant, tableName); } - private void dropIndexAndAssert(MetadataPartitionType indexType, String resourceFilePath, Option completedIndexInstant) { - HoodieIndexer.Config config = getHoodieIndexConfig(indexType.name(), DROP_INDEX, resourceFilePath); - HoodieIndexer indexer = new HoodieIndexer(jsc, config); + private void dropIndexAndAssert(MetadataPartitionType indexType, String resourceFilePath, Option completedIndexInstant, String tableName) { + HoodieIndexer.Config config = getHoodieIndexConfig(indexType.name(), DROP_INDEX, resourceFilePath, tableName); + HoodieIndexer indexer = new HoodieIndexer(jsc(), config); assertEquals(0, indexer.start(0)); Option pendingFlights = metaClient.reloadActiveTimeline().filterPendingIndexTimeline().lastInstant(); assertFalse(pendingFlights.isPresent()); - assertFalse(metadataPartitionExists(basePath, context, indexType)); + assertFalse(metadataPartitionExists(basePath(), context(), indexType)); if (completedIndexInstant.isPresent()) { assertEquals(completedIndexInstant, metaClient.reloadActiveTimeline().filterCompletedIndexTimeline().lastInstant()); } } - private HoodieIndexer.Config getHoodieIndexConfig(String indexType, String runMode, String resourceFilePath) { + private HoodieIndexer.Config getHoodieIndexConfig(String indexType, String runMode, String resourceFilePath, String tableName) { HoodieIndexer.Config config = new HoodieIndexer.Config(); String propsPath = Objects.requireNonNull(getClass().getClassLoader().getResource(resourceFilePath)).getPath(); - config.basePath = basePath; + config.basePath = basePath(); config.tableName = tableName; config.indexTypes = indexType; config.runningMode = runMode; @@ -446,24 +417,4 @@ private static HoodieMetadataConfig.Builder getMetadataConfigBuilder(boolean ena .enable(enable) .withAsyncIndex(asyncIndex); } - - @Override - public HoodieEngineContext context() { - return context; - } - - @Override - public SparkSession spark() { - return spark; - } - - @Override - public SQLContext sqlContext() { - return sqlContext; - } - - @Override - public JavaSparkContext jsc() { - return jsc; - } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java index 018dd0f67d777..16d190ac45d15 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java @@ -21,7 +21,6 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.utilities.sources.AvroKafkaSource; import org.apache.hudi.utilities.sources.helpers.SchemaTestProvider; -import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; @@ -45,7 +44,7 @@ /** * Tests {@link KafkaAvroSchemaDeserializer}. */ -public class TestKafkaAvroSchemaDeserializer extends UtilitiesTestBase { +public class TestKafkaAvroSchemaDeserializer { private final SchemaRegistryClient schemaRegistry; private final KafkaAvroSerializer avroSerializer; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java index 2106afe11f755..1ac3f91fa8c7e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java @@ -29,8 +29,6 @@ import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -63,14 +61,8 @@ public static void beforeAll() throws Exception { UtilitiesTestBase.initTestServices(false, false); } - @AfterAll - public static void afterAll() { - UtilitiesTestBase.cleanupClass(); - } - @BeforeEach public void beforeEach() throws Exception { - super.setup(); schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc); MockitoAnnotations.initMocks(this); @@ -79,11 +71,6 @@ public void beforeEach() throws Exception { props.put(PUBSUB_SUBSCRIPTION_ID, "dummy-subscription"); } - @AfterEach - public void afterEach() throws Exception { - super.teardown(); - } - @Test public void shouldReturnEmptyOnNoMessages() { when(pubsubMessagesFetcher.fetchMessages()).thenReturn(Collections.emptyList()); diff --git a/packaging/hudi-datahub-sync-bundle/pom.xml b/packaging/hudi-datahub-sync-bundle/pom.xml index 2a1ba95b4772a..19bff7716be76 100644 --- a/packaging/hudi-datahub-sync-bundle/pom.xml +++ b/packaging/hudi-datahub-sync-bundle/pom.xml @@ -88,9 +88,6 @@ org.apache.hbase.thirdparty:hbase-shaded-netty org.apache.hbase.thirdparty:hbase-shaded-protobuf org.apache.htrace:htrace-core4 - org.apache.httpcomponents:fluent-hc - org.apache.httpcomponents:httpcore - org.apache.httpcomponents:httpclient org.apache.httpcomponents:httpasyncclient org.apache.httpcomponents:httpcore-nio org.openjdk.jol:jol-core diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index e5726bca948ee..df590a78a7ca7 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -91,9 +91,6 @@ io.javalin:javalin org.jetbrains.kotlin:* org.rocksdb:rocksdbjni - org.apache.httpcomponents:httpclient - org.apache.httpcomponents:httpcore - org.apache.httpcomponents:fluent-hc org.antlr:stringtemplate org.apache.parquet:parquet-avro diff --git a/packaging/hudi-integ-test-bundle/pom.xml b/packaging/hudi-integ-test-bundle/pom.xml index c9deb10712630..479a5711e0de2 100644 --- a/packaging/hudi-integ-test-bundle/pom.xml +++ b/packaging/hudi-integ-test-bundle/pom.xml @@ -161,10 +161,6 @@ org.apache.thrift:libfb303 org.apache.thrift:libthrift - org.apache.httpcomponents:httpclient - org.apache.httpcomponents:httpcore - org.apache.httpcomponents:fluent-hc - com.fasterxml.jackson.core:jackson-annotations com.fasterxml.jackson.core:jackson-core com.fasterxml.jackson.core:jackson-databind @@ -607,11 +603,6 @@ 1.4 - - org.apache.httpcomponents - httpcore - - org.apache.hadoop hadoop-mapreduce-client-common diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml index 86a52496c24c5..9222937ba75f7 100644 --- a/packaging/hudi-kafka-connect-bundle/pom.xml +++ b/packaging/hudi-kafka-connect-bundle/pom.xml @@ -101,9 +101,6 @@ org.eclipse.jetty:* org.eclipse.jetty.websocket:* org.rocksdb:rocksdbjni - org.apache.httpcomponents:httpclient - org.apache.httpcomponents:httpcore - org.apache.httpcomponents:fluent-hc io.dropwizard.metrics:metrics-core io.dropwizard.metrics:metrics-graphite diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index 7f50eac81936f..6e0dfe32d86e0 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -90,9 +90,6 @@ org.eclipse.jetty.websocket:* org.jetbrains.kotlin:* org.rocksdb:rocksdbjni - org.apache.httpcomponents:httpclient - org.apache.httpcomponents:httpcore - org.apache.httpcomponents:fluent-hc org.antlr:stringtemplate org.apache.parquet:parquet-avro @@ -150,10 +147,6 @@ javax.servlet. org.apache.hudi.javax.servlet. - - org.apache.http. - org.apache.hudi.org.apache.http. - com.yammer.metrics. org.apache.hudi.com.yammer.metrics. diff --git a/packaging/hudi-timeline-server-bundle/pom.xml b/packaging/hudi-timeline-server-bundle/pom.xml index dbb333e23929a..409acd41be79a 100644 --- a/packaging/hudi-timeline-server-bundle/pom.xml +++ b/packaging/hudi-timeline-server-bundle/pom.xml @@ -56,12 +56,6 @@ jackson-databind - - - org.apache.httpcomponents - fluent-hc - - io.javalin javalin @@ -162,11 +156,8 @@ --> org.apache.hudi:hudi-common org.apache.hudi:hudi-timeline-service - org.apache.httpcomponents:httpclient - org.apache.httpcomponents:httpcore org.mortbay.jetty:jetty org.mortbay.jetty:jetty-util - org.apache.httpcomponents:fluent-hc io.javalin:javalin org.jetbrains.kotlin:kotlin-stdlib-jdk8 org.jetbrains.kotlin:kotlin-stdlib diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index ecfcf76878303..bbc28bf0f5964 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -113,9 +113,6 @@ org.eclipse.jetty.websocket:* org.jetbrains.kotlin:* org.rocksdb:rocksdbjni - org.apache.httpcomponents:httpclient - org.apache.httpcomponents:httpcore - org.apache.httpcomponents:fluent-hc org.antlr:stringtemplate org.apache.parquet:parquet-avro diff --git a/packaging/hudi-utilities-slim-bundle/pom.xml b/packaging/hudi-utilities-slim-bundle/pom.xml index cc155769ad572..74de7f863985c 100644 --- a/packaging/hudi-utilities-slim-bundle/pom.xml +++ b/packaging/hudi-utilities-slim-bundle/pom.xml @@ -105,9 +105,6 @@ org.eclipse.jetty.websocket:* org.jetbrains.kotlin:* org.rocksdb:rocksdbjni - org.apache.httpcomponents:httpclient - org.apache.httpcomponents:httpcore - org.apache.httpcomponents:fluent-hc org.antlr:stringtemplate com.github.davidmoten:guava-mini diff --git a/pom.xml b/pom.xml index 3965fc9ee0efb..6efb8bf936d43 100644 --- a/pom.xml +++ b/pom.xml @@ -168,7 +168,7 @@ 3.1.0 log4j2-surefire.properties 0.12.0 - 9.4.15.v20190215 + 9.4.33.v20201020 3.1.0-incubating 2.4.9 1.4.199 @@ -434,6 +434,10 @@ com.esotericsoftware:kryo-shaded com.esotericsoftware:minlog org.objenesis:objenesis + + org.apache.httpcomponents:httpclient + org.apache.httpcomponents:httpcore + org.apache.httpcomponents:fluent-hc @@ -454,6 +458,11 @@ org.objenesis. org.apache.hudi.org.objenesis. + + + org.apache.http. + org.apache.hudi.org.apache.http. + diff --git a/scripts/pr_compliance.py b/scripts/pr_compliance.py index 5946a355872e5..af7d9454f70f7 100644 --- a/scripts/pr_compliance.py +++ b/scripts/pr_compliance.py @@ -396,7 +396,7 @@ def make_default_validator(body, debug=False): "### Impact", {"_Describe any public API or user-facing feature change or any performance impact._"}) risklevel = RiskLevelData("RISKLEVEL", - "### Risk level ", + "### Risk level", {"_If medium or high, explain what verification was done to mitigate the risks._"}) checklist = ParseSectionData("CHECKLIST", "### Contributor's checklist",