-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-6313] Add metrics counters for compaction requested/completed events. #8759
Changes from 4 commits
953795e
fbdd1d2
f926796
eb78efa
c8b0280
155e496
eab8998
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,11 +35,15 @@ | |
import org.apache.hudi.config.HoodieWriteConfig; | ||
import org.apache.hudi.exception.HoodieCompactionException; | ||
import org.apache.hudi.internal.schema.utils.SerDeHelper; | ||
import org.apache.hudi.metrics.HoodieMetrics; | ||
import org.apache.hudi.table.HoodieCompactionHandler; | ||
import org.apache.hudi.table.HoodieTable; | ||
import org.apache.hudi.table.action.BaseActionExecutor; | ||
import org.apache.hudi.table.action.HoodieWriteMetadata; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.List; | ||
|
||
import static org.apache.hudi.common.util.ValidationUtils.checkArgument; | ||
|
@@ -48,10 +52,14 @@ | |
public class RunCompactionActionExecutor<T> extends | ||
BaseActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>> { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(LogCompactionExecutionHelper.class); | ||
|
||
private final HoodieCompactor compactor; | ||
private final HoodieCompactionHandler compactionHandler; | ||
private WriteOperationType operationType; | ||
|
||
private final HoodieMetrics metrics; | ||
|
||
public RunCompactionActionExecutor(HoodieEngineContext context, | ||
HoodieWriteConfig config, | ||
HoodieTable table, | ||
|
@@ -65,10 +73,14 @@ public RunCompactionActionExecutor(HoodieEngineContext context, | |
this.operationType = operationType; | ||
checkArgument(operationType == WriteOperationType.COMPACT || operationType == WriteOperationType.LOG_COMPACT, | ||
"Only COMPACT and LOG_COMPACT is supported"); | ||
metrics = new HoodieMetrics(config); | ||
} | ||
|
||
@Override | ||
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() { | ||
LOG.info("Compaction requested."); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we add the instant time for compaction There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
metrics.emitCompactionRequested(); | ||
|
||
HoodieTimeline pendingMajorOrMinorCompactionTimeline = WriteOperationType.COMPACT.equals(operationType) | ||
? table.getActiveTimeline().filterPendingCompactionTimeline() | ||
: table.getActiveTimeline().filterPendingLogCompactionTimeline(); | ||
|
@@ -116,6 +128,8 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() { | |
throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e); | ||
} | ||
|
||
LOG.info("Compaction completed."); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we add the instant time for compaction There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
metrics.emitCompactionCompleted(); | ||
return compactionMetadata; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,14 +40,17 @@ | |
import org.apache.hudi.config.HoodieIndexConfig; | ||
import org.apache.hudi.config.HoodieMemoryConfig; | ||
import org.apache.hudi.config.HoodieWriteConfig; | ||
import org.apache.hudi.config.metrics.HoodieMetricsConfig; | ||
import org.apache.hudi.exception.HoodieNotSupportedException; | ||
import org.apache.hudi.index.HoodieIndex; | ||
import org.apache.hudi.index.bloom.HoodieBloomIndex; | ||
import org.apache.hudi.index.bloom.SparkHoodieBloomIndexHelper; | ||
import org.apache.hudi.metrics.HoodieMetrics; | ||
import org.apache.hudi.table.HoodieSparkTable; | ||
import org.apache.hudi.table.HoodieTable; | ||
import org.apache.hudi.testutils.HoodieClientTestHarness; | ||
|
||
import com.codahale.metrics.Counter; | ||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.spark.api.java.JavaRDD; | ||
import org.junit.jupiter.api.AfterEach; | ||
|
@@ -56,6 +59,7 @@ | |
|
||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.SortedMap; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
|
@@ -89,9 +93,22 @@ public void tearDown() throws Exception { | |
private HoodieWriteConfig getConfig() { | ||
return getConfigBuilder() | ||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()) | ||
.withMetricsConfig(getMetricsConfig()) | ||
.build(); | ||
} | ||
|
||
private static HoodieMetricsConfig getMetricsConfig() { | ||
return HoodieMetricsConfig.newBuilder().on(true).withReporterType("INMEMORY").build(); | ||
} | ||
|
||
private long getCompactionMetricCount(String action, String metric) { | ||
HoodieMetrics metrics = writeClient.getMetrics(); | ||
String metricName = metrics.getMetricsName(action, metric); | ||
SortedMap<String, Counter> counters = metrics.getMetrics().getRegistry().getCounters(); | ||
|
||
return counters.containsKey(metricName) ? counters.get(metricName).getCount() : 0; | ||
} | ||
|
||
private HoodieWriteConfig.Builder getConfigBuilder() { | ||
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) | ||
.withParallelism(2, 2) | ||
|
@@ -106,12 +123,18 @@ private HoodieWriteConfig.Builder getConfigBuilder() { | |
@Test | ||
public void testCompactionOnCopyOnWriteFail() throws Exception { | ||
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE); | ||
HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient); | ||
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime(); | ||
assertThrows(HoodieNotSupportedException.class, () -> { | ||
table.scheduleCompaction(context, compactionInstantTime, Option.empty()); | ||
table.compact(context, compactionInstantTime); | ||
}); | ||
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(getConfig());) { | ||
HoodieTable table = HoodieSparkTable.create(getConfig(), context, metaClient); | ||
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime(); | ||
assertThrows(HoodieNotSupportedException.class, () -> { | ||
table.scheduleCompaction(context, compactionInstantTime, Option.empty()); | ||
table.compact(context, compactionInstantTime); | ||
}); | ||
|
||
// Verify compaction.requested, compaction.completed metrics counts. | ||
assertEquals(0, getCompactionMetricCount("counter", "compaction.requested")); | ||
assertEquals(0, getCompactionMetricCount("counter", "compaction.completed")); | ||
} | ||
} | ||
|
||
@Test | ||
|
@@ -129,6 +152,10 @@ public void testCompactionEmpty() { | |
String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime(); | ||
Option<HoodieCompactionPlan> plan = table.scheduleCompaction(context, compactionInstantTime, Option.empty()); | ||
assertFalse(plan.isPresent(), "If there is nothing to compact, result will be empty"); | ||
|
||
// Verify compaction.requested, compaction.completed metrics counts. | ||
assertEquals(0, getCompactionMetricCount("counter", "compaction.requested")); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we re-use the variables here instead of hard coding the metric names ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can use const variables, but from what I could see referring to metric names using inline literal strings appears to be the common pattern in the codebase and hence this usage. Let me know what you think. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @amrishlal, you could use const variable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @SteNicholas Using |
||
assertEquals(0, getCompactionMetricCount("counter", "compaction.completed")); | ||
} | ||
} | ||
|
||
|
@@ -148,7 +175,7 @@ public void testScheduleCompactionWithInflightInstant() { | |
newCommitTime = "102"; | ||
writeClient.startCommitWithTime(newCommitTime); | ||
metaClient.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED, | ||
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty()); | ||
HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty()); | ||
|
||
// create one compaction instance before exist inflight instance. | ||
String compactionTime = "101"; | ||
|
@@ -161,6 +188,7 @@ public void testWriteStatusContentsAfterCompaction() throws Exception { | |
// insert 100 records | ||
HoodieWriteConfig config = getConfigBuilder() | ||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()) | ||
.withMetricsConfig(getMetricsConfig()) | ||
.build(); | ||
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) { | ||
String newCommitTime = "100"; | ||
|
@@ -180,6 +208,10 @@ public void testWriteStatusContentsAfterCompaction() throws Exception { | |
HoodieData<WriteStatus> result = compact(writeClient, compactionInstantTime); | ||
|
||
verifyCompaction(result); | ||
|
||
// Verify compaction.requested, compaction.completed metrics counts. | ||
assertEquals(1, getCompactionMetricCount("counter", "compaction.requested")); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. responded above. |
||
assertEquals(1, getCompactionMetricCount("counter", "compaction.completed")); | ||
} | ||
} | ||
|
||
|
@@ -190,7 +222,9 @@ public void testSpillingWhenCompaction() throws Exception { | |
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()) | ||
.withMemoryConfig(HoodieMemoryConfig.newBuilder() | ||
.withMaxMemoryMaxSize(1L, 1L).build()) // force spill | ||
.withMetricsConfig(getMetricsConfig()) | ||
.build(); | ||
|
||
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) { | ||
String newCommitTime = "100"; | ||
writeClient.startCommitWithTime(newCommitTime); | ||
|
@@ -210,6 +244,10 @@ public void testSpillingWhenCompaction() throws Exception { | |
HoodieData<WriteStatus> result = compact(writeClient, "10" + (i + 1)); | ||
|
||
verifyCompaction(result); | ||
|
||
// Verify compaction.requested, compaction.completed metrics counts. | ||
assertEquals(i / 2 + 1, getCompactionMetricCount("counter", "compaction.requested")); | ||
assertEquals(i / 2 + 1, getCompactionMetricCount("counter", "compaction.completed")); | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RunCompactionActionExecutor.class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.