Skip to content

Commit

Permalink
[HUDI-4167] Remove the timeline refresh with initializing hoodie table
Browse files Browse the repository at this point in the history
The timeline refresh on table initialization invokes the fs view #sync, which has two actions now:

1. reload the timeline of the fs view, so that the next fs view request is based on this timeline metadata
2. if this is a local fs view, clear all the local states; if this is a remote fs view, send request to sync the remote fs view

But, let's see the construction, the meta client is instantiated freshly so the timeline is already the latest,
the table is also constructed freshly, so the fs view has no local states, that means, the #sync is unnecessary totally.

In this patch, the metadata lifecycle and data set fs view are kept in sync, when the fs view is refreshed, the underneath metadata
is also refreshed synchronouly. The freshness of the metadata follows the same rules as data fs view:

1. if the fs view is local, the visibility is based on the client table metadata client's latest commit
2. if the fs view is remote, the timeline server would #sync the fs view and metadata together based on the lagging server local timeline

From the perspective of client, no need to care about the refresh action anymore no matter whether the metadata table is enabled or not.
That make the client logic more clear and less error-prone.

Removes the timeline refresh has another benefit: if avoids unncecessary #refresh of the remote fs view, if all the clients send request to #sync the
remote fs view, the server would encounter conflicts and the client encounters a response error.
  • Loading branch information
danny0405 committed Jun 1, 2022
1 parent 795a99b commit 12dda8d
Show file tree
Hide file tree
Showing 18 changed files with 62 additions and 73 deletions.
Expand Up @@ -296,7 +296,7 @@ protected static int deleteMarker(JavaSparkContext jsc, String instantTime, Stri
SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false);
HoodieWriteConfig config = client.getConfig();
HoodieEngineContext context = client.getEngineContext();
HoodieSparkTable table = HoodieSparkTable.create(config, context, true);
HoodieSparkTable table = HoodieSparkTable.create(config, context);
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
return 0;
Expand Down
Expand Up @@ -296,11 +296,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
}
}

protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
return createTable(config, hadoopConf, false);
}

protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline);
protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf);

void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
try {
Expand Down Expand Up @@ -365,7 +361,7 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
*/
protected void rollbackFailedBootstrap() {
LOG.info("Rolling back pending bootstrap if present");
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
Option<String> instant = Option.fromJavaOptional(
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
Expand Down Expand Up @@ -542,9 +538,6 @@ protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata me
return;
}
if (config.areAnyTableServicesExecutedInline() || config.areAnyTableServicesScheduledInline()) {
if (config.isMetadataTableEnabled()) {
table.getHoodieView().sync();
}
// Do an inline compaction if enabled
if (config.inlineCompactionEnabled()) {
runAnyPendingCompactions(table);
Expand Down Expand Up @@ -634,7 +627,7 @@ protected void autoArchiveOnCommit(HoodieTable table, boolean acquireLockForArch
* Run any pending compactions.
*/
public void runAnyPendingCompactions() {
runAnyPendingCompactions(createTable(config, hadoopConf, config.isMetadataTableEnabled()));
runAnyPendingCompactions(createTable(config, hadoopConf));
}

/**
Expand All @@ -644,7 +637,7 @@ public void runAnyPendingCompactions() {
* @param comment - Comment for the savepoint
*/
public void savepoint(String user, String comment) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
if (table.getCompletedCommitsTimeline().empty()) {
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
}
Expand All @@ -668,7 +661,7 @@ public void savepoint(String user, String comment) {
* @param comment - Comment for the savepoint
*/
public void savepoint(String instantTime, String user, String comment) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
table.savepoint(context, instantTime, user, comment);
}

Expand All @@ -680,7 +673,7 @@ public void savepoint(String instantTime, String user, String comment) {
* @return true if the savepoint was deleted successfully
*/
public void deleteSavepoint(String savepointTime) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
SavepointHelpers.deleteSavepoint(table, savepointTime);
}

Expand Down Expand Up @@ -1012,7 +1005,7 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String
*/
public Option<String> scheduleIndexing(List<MetadataPartitionType> partitionTypes) {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
Option<HoodieIndexPlan> indexPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
Option<HoodieIndexPlan> indexPlan = createTable(config, hadoopConf)
.scheduleIndexing(context, instantTime, partitionTypes);
return indexPlan.isPresent() ? Option.of(instantTime) : Option.empty();
}
Expand All @@ -1024,7 +1017,7 @@ public Option<String> scheduleIndexing(List<MetadataPartitionType> partitionType
* @return {@link Option<HoodieIndexCommitMetadata>} after successful indexing.
*/
public Option<HoodieIndexCommitMetadata> index(String indexInstantTime) {
return createTable(config, hadoopConf, config.isMetadataTableEnabled()).index(context, indexInstantTime);
return createTable(config, hadoopConf).index(context, indexInstantTime);
}

/**
Expand Down Expand Up @@ -1339,17 +1332,17 @@ private Option<String> scheduleTableServiceInternal(String instantTime, Option<M
return Option.empty();
case CLUSTER:
LOG.info("Scheduling clustering at instant time :" + instantTime);
Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf)
.scheduleClustering(context, instantTime, extraMetadata);
return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
case COMPACT:
LOG.info("Scheduling compaction at instant time :" + instantTime);
Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf)
.scheduleCompaction(context, instantTime, extraMetadata);
return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
case CLEAN:
LOG.info("Scheduling cleaning at instant time :" + instantTime);
Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf)
.scheduleCleaning(context, instantTime, extraMetadata);
return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
default:
Expand Down Expand Up @@ -1702,6 +1695,6 @@ private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient m
// try to save history schemas
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient);
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
commitStats(instantTime, Collections.EMPTY_LIST, Option.of(extraMeta), commitActionType);
commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), commitActionType);
}
}
Expand Up @@ -364,8 +364,8 @@ public class HoodieWriteConfig extends HoodieConfig {

public static final ConfigProperty<Boolean> REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT = ConfigProperty
.key("hoodie.refresh.timeline.server.based.on.latest.commit")
.defaultValue(false)
.withDocumentation("Refresh timeline in timeline server based on latest commit apart from timeline hash difference. By default (false), ");
.defaultValue(true)
.withDocumentation("Refresh timeline in timeline server based on latest commit apart from timeline hash difference. By default (true).");

public static final ConfigProperty<Long> INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty
.key("hoodie.consistency.check.initial_interval_ms")
Expand Down Expand Up @@ -2499,6 +2499,11 @@ public Builder withAutoAdjustLockConfigs(boolean autoAdjustLockConfigs) {
return this;
}

public Builder withRefreshTimelineServerBasedOnLatestCommit(boolean refreshTimelineServerBasedOnLatestCommit) {
writeConfig.setValue(REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT, Boolean.toString(refreshTimelineServerBasedOnLatestCommit));
return this;
}

protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType));
// Check for mandatory properties
Expand Down
Expand Up @@ -261,9 +261,6 @@ public HoodieCleanMetadata execute() {
}
}
table.getMetaClient().reloadActiveTimeline();
if (config.isMetadataTableEnabled()) {
table.getHoodieView().sync();
}
});
}

Expand Down
Expand Up @@ -117,8 +117,7 @@ public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Optio
}

@Override
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf,
boolean refreshTimeline) {
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
}

Expand Down
Expand Up @@ -62,13 +62,6 @@ public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieW
public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieWriteConfig config,
HoodieFlinkEngineContext context,
HoodieTableMetaClient metaClient) {
return HoodieFlinkTable.create(config, context, metaClient, config.isMetadataTableEnabled());
}

public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieWriteConfig config,
HoodieFlinkEngineContext context,
HoodieTableMetaClient metaClient,
boolean refreshTimeline) {
final HoodieFlinkTable<T> hoodieFlinkTable;
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
Expand All @@ -80,9 +73,6 @@ public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieW
default:
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
}
if (refreshTimeline) {
hoodieFlinkTable.getHoodieView().sync();
}
return hoodieFlinkTable;
}

Expand Down
Expand Up @@ -104,7 +104,7 @@ private HoodieWriteConfig makeConfig(boolean rangePruning, boolean treeFiltering
public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
HoodieBloomIndex index = new HoodieBloomIndex(config, ListBasedHoodieBloomIndexHelper.getInstance());
HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient, false);
HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient);
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA);

// Create some partitions, and put some files
Expand Down
Expand Up @@ -89,9 +89,7 @@ public boolean commit(String instantTime,
}

@Override
protected HoodieTable createTable(HoodieWriteConfig config,
Configuration hadoopConf,
boolean refreshTimeline) {
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
return HoodieJavaTable.create(config, context);
}

Expand Down
Expand Up @@ -123,10 +123,8 @@ public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Op
}

@Override
protected HoodieTable createTable(HoodieWriteConfig config,
Configuration hadoopConf,
boolean refreshTimeline) {
return HoodieSparkTable.create(config, context, refreshTimeline);
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
return HoodieSparkTable.create(config, context);
}

@Override
Expand Down Expand Up @@ -333,7 +331,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata,

@Override
protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context, true);
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient());
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
Expand All @@ -352,7 +350,7 @@ protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionIns

@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context, config.isMetadataTableEnabled());
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient());
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
Expand Down Expand Up @@ -434,7 +432,7 @@ protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<Strin
}

// Create a Hoodie table which encapsulated the commits and files visible
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, config.isMetadataTableEnabled());
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
}

/**
Expand Down
Expand Up @@ -54,30 +54,18 @@ protected HoodieSparkTable(HoodieWriteConfig config, HoodieEngineContext context
}

public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config, HoodieEngineContext context) {
return create(config, context, false);
}

public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config, HoodieEngineContext context,
boolean refreshTimeline) {
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
.setFileSystemRetryConfig(config.getFileSystemRetryConfig())
.setProperties(config.getProps()).build();
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline);
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
}

public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
HoodieSparkEngineContext context,
HoodieTableMetaClient metaClient) {
return create(config, context, metaClient, false);
}

public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
HoodieSparkEngineContext context,
HoodieTableMetaClient metaClient,
boolean refreshTimeline) {
HoodieSparkTable<T> hoodieSparkTable;
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
Expand All @@ -89,9 +77,6 @@ public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieW
default:
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
}
if (refreshTimeline) {
hoodieSparkTable.getHoodieView().sync();
}
return hoodieSparkTable;
}

Expand Down
Expand Up @@ -515,7 +515,13 @@ private HoodieWriteConfig.Builder getWriteConfigBuilder(String schema) {
return getConfigBuilder(schema)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withAvroSchemaValidate(true);
.withAvroSchemaValidate(true)
// The test has rollback instants on the timeline,
// these rollback instants use real time as instant time, whose instant time is always greater than
// the normal commits instant time, this breaks the refresh rule introduced in HUDI-2761:
// The last client instant is always the rollback instant but not the normal commit.
// Always refresh the timeline when client and server have different timeline.
.withRefreshTimelineServerBasedOnLatestCommit(false);
}

@Override
Expand Down
Expand Up @@ -111,7 +111,7 @@ private void verifyBaseMetadataTable() throws IOException {
assertEquals(fsPartitions, metadataPartitions, "Partitions should match");

// Files within each partition should match
HoodieTable table = HoodieSparkTable.create(writeConfig, context, true);
HoodieTable table = HoodieSparkTable.create(writeConfig, context);
TableFileSystemView tableView = table.getHoodieView();
List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
Expand Down
Expand Up @@ -247,7 +247,7 @@ public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws
assertEquals(allPartitions.size(), testTable.listAllBaseFiles().length);

// Verify that all data file has one log file
HoodieTable table = HoodieSparkTable.create(config, context(), metaClient, true);
HoodieTable table = HoodieSparkTable.create(config, context(), metaClient);
for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> groupedLogFiles =
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
Expand Down
Expand Up @@ -559,7 +559,7 @@ public void validateMetadata(HoodieTestTable testTable, List<String> inflightCom

// Files within each partition should match
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext, true);
HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext);
TableFileSystemView tableView = table.getHoodieView();
List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
Expand Down
Expand Up @@ -199,6 +199,7 @@ protected void resetViewState() {
LOG.info("Deleting all rocksdb data associated with table filesystem view");
rocksDB.close();
rocksDB = new RocksDBDAO(metaClient.getBasePath(), config.getRocksdbBasePath());
schemaHelper.getAllColumnFamilies().forEach(rocksDB::addColumnFamily);
}

@Override
Expand Down

0 comments on commit 12dda8d

Please sign in to comment.