diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java index 9fe83f1995c2b..43fe168587ac1 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java @@ -296,7 +296,7 @@ protected static int deleteMarker(JavaSparkContext jsc, String instantTime, Stri SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false); HoodieWriteConfig config = client.getConfig(); HoodieEngineContext context = client.getEngineContext(); - HoodieSparkTable table = HoodieSparkTable.create(config, context, true); + HoodieSparkTable table = HoodieSparkTable.create(config, context); WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); return 0; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 5c485bed0581d..fdfbeadd49b2c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -296,11 +296,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom } } - protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) { - return createTable(config, hadoopConf, false); - } - - protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline); + protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf); void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) { try { @@ -365,7 +361,7 @@ public void bootstrap(Option> extraMetadata) { */ protected void rollbackFailedBootstrap() { LOG.info("Rolling back pending bootstrap if present"); - HoodieTable table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); + HoodieTable table = createTable(config, hadoopConf); HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); Option instant = Option.fromJavaOptional( inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst()); @@ -542,9 +538,6 @@ protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata me return; } if (config.areAnyTableServicesExecutedInline() || config.areAnyTableServicesScheduledInline()) { - if (config.isMetadataTableEnabled()) { - table.getHoodieView().sync(); - } // Do an inline compaction if enabled if (config.inlineCompactionEnabled()) { runAnyPendingCompactions(table); @@ -634,7 +627,7 @@ protected void autoArchiveOnCommit(HoodieTable table, boolean acquireLockForArch * Run any pending compactions. */ public void runAnyPendingCompactions() { - runAnyPendingCompactions(createTable(config, hadoopConf, config.isMetadataTableEnabled())); + runAnyPendingCompactions(createTable(config, hadoopConf)); } /** @@ -644,7 +637,7 @@ public void runAnyPendingCompactions() { * @param comment - Comment for the savepoint */ public void savepoint(String user, String comment) { - HoodieTable table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); + HoodieTable table = createTable(config, hadoopConf); if (table.getCompletedCommitsTimeline().empty()) { throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty"); } @@ -668,7 +661,7 @@ public void savepoint(String user, String comment) { * @param comment - Comment for the savepoint */ public void savepoint(String instantTime, String user, String comment) { - HoodieTable table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); + HoodieTable table = createTable(config, hadoopConf); table.savepoint(context, instantTime, user, comment); } @@ -680,7 +673,7 @@ public void savepoint(String instantTime, String user, String comment) { * @return true if the savepoint was deleted successfully */ public void deleteSavepoint(String savepointTime) { - HoodieTable table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); + HoodieTable table = createTable(config, hadoopConf); SavepointHelpers.deleteSavepoint(table, savepointTime); } @@ -1012,7 +1005,7 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option scheduleIndexing(List partitionTypes) { String instantTime = HoodieActiveTimeline.createNewInstantTime(); - Option indexPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled()) + Option indexPlan = createTable(config, hadoopConf) .scheduleIndexing(context, instantTime, partitionTypes); return indexPlan.isPresent() ? Option.of(instantTime) : Option.empty(); } @@ -1024,7 +1017,7 @@ public Option scheduleIndexing(List partitionType * @return {@link Option} after successful indexing. */ public Option index(String indexInstantTime) { - return createTable(config, hadoopConf, config.isMetadataTableEnabled()).index(context, indexInstantTime); + return createTable(config, hadoopConf).index(context, indexInstantTime); } /** @@ -1339,17 +1332,17 @@ private Option scheduleTableServiceInternal(String instantTime, Option clusteringPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled()) + Option clusteringPlan = createTable(config, hadoopConf) .scheduleClustering(context, instantTime, extraMetadata); return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty(); case COMPACT: LOG.info("Scheduling compaction at instant time :" + instantTime); - Option compactionPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled()) + Option compactionPlan = createTable(config, hadoopConf) .scheduleCompaction(context, instantTime, extraMetadata); return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty(); case CLEAN: LOG.info("Scheduling cleaning at instant time :" + instantTime); - Option cleanerPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled()) + Option cleanerPlan = createTable(config, hadoopConf) .scheduleCleaning(context, instantTime, extraMetadata); return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty(); default: @@ -1702,6 +1695,6 @@ private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient m // try to save history schemas FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient); schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr)); - commitStats(instantTime, Collections.EMPTY_LIST, Option.of(extraMeta), commitActionType); + commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), commitActionType); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 42208a0734aa7..1603965ea987f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -364,8 +364,8 @@ public class HoodieWriteConfig extends HoodieConfig { public static final ConfigProperty REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT = ConfigProperty .key("hoodie.refresh.timeline.server.based.on.latest.commit") - .defaultValue(false) - .withDocumentation("Refresh timeline in timeline server based on latest commit apart from timeline hash difference. By default (false), "); + .defaultValue(true) + .withDocumentation("Refresh timeline in timeline server based on latest commit apart from timeline hash difference. By default (true)."); public static final ConfigProperty INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty .key("hoodie.consistency.check.initial_interval_ms") @@ -2499,6 +2499,11 @@ public Builder withAutoAdjustLockConfigs(boolean autoAdjustLockConfigs) { return this; } + public Builder withRefreshTimelineServerBasedOnLatestCommit(boolean refreshTimelineServerBasedOnLatestCommit) { + writeConfig.setValue(REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT, Boolean.toString(refreshTimelineServerBasedOnLatestCommit)); + return this; + } + protected void setDefaults() { writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType)); // Check for mandatory properties diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 30ed27b39b77a..fbd4b46227dcd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -261,9 +261,6 @@ public HoodieCleanMetadata execute() { } } table.getMetaClient().reloadActiveTimeline(); - if (config.isMetadataTableEnabled()) { - table.getHoodieView().sync(); - } }); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index ddfbabaf36ae9..b68cf97e9aa35 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -117,8 +117,7 @@ public boolean commit(String instantTime, List writeStatuses, Optio } @Override - protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, - boolean refreshTimeline) { + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) { return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index 6eae15e7e1aff..26149918c6549 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -62,13 +62,6 @@ public static HoodieFlinkTable create(HoodieW public static HoodieFlinkTable create(HoodieWriteConfig config, HoodieFlinkEngineContext context, HoodieTableMetaClient metaClient) { - return HoodieFlinkTable.create(config, context, metaClient, config.isMetadataTableEnabled()); - } - - public static HoodieFlinkTable create(HoodieWriteConfig config, - HoodieFlinkEngineContext context, - HoodieTableMetaClient metaClient, - boolean refreshTimeline) { final HoodieFlinkTable hoodieFlinkTable; switch (metaClient.getTableType()) { case COPY_ON_WRITE: @@ -80,9 +73,6 @@ public static HoodieFlinkTable create(HoodieW default: throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); } - if (refreshTimeline) { - hoodieFlinkTable.getHoodieView().sync(); - } return hoodieFlinkTable; } diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java index 50adabbd585ea..e23ee4ad58e6e 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java @@ -104,7 +104,7 @@ private HoodieWriteConfig makeConfig(boolean rangePruning, boolean treeFiltering public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception { HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); HoodieBloomIndex index = new HoodieBloomIndex(config, ListBasedHoodieBloomIndexHelper.getInstance()); - HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient, false); + HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient); HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA); // Create some partitions, and put some files diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index 7f5dc19baf274..fbfb85bab3b8f 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -89,9 +89,7 @@ public boolean commit(String instantTime, } @Override - protected HoodieTable createTable(HoodieWriteConfig config, - Configuration hadoopConf, - boolean refreshTimeline) { + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) { return HoodieJavaTable.create(config, context); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 7f9ec05e3c5eb..fe6ea975e3111 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -123,10 +123,8 @@ public boolean commit(String instantTime, JavaRDD writeStatuses, Op } @Override - protected HoodieTable createTable(HoodieWriteConfig config, - Configuration hadoopConf, - boolean refreshTimeline) { - return HoodieSparkTable.create(config, context, refreshTimeline); + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) { + return HoodieSparkTable.create(config, context); } @Override @@ -333,7 +331,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, @Override protected HoodieWriteMetadata> compact(String compactionInstantTime, boolean shouldComplete) { - HoodieSparkTable table = HoodieSparkTable.create(config, context, true); + HoodieSparkTable table = HoodieSparkTable.create(config, context); preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient()); HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); @@ -352,7 +350,7 @@ protected HoodieWriteMetadata> compact(String compactionIns @Override public HoodieWriteMetadata> cluster(String clusteringInstant, boolean shouldComplete) { - HoodieSparkTable table = HoodieSparkTable.create(config, context, config.isMetadataTableEnabled()); + HoodieSparkTable table = HoodieSparkTable.create(config, context); preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient()); HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant); @@ -434,7 +432,7 @@ protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option HoodieSparkTable create(HoodieWriteConfig config, HoodieEngineContext context) { - return create(config, context, false); - } - - public static HoodieSparkTable create(HoodieWriteConfig config, HoodieEngineContext context, - boolean refreshTimeline) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath()) .setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig()) .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))) .setFileSystemRetryConfig(config.getFileSystemRetryConfig()) .setProperties(config.getProps()).build(); - return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline); + return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient); } public static HoodieSparkTable create(HoodieWriteConfig config, HoodieSparkEngineContext context, HoodieTableMetaClient metaClient) { - return create(config, context, metaClient, false); - } - - public static HoodieSparkTable create(HoodieWriteConfig config, - HoodieSparkEngineContext context, - HoodieTableMetaClient metaClient, - boolean refreshTimeline) { HoodieSparkTable hoodieSparkTable; switch (metaClient.getTableType()) { case COPY_ON_WRITE: @@ -89,9 +77,6 @@ public static HoodieSparkTable create(HoodieW default: throw new HoodieException("Unsupported table type :" + metaClient.getTableType()); } - if (refreshTimeline) { - hoodieSparkTable.getHoodieView().sync(); - } return hoodieSparkTable; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index 1cb7bcbfc4fcb..98bcb11033c5b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -515,7 +515,13 @@ private HoodieWriteConfig.Builder getWriteConfigBuilder(String schema) { return getConfigBuilder(schema) .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build()) .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()) - .withAvroSchemaValidate(true); + .withAvroSchemaValidate(true) + // The test has rollback instants on the timeline, + // these rollback instants use real time as instant time, whose instant time is always greater than + // the normal commits instant time, this breaks the refresh rule introduced in HUDI-2761: + // The last client instant is always the rollback instant but not the normal commit. + // Always refresh the timeline when client and server have different timeline. + .withRefreshTimelineServerBasedOnLatestCommit(false); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java index 9a8fc55a20028..e19c8fc1a2ee5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java @@ -111,7 +111,7 @@ private void verifyBaseMetadataTable() throws IOException { assertEquals(fsPartitions, metadataPartitions, "Partitions should match"); // Files within each partition should match - HoodieTable table = HoodieSparkTable.create(writeConfig, context, true); + HoodieTable table = HoodieSparkTable.create(writeConfig, context); TableFileSystemView tableView = table.getHoodieView(); List fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList()); Map partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index b9f025223b7df..0ce6ca0ee923b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -247,7 +247,7 @@ public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws assertEquals(allPartitions.size(), testTable.listAllBaseFiles().length); // Verify that all data file has one log file - HoodieTable table = HoodieSparkTable.create(config, context(), metaClient, true); + HoodieTable table = HoodieSparkTable.create(config, context(), metaClient); for (String partitionPath : dataGen.getPartitionPaths()) { List groupedLogFiles = table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 4504c552c95d6..d0365dced199e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -559,7 +559,7 @@ public void validateMetadata(HoodieTestTable testTable, List inflightCom // Files within each partition should match metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext, true); + HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext); TableFileSystemView tableView = table.getHoodieView(); List fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList()); Map partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java index af0dc130162aa..02a406e7e0763 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java @@ -199,6 +199,7 @@ protected void resetViewState() { LOG.info("Deleting all rocksdb data associated with table filesystem view"); rocksDB.close(); rocksDB = new RocksDBDAO(metaClient.getBasePath(), config.getRocksdbBasePath()); + schemaHelper.getAllColumnFamilies().forEach(rocksDB::addColumnFamily); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index cf941bb70cc3b..e8937b39dc7f1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -550,10 +550,7 @@ private List getRollbackedCommits(HoodieInstant instant, HoodieActiveTim @Override public void close() { - for (Pair partitionFileSlicePair : partitionReaders.keySet()) { - close(partitionFileSlicePair); - } - partitionReaders.clear(); + closePartitionReaders(); } /** @@ -567,6 +564,16 @@ private synchronized void close(Pair partitionFileSlicePair) { closeReader(readers); } + /** + * Close and clear all the partitions readers. + */ + private void closePartitionReaders() { + for (Pair partitionFileSlicePair : partitionReaders.keySet()) { + close(partitionFileSlicePair); + } + partitionReaders.clear(); + } + private void closeReader(Pair readers) { if (readers != null) { try { @@ -624,5 +631,11 @@ public Option getLatestCompactionTime() { public void reset() { initIfNeeded(); dataMetaClient.reloadActiveTimeline(); + if (metadataMetaClient != null) { + metadataMetaClient.reloadActiveTimeline(); + } + // the cached reader has max instant time restriction, they should be cleared + // because the metadata timeline may have changed. + closePartitionReaders(); } } diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java index 40669f50e42d6..2ff21682213c2 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java @@ -150,7 +150,7 @@ public static class Builder { private int markerBatchNumThreads = 20; private long markerBatchIntervalMs = 50L; private int markerParallelism = 100; - private boolean refreshTimelineBasedOnLatestCommit = false; + private boolean refreshTimelineBasedOnLatestCommit = true; public Builder() { } @@ -240,6 +240,7 @@ public Config build() { config.markerBatchNumThreads = this.markerBatchNumThreads; config.markerBatchIntervalMs = this.markerBatchIntervalMs; config.markerParallelism = this.markerParallelism; + config.refreshTimelineBasedOnLatestCommit = this.refreshTimelineBasedOnLatestCommit; return config; } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java index 13f5ad97cfc92..53a657106b7a5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; @@ -226,6 +227,8 @@ void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) prepJobConfig.continuousMode = true; prepJobConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords)); prepJobConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key())); + // in multi writer mode, disable the latest commit refresh to reduce conflict + prepJobConfig.configs.add(String.format("%s=false", HoodieWriteConfig.REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT.key())); HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc()); // Prepare base dataset with some commits