Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6313] Add metrics counters for compaction requested/completed events. #8759

Merged
merged 7 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;

Expand Down Expand Up @@ -50,6 +51,8 @@ public class HoodieMetrics {
private String conflictResolutionTimerName = null;
private String conflictResolutionSuccessCounterName = null;
private String conflictResolutionFailureCounterName = null;
private String compactionRequestedCounterName = null;
private String compactionCompletedCounterName = null;
private HoodieWriteConfig config;
private String tableName;
private Timer rollbackTimer = null;
Expand All @@ -64,6 +67,8 @@ public class HoodieMetrics {
private Timer conflictResolutionTimer = null;
private Counter conflictResolutionSuccessCounter = null;
private Counter conflictResolutionFailureCounter = null;
private Counter compactionRequestedCounter = null;
private Counter compactionCompletedCounter = null;

public HoodieMetrics(HoodieWriteConfig config) {
this.config = config;
Expand All @@ -82,6 +87,8 @@ public HoodieMetrics(HoodieWriteConfig config) {
this.conflictResolutionTimerName = getMetricsName("timer", "conflict_resolution");
this.conflictResolutionSuccessCounterName = getMetricsName("counter", "conflict_resolution.success");
this.conflictResolutionFailureCounterName = getMetricsName("counter", "conflict_resolution.failure");
this.compactionRequestedCounterName = getMetricsName("counter", "compaction.requested");
this.compactionCompletedCounterName = getMetricsName("counter", "compaction.completed");
}
}

Expand Down Expand Up @@ -270,7 +277,8 @@ public void updateIndexMetrics(final String action, final long durationInMs) {
}
}

String getMetricsName(String action, String metric) {
@VisibleForTesting
public String getMetricsName(String action, String metric) {
return config == null ? null : String.format("%s.%s.%s", config.getMetricReporterMetricsNamePrefix(), action, metric);
}

Expand Down Expand Up @@ -308,6 +316,20 @@ public void emitConflictResolutionFailed() {
}
}

public void emitCompactionRequested() {
if (config.isMetricsOn()) {
compactionRequestedCounter = getCounter(compactionRequestedCounter, compactionRequestedCounterName);
compactionRequestedCounter.inc();
}
}

public void emitCompactionCompleted() {
if (config.isMetricsOn()) {
compactionCompletedCounter = getCounter(compactionCompletedCounter, compactionCompletedCounterName);
compactionCompletedCounter.inc();
}
}

private Counter getCounter(Counter counter, String name) {
if (counter == null) {
return metrics.getRegistry().counter(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieCompactionHandler;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
Expand All @@ -48,10 +52,14 @@
public class RunCompactionActionExecutor<T> extends
BaseActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>> {

private static final Logger LOG = LoggerFactory.getLogger(RunCompactionActionExecutor.class);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RunCompactionActionExecutor.class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

private final HoodieCompactor compactor;
private final HoodieCompactionHandler compactionHandler;
private WriteOperationType operationType;

private final HoodieMetrics metrics;

public RunCompactionActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable table,
Expand All @@ -65,10 +73,14 @@ public RunCompactionActionExecutor(HoodieEngineContext context,
this.operationType = operationType;
checkArgument(operationType == WriteOperationType.COMPACT || operationType == WriteOperationType.LOG_COMPACT,
"Only COMPACT and LOG_COMPACT is supported");
metrics = new HoodieMetrics(config);
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
LOG.info("Compaction requested. Instant time: {}.", instantTime);
metrics.emitCompactionRequested();

HoodieTimeline pendingMajorOrMinorCompactionTimeline = WriteOperationType.COMPACT.equals(operationType)
? table.getActiveTimeline().filterPendingCompactionTimeline()
: table.getActiveTimeline().filterPendingLogCompactionTimeline();
Expand Down Expand Up @@ -117,6 +129,8 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e);
}

LOG.info("Compaction completed. Instant time: {}.", instantTime);
metrics.emitCompactionCompleted();
return compactionMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,17 @@
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bloom.HoodieBloomIndex;
import org.apache.hudi.index.bloom.SparkHoodieBloomIndexHelper;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;

import com.codahale.metrics.Counter;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -56,6 +59,7 @@

import java.io.IOException;
import java.util.List;
import java.util.SortedMap;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -89,9 +93,22 @@ public void tearDown() throws Exception {
private HoodieWriteConfig getConfig() {
return getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withMetricsConfig(getMetricsConfig())
.build();
}

private static HoodieMetricsConfig getMetricsConfig() {
return HoodieMetricsConfig.newBuilder().on(true).withReporterType("INMEMORY").build();
}

private long getCompactionMetricCount(String metric) {
HoodieMetrics metrics = writeClient.getMetrics();
String metricName = metrics.getMetricsName("counter", metric);
SortedMap<String, Counter> counters = metrics.getMetrics().getRegistry().getCounters();

return counters.containsKey(metricName) ? counters.get(metricName).getCount() : 0;
}

private HoodieWriteConfig.Builder getConfigBuilder() {
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
Expand All @@ -106,12 +123,18 @@ private HoodieWriteConfig.Builder getConfigBuilder() {
@Test
public void testCompactionOnCopyOnWriteFail() throws Exception {
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
assertThrows(HoodieNotSupportedException.class, () -> {
table.scheduleCompaction(context, compactionInstantTime, Option.empty());
table.compact(context, compactionInstantTime);
});
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(getConfig());) {
HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient);
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
assertThrows(HoodieNotSupportedException.class, () -> {
table.scheduleCompaction(context, compactionInstantTime, Option.empty());
table.compact(context, compactionInstantTime);
});

// Verify compaction.requested, compaction.completed metrics counts.
assertEquals(0, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
assertEquals(0, getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
}
}

@Test
Expand All @@ -129,6 +152,10 @@ public void testCompactionEmpty() {
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
Option<HoodieCompactionPlan> plan = table.scheduleCompaction(context, compactionInstantTime, Option.empty());
assertFalse(plan.isPresent(), "If there is nothing to compact, result will be empty");

// Verify compaction.requested, compaction.completed metrics counts.
assertEquals(0, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
assertEquals(0, getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
}
}

Expand All @@ -148,7 +175,7 @@ public void testScheduleCompactionWithInflightInstant() {
newCommitTime = "102";
writeClient.startCommitWithTime(newCommitTime);
metaClient.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());

// create one compaction instance before exist inflight instance.
String compactionTime = "101";
Expand All @@ -161,6 +188,7 @@ public void testWriteStatusContentsAfterCompaction() throws Exception {
// insert 100 records
HoodieWriteConfig config = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withMetricsConfig(getMetricsConfig())
.build();
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
String newCommitTime = "100";
Expand All @@ -180,6 +208,10 @@ public void testWriteStatusContentsAfterCompaction() throws Exception {
HoodieData<WriteStatus> result = compact(writeClient, compactionInstantTime);

verifyCompaction(result);

// Verify compaction.requested, compaction.completed metrics counts.
assertEquals(1, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
assertEquals(1, getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
}
}

Expand All @@ -190,7 +222,9 @@ public void testSpillingWhenCompaction() throws Exception {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withMemoryConfig(HoodieMemoryConfig.newBuilder()
.withMaxMemoryMaxSize(1L, 1L).build()) // force spill
.withMetricsConfig(getMetricsConfig())
.build();

try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
Expand All @@ -210,6 +244,10 @@ public void testSpillingWhenCompaction() throws Exception {
HoodieData<WriteStatus> result = compact(writeClient, "10" + (i + 1));

verifyCompaction(result);

// Verify compaction.requested, compaction.completed metrics counts.
assertEquals(i / 2 + 1, getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
assertEquals(i / 2 + 1, getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ public interface HoodieTimeline extends Serializable {
String COMPACTION_ACTION = "compaction";
String LOG_COMPACTION_ACTION = "logcompaction";
String REQUESTED_EXTENSION = ".requested";
String COMPLETED_EXTENSION = ".completed";
String RESTORE_ACTION = "restore";
String INDEXING_ACTION = "indexing";
// only for schema save
String SCHEMA_COMMIT_ACTION = "schemacommit";

String[] VALID_ACTIONS_IN_TIMELINE = {COMMIT_ACTION, DELTA_COMMIT_ACTION,
CLEAN_ACTION, SAVEPOINT_ACTION, RESTORE_ACTION, ROLLBACK_ACTION,
COMPACTION_ACTION, REPLACE_COMMIT_ACTION, INDEXING_ACTION};
Expand All @@ -81,6 +81,7 @@ public interface HoodieTimeline extends Serializable {
String REQUESTED_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + REQUESTED_EXTENSION;
String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + INFLIGHT_EXTENSION;
String REQUESTED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, REQUESTED_EXTENSION);
String COMPLETED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, COMPLETED_EXTENSION);
String REQUESTED_COMPACTION_EXTENSION = StringUtils.join(".", REQUESTED_COMPACTION_SUFFIX);
String INFLIGHT_COMPACTION_EXTENSION = StringUtils.join(".", COMPACTION_ACTION, INFLIGHT_EXTENSION);
String REQUESTED_RESTORE_EXTENSION = "." + RESTORE_ACTION + REQUESTED_EXTENSION;
Expand Down