From 7f8630cc57fbb9d29e8dc7ca87b582264da073fd Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Thu, 2 Jun 2022 09:48:48 +0800 Subject: [PATCH] [HUDI-4167] Remove the timeline refresh with initializing hoodie table (#5716) The timeline refresh on table initialization invokes the fs view #sync, which has two actions now: 1. reload the timeline of the fs view, so that the next fs view request is based on this timeline metadata 2. if this is a local fs view, clear all the local states; if this is a remote fs view, send request to sync the remote fs view But, let's see the construction, the meta client is instantiated freshly so the timeline is already the latest, the table is also constructed freshly, so the fs view has no local states, that means, the #sync is unnecessary totally. In this patch, the metadata lifecycle and data set fs view are kept in sync, when the fs view is refreshed, the underneath metadata is also refreshed synchronouly. The freshness of the metadata follows the same rules as data fs view: 1. if the fs view is local, the visibility is based on the client table metadata client's latest commit 2. if the fs view is remote, the timeline server would #sync the fs view and metadata together based on the lagging server local timeline From the perspective of client, no need to care about the refresh action anymore no matter whether the metadata table is enabled or not. That make the client logic more clear and less error-prone. Removes the timeline refresh has another benefit: if avoids unncecessary #refresh of the remote fs view, if all the clients send request to #sync the remote fs view, the server would encounter conflicts and the client encounters a response error. --- .../apache/hudi/cli/commands/SparkMain.java | 2 +- .../hudi/client/BaseHoodieWriteClient.java | 28 ++++++++----------- .../apache/hudi/config/HoodieWriteConfig.java | 9 ++++-- .../hudi/client/HoodieFlinkWriteClient.java | 3 +- .../apache/hudi/table/HoodieFlinkTable.java | 10 ------- .../bloom/TestFlinkHoodieBloomIndex.java | 2 +- .../hudi/client/HoodieJavaWriteClient.java | 4 +-- .../hudi/client/SparkRDDWriteClient.java | 12 ++++---- .../apache/hudi/table/HoodieSparkTable.java | 17 +---------- .../hudi/client/TestTableSchemaEvolution.java | 8 +++++- .../TestHoodieBackedTableMetadata.java | 2 +- .../table/TestHoodieMergeOnReadTable.java | 2 +- .../testutils/HoodieClientTestHarness.java | 2 +- .../view/RocksDbBasedFileSystemView.java | 1 + .../metadata/HoodieBackedTableMetadata.java | 21 +++++++++++--- .../timeline/service/TimelineService.java | 3 +- 16 files changed, 59 insertions(+), 67 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 9fe83f1995c2..43fe168587ac 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -296,7 +296,7 @@ protected static int deleteMarker(JavaSparkContext jsc, String instantTime, Stri SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false); HoodieWriteConfig config = client.getConfig(); HoodieEngineContext context = client.getEngineContext(); - HoodieSparkTable table = HoodieSparkTable.create(config, context, true); + HoodieSparkTable table = HoodieSparkTable.create(config, context); WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); return 0; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 5c485bed0581..455cb644c7d4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -296,11 +296,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom } } - protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) { - return createTable(config, hadoopConf, false); - } - - protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline); + protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf); void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) { try { @@ -365,7 +361,7 @@ public void bootstrap(Option> extraMetadata) { */ protected void rollbackFailedBootstrap() { LOG.info("Rolling back pending bootstrap if present"); - HoodieTable table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); + HoodieTable table = createTable(config, hadoopConf); HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); Option instant = Option.fromJavaOptional( inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst()); @@ -634,7 +630,7 @@ protected void autoArchiveOnCommit(HoodieTable table, boolean acquireLockForArch * Run any pending compactions. */ public void runAnyPendingCompactions() { - runAnyPendingCompactions(createTable(config, hadoopConf, config.isMetadataTableEnabled())); + runAnyPendingCompactions(createTable(config, hadoopConf)); } /** @@ -644,7 +640,7 @@ public void runAnyPendingCompactions() { * @param comment - Comment for the savepoint */ public void savepoint(String user, String comment) { - HoodieTable table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); + HoodieTable table = createTable(config, hadoopConf); if (table.getCompletedCommitsTimeline().empty()) { throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty"); } @@ -668,7 +664,7 @@ public void savepoint(String user, String comment) { * @param comment - Comment for the savepoint */ public void savepoint(String instantTime, String user, String comment) { - HoodieTable table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); + HoodieTable table = createTable(config, hadoopConf); table.savepoint(context, instantTime, user, comment); } @@ -680,7 +676,7 @@ public void savepoint(String instantTime, String user, String comment) { * @return true if the savepoint was deleted successfully */ public void deleteSavepoint(String savepointTime) { - HoodieTable table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); + HoodieTable table = createTable(config, hadoopConf); SavepointHelpers.deleteSavepoint(table, savepointTime); } @@ -1012,7 +1008,7 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option scheduleIndexing(List partitionTypes) { String instantTime = HoodieActiveTimeline.createNewInstantTime(); - Option indexPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled()) + Option indexPlan = createTable(config, hadoopConf) .scheduleIndexing(context, instantTime, partitionTypes); return indexPlan.isPresent() ? Option.of(instantTime) : Option.empty(); } @@ -1024,7 +1020,7 @@ public Option scheduleIndexing(List partitionType * @return {@link Option} after successful indexing. */ public Option index(String indexInstantTime) { - return createTable(config, hadoopConf, config.isMetadataTableEnabled()).index(context, indexInstantTime); + return createTable(config, hadoopConf).index(context, indexInstantTime); } /** @@ -1339,17 +1335,17 @@ private Option scheduleTableServiceInternal(String instantTime, Option clusteringPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled()) + Option clusteringPlan = createTable(config, hadoopConf) .scheduleClustering(context, instantTime, extraMetadata); return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty(); case COMPACT: LOG.info("Scheduling compaction at instant time :" + instantTime); - Option compactionPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled()) + Option compactionPlan = createTable(config, hadoopConf) .scheduleCompaction(context, instantTime, extraMetadata); return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty(); case CLEAN: LOG.info("Scheduling cleaning at instant time :" + instantTime); - Option cleanerPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled()) + Option cleanerPlan = createTable(config, hadoopConf) .scheduleCleaning(context, instantTime, extraMetadata); return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty(); default: @@ -1702,6 +1698,6 @@ private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient m // try to save history schemas FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient); schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr)); - commitStats(instantTime, Collections.EMPTY_LIST, Option.of(extraMeta), commitActionType); + commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), commitActionType); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 42208a0734aa..1603965ea987 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -364,8 +364,8 @@ public class HoodieWriteConfig extends HoodieConfig { public static final ConfigProperty REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT = ConfigProperty .key("hoodie.refresh.timeline.server.based.on.latest.commit") - .defaultValue(false) - .withDocumentation("Refresh timeline in timeline server based on latest commit apart from timeline hash difference. By default (false), "); + .defaultValue(true) + .withDocumentation("Refresh timeline in timeline server based on latest commit apart from timeline hash difference. By default (true)."); public static final ConfigProperty INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty .key("hoodie.consistency.check.initial_interval_ms") @@ -2499,6 +2499,11 @@ public Builder withAutoAdjustLockConfigs(boolean autoAdjustLockConfigs) { return this; } + public Builder withRefreshTimelineServerBasedOnLatestCommit(boolean refreshTimelineServerBasedOnLatestCommit) { + writeConfig.setValue(REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT, Boolean.toString(refreshTimelineServerBasedOnLatestCommit)); + return this; + } + protected void setDefaults() { writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType)); // Check for mandatory properties diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index ddfbabaf36ae..b68cf97e9aa3 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -117,8 +117,7 @@ public boolean commit(String instantTime, List writeStatuses, Optio } @Override - protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, - boolean refreshTimeline) { + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) { return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index 6eae15e7e1af..26149918c654 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -62,13 +62,6 @@ public static HoodieFlinkTable create(HoodieW public static HoodieFlinkTable create(HoodieWriteConfig config, HoodieFlinkEngineContext context, HoodieTableMetaClient metaClient) { - return HoodieFlinkTable.create(config, context, metaClient, config.isMetadataTableEnabled()); - } - - public static HoodieFlinkTable create(HoodieWriteConfig config, - HoodieFlinkEngineContext context, - HoodieTableMetaClient metaClient, - boolean refreshTimeline) { final HoodieFlinkTable hoodieFlinkTable; switch (metaClient.getTableType()) { case COPY_ON_WRITE: @@ -80,9 +73,6 @@ public static HoodieFlinkTable create(HoodieW default: throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); } - if (refreshTimeline) { - hoodieFlinkTable.getHoodieView().sync(); - } return hoodieFlinkTable; } diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java index 50adabbd585e..e23ee4ad58e6 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java @@ -104,7 +104,7 @@ private HoodieWriteConfig makeConfig(boolean rangePruning, boolean treeFiltering public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception { HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); HoodieBloomIndex index = new HoodieBloomIndex(config, ListBasedHoodieBloomIndexHelper.getInstance()); - HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient, false); + HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient); HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA); // Create some partitions, and put some files diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index 7f5dc19baf27..fbfb85bab3b8 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -89,9 +89,7 @@ public boolean commit(String instantTime, } @Override - protected HoodieTable createTable(HoodieWriteConfig config, - Configuration hadoopConf, - boolean refreshTimeline) { + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) { return HoodieJavaTable.create(config, context); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 7f9ec05e3c5e..fe6ea975e311 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -123,10 +123,8 @@ public boolean commit(String instantTime, JavaRDD writeStatuses, Op } @Override - protected HoodieTable createTable(HoodieWriteConfig config, - Configuration hadoopConf, - boolean refreshTimeline) { - return HoodieSparkTable.create(config, context, refreshTimeline); + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) { + return HoodieSparkTable.create(config, context); } @Override @@ -333,7 +331,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, @Override protected HoodieWriteMetadata> compact(String compactionInstantTime, boolean shouldComplete) { - HoodieSparkTable table = HoodieSparkTable.create(config, context, true); + HoodieSparkTable table = HoodieSparkTable.create(config, context); preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient()); HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); @@ -352,7 +350,7 @@ protected HoodieWriteMetadata> compact(String compactionIns @Override public HoodieWriteMetadata> cluster(String clusteringInstant, boolean shouldComplete) { - HoodieSparkTable table = HoodieSparkTable.create(config, context, config.isMetadataTableEnabled()); + HoodieSparkTable table = HoodieSparkTable.create(config, context); preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient()); HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant); @@ -434,7 +432,7 @@ protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option HoodieSparkTable create(HoodieWriteConfig config, HoodieEngineContext context) { - return create(config, context, false); - } - - public static HoodieSparkTable create(HoodieWriteConfig config, HoodieEngineContext context, - boolean refreshTimeline) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath()) .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))) .setFileSystemRetryConfig(config.getFileSystemRetryConfig()) .setProperties(config.getProps()).build(); - return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline); + return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient); } public static HoodieSparkTable create(HoodieWriteConfig config, HoodieSparkEngineContext context, HoodieTableMetaClient metaClient) { - return create(config, context, metaClient, false); - } - - public static HoodieSparkTable create(HoodieWriteConfig config, - HoodieSparkEngineContext context, - HoodieTableMetaClient metaClient, - boolean refreshTimeline) { HoodieSparkTable hoodieSparkTable; switch (metaClient.getTableType()) { case COPY_ON_WRITE: @@ -89,9 +77,6 @@ public static HoodieSparkTable create(HoodieW default: throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); } - if (refreshTimeline) { - hoodieSparkTable.getHoodieView().sync(); - } return hoodieSparkTable; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index 1cb7bcbfc4fc..98bcb11033c5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -515,7 +515,13 @@ private HoodieWriteConfig.Builder getWriteConfigBuilder(String schema) { return getConfigBuilder(schema) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()) - .withAvroSchemaValidate(true); + .withAvroSchemaValidate(true) + // The test has rollback instants on the timeline, + // these rollback instants use real time as instant time, whose instant time is always greater than + // the normal commits instant time, this breaks the refresh rule introduced in HUDI-2761: + // The last client instant is always the rollback instant but not the normal commit. + // Always refresh the timeline when client and server have different timeline. + .withRefreshTimelineServerBasedOnLatestCommit(false); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java index 9a8fc55a2002..e19c8fc1a2ee 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java @@ -111,7 +111,7 @@ private void verifyBaseMetadataTable() throws IOException { assertEquals(fsPartitions, metadataPartitions, "Partitions should match"); // Files within each partition should match - HoodieTable table = HoodieSparkTable.create(writeConfig, context, true); + HoodieTable table = HoodieSparkTable.create(writeConfig, context); TableFileSystemView tableView = table.getHoodieView(); List fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList()); Map partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index b9f025223b7d..0ce6ca0ee923 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -247,7 +247,7 @@ public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws assertEquals(allPartitions.size(), testTable.listAllBaseFiles().length); // Verify that all data file has one log file - HoodieTable table = HoodieSparkTable.create(config, context(), metaClient, true); + HoodieTable table = HoodieSparkTable.create(config, context(), metaClient); for (String partitionPath : dataGen.getPartitionPaths()) { List groupedLogFiles = table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 4504c552c95d..d0365dced199 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -559,7 +559,7 @@ public void validateMetadata(HoodieTestTable testTable, List inflightCom // Files within each partition should match metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext, true); + HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext); TableFileSystemView tableView = table.getHoodieView(); List fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList()); Map partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java index af0dc130162a..02a406e7e076 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java @@ -199,6 +199,7 @@ protected void resetViewState() { LOG.info("Deleting all rocksdb data associated with table filesystem view"); rocksDB.close(); rocksDB = new RocksDBDAO(metaClient.getBasePath(), config.getRocksdbBasePath()); + schemaHelper.getAllColumnFamilies().forEach(rocksDB::addColumnFamily); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index cf941bb70cc3..e8937b39dc7f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -550,10 +550,7 @@ private List getRollbackedCommits(HoodieInstant instant, HoodieActiveTim @Override public void close() { - for (Pair partitionFileSlicePair : partitionReaders.keySet()) { - close(partitionFileSlicePair); - } - partitionReaders.clear(); + closePartitionReaders(); } /** @@ -567,6 +564,16 @@ private synchronized void close(Pair partitionFileSlicePair) { closeReader(readers); } + /** + * Close and clear all the partitions readers. + */ + private void closePartitionReaders() { + for (Pair partitionFileSlicePair : partitionReaders.keySet()) { + close(partitionFileSlicePair); + } + partitionReaders.clear(); + } + private void closeReader(Pair readers) { if (readers != null) { try { @@ -624,5 +631,11 @@ public Option getLatestCompactionTime() { public void reset() { initIfNeeded(); dataMetaClient.reloadActiveTimeline(); + if (metadataMetaClient != null) { + metadataMetaClient.reloadActiveTimeline(); + } + // the cached reader has max instant time restriction, they should be cleared + // because the metadata timeline may have changed. + closePartitionReaders(); } } diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java index 40669f50e42d..2ff21682213c 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java @@ -150,7 +150,7 @@ public static class Builder { private int markerBatchNumThreads = 20; private long markerBatchIntervalMs = 50L; private int markerParallelism = 100; - private boolean refreshTimelineBasedOnLatestCommit = false; + private boolean refreshTimelineBasedOnLatestCommit = true; public Builder() { } @@ -240,6 +240,7 @@ public Config build() { config.markerBatchNumThreads = this.markerBatchNumThreads; config.markerBatchIntervalMs = this.markerBatchIntervalMs; config.markerParallelism = this.markerParallelism; + config.refreshTimelineBasedOnLatestCommit = this.refreshTimelineBasedOnLatestCommit; return config; } }