Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4167] Remove the timeline refresh with initializing hoodie table #5716

Merged
merged 1 commit into from Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -296,7 +296,7 @@ protected static int deleteMarker(JavaSparkContext jsc, String instantTime, Stri
SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false);
HoodieWriteConfig config = client.getConfig();
HoodieEngineContext context = client.getEngineContext();
HoodieSparkTable table = HoodieSparkTable.create(config, context, true);
HoodieSparkTable table = HoodieSparkTable.create(config, context);
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
return 0;
Expand Down
Expand Up @@ -296,11 +296,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
}
}

protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
return createTable(config, hadoopConf, false);
}

protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline);
protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf);

void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
try {
Expand Down Expand Up @@ -365,7 +361,7 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
*/
protected void rollbackFailedBootstrap() {
LOG.info("Rolling back pending bootstrap if present");
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
Option<String> instant = Option.fromJavaOptional(
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
Expand Down Expand Up @@ -634,7 +630,7 @@ protected void autoArchiveOnCommit(HoodieTable table, boolean acquireLockForArch
* Run any pending compactions.
*/
public void runAnyPendingCompactions() {
runAnyPendingCompactions(createTable(config, hadoopConf, config.isMetadataTableEnabled()));
runAnyPendingCompactions(createTable(config, hadoopConf));
}

/**
Expand All @@ -644,7 +640,7 @@ public void runAnyPendingCompactions() {
* @param comment - Comment for the savepoint
*/
public void savepoint(String user, String comment) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
if (table.getCompletedCommitsTimeline().empty()) {
throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty");
}
Expand All @@ -668,7 +664,7 @@ public void savepoint(String user, String comment) {
* @param comment - Comment for the savepoint
*/
public void savepoint(String instantTime, String user, String comment) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
table.savepoint(context, instantTime, user, comment);
}

Expand All @@ -680,7 +676,7 @@ public void savepoint(String instantTime, String user, String comment) {
* @return true if the savepoint was deleted successfully
*/
public void deleteSavepoint(String savepointTime) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
SavepointHelpers.deleteSavepoint(table, savepointTime);
}

Expand Down Expand Up @@ -1012,7 +1008,7 @@ public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String
*/
public Option<String> scheduleIndexing(List<MetadataPartitionType> partitionTypes) {
String instantTime = HoodieActiveTimeline.createNewInstantTime();
Option<HoodieIndexPlan> indexPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
Option<HoodieIndexPlan> indexPlan = createTable(config, hadoopConf)
.scheduleIndexing(context, instantTime, partitionTypes);
return indexPlan.isPresent() ? Option.of(instantTime) : Option.empty();
}
Expand All @@ -1024,7 +1020,7 @@ public Option<String> scheduleIndexing(List<MetadataPartitionType> partitionType
* @return {@link Option<HoodieIndexCommitMetadata>} after successful indexing.
*/
public Option<HoodieIndexCommitMetadata> index(String indexInstantTime) {
return createTable(config, hadoopConf, config.isMetadataTableEnabled()).index(context, indexInstantTime);
return createTable(config, hadoopConf).index(context, indexInstantTime);
}

/**
Expand Down Expand Up @@ -1339,17 +1335,17 @@ private Option<String> scheduleTableServiceInternal(String instantTime, Option<M
return Option.empty();
case CLUSTER:
LOG.info("Scheduling clustering at instant time :" + instantTime);
Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf)
.scheduleClustering(context, instantTime, extraMetadata);
return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
case COMPACT:
LOG.info("Scheduling compaction at instant time :" + instantTime);
Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf)
.scheduleCompaction(context, instantTime, extraMetadata);
return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
case CLEAN:
LOG.info("Scheduling cleaning at instant time :" + instantTime);
Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf, config.isMetadataTableEnabled())
Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf)
.scheduleCleaning(context, instantTime, extraMetadata);
return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
default:
Expand Down Expand Up @@ -1702,6 +1698,6 @@ private void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient m
// try to save history schemas
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient);
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
commitStats(instantTime, Collections.EMPTY_LIST, Option.of(extraMeta), commitActionType);
commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), commitActionType);
}
}
Expand Up @@ -364,8 +364,8 @@ public class HoodieWriteConfig extends HoodieConfig {

public static final ConfigProperty<Boolean> REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT = ConfigProperty
.key("hoodie.refresh.timeline.server.based.on.latest.commit")
.defaultValue(false)
.withDocumentation("Refresh timeline in timeline server based on latest commit apart from timeline hash difference. By default (false), ");
.defaultValue(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we are ready for this flip yet. there are probably some open JIRAs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removed in #6179
but @danny0405 why can't we keep it as false? removing it completely is a breaking change. and normally we would at least deprecate it first.

Copy link
Contributor Author

@danny0405 danny0405 Jul 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hy can't we keep it as false

It is actually hard code to be true before this patch and this config option is fixed to be effective only after this PR.

Take a look at TimelineService#refreshTimelineBasedOnLatestCommit, it's weird that the member is hard code to be true and there is no way to config it before this patch, config option REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT is totally ignored.

.withDocumentation("Refresh timeline in timeline server based on latest commit apart from timeline hash difference. By default (true).");

public static final ConfigProperty<Long> INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty
.key("hoodie.consistency.check.initial_interval_ms")
Expand Down Expand Up @@ -2499,6 +2499,11 @@ public Builder withAutoAdjustLockConfigs(boolean autoAdjustLockConfigs) {
return this;
}

public Builder withRefreshTimelineServerBasedOnLatestCommit(boolean refreshTimelineServerBasedOnLatestCommit) {
writeConfig.setValue(REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT, Boolean.toString(refreshTimelineServerBasedOnLatestCommit));
return this;
}

protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType));
// Check for mandatory properties
Expand Down
Expand Up @@ -117,8 +117,7 @@ public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Optio
}

@Override
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf,
boolean refreshTimeline) {
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
}

Expand Down
Expand Up @@ -62,13 +62,6 @@ public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieW
public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieWriteConfig config,
HoodieFlinkEngineContext context,
HoodieTableMetaClient metaClient) {
return HoodieFlinkTable.create(config, context, metaClient, config.isMetadataTableEnabled());
}

public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieWriteConfig config,
HoodieFlinkEngineContext context,
HoodieTableMetaClient metaClient,
boolean refreshTimeline) {
final HoodieFlinkTable<T> hoodieFlinkTable;
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
Expand All @@ -80,9 +73,6 @@ public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieW
default:
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
}
if (refreshTimeline) {
hoodieFlinkTable.getHoodieView().sync();
}
return hoodieFlinkTable;
}

Expand Down
Expand Up @@ -104,7 +104,7 @@ private HoodieWriteConfig makeConfig(boolean rangePruning, boolean treeFiltering
public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception {
HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking);
HoodieBloomIndex index = new HoodieBloomIndex(config, ListBasedHoodieBloomIndexHelper.getInstance());
HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient, false);
HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient);
HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA);

// Create some partitions, and put some files
Expand Down
Expand Up @@ -89,9 +89,7 @@ public boolean commit(String instantTime,
}

@Override
protected HoodieTable createTable(HoodieWriteConfig config,
Configuration hadoopConf,
boolean refreshTimeline) {
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
return HoodieJavaTable.create(config, context);
}

Expand Down
Expand Up @@ -123,10 +123,8 @@ public boolean commit(String instantTime, JavaRDD<WriteStatus> writeStatuses, Op
}

@Override
protected HoodieTable createTable(HoodieWriteConfig config,
Configuration hadoopConf,
boolean refreshTimeline) {
return HoodieSparkTable.create(config, context, refreshTimeline);
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
return HoodieSparkTable.create(config, context);
}

@Override
Expand Down Expand Up @@ -333,7 +331,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata,

@Override
protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context, true);
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient());
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
Expand All @@ -352,7 +350,7 @@ protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String compactionIns

@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context, config.isMetadataTableEnabled());
HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient());
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
Expand Down Expand Up @@ -434,7 +432,7 @@ protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<Strin
}

// Create a Hoodie table which encapsulated the commits and files visible
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, config.isMetadataTableEnabled());
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
}

/**
Expand Down
Expand Up @@ -54,30 +54,18 @@ protected HoodieSparkTable(HoodieWriteConfig config, HoodieEngineContext context
}

public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config, HoodieEngineContext context) {
return create(config, context, false);
}

public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config, HoodieEngineContext context,
boolean refreshTimeline) {
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
.setFileSystemRetryConfig(config.getFileSystemRetryConfig())
.setProperties(config.getProps()).build();
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, refreshTimeline);
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
}

public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
HoodieSparkEngineContext context,
HoodieTableMetaClient metaClient) {
return create(config, context, metaClient, false);
}

public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
HoodieSparkEngineContext context,
HoodieTableMetaClient metaClient,
boolean refreshTimeline) {
HoodieSparkTable<T> hoodieSparkTable;
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
Expand All @@ -89,9 +77,6 @@ public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieW
default:
throw new HoodieException("Unsupported table type :" + metaClient.getTableType());
}
if (refreshTimeline) {
hoodieSparkTable.getHoodieView().sync();
}
return hoodieSparkTable;
}

Expand Down
Expand Up @@ -515,7 +515,13 @@ private HoodieWriteConfig.Builder getWriteConfigBuilder(String schema) {
return getConfigBuilder(schema)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withAvroSchemaValidate(true);
.withAvroSchemaValidate(true)
// The test has rollback instants on the timeline,
// these rollback instants use real time as instant time, whose instant time is always greater than
// the normal commits instant time, this breaks the refresh rule introduced in HUDI-2761:
// The last client instant is always the rollback instant but not the normal commit.
// Always refresh the timeline when client and server have different timeline.
.withRefreshTimelineServerBasedOnLatestCommit(false);
}

@Override
Expand Down
Expand Up @@ -111,7 +111,7 @@ private void verifyBaseMetadataTable() throws IOException {
assertEquals(fsPartitions, metadataPartitions, "Partitions should match");

// Files within each partition should match
HoodieTable table = HoodieSparkTable.create(writeConfig, context, true);
HoodieTable table = HoodieSparkTable.create(writeConfig, context);
TableFileSystemView tableView = table.getHoodieView();
List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
Expand Down
Expand Up @@ -247,7 +247,7 @@ public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws
assertEquals(allPartitions.size(), testTable.listAllBaseFiles().length);

// Verify that all data file has one log file
HoodieTable table = HoodieSparkTable.create(config, context(), metaClient, true);
HoodieTable table = HoodieSparkTable.create(config, context(), metaClient);
for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> groupedLogFiles =
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
Expand Down
Expand Up @@ -559,7 +559,7 @@ public void validateMetadata(HoodieTestTable testTable, List<String> inflightCom

// Files within each partition should match
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext, true);
HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext);
TableFileSystemView tableView = table.getHoodieView();
List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> basePath + "/" + partition).collect(Collectors.toList());
Map<String, FileStatus[]> partitionToFilesMap = tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
Expand Down
Expand Up @@ -199,6 +199,7 @@ protected void resetViewState() {
LOG.info("Deleting all rocksdb data associated with table filesystem view");
rocksDB.close();
rocksDB = new RocksDBDAO(metaClient.getBasePath(), config.getRocksdbBasePath());
schemaHelper.getAllColumnFamilies().forEach(rocksDB::addColumnFamily);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please avoid unrelated changes in the same PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a necessary fix actually, to fix the refresh of the fs view.

}

@Override
Expand Down