Skip to content

Commit

Permalink
HIVE-25390: Metrics compaction_failed_initiator_ratio and compaction_…
Browse files Browse the repository at this point in the history
…failed_cleaner_ratio should be counters (Karen Coppage, reviewed by Laszlo Pinter)

Closes #2534
  • Loading branch information
klcopp committed Jul 28, 2021
1 parent 69c97c2 commit ce62aeb
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 73 deletions.
12 changes: 4 additions & 8 deletions ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.TableName;
Expand Down Expand Up @@ -62,14 +61,15 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.hadoop.hive.conf.Constants.COMPACTOR_CLEANER_THREAD_NAME_FORMAT;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;

import com.codahale.metrics.Counter;

/**
* A class to clean directories after compactions. This will run in a separate thread.
*/
Expand Down Expand Up @@ -98,8 +98,7 @@ public void run() {
try {
boolean metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
Pair<AtomicInteger, AtomicInteger> ratio =
Metrics.getOrCreateRatio(MetricsConstants.COMPACTION_FAILED_CLEANER_RATIO);
Counter failuresCounter = Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER);
do {
TxnStore.MutexAPI.LockHandle handle = null;
long startedAt = -1;
Expand All @@ -112,9 +111,6 @@ public void run() {
// so wrap it in a big catch Throwable statement.
try {
handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
if (metricsEnabled) {
ratio.getRight().incrementAndGet();
}
startedAt = System.currentTimeMillis();
long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();

Expand All @@ -140,7 +136,7 @@ public void run() {
} catch (Throwable t) {
// the lock timeout on AUX lock, should be ignored.
if (metricsEnabled && handle != null) {
ratio.getLeft().incrementAndGet();
failuresCounter.inc();
}
LOG.error("Caught an exception in the main loop of compactor cleaner, " +
StringUtils.stringifyException(t));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;

import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -64,15 +64,13 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.hadoop.hive.conf.Constants.COMPACTOR_INTIATOR_THREAD_NAME_FORMAT;
Expand Down Expand Up @@ -107,8 +105,7 @@ public void run() {
TimeUnit.MILLISECONDS);
boolean metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
Pair<AtomicInteger, AtomicInteger> ratio =
Metrics.getOrCreateRatio(MetricsConstants.COMPACTION_FAILED_INITIATOR_RATIO);
Counter failuresCounter = Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER);

// Make sure we run through the loop once before checking to stop as this makes testing
// much easier. The stop value is only for testing anyway and not used when called from
Expand All @@ -123,7 +120,6 @@ public void run() {
try {
handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name());
if (metricsEnabled) {
ratio.getRight().incrementAndGet();
perfLogger.perfLogBegin(CLASS_NAME, MetricsConstants.COMPACTION_INITIATOR_CYCLE);
}
startedAt = System.currentTimeMillis();
Expand Down Expand Up @@ -182,7 +178,7 @@ public void run() {
} catch (Throwable t) {
// the lock timeout on AUX lock, should be ignored.
if (metricsEnabled && handle != null) {
ratio.getLeft().incrementAndGet();
failuresCounter.inc();
}
LOG.error("Initiator loop caught unexpected exception this time through the loop: " +
StringUtils.stringifyException(t));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;

import org.apache.commons.lang3.tuple.Pair;
import com.codahale.metrics.Counter;
import com.google.common.collect.Maps;
import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.common.metrics.MetricsTestUtils;
Expand Down Expand Up @@ -229,41 +229,33 @@ public void testOldestReadyForCleaningAge() throws Exception {
@Test
public void testInitiatorNoFailure() throws Exception {
startInitiator();
Pair<AtomicInteger, AtomicInteger> ratio =
Metrics.getOrCreateRatio(MetricsConstants.COMPACTION_FAILED_INITIATOR_RATIO);
Assert.assertEquals("numerator mismatch", 0, ratio.getLeft().get());
Assert.assertEquals("denominator mismatch", 1, ratio.getRight().get());
Counter counter = Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER);
Assert.assertEquals("Count incorrect", 0, counter.getCount());
}

@Test
public void testCleanerNoFailure() throws Exception {
startCleaner();
Pair<AtomicInteger, AtomicInteger> ratio =
Metrics.getOrCreateRatio(MetricsConstants.COMPACTION_FAILED_CLEANER_RATIO);
Assert.assertEquals("numerator mismatch", 0, ratio.getLeft().get());
Assert.assertEquals("denominator mismatch", 1, ratio.getRight().get());
Counter counter = Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER);
Assert.assertEquals("Count incorrect", 0, counter.getCount());
}

@Test
public void testInitiatorFailure() throws Exception {
ThrowingTxnHandler.doThrow = true;
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.TXN_STORE_IMPL, "org.apache.hadoop.hive.metastore.txn.ThrowingTxnHandler");
startInitiator();
Pair<AtomicInteger, AtomicInteger> ratio =
Metrics.getOrCreateRatio(MetricsConstants.COMPACTION_FAILED_INITIATOR_RATIO);
Assert.assertEquals("numerator mismatch", 1, ratio.getLeft().get());
Assert.assertEquals("denominator mismatch", 1, ratio.getRight().get());
Counter counter = Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER);
Assert.assertEquals("Count incorrect", 1, counter.getCount());
}

@Test
public void testCleanerFailure() throws Exception {
ThrowingTxnHandler.doThrow = true;
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.TXN_STORE_IMPL, "org.apache.hadoop.hive.metastore.txn.ThrowingTxnHandler");
startCleaner();
Pair<AtomicInteger, AtomicInteger> ratio =
Metrics.getOrCreateRatio(MetricsConstants.COMPACTION_FAILED_CLEANER_RATIO);
Assert.assertEquals("numerator mismatch", 1, ratio.getLeft().get());
Assert.assertEquals("denominator mismatch", 1, ratio.getRight().get());
Counter counter = Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER);
Assert.assertEquals("Count incorrect", 1, counter.getCount());
}

@Test
Expand All @@ -280,11 +272,8 @@ public void testInitiatorAuxFailure() throws Exception {
}
}
// the lock timeout on AUX lock, should be ignored.
Pair<AtomicInteger, AtomicInteger> ratio =
Metrics.getOrCreateRatio(MetricsConstants.COMPACTION_FAILED_INITIATOR_RATIO);
Assert.assertEquals(0, ratio.getLeft().get());
Assert.assertEquals("numerator mismatch", 0, ratio.getLeft().get());
Assert.assertEquals("denominator mismatch", 0, ratio.getRight().get());
Counter failureCounter = Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER);
Assert.assertEquals("count mismatch", 0, failureCounter.getCount());
}

@Test
Expand All @@ -301,10 +290,8 @@ public void testCleanerAuxFailure() throws Exception {
}
}
// the lock timeout on AUX lock, should be ignored.
Pair<AtomicInteger, AtomicInteger> ratio =
Metrics.getOrCreateRatio(MetricsConstants.COMPACTION_FAILED_CLEANER_RATIO);
Assert.assertEquals("numerator mismatch", 0, ratio.getLeft().get());
Assert.assertEquals("denominator mismatch", 0, ratio.getRight().get());
Counter failureCounter = Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER);
Assert.assertEquals("count mismatch", 0, failureCounter.getCount());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.Reporter;
import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.Slf4jReporter;
Expand Down Expand Up @@ -161,35 +160,6 @@ public Integer getValue() {
}
}

/**
* Get the pair of AtomicIntegers behind an existing ratio gauge, or create a new gauge if it does not already
* exist.
* @param name Name of gauge. This should come from MetricConstants
* @return Pair<AtomicInteger, AtomicInteger> as the numerator and denominator of the ratio.
*/
public static Pair<AtomicInteger, AtomicInteger> getOrCreateRatio(String name) {
// We return a garbage value if metrics haven't been initialized so that callers don't have
// to keep checking if the resulting value is null.
if (self == null) return dummyRatio;
Pair<AtomicInteger, AtomicInteger> ratio = self.gaugeRatio.get(name);
if (ratio != null) return ratio;
synchronized (Metrics.class) {
ratio = self.gaugeRatio.get(name);
if (ratio != null) return ratio;
ratio = Pair.of(new AtomicInteger(), new AtomicInteger());
final Pair<AtomicInteger, AtomicInteger> forGauge = ratio;
self.gaugeRatio.put(name, ratio);
self.registry.register(name, new RatioGauge() {
@Override
protected Ratio getRatio() {
return Ratio.of(forGauge.getLeft().get(), forGauge.getRight().get());
}
});
return ratio;
}
}


public static Counter getOpenConnectionsCounter() {
return getOrCreateCounter(MetricsConstants.OPEN_CONNECTIONS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ public class MetricsConstants {
public static final String COMPACTION_STATUS_PREFIX = "compaction_num_";
public static final String COMPACTION_OLDEST_ENQUEUE_AGE = "compaction_oldest_enqueue_age_in_sec";
public static final String COMPACTION_INITIATOR_CYCLE = "compaction_initiator_cycle";
public static final String COMPACTION_FAILED_INITIATOR_RATIO = "compaction_failed_initiator_ratio";
public static final String COMPACTION_INITIATOR_FAILURE_COUNTER = "compaction_initiator_failure_counter";
public static final String COMPACTION_CLEANER_CYCLE = "compaction_cleaner_cycle";
public static final String COMPACTION_FAILED_CLEANER_RATIO = "compaction_failed_cleaner_ratio";
public static final String COMPACTION_CLEANER_FAILURE_COUNTER = "compaction_cleaner_failure_counter";
public static final String COMPACTION_WORKER_CYCLE = "compaction_worker_cycle";

public static final String OLDEST_OPEN_REPL_TXN_ID = "oldest_open_repl_txn_id";
Expand Down

0 comments on commit ce62aeb

Please sign in to comment.