Skip to content

Commit

Permalink
Increment cluster counter and delete orphaned meter in MetricsStore
Browse files Browse the repository at this point in the history
Change to store the cluster counter value directly instead of
remembering each counter reported by client or worker.
This is helpful: (1) to reduce the space needed to store reported
metrics (2) because counter value is historical summarized value. There
is no need to delete the orphaned counter value.

For all other metrics, we keep the original way: store all the original
reported metrics and update the cluster metric value periodically.
But to make it more accurate, a delete orphaned metrics logic is added.
Metrics reported by worker/client which hasn't reported for a long
period of time (configurable) will be deleted. This can help us to get
more accurate throughput value. worker/client report throughput as the
last 1 minute rate which does not make sense if it's orphaned.

pr-link: #10850
change-id: cid-18696a6295023a783e2785ba845a6a58d4d6c862
  • Loading branch information
LuQQiu committed Feb 7, 2020
1 parent 1120a53 commit 32efede
Show file tree
Hide file tree
Showing 13 changed files with 408 additions and 179 deletions.
24 changes: 23 additions & 1 deletion core/common/src/main/java/alluxio/conf/PropertyKey.java
Expand Up @@ -1283,7 +1283,7 @@ public String toString() {
.build();
public static final PropertyKey MASTER_CLUSTER_METRICS_UPDATE_INTERVAL =
new Builder(Name.MASTER_CLUSTER_METRICS_UPDATE_INTERVAL)
.setDefaultValue("1m")
.setDefaultValue("1min")
.setDescription("The interval for periodically updating the cluster level metrics.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.MASTER)
Expand Down Expand Up @@ -1437,6 +1437,24 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey MASTER_REPORTED_METRICS_CLEANUP_INTERVAL =
new Builder(Name.MASTER_REPORTED_METRICS_CLEANUP_INTERVAL)
.setDefaultValue("5min")
.setDescription("The interval for periodically cleanup the orphaned metrics "
+ "which are reported by lost workers/clients and stored "
+ "in the leading master metrics store.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey MASTER_REPORTED_METRICS_CLEANUP_AGE =
new Builder(Name.MASTER_REPORTED_METRICS_CLEANUP_AGE)
.setDefaultValue("5min")
.setDescription("All the metrics which are reported by workers or clients "
+ "which haven't reported for period longer than this cleanup age "
+ "will be removed from the leading master.")
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.MASTER)
.build();
public static final PropertyKey MASTER_STANDBY_HEARTBEAT_INTERVAL =
new Builder(Name.MASTER_STANDBY_HEARTBEAT_INTERVAL)
.setDefaultValue("2min")
Expand Down Expand Up @@ -4142,6 +4160,10 @@ public static final class Name {
public static final String MASTER_FILE_ACCESS_TIME_UPDATER_SHUTDOWN_TIMEOUT =
"alluxio.master.file.access.time.updater.shutdown.timeout";
public static final String MASTER_FORMAT_FILE_PREFIX = "alluxio.master.format.file.prefix";
public static final String MASTER_REPORTED_METRICS_CLEANUP_INTERVAL =
"alluxio.master.reported.metrics.cleanup.interval";
public static final String MASTER_REPORTED_METRICS_CLEANUP_AGE =
"alluxio.master.reported.metrics.cleanup.age";
public static final String MASTER_STANDBY_HEARTBEAT_INTERVAL =
"alluxio.master.standby.heartbeat.interval";
public static final String MASTER_LOST_WORKER_FILE_DETECTION_INTERVAL =
Expand Down
Expand Up @@ -45,6 +45,7 @@ public final class HeartbeatContext {
public static final String MASTER_LOST_FILES_DETECTION = "Master Lost Files Detection";
public static final String MASTER_LOST_MASTER_DETECTION = "Master Lost Master Detection";
public static final String MASTER_LOST_WORKER_DETECTION = "Master Lost Worker Detection";
public static final String MASTER_ORPHANED_METRICS_CLEANER = "Master Orphaned Metrics Cleaner";
public static final String MASTER_METRICS_SYNC = "Master Metrics Sync";
public static final String MASTER_METRICS_TIME_SERIES = "Master Metrics Time Series";
public static final String MASTER_UFS_CLEANUP = "Master Ufs Cleanup";
Expand Down Expand Up @@ -92,6 +93,7 @@ public final class HeartbeatContext {
sTimerClasses.put(WORKER_SPACE_RESERVER, SLEEPING_TIMER_CLASS);
sTimerClasses.put(WORKER_STORAGE_HEALTH, SLEEPING_TIMER_CLASS);
sTimerClasses.put(MASTER_METRICS_SYNC, SLEEPING_TIMER_CLASS);
sTimerClasses.put(MASTER_ORPHANED_METRICS_CLEANER, SLEEPING_TIMER_CLASS);
sTimerClasses.put(MASTER_METRICS_TIME_SERIES, SLEEPING_TIMER_CLASS);
}

Expand Down
50 changes: 34 additions & 16 deletions core/common/src/main/java/alluxio/metrics/Metric.java
Expand Up @@ -36,7 +36,7 @@ public final class Metric implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(Metric.class);
private static final long serialVersionUID = -2236393414222298333L;

private static final String ID_SEPARATOR = "-id:";
public static final String ID_SEPARATOR = "-id:";
public static final String TAG_SEPARATOR = ":";
private static final ConcurrentHashMap<UserMetricKey, String> CACHED_METRICS =
new ConcurrentHashMap<>();
Expand All @@ -52,7 +52,7 @@ public final class Metric implements Serializable {

/**
* The unique identifier to represent this metric.
* The pattern is instance.[hostname-id:instanceId.]name[.tagName:tagValue]*.
* The pattern is instance.name[.tagName:tagValue]*[hostname-id:instanceId.].
* Fetched once and assumed to be immutable.
*/
private final Supplier<String> mFullMetricNameSupplier =
Expand Down Expand Up @@ -204,7 +204,7 @@ public int hashCode() {

/**
* @return the fully qualified metric name, which is of pattern
* instance.name[.hostname-id:instanceId][.tagName:tagValue]*, where the tags are appended
* instance.name[.tagName:tagValue]*[.hostname-id:instanceId], where the tags are appended
* at the end
*/
public String getFullMetricName() {
Expand All @@ -213,24 +213,23 @@ public String getFullMetricName() {

/**
* @return the fully qualified metric name, which is of pattern
* instance.[hostname-id:instanceId.]name[.tagName:tagValue]*, where the tags are appended
* instance.name[.tagName:tagValue]*[.hostname-id:instanceId], where the tags are appended
* at the end
*/
private String constructFullMetricName() {
StringBuilder sb = new StringBuilder();
sb.append(mInstanceType).append('.');
sb.append(mName);
for (Entry<String, String> entry : mTags.entrySet()) {
sb.append('.').append(entry.getKey()).append(TAG_SEPARATOR).append(entry.getValue());
}
if (mHostname != null) {
sb.append('.');
sb.append(mHostname);
if (mInstanceId != null) {
sb.append(ID_SEPARATOR).append(mInstanceId);
}
}

for (Entry<String, String> entry : mTags.entrySet()) {
sb.append('.').append(entry.getKey()).append(TAG_SEPARATOR).append(entry.getValue());
}
return sb.toString();
}

Expand Down Expand Up @@ -289,6 +288,26 @@ public static String getMetricNameWithUserTag(String metricName, String userName
+ TAG_SEPARATOR + userName);
}

/**
* Gets value of ufs tag from the full metric name.
*
* @param fullName the full metric name
* @return value of ufs tag
*/
public static String getTagUfsValueFromFullName(String fullName) {
String[] pieces = fullName.split("\\.");
if (pieces.length < 3) {
return null;
}
for (int i = 2; i < pieces.length; i++) {
String current = pieces[i];
if (current.contains(TAG_SEPARATOR) && current.contains(MetricInfo.TAG_UFS)) {
return current.substring(current.indexOf(TAG_SEPARATOR) + 1);
}
}
return null;
}

/**
* Creates the metric from the full name and the value.
*
Expand All @@ -300,32 +319,31 @@ public static String getMetricNameWithUserTag(String metricName, String userName
public static Metric from(String fullName, double value, MetricType metricType) {
String[] pieces = fullName.split("\\.");
Preconditions.checkArgument(pieces.length > 1, "Incorrect metrics name: %s.", fullName);

int len = pieces.length;
String hostname = null;
String id = null;
String name = null;
int tagStartIdx = 0;
int tagEndIndex = len;
// Master or cluster metrics don't have hostname included.
if (pieces[0].equals(MetricsSystem.InstanceType.MASTER.toString())
|| pieces[0].equals(MetricsSystem.CLUSTER)) {
name = pieces[1];
tagStartIdx = 2;
} else {
if (pieces[2].contains(ID_SEPARATOR)) {
String[] ids = pieces[2].split(ID_SEPARATOR);
if (pieces[len - 1].contains(ID_SEPARATOR)) {
String[] ids = pieces[len - 1].split(ID_SEPARATOR);
hostname = ids[0];
id = ids[1];
} else {
hostname = pieces[2];
hostname = pieces[len - 1];
}
name = pieces[1];
tagStartIdx = 3;
tagEndIndex = len - 1;
}
MetricsSystem.InstanceType instance = MetricsSystem.InstanceType.fromString(pieces[0]);
Metric metric = new Metric(instance, hostname, id, metricType, name, value);

// parse tags
for (int i = tagStartIdx; i < pieces.length; i++) {
for (int i = 2; i < tagEndIndex; i++) {
String tagStr = pieces[i];
if (!tagStr.contains(TAG_SEPARATOR)) {
// Unknown tag
Expand Down
31 changes: 21 additions & 10 deletions core/common/src/main/java/alluxio/metrics/MetricKey.java
Expand Up @@ -110,6 +110,17 @@ public String getName() {
return mName;
}

/**
* @return the name of the Metric without instance prefix
*/
public String getMetricName() {
String[] pieces = mName.split("\\.");
if (pieces.length <= 1) {
return mName;
}
return pieces[1];
}

/**
* @return the description of a Metric
*/
Expand Down Expand Up @@ -402,7 +413,7 @@ public MetricKey build() {
new Builder(Name.CLUSTER_BYTES_READ_ALLUXIO)
.setDescription("Total number of bytes read from Alluxio storage reported "
+ "by all workers. This does not include UFS reads.")
.setMetricType(MetricType.GAUGE)
.setMetricType(MetricType.COUNTER)
.build();
public static final MetricKey CLUSTER_BYTES_READ_ALLUXIO_THROUGHPUT =
new Builder(Name.CLUSTER_BYTES_READ_ALLUXIO_THROUGHPUT)
Expand All @@ -413,7 +424,7 @@ public MetricKey build() {
new Builder(Name.CLUSTER_BYTES_READ_DOMAIN)
.setDescription("Total number of bytes read from Alluxio storage "
+ "via domain socket reported by all workers")
.setMetricType(MetricType.GAUGE)
.setMetricType(MetricType.COUNTER)
.build();
public static final MetricKey CLUSTER_BYTES_READ_DOMAIN_THROUGHPUT =
new Builder(Name.CLUSTER_BYTES_READ_DOMAIN_THROUGHPUT)
Expand All @@ -425,7 +436,7 @@ public MetricKey build() {
new Builder(Name.CLUSTER_BYTES_READ_LOCAL)
.setDescription("Total number of bytes short-circuit read from local storage "
+ "by all clients")
.setMetricType(MetricType.GAUGE)
.setMetricType(MetricType.COUNTER)
.build();
public static final MetricKey CLUSTER_BYTES_READ_LOCAL_THROUGHPUT =
new Builder(Name.CLUSTER_BYTES_READ_LOCAL_THROUGHPUT)
Expand All @@ -435,12 +446,12 @@ public MetricKey build() {
public static final MetricKey CLUSTER_BYTES_READ_UFS =
new Builder(Name.CLUSTER_BYTES_READ_UFS)
.setDescription("Total number of bytes read from a specific UFS by all workers")
.setMetricType(MetricType.GAUGE)
.setMetricType(MetricType.COUNTER)
.build();
public static final MetricKey CLUSTER_BYTES_READ_UFS_ALL =
new Builder(Name.CLUSTER_BYTES_READ_UFS_ALL)
.setDescription("Total number of bytes read from a all Alluxio UFSes by all workers")
.setMetricType(MetricType.GAUGE)
.setMetricType(MetricType.COUNTER)
.build();
public static final MetricKey CLUSTER_BYTES_READ_UFS_THROUGHPUT =
new Builder(Name.CLUSTER_BYTES_READ_UFS_THROUGHPUT)
Expand All @@ -451,7 +462,7 @@ public MetricKey build() {
new Builder(Name.CLUSTER_BYTES_WRITTEN_ALLUXIO)
.setDescription("Total number of bytes written to Alluxio storage in all workers. "
+ "This does not include UFS writes")
.setMetricType(MetricType.GAUGE)
.setMetricType(MetricType.COUNTER)
.build();
public static final MetricKey CLUSTER_BYTES_WRITTEN_ALLUXIO_THROUGHPUT =
new Builder(Name.CLUSTER_BYTES_WRITTEN_ALLUXIO_THROUGHPUT)
Expand All @@ -462,7 +473,7 @@ public MetricKey build() {
new Builder(Name.CLUSTER_BYTES_WRITTEN_DOMAIN)
.setDescription("Total number of bytes written to Alluxio storage "
+ "via domain socket by all workers")
.setMetricType(MetricType.GAUGE)
.setMetricType(MetricType.COUNTER)
.build();
public static final MetricKey CLUSTER_BYTES_WRITTEN_DOMAIN_THROUGHPUT =
new Builder(Name.CLUSTER_BYTES_WRITTEN_DOMAIN_THROUGHPUT)
Expand All @@ -474,7 +485,7 @@ public MetricKey build() {
new Builder(Name.CLUSTER_BYTES_WRITTEN_LOCAL)
.setDescription("Total number of bytes short-circuit written to local storage "
+ "by all clients")
.setMetricType(MetricType.GAUGE)
.setMetricType(MetricType.COUNTER)
.build();
public static final MetricKey CLUSTER_BYTES_WRITTEN_LOCAL_THROUGHPUT =
new Builder(Name.CLUSTER_BYTES_WRITTEN_LOCAL_THROUGHPUT)
Expand All @@ -484,12 +495,12 @@ public MetricKey build() {
public static final MetricKey CLUSTER_BYTES_WRITTEN_UFS =
new Builder(Name.CLUSTER_BYTES_WRITTEN_UFS)
.setDescription("Total number of bytes written to a specific Alluxio UFS by all workers")
.setMetricType(MetricType.GAUGE)
.setMetricType(MetricType.COUNTER)
.build();
public static final MetricKey CLUSTER_BYTES_WRITTEN_UFS_ALL =
new Builder(Name.CLUSTER_BYTES_WRITTEN_UFS_ALL)
.setDescription("Total number of bytes written to all Alluxio UFSes by all workers")
.setMetricType(MetricType.GAUGE)
.setMetricType(MetricType.COUNTER)
.build();
public static final MetricKey CLUSTER_BYTES_WRITTEN_UFS_THROUGHPUT =
new Builder(Name.CLUSTER_BYTES_WRITTEN_UFS_THROUGHPUT)
Expand Down
5 changes: 4 additions & 1 deletion core/common/src/main/java/alluxio/metrics/MetricsSystem.java
Expand Up @@ -209,6 +209,9 @@ public static synchronized int getNumSinks() {
* @return the metric with instance and id tags
*/
public static String getMetricName(String name) {
if (name.startsWith(CLUSTER)) {
return name;
}
switch (CommonUtils.PROCESS_TYPE.get()) {
case CLIENT:
return getClientMetricName(name);
Expand Down Expand Up @@ -451,7 +454,7 @@ public static Timer timer(String name) {
* @param <T> the type
*/
public static synchronized <T> void registerGaugeIfAbsent(String name, Gauge<T> metric) {
if (!METRIC_REGISTRY.getGauges().containsKey(name)) {
if (!METRIC_REGISTRY.getMetrics().containsKey(name)) {
METRIC_REGISTRY.register(name, metric);
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/common/src/test/java/alluxio/metrics/MetricTest.java
Expand Up @@ -34,7 +34,7 @@ public void proto() {

@Test
public void testFullNameParsing() {
String fullName = "Client.metric.192_1_1_1|A.tag1:A::/.tag2:B:/";
String fullName = "Client.metric.tag1:A::/.tag2:B:/.192_1_1_1|A";
Metric metric = Metric.from(fullName, 1, MetricType.COUNTER);
assertEquals(fullName, metric.getFullMetricName());
}
Expand Down

0 comments on commit 32efede

Please sign in to comment.