Skip to content

Commit

Permalink
Merge branch 'apache:master' into HUDI-5072
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Oct 25, 2022
2 parents 1ffc01a + 4f6f15c commit 8e22b49
Show file tree
Hide file tree
Showing 41 changed files with 1,068 additions and 420 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.cli.commands;

import org.apache.avro.AvroRuntimeException;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.cli.DeDupeType;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
Expand All @@ -38,14 +36,16 @@
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.avro.AvroRuntimeException;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.spark.util.Utils;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
import org.springframework.shell.standard.ShellOption;
import scala.collection.JavaConverters;

import java.io.FileInputStream;
import java.io.IOException;
Expand All @@ -55,6 +55,8 @@
import java.util.TreeSet;
import java.util.stream.Collectors;

import scala.collection.JavaConverters;

import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;

/**
Expand Down Expand Up @@ -205,6 +207,13 @@ public void removeCorruptedPendingCleanAction() {
});
}

@ShellMethod(key = "repair show empty commit metadata", value = "show failed commits")
public void showFailedCommits() {
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
activeTimeline.filterCompletedInstants().getInstants().filter(activeTimeline::isEmpty).forEach(hoodieInstant -> LOG.warn("Empty Commit: " + hoodieInstant.toString()));
}

@ShellMethod(key = "repair migrate-partition-meta", value = "Migrate all partition meta file currently stored in text format "
+ "to be stored in base file format. See HoodieTableConfig#PARTITION_METAFILE_USE_DATA_FORMAT.")
public String migratePartitionMeta(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SQLContext;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -54,6 +57,8 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.shell.Shell;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.Logger;

import java.io.FileInputStream;
import java.io.IOException;
Expand All @@ -65,6 +70,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
Expand Down Expand Up @@ -259,6 +265,50 @@ public void testRemoveCorruptedPendingCleanAction() throws IOException {
assertEquals(0, metaClient.getActiveTimeline().filterInflightsAndRequested().getInstants().count());
}

/**
* Testcase for "repair cleanup empty commit metadata"
*
*/
@Test
public void testShowFailedCommits() {
HoodieCLI.conf = hadoopConf();

Configuration conf = HoodieCLI.conf;

HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();

for (int i = 1; i < 20; i++) {
String timestamp = String.valueOf(i);
// Write corrupted requested Clean File
HoodieTestCommitMetadataGenerator.createCommitFile(tablePath, timestamp, conf);
}

metaClient.getActiveTimeline().getInstants().filter(hoodieInstant -> Integer.parseInt(hoodieInstant.getTimestamp()) % 4 == 0).forEach(hoodieInstant -> {
metaClient.getActiveTimeline().deleteInstantFileIfExists(hoodieInstant);
metaClient.getActiveTimeline().createNewInstant(hoodieInstant);
});

final TestLogAppender appender = new TestLogAppender();
final Logger logger = (Logger) LogManager.getLogger(RepairsCommand.class);
try {
appender.start();
logger.addAppender(appender);
Object result = shell.evaluate(() -> "repair show empty commit metadata");
assertTrue(ShellEvaluationResultUtil.isSuccess(result));
final List<LogEvent> log = appender.getLog();
assertEquals(log.size(),4);
log.forEach(LoggingEvent -> {
assertEquals(LoggingEvent.getLevel(), Level.WARN);
assertTrue(LoggingEvent.getMessage().getFormattedMessage().contains("Empty Commit: "));
assertTrue(LoggingEvent.getMessage().getFormattedMessage().contains("COMPLETED]"));
});
} finally {
logger.removeAppender(appender);
}


}

@Test
public void testRepairDeprecatedPartition() throws IOException {
tablePath = tablePath + "/repair_test/";
Expand Down Expand Up @@ -374,4 +424,23 @@ public void testRenamePartition() throws IOException {
assertEquals(totalRecs, totalRecsInOldPartition);
}
}

class TestLogAppender extends AbstractAppender {
private final List<LogEvent> log = new ArrayList<>();

protected TestLogAppender() {
super(UUID.randomUUID().toString(), null, null, false, null);
}

@Override
public void append(LogEvent event) {
log.add(event);
}

public List<LogEvent> getLog() {
return new ArrayList<LogEvent>(log);
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,8 @@ public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instan
public void preWrite(String instantTime, WriteOperationType writeOperationType,
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
this.lastCompletedTxnAndMetadata = txnManager.isOptimisticConcurrencyControlEnabled()
? TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient) : Option.empty();
this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(metaClient);
this.pendingInflightAndRequestedInstants.remove(instantTime);
if (null == this.asyncCleanerService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,7 @@ public Option<HoodieInstant> getCurrentTransactionOwner() {
return currentTxnOwnerInstant;
}

public boolean isOptimisticConcurrencyControlEnabled() {
return isOptimisticConcurrencyControlEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
private static final Logger LOG = LogManager.getLogger(HoodieAppendHandle.class);
// This acts as the sequenceID for records written
private static final AtomicLong RECORD_COUNTER = new AtomicLong(1);
private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE = 100;

protected final String fileId;
// Buffer for holding records in memory before they are flushed to disk
Expand Down Expand Up @@ -559,12 +560,16 @@ private void writeToBuffer(HoodieRecord<T> record) {
* Checks if the number of records have reached the set threshold and then flushes the records to disk.
*/
private void flushToDiskIfRequired(HoodieRecord record, boolean appendDeleteBlocks) {
if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)
|| numberOfRecords % NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE == 0) {
averageRecordSize = (long) (averageRecordSize * 0.8 + sizeEstimator.sizeEstimate(record) * 0.2);
}

// Append if max number of records reached to achieve block size
if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) {
// Recompute averageRecordSize before writing a new block and update existing value with
// avg of new and old
LOG.info("AvgRecordSize => " + averageRecordSize);
averageRecordSize = (averageRecordSize + sizeEstimator.sizeEstimate(record)) / 2;
LOG.info("Flush log block to disk, the current avgRecordSize => " + averageRecordSize);
// Delete blocks will be appended after appending all the data blocks.
appendDataAndDeleteBlocks(header, appendDeleteBlocks);
estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, S
* @param partitions {@link List} of partition to be deleted
* @return HoodieWriteMetadata
*/
public abstract HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions);
public abstract HoodieWriteMetadata<O> deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions);

/**
* Upserts the given prepared records into the Hoodie table, at the supplied instantTime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public BaseCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig c
this.taskContextSupplier = context.getTaskContextSupplier();
// TODO : Remove this once we refactor and move out autoCommit method from here, since the TxnManager is held in {@link BaseHoodieWriteClient}.
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
this.lastCompletedTxn = TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient());
this.lastCompletedTxn = txnManager.isOptimisticConcurrencyControlEnabled()
? TransactionUtils.getLastCompletedTxnInstantAndMetadata(table.getMetaClient()) : Option.empty();
this.pendingInflightAndRequestedInstants = TransactionUtils.getInflightAndRequestedInstants(table.getMetaClient());
this.pendingInflightAndRequestedInstants.remove(instantTime);
if (!table.getStorageLayout().writeOperationSupported(operationType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,14 @@ public List<WriteStatus> delete(List<HoodieKey> keys, String instantTime) {
return postWrite(result, instantTime, table);
}

public List<WriteStatus> deletePartitions(List<String> partitions, String instantTime) {
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime));
preWrite(instantTime, WriteOperationType.DELETE_PARTITION, table.getMetaClient());
HoodieWriteMetadata<List<WriteStatus>> result = table.deletePartitions(context, instantTime, partitions);
return postWrite(result, instantTime, table);
}

@Override
public void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.hudi.table.action.clean.CleanPlanActionExecutor;
import org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor;
import org.apache.hudi.table.action.commit.FlinkDeleteCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkDeletePartitionCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertOverwriteCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkInsertOverwriteTableCommitActionExecutor;
Expand Down Expand Up @@ -243,8 +244,8 @@ public HoodieWriteMetadata<List<WriteStatus>> delete(HoodieEngineContext context
}

@Override
public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) {
throw new HoodieNotSupportedException("DeletePartitions is not supported yet");
public HoodieWriteMetadata<List<WriteStatus>> deletePartitions(HoodieEngineContext context, String instantTime, List<String> partitions) {
return new FlinkDeletePartitionCommitActionExecutor<>(context, config, this, instantTime, partitions).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.commit;

import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieDeletePartitionException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;

public class FlinkDeletePartitionCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends FlinkInsertOverwriteCommitActionExecutor<T> {

private final List<String> partitions;

public FlinkDeletePartitionCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<?, ?, ?, ?> table,
String instantTime,
List<String> partitions) {
super(context, null, config, table, instantTime, null, WriteOperationType.DELETE_PARTITION);
this.partitions = partitions;
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> execute() {
try {
HoodieTimer timer = new HoodieTimer().startTimer();
context.setJobStatus(this.getClass().getSimpleName(), "Gather all file ids from all deleting partitions.");
Map<String, List<String>> partitionToReplaceFileIds =
context.parallelize(partitions).distinct().collectAsList()
.stream().collect(Collectors.toMap(partitionPath -> partitionPath, this::getAllExistingFileIds));
HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
result.setWriteStatuses(Collections.emptyList());

// created requested
HoodieInstant dropPartitionsInstant = new HoodieInstant(REQUESTED, REPLACE_COMMIT_ACTION, instantTime);
if (!table.getMetaClient().getFs().exists(new Path(table.getMetaClient().getMetaPath(),
dropPartitionsInstant.getFileName()))) {
HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
.setOperationType(WriteOperationType.DELETE_PARTITION.name())
.setExtraMetadata(extraMetadata.orElse(Collections.emptyMap()))
.build();
table.getMetaClient().getActiveTimeline().saveToPendingReplaceCommit(dropPartitionsInstant,
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
}

this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())),
instantTime);
this.commitOnAutoCommit(result);
return result;
} catch (Exception e) {
throw new HoodieDeletePartitionException("Failed to drop partitions for commit time " + instantTime, e);
}
}

private List<String> getAllExistingFileIds(String partitionPath) {
// because new commit is not complete. it is safe to mark all existing file Ids as old files
return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@

import scala.Tuple2;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_QUORUM;
import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_CLIENT_PORT;
import static org.apache.hadoop.hbase.security.SecurityConstants.MASTER_KRB_PRINCIPAL;
import static org.apache.hadoop.hbase.security.SecurityConstants.REGIONSERVER_KRB_PRINCIPAL;
import static org.apache.hadoop.hbase.security.User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY;
import static org.apache.hadoop.hbase.security.User.HBASE_SECURITY_CONF_KEY;

/**
* Hoodie Index implementation backed by HBase.
*/
Expand Down Expand Up @@ -145,22 +154,22 @@ public HBaseIndexQPSResourceAllocator createQPSResourceAllocator(HoodieWriteConf
private Connection getHBaseConnection() {
Configuration hbaseConfig = HBaseConfiguration.create();
String quorum = config.getHbaseZkQuorum();
hbaseConfig.set("hbase.zookeeper.quorum", quorum);
hbaseConfig.set(ZOOKEEPER_QUORUM, quorum);
String zkZnodeParent = config.getHBaseZkZnodeParent();
if (zkZnodeParent != null) {
hbaseConfig.set("zookeeper.znode.parent", zkZnodeParent);
hbaseConfig.set(ZOOKEEPER_ZNODE_PARENT, zkZnodeParent);
}
String port = String.valueOf(config.getHbaseZkPort());
hbaseConfig.set("hbase.zookeeper.property.clientPort", port);
hbaseConfig.set(ZOOKEEPER_CLIENT_PORT, port);

try {
String authentication = config.getHBaseIndexSecurityAuthentication();
if (authentication.equals("kerberos")) {
hbaseConfig.set("hbase.security.authentication", "kerberos");
hbaseConfig.set("hadoop.security.authentication", "kerberos");
hbaseConfig.set("hbase.security.authorization", "true");
hbaseConfig.set("hbase.regionserver.kerberos.principal", config.getHBaseIndexRegionserverPrincipal());
hbaseConfig.set("hbase.master.kerberos.principal", config.getHBaseIndexMasterPrincipal());
hbaseConfig.set(HBASE_SECURITY_CONF_KEY, "kerberos");
hbaseConfig.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
hbaseConfig.set(HBASE_SECURITY_AUTHORIZATION_CONF_KEY, "true");
hbaseConfig.set(REGIONSERVER_KRB_PRINCIPAL, config.getHBaseIndexRegionserverPrincipal());
hbaseConfig.set(MASTER_KRB_PRINCIPAL, config.getHBaseIndexMasterPrincipal());

String principal = config.getHBaseIndexKerberosUserPrincipal();
String keytab = SparkFiles.get(config.getHBaseIndexKerberosUserKeytab());
Expand Down
Loading

0 comments on commit 8e22b49

Please sign in to comment.