Skip to content

Commit

Permalink
[HUDI-2207] Support independent flink hudi clustering function
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhaojing committed May 24, 2022
1 parent 8ec625d commit f1bc2f0
Show file tree
Hide file tree
Showing 23 changed files with 1,708 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,7 @@ protected Option<String> inlineScheduleClustering(Option<Map<String, String>> ex
return scheduleClustering(extraMetadata);
}

protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
public void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) {
Option<HoodiePendingRollbackInfo> pendingRollbackInstantInfo = getPendingRollbackInfo(table.getMetaClient(), inflightInstant.getTimestamp(), false);
String commitTime = pendingRollbackInstantInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime());
table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy.";
public static final String SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy";
public static final String FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy";
public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy";
public static final String SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,20 @@
import java.io.Serializable;

/**
* Repartition input records into at least expected number of output spark partitions. It should give below guarantees -
* Output spark partition will have records from only one hoodie partition. - Average records per output spark
* partitions should be almost equal to (#inputRecords / #outputSparkPartitions) to avoid possible skews.
* Repartition input records into at least expected number of output partitions. It should give below guarantees -
* Output partition will have records from only one hoodie partition. - Average records per output
* partitions should be almost equal to (#inputRecords / #outputPartitions) to avoid possible skews.
*/
public interface BulkInsertPartitioner<I> extends Serializable {

/**
* Repartitions the input records into at least expected number of output spark partitions.
* Repartitions the input records into at least expected number of output partitions.
*
* @param records Input Hoodie records
* @param outputSparkPartitions Expected number of output partitions
* @param records Input Hoodie records
* @param outputPartitions Expected number of output partitions
* @return
*/
I repartitionRecords(I records, int outputSparkPartitions);
I repartitionRecords(I records, int outputPartitions);

/**
* @return {@code true} if the records within a partition are sorted; {@code false} otherwise.
Expand All @@ -48,6 +48,7 @@ public interface BulkInsertPartitioner<I> extends Serializable {
/**
* Return file group id prefix for the given data partition.
* By defauult, return a new file group id prefix, so that incoming records will route to a fresh new file group
*
* @param partitionId data partition
* @return
*/
Expand All @@ -57,6 +58,7 @@ default String getFileIdPfx(int partitionId) {

/**
* Return write handle factory for the given partition.
*
* @param partitionId data partition
* @return
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public static String checkAndGetClusteringPlanStrategy(HoodieWriteConfig config)
String sparkSizeBasedClassName = HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
String sparkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy";
String sparkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy";
String flinkSizeBasedClassName = HoodieClusteringConfig.FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
String flinkSelectedPartitionsClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkSelectedPartitionsClusteringPlanStrategy";
String flinkRecentDaysClassName = "org.apache.hudi.client.clustering.plan.strategy.FlinkRecentDaysClusteringPlanStrategy";
String javaSelectedPartitionClassName = "org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy";
String javaSizeBasedClassName = HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;

Expand All @@ -82,6 +85,14 @@ public static String checkAndGetClusteringPlanStrategy(HoodieWriteConfig config)
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
return sparkSizeBasedClassName;
} else if (flinkRecentDaysClassName.equals(className)) {
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()));
return flinkSizeBasedClassName;
} else if (flinkSelectedPartitionsClassName.equals(className)) {
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
return flinkSizeBasedClassName;
} else if (javaSelectedPartitionClassName.equals(className)) {
config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
LOG.warn(String.format(logStr, className, javaSizeBasedClassName, HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
Expand All @@ -39,6 +41,7 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndexFactory;
Expand All @@ -53,7 +56,6 @@
import org.apache.hudi.io.MiniBatchHandle;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
Expand All @@ -69,6 +71,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -365,8 +369,7 @@ public void completeCompaction(
// commit to data table after committing to metadata table.
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter(compactionInstant.getTimestamp()).ifPresent(
w -> ((HoodieTableMetadataWriter) w).update(metadata, compactionInstant.getTimestamp(), table.isTableServiceAction(compactionInstant.getAction())));
updateTableMetadata(table, metadata, compactionInstant);
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
} finally {
Expand Down Expand Up @@ -401,6 +404,59 @@ public HoodieWriteMetadata<List<WriteStatus>> cluster(final String clusteringIns
throw new HoodieNotSupportedException("Clustering is not supported yet");
}

private void updateTableMetadata(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
HoodieCommitMetadata commitMetadata,
HoodieInstant hoodieInstant) {
table.getMetadataWriter(hoodieInstant.getTimestamp()).ifPresent(
w -> w.update(commitMetadata, hoodieInstant.getTimestamp(), table.isTableServiceAction(hoodieInstant.getAction())));
}

private void completeClustering(
HoodieReplaceCommitMetadata metadata,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String clusteringCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect clustering write status and commit clustering");
HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime);
List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
e.getValue().stream()).collect(Collectors.toList());
if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
throw new HoodieClusteringException("Clustering failed to write to files:"
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
}

try {
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
finalizeWrite(table, clusteringCommitTime, writeStats);
// commit to data table after committing to metadata table.
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
updateTableMetadata(table, metadata, clusteringInstant);
LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata);
table.getActiveTimeline().transitionReplaceInflightToComplete(
HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
throw new HoodieClusteringException(
"Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e);
} finally {
this.txnManager.endTransaction(Option.of(clusteringInstant));
}

WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
if (clusteringTimer != null) {
long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ config.getBasePath() + " at time " + clusteringCommitTime, e);
}
}
LOG.info("Clustering successfully on commit " + clusteringCommitTime);
}

@Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// Create a Hoodie table which encapsulated the commits and files visible
Expand All @@ -414,6 +470,23 @@ protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> insta
// no need to execute the upgrade/downgrade on each write in streaming.
}

public void completeTableService(
TableServiceType tableServiceType,
HoodieCommitMetadata metadata,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String commitInstant) {
switch (tableServiceType) {
case CLUSTER:
completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant);
break;
case COMPACT:
completeCompaction(metadata, table, commitInstant);
break;
default:
throw new IllegalArgumentException("This table service is not valid " + tableServiceType);
}
}

/**
* Upgrade downgrade the Hoodie table.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.clustering.plan.strategy;

import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

/**
* Clustering Strategy based on following.
* 1) Only looks at latest 'daybased.lookback.partitions' partitions.
* 2) Excludes files that are greater than 'small.file.limit' from clustering plan.
*/
public class FlinkRecentDaysClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
extends FlinkSizeBasedClusteringPlanStrategy<T> {
private static final Logger LOG = LogManager.getLogger(FlinkRecentDaysClusteringPlanStrategy.class);

public FlinkRecentDaysClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> table,
HoodieFlinkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}

public FlinkRecentDaysClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> table,
HoodieFlinkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}

@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
int targetPartitionsForClustering = getWriteConfig().getTargetPartitionsForClustering();
int skipPartitionsFromLatestForClustering = getWriteConfig().getSkipPartitionsFromLatestForClustering();
return partitionPaths.stream()
.sorted(Comparator.reverseOrder())
.skip(Math.max(skipPartitionsFromLatestForClustering, 0))
.limit(targetPartitionsForClustering > 0 ? targetPartitionsForClustering : partitionPaths.size())
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.client.clustering.plan.strategy;

import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import org.apache.hudi.table.HoodieFlinkMergeOnReadTable;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.List;
import java.util.stream.Collectors;

import static org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX;

/**
* Clustering Strategy to filter just specified partitions from [begin, end]. Note both begin and end are inclusive.
*/
public class FlinkSelectedPartitionsClusteringPlanStrategy<T extends HoodieRecordPayload<T>>
extends FlinkSizeBasedClusteringPlanStrategy<T> {
private static final Logger LOG = LogManager.getLogger(FlinkSelectedPartitionsClusteringPlanStrategy.class);

public static final String CONF_BEGIN_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition";
public static final String CONF_END_PARTITION = CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition";

public FlinkSelectedPartitionsClusteringPlanStrategy(HoodieFlinkCopyOnWriteTable<T> table,
HoodieFlinkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}

public FlinkSelectedPartitionsClusteringPlanStrategy(HoodieFlinkMergeOnReadTable<T> table,
HoodieFlinkEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}

@Override
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
String beginPartition = getWriteConfig().getProps().getProperty(CONF_BEGIN_PARTITION);
String endPartition = getWriteConfig().getProps().getProperty(CONF_END_PARTITION);
List<String> filteredPartitions = partitionPaths.stream()
.filter(path -> path.compareTo(beginPartition) >= 0 && path.compareTo(endPartition) <= 0)
.collect(Collectors.toList());
LOG.info("Filtered to the following partitions: " + filteredPartitions);
return filteredPartitions;
}
}
Loading

0 comments on commit f1bc2f0

Please sign in to comment.