Skip to content
Permalink
Browse files
HIVE-25897: Move delta metric collection into AcidMetricsService (#2973
…) (Laszlo Pinter, reviewed by Karen Coppage)
  • Loading branch information
lcspinter committed Jan 31, 2022
1 parent 38c3079 commit 9ff9e3d6d50eb577d479b03dba0c739207ffa935
Showing 42 changed files with 784 additions and 817 deletions.
@@ -56,7 +56,6 @@
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -38,6 +38,7 @@
import org.apache.hadoop.hive.metastore.api.TxnOpenException;
import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
@@ -48,7 +49,6 @@
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
import org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
@@ -109,8 +109,7 @@ public void init(AtomicBoolean stop) throws Exception {
conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM),
COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON) &&
MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON);
MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
}

@Override
@@ -193,12 +192,6 @@ public void run() {
}
}

private void updateDeltaFilesMetrics(String dbName, String tableName, String partName, List<Path> obsoleteDirs) {
if (metricsEnabled) {
DeltaFilesMetricReporter.updateMetricsFromCleaner(dbName, tableName, partName, obsoleteDirs, conf, txnHandler);
}
}

private void clean(CompactionInfo ci, long minOpenTxnGLB, boolean metricsEnabled) throws MetaException {
LOG.info("Starting cleaning for " + ci);
PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
@@ -429,7 +422,8 @@ private boolean removeFiles(String location, ValidWriteIdList writeIdList, Compa
.map(Path::getName).collect(Collectors.joining(",")));
boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
if (dir.getObsolete().size() > 0) {
updateDeltaFilesMetrics(ci.dbname, ci.tableName, ci.partName, dir.getObsolete());
AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName, ci.partName, dir.getObsolete(), conf,
txnHandler);
}
return success;
}
@@ -29,7 +29,6 @@
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionResponse;
@@ -45,6 +44,7 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
@@ -55,7 +55,6 @@
import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDirectory;
import org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
@@ -301,12 +300,7 @@ public void init(AtomicBoolean stop) throws Exception {
this.tableCache = Optional.of(CacheBuilder.newBuilder().softValues().build());
}
metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON) &&
MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON);
if (metricsEnabled) {
MetricsFactory.init(conf);
DeltaFilesMetricReporter.init(conf, txnHandler);
}
MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
}

private void recoverFailedCompactions(boolean remoteOnly) throws MetaException {
@@ -363,14 +357,6 @@ private boolean foundCurrentOrFailedCompactions(ShowCompactResponse compactions,
}
return false;
}

private void updateDeltaFilesMetrics(AcidDirectory directory, String dbName, String tableName, String partName,
long baseSize, Map<Path, Long> deltaSizes) {
if (metricsEnabled) {
DeltaFilesMetricReporter.updateMetricsFromInitiator(directory, dbName, tableName, partName, conf, txnHandler,
baseSize, deltaSizes);
}
}

private CompactionType checkForCompaction(final CompactionInfo ci,
final ValidWriteIdList writeIds,
@@ -402,7 +388,8 @@ private CompactionType checkForCompaction(final CompactionInfo ci,
deltaSizes.put(delta.getPath(), getDirSize(fs, delta));
}
long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum);
updateDeltaFilesMetrics(acidDirectory, ci.dbname, ci.tableName, ci.partName, baseSize, deltaSizes);
AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName, ci.partName, conf, txnHandler,
baseSize, deltaSizes, acidDirectory.getObsolete());

if (runJobAsSelf(runAs)) {
return determineCompactionType(ci, acidDirectory, tblproperties, baseSize, deltaSize);
@@ -40,13 +40,13 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.metastore.txn.TxnStatus;
import org.apache.hadoop.hive.ql.DriverUtils;
import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter;
import org.apache.hive.common.util.Ref;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.thrift.TException;
@@ -65,7 +65,6 @@
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
@@ -87,7 +86,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
static final private long SLEEP_TIME = 10000;

private String workerName;
private boolean metricsEnabled;

// TODO: this doesn't check if compaction is already running (even though Initiator does but we
// don't go through Initiator for user initiated compactions)
@@ -146,9 +144,6 @@ public void init(AtomicBoolean stop) throws Exception {
super.init(stop);
this.workerName = getWorkerId();
setName(workerName);
// To enable delta metrics collection, initiator must be enabled on HMS side
metricsEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED) &&
MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
}

@VisibleForTesting
@@ -499,7 +494,8 @@ protected Boolean findNextCompactionAndExecute(boolean collectGenericStats, bool
msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
compactionTxn.wasSuccessful();

updateDeltaFilesMetrics(dir, ci.dbname, ci.tableName, ci.partName, ci.type);
AcidMetricService.updateMetricsFromWorker(ci.dbname, ci.tableName, ci.partName, ci.type,
dir.getCurrentDirectories().size(), dir.getDeleteDeltas().size(), conf, msc);

} catch (Throwable e) {
LOG.error("Caught exception while trying to compact " + ci +
@@ -658,13 +654,6 @@ private String getWorkerId() {
return name.toString();
}

private void updateDeltaFilesMetrics(AcidDirectory directory, String dbName, String tableName, String partName,
CompactionType type) {
if (metricsEnabled) {
DeltaFilesMetricReporter.updateMetricsFromWorker(directory, dbName, tableName, partName, type, conf, msc);
}
}

/**
* Keep track of the compaction's transaction and its operations.
*/

0 comments on commit 9ff9e3d

Please sign in to comment.