Skip to content
Permalink
Browse files
HIVE-25345: Add logging based on new compaction metrics (#2493) (Lasz…
…lo Pinter, reviewed by Karen Coppage)
  • Loading branch information
lcspinter committed Jul 23, 2021
1 parent ba96607 commit f0e93b18c70017dbe0548eb02488d1d1f64ae2a9
Show file tree
Hide file tree
Showing 9 changed files with 343 additions and 28 deletions.
@@ -3194,6 +3194,24 @@ public static enum ConfVars {
"Age of table/partition's oldest aborted transaction when compaction will be triggered. " +
"Default time unit is: hours. Set to a negative number to disable."),

HIVE_COMPACTOR_ACTIVE_DELTA_DIR_THRESHOLD("hive.compactor.active.delta.dir.threshold", 200,
"If the number of active delta directories under a table/partition passes this threshold, a warning" +
" message will be logged."),

HIVE_COMPACTOR_OBSOLETE_DELTA_DIR_THRESHOLD("hive.compactor.obsolete.delta.dir.threshold", 200,
"If the number of obsolete delta directories under a table/partition passes this threshold, a " +
"warning message will be logged."),

HIVE_COMPACTOR_SMALL_DELTA_DIR_THRESHOLD("hive.compactor.small.delta.dir.threshold", 200,
"If the number of small delta directories under a table/partition passes this threshold, a " +
"warning message will be logged."),

HIVE_COMPACTOR_ACID_METRICS_LOGGER_FREQUENCY(
"hive.compactor.acid.metrics.logger.frequency",
"360m", new TimeValidator(TimeUnit.MINUTES),
"Logging frequency of ACID related metrics. Set this value to 0 to completely turn off logging. " +
"Default time unit: minutes"),

HIVE_COMPACTOR_WAIT_TIMEOUT("hive.compactor.wait.timeout", 300000L, "Time out in "
+ "milliseconds for blocking compaction. It's value has to be higher than 2000 milliseconds. "),

@@ -1817,7 +1817,7 @@ static List<OrcSplit> generateSplitsInfo(Configuration conf, Context context)

if (metricsEnabled && directory instanceof AcidDirectory) {
DeltaFilesMetricReporter.mergeDeltaFilesStats((AcidDirectory) directory, checkThresholdInSec,
deltaPctThreshold, deltaFilesStats);
deltaPctThreshold, deltaFilesStats, conf);
}
// We have received a new directory information, make split strategies.
--resultsLeft;
@@ -84,6 +84,8 @@ public class DeltaFilesMetricReporter {

public static final String OBJECT_NAME_PREFIX = "metrics:type=compaction,name=";

private static long lastSuccessfulLoggingTime = 0;

public enum DeltaFilesMetricType {
NUM_OBSOLETE_DELTAS("HIVE_ACID_NUM_OBSOLETE_DELTAS"),
NUM_DELTAS("HIVE_ACID_NUM_DELTAS"),
@@ -193,7 +195,8 @@ private void updateMetrics(DeltaFilesMetricType metric, Cache<String, Integer> c
}

public static void mergeDeltaFilesStats(AcidDirectory dir, long checkThresholdInSec,
float deltaPctThreshold, EnumMap<DeltaFilesMetricType, Map<String, Integer>> deltaFilesStats) throws IOException {
float deltaPctThreshold, EnumMap<DeltaFilesMetricType, Map<String, Integer>> deltaFilesStats,
Configuration conf) throws IOException {
long baseSize = getBaseSize(dir);
int numObsoleteDeltas = getNumObsoleteDeltas(dir, checkThresholdInSec);

@@ -212,11 +215,41 @@ public static void mergeDeltaFilesStats(AcidDirectory dir, long checkThresholdIn
}
}
}

logDeltaDirMetrics(dir, conf, numObsoleteDeltas, numDeltas, numSmallDeltas);

String path = getRelPath(dir);
newDeltaFilesStats(numObsoleteDeltas, numDeltas, numSmallDeltas)
.forEach((type, cnt) -> deltaFilesStats.computeIfAbsent(type, v -> new HashMap<>()).put(path, cnt));
}

private static void logDeltaDirMetrics(AcidDirectory dir, Configuration conf, int numObsoleteDeltas, int numDeltas,
int numSmallDeltas) {
long loggerFrequency = HiveConf
.getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ACID_METRICS_LOGGER_FREQUENCY, TimeUnit.MILLISECONDS);
if (loggerFrequency <= 0) {
return;
}
long currentTime = System.currentTimeMillis();
if (lastSuccessfulLoggingTime == 0 || currentTime >= lastSuccessfulLoggingTime + loggerFrequency) {
if (numDeltas >= HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ACTIVE_DELTA_DIR_THRESHOLD)) {
LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " active delta directories. This can " +
"cause performance degradation.");
}

if (numObsoleteDeltas >=
HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_OBSOLETE_DELTA_DIR_THRESHOLD)) {
LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " obsolete delta directories. This can " +
"indicate compaction cleaner issues.");
}

if (numSmallDeltas >= HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_SMALL_DELTA_DIR_THRESHOLD)) {
LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " small delta directories. This can " +
"indicate performance degradation and there might be a problem with your streaming setup.");
}
}
}

private static int getNumObsoleteDeltas(AcidDirectory dir, long checkThresholdInSec) throws IOException {
int numObsoleteDeltas = 0;
for (Path obsolete : dir.getObsolete()) {
@@ -478,7 +478,7 @@ public void testUpdateCompactionMetrics() {
System.currentTimeMillis(),true, "4.0.0", "4.0.0"));

scr.setCompacts(elements);
AcidMetricService.updateMetricsFromShowCompact(scr);
AcidMetricService.updateMetricsFromShowCompact(scr, conf);

Assert.assertEquals(1,
Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_STATUS_PREFIX +
@@ -516,7 +516,7 @@ public void testAgeMetricsNotSet() {
elements.add(generateElement(14, "db3", "tb4", null, CompactionType.MINOR, TxnStore.CLEANING_RESPONSE, 5L));

scr.setCompacts(elements);
AcidMetricService.updateMetricsFromShowCompact(scr);
AcidMetricService.updateMetricsFromShowCompact(scr, conf);
// Check that it is not set
Assert.assertEquals(0, Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue());
}
@@ -529,7 +529,7 @@ public void testAgeMetricsAge() {
elements.add(generateElement(15,"db3", "tb5", null, CompactionType.MINOR, TxnStore.INITIATED_RESPONSE, start));

scr.setCompacts(elements);
AcidMetricService.updateMetricsFromShowCompact(scr);
AcidMetricService.updateMetricsFromShowCompact(scr, conf);
long diff = (System.currentTimeMillis() - start)/1000;
// Check that we have at least 1s old compaction age, but not more than expected
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue() <= diff);
@@ -547,7 +547,7 @@ public void testAgeMetricsOrder() {
start - 100000L));

scr.setCompacts(elements);
AcidMetricService.updateMetricsFromShowCompact(scr);
AcidMetricService.updateMetricsFromShowCompact(scr, conf);
// Check that the age is older than 10s
Assert.assertTrue(Metrics.getOrCreateGauge(MetricsConstants.COMPACTION_OLDEST_ENQUEUE_AGE).intValue() > 10);

@@ -432,6 +432,100 @@ public enum ConfVars {
COMPACTOR_RUN_AS_USER("metastore.compactor.run.as.user", "hive.compactor.run.as.user", "",
"Specify the user to run compactor Initiator and Worker as. If empty string, defaults to table/partition " +
"directory owner."),
COMPACTOR_OLDEST_REPLICATION_OPENTXN_THRESHOLD_WARNING(
"metastore.compactor.oldest.replication.open.txn.threshold.warning",
"hive.compactor.oldest.replication.open.txn.threshold.warning",
14, TimeUnit.DAYS,
"Age of open replication transaction after which a warning will be logged. Default time unit: days"),
COMPACTOR_OLDEST_REPLICATION_OPENTXN_THRESHOLD_ERROR(
"metastore.compactor.oldest.replication.open.txn.threshold.error",
"hive.compactor.oldest.replication.open.txn.threshold.error",
21, TimeUnit.DAYS,
"Age of open replication transaction after which an error will be logged. Default time unit: days"),
COMPACTOR_OLDEST_OPENTXN_THRESHOLD_WARNING(
"metastore.compactor.oldest.open.txn.threshold.warning",
"hive.compactor.oldest.open.txn.threshold.warning",
24, TimeUnit.HOURS,
"Age of oldest open non-replication transaction after which a warning will be logged. " +
"Default time unit: hours"),
COMPACTOR_OLDEST_OPENTXN_THRESHOLD_ERROR(
"metastore.compactor.oldest.open.txn.threshold.error",
"hive.compactor.oldest.open.txn.threshold.error",
72, TimeUnit.HOURS,
"Age of oldest open non-replication transaction after which an error will be logged. "
+ "Default time unit: hours"),
COMPACTOR_OLDEST_UNCLEANED_ABORTEDTXN_TIME_THRESHOLD_WARNING(
"metastore.compactor.oldest.uncleaned.aborted.txn.time.threshold.warning",
"hive.compactor.oldest.uncleaned.aborted.txn.time.threshold.warning",
24, TimeUnit.HOURS,
"Age of oldest aborted transaction after which a warning will be logged. Default time unit: hours"),
COMPACTOR_OLDEST_UNCLEANED_ABORTEDTXN_TIME_THRESHOLD_ERROR(
"metastore.compactor.oldest.uncleaned.aborted.txn.time.threshold.error",
"hive.compactor.oldest.uncleaned.aborted.txn.time.threshold.error",
48, TimeUnit.HOURS,
"Age of oldest aborted transaction after which an error will be logged. Default time unit: hours"),
COMPACTOR_TABLES_WITH_ABORTEDTXN_THRESHOLD(
"metastore.compactor.tables.with.aborted.txn.threshold",
"hive.compactor.tables.with.aborted.txn.threshold", 1,
"Number of tables has not been compacted and have more than " +
"hive.metastore.acidmetrics.table.aborted.txns.threshold (default 1500) aborted transactions. If this " +
"threshold is passed, a warning will be logged."),
COMPACTOR_OLDEST_UNCLEANED_COMPACTION_TIME_THRESHOLD(
"metastore.compactor.oldest.uncleaned.compaction.time.threshold",
"hive.compactor.oldest.uncleaned.compaction.time.threshold",
24, TimeUnit.HOURS,
"Age of oldest ready for cleaning compaction in the compaction queue. If this threshold is passed, " +
"a warning will be logged. Default time unit is: hours"),
COMPACTOR_FAILED_COMPACTION_RATIO_THRESHOLD(
"metastore.compactor.failed.compaction.ratio.threshold",
"hive.compactor.failed.compaction.ratio.threshold", .01,
"Ratio between the number of failed compactions + not initiated compactions and number of failed " +
"compactions + not initiated compactions + succeeded compactions. If this threshold is passed, a warning " +
"will be logged."),
COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_WARNING(
"metastore.compactor.oldest.initiated.compaction.time.threshold.warning",
"hive.compactor.oldest.initiated.compaction.time.threshold.warning",
1, TimeUnit.HOURS,
"Age of oldest initiated compaction in the compaction queue after which a warning will be logged. " +
"Default time unit is: hours"),
COMPACTOR_OLDEST_INITIATED_COMPACTION_TIME_THRESHOLD_ERROR(
"metastore.compactor.oldest.initiated.compaction.time.threshold.error",
"hive.compactor.oldest.initiated.compaction.time.threshold.error",
12, TimeUnit.HOURS,
"Age of oldest initiated compaction in the compaction queue after which an error will be logged. " +
"Default time unit is: hours"),
COMPACTOR_COMPLETED_TXN_COMPONENTS_RECORD_THRESHOLD_WARNING(
"metastore.compactor.completed.txn.components.record.threshold.warning",
"hive.compactor.completed.txn.components.record.threshold.warning",
500000,
"Number of records in COMPLETED_TXN_COMPONENTS table, after which a warning will be logged."),
COMPACTOR_COMPLETED_TXN_COMPONENTS_RECORD_THRESHOLD_ERROR(
"metastore.compactor.completed.txn.components.record.threshold.error",
"hive.compactor.completed.txn.components.record.threshold.error",
1000000,
"Number of records in COMPLETED_TXN_COMPONENTS table, after which an error will be logged."),
COMPACTOR_TXN_TO_WRITEID_RECORD_THRESHOLD_WARNING(
"metastore.compactor.txn.to.writeid.record.threshold.warning",
"hive.compactor.txn.to.writeid.record.threshold.warning",
500000,
"Number of records in TXN_TO_WRITEID table, after which a warning will be logged."),
COMPACTOR_TXN_TO_WRITEID_RECORD_THRESHOLD_ERROR(
"metastore.compactor.txn.to.writeid.record.threshold.error",
"hive.compactor.txn.to.writeid.record.threshold.error",
1000000,
"Number of records in TXN_TO_WRITEID table, after which an error will be logged."),
COMPACTOR_NUMBER_OF_DISABLED_COMPACTION_TABLES_THRESHOLD(
"metastore.compactor.number.of.disabled.compaction.tables.threshold",
"hive.compactor.number.of.disabled.compaction.tables.threshold",
1,
"If the number of writes to tables where auto-compaction is disabled reaches this threshold, a " +
"warning will be logged after every subsequent write to any table where auto-compaction is disabled."),
COMPACTOR_ACID_METRICS_LOGGER_FREQUENCY(
"metastore.compactor.acid.metrics.logger.frequency",
"hive.compactor.acid.metrics.logger.frequency",
360, TimeUnit.MINUTES,
"Logging frequency of ACID related metrics. Set this value to 0 to completely turn off logging. " +
"Default time unit: minutes"),
METASTORE_HOUSEKEEPING_LEADER_HOSTNAME("metastore.housekeeping.leader.hostname",
"hive.metastore.housekeeping.leader.hostname", "",
"If there are multiple Thrift metastore services running, the hostname of Thrift metastore " +
@@ -99,10 +99,16 @@ public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaExceptio
public void onAllocWriteId(AllocWriteIdEvent allocWriteIdEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException {
if (MetastoreConf.getBoolVar(getConf(), MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
Table table = getTable(allocWriteIdEvent);

// In the case of CTAS, the table is created after write ids are allocated, so we'll skip metrics collection.
if (table != null && MetaStoreUtils.isNoAutoCompactSet(table.getParameters())) {
Metrics.getOrCreateGauge(MetricsConstants.WRITES_TO_DISABLED_COMPACTION_TABLE).incrementAndGet();
int noAutoCompactSet =
Metrics.getOrCreateGauge(MetricsConstants.WRITES_TO_DISABLED_COMPACTION_TABLE).incrementAndGet();
if (noAutoCompactSet >=
MetastoreConf.getIntVar(getConf(),
MetastoreConf.ConfVars.COMPACTOR_NUMBER_OF_DISABLED_COMPACTION_TABLES_THRESHOLD)) {
LOGGER.warn("There has been a write to table " + table.getDbName() + "." + table.getTableName() +
" where auto-compaction is disabled (tblproperties (\"no_auto_compact\"=\"true\")).");
}
}
}
}

0 comments on commit f0e93b1

Please sign in to comment.