Skip to content
Permalink
Browse files

Improve concurrency for MetricsStore

Cherry-pick of existing commit.
orig-pr: #10493
orig-commit: 40d07a2
orig-commit-author: LuQQiu <luqiujob@gmail.com>

pr-link: #10560
change-id: cid-3c92e89a816d57dceeb257c69f506a51793fd078
  • Loading branch information
alluxio-bot committed Dec 3, 2019
1 parent f80c06e commit c2d580b1288859239ddbc6c8844bf28edb0d0683
@@ -13,9 +13,12 @@

import alluxio.grpc.MetricType;

import alluxio.util.CommonUtils;

import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AtomicDouble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -24,6 +27,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

/**
* A metric of a given instance. The instance can be master, worker, or client.
@@ -40,13 +44,22 @@
private final MetricsSystem.InstanceType mInstanceType;
private final String mHostname;
private final String mName;
private final Double mValue;
private final MetricType mMetricType;
private String mInstanceId;
// TODO(yupeng): consider a dedicated data structure for tag, when more functionality are added to
// tags in the future
private final Map<String, String> mTags;

/**
* The unique identifier to represent this metric.
* The pattern is instance.[hostname-id:instanceId.]name[.tagName:tagValue]*.
* Fetched once and assumed to be immutable.
*/
private final Supplier<String> mFullMetricNameSupplier =
CommonUtils.memoize(this::constructFullMetricName);

private AtomicDouble mValue;

/**
* Constructs a {@link Metric} instance.
*
@@ -79,10 +92,37 @@ public Metric(MetricsSystem.InstanceType instanceType, String hostname, String i
mInstanceId = id;
mMetricType = metricType;
mName = name;
mValue = value;
mValue = new AtomicDouble(value);
mTags = new LinkedHashMap<>();
}

/**
* Add metric value delta to the existing value.
* This method should only be used by {@link alluxio.master.metrics.MetricsStore}
*
* @param delta value to add
*/
public void addValue(double delta) {
mValue.addAndGet(delta);
}

/**
* Set the metric value.
* This method should only be used by {@link alluxio.master.metrics.MetricsStore}
*
* @param value value to set
*/
public void setValue(double value) {
mValue.set(value);
}

/**
* @return the metric value
*/
public double getValue() {
return mValue.get();
}

/**
* @return the instance type
*/
@@ -121,13 +161,6 @@ public String getName() {
return mName;
}

/**
* @return the metric value
*/
public double getValue() {
return mValue;
}

/**
* @return the instance id
*/
@@ -161,12 +194,12 @@ public boolean equals(Object other) {
}
Metric metric = (Metric) other;
return Objects.equal(getFullMetricName(), metric.getFullMetricName())
&& Objects.equal(mValue, metric.mValue);
&& Objects.equal(mValue.get(), metric.mValue.get());
}

@Override
public int hashCode() {
return Objects.hashCode(getFullMetricName(), mValue);
return Objects.hashCode(getFullMetricName(), mValue.get());
}

/**
@@ -175,6 +208,15 @@ public int hashCode() {
* at the end
*/
public String getFullMetricName() {
return mFullMetricNameSupplier.get();
}

/**
* @return the fully qualified metric name, which is of pattern
* instance.[hostname-id:instanceId.]name[.tagName:tagValue]*, where the tags are appended
* at the end
*/
private String constructFullMetricName() {
StringBuilder sb = new StringBuilder();
sb.append(mInstanceType).append('.');
if (mHostname != null) {
@@ -198,7 +240,7 @@ public String getFullMetricName() {
public alluxio.grpc.Metric toProto() {
alluxio.grpc.Metric.Builder metric = alluxio.grpc.Metric.newBuilder();
metric.setInstance(mInstanceType.toString()).setHostname(mHostname).setMetricType(mMetricType)
.setName(mName).setValue(mValue).putAllTags(mTags);
.setName(mName).setValue(mValue.get()).putAllTags(mTags);

if (mInstanceId != null && !mInstanceId.isEmpty()) {
metric.setInstanceId(mInstanceId);
@@ -320,7 +362,7 @@ public String toString() {
.add("metricType", mMetricType)
.add("name", mName)
.add("tags", mTags)
.add("value", mValue)
.add("value", mValue.get())
.toString();
}

@@ -26,7 +26,6 @@
import alluxio.underfs.options.MkdirsOptions;
import alluxio.underfs.options.OpenOptions;
import alluxio.util.CommonUtils;
import alluxio.util.UnderFileSystemUtils;
import alluxio.util.executor.ExecutorServiceFactories;
import alluxio.util.io.PathUtils;

@@ -79,7 +78,7 @@

/** The root key of an object fs. */
protected final Supplier<String> mRootKeySupplier =
UnderFileSystemUtils.memoize(this::getRootKey);
CommonUtils.memoize(this::getRootKey);

/**
* Constructs an {@link ObjectUnderFileSystem}.
@@ -691,5 +691,31 @@ public static String convertMsToClockTime(long millis) {
return output;
}

/**
* Memoize implementation for java.util.function.supplier.
*
* @param original the original supplier
* @param <T> the object type
* @return the supplier with memorization
*/
public static <T> Supplier<T> memoize(Supplier<T> original) {
return new Supplier<T>() {
Supplier<T> mDelegate = this::firstTime;
boolean mInitialized;
public T get() {
return mDelegate.get();
}

private synchronized T firstTime() {
if (!mInitialized) {
T value = original.get();
mDelegate = () -> value;
mInitialized = true;
}
return mDelegate.get();
}
};
}

private CommonUtils() {} // prevent instantiation
}
@@ -17,7 +17,6 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.function.Supplier;

import javax.annotation.concurrent.ThreadSafe;

@@ -146,32 +145,6 @@ public static String getBucketName(AlluxioURI uri) {
return uri.getAuthority().toString();
}

/**
* Memoize implementation for java.util.function.supplier.
*
* @param original the original supplier
* @param <T> the object type
* @return the supplier with memorization
*/
public static <T> Supplier<T> memoize(Supplier<T> original) {
return new Supplier<T>() {
Supplier<T> mDelegate = this::firstTime;
boolean mInitialized;
public T get() {
return mDelegate.get();
}

private synchronized T firstTime() {
if (!mInitialized) {
T value = original.get();
mDelegate = () -> value;
mInitialized = true;
}
return mDelegate.get();
}
};
}

/**
* Returns an approximate content hash, using the length and modification time.
*
@@ -188,6 +188,7 @@ public String getName() {
public void start(Boolean isLeader) throws IOException {
super.start(isLeader);
if (isLeader) {
mMetricsStore.clear();
getExecutorService().submit(mClusterMetricsUpdater);
}
}
@@ -21,10 +21,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.annotation.concurrent.GuardedBy;
@@ -95,9 +93,7 @@ public void putWorkerMetrics(String hostname, List<Metric> metrics) {
if (metrics.isEmpty()) {
return;
}
synchronized (mWorkerMetrics) {
putReportedMetrics(mWorkerMetrics, getFullInstanceId(hostname, null), metrics);
}
putReportedMetrics(mWorkerMetrics, getFullInstanceId(hostname, null), metrics);
}

/**
@@ -112,10 +108,7 @@ public void putClientMetrics(String hostname, String clientId, List<Metric> metr
if (metrics.isEmpty()) {
return;
}
LOG.debug("Removing metrics for id {} to replace with {}", clientId, metrics);
synchronized (mClientMetrics) {
putReportedMetrics(mClientMetrics, getFullInstanceId(hostname, clientId), metrics);
}
putReportedMetrics(mClientMetrics, getFullInstanceId(hostname, clientId), metrics);
}

/**
@@ -132,7 +125,6 @@ public void putClientMetrics(String hostname, String clientId, List<Metric> metr
*/
private static void putReportedMetrics(IndexedSet<Metric> metricSet, String instanceId,
List<Metric> reportedMetrics) {
List<Metric> newMetrics = new ArrayList<>(reportedMetrics.size());
for (Metric metric : reportedMetrics) {
if (metric.getHostname() == null) {
continue; // ignore metrics whose hostname is null
@@ -141,25 +133,17 @@ private static void putReportedMetrics(IndexedSet<Metric> metricSet, String inst
// If a metric is COUNTER, the value sent via RPC should be the incremental value; i.e.
// the amount the value has changed since the last RPC. The master should equivalently
// increment its value based on the received metric rather than replacing it.
if (metric.getMetricType() == MetricType.COUNTER) {
// FULL_NAME_INDEX is a unique index, so getFirstByField will return the same results as
// getByField
if (!metricSet.add(metric)) {
Metric oldMetric = metricSet.getFirstByField(FULL_NAME_INDEX, metric.getFullMetricName());
double oldVal = oldMetric == null ? 0.0 : oldMetric.getValue();
Metric newMetric = new Metric(metric.getInstanceType(), metric.getHostname(),
metric.getMetricType(), metric.getName(), oldVal + metric.getValue());
for (Map.Entry<String, String> tag : metric.getTags().entrySet()) {
newMetric.addTag(tag.getKey(), tag.getValue());
if (metric.getMetricType() == MetricType.COUNTER) {
if (metric.getValue() != 0L) {
oldMetric.addValue(metric.getValue());
}
} else {
oldMetric.setValue(metric.getValue());
}
metricSet.removeByField(FULL_NAME_INDEX, metric.getFullMetricName());
newMetrics.add(newMetric);
} else {
metricSet.removeByField(FULL_NAME_INDEX, metric.getFullMetricName());
newMetrics.add(metric);
}
}
metricSet.removeByField(ID_INDEX, instanceId);
metricSet.addAll(newMetrics);
}

/**
@@ -177,13 +161,9 @@ private static void putReportedMetrics(IndexedSet<Metric> metricSet, String inst
}

if (instanceType == InstanceType.WORKER) {
synchronized (mWorkerMetrics) {
return mWorkerMetrics.getByField(NAME_INDEX, name);
}
return mWorkerMetrics.getByField(NAME_INDEX, name);
} else if (instanceType == InstanceType.CLIENT) {
synchronized (mClientMetrics) {
return mClientMetrics.getByField(NAME_INDEX, name);
}
return mClientMetrics.getByField(NAME_INDEX, name);
} else {
throw new IllegalArgumentException("Unsupported instance type " + instanceType);
}
@@ -201,13 +181,14 @@ private static void putReportedMetrics(IndexedSet<Metric> metricSet, String inst

/**
* Clears all the metrics.
*
* This method should only be called when starting the {@link DefaultMetricsMaster}
* and before starting the metrics updater to avoid conflicts with
* other methods in this class which updates or accesses
* the metrics inside metrics sets.
*/
public void clear() {
synchronized (mWorkerMetrics) {
mWorkerMetrics.clear();
}
synchronized (mClientMetrics) {
mClientMetrics.clear();
}
mWorkerMetrics.clear();
mClientMetrics.clear();
}
}
@@ -94,7 +94,7 @@ public void testAggregator() {
Metric.from("worker.192_1_1_2.metricA", 3, MetricType.GAUGE));
mMetricsMaster.workerHeartbeat("192_1_1_2", metrics3);
assertEquals(13L, getGauge("metricA"));
assertEquals(20L, getGauge("metricB"));
assertEquals(22L, getGauge("metricB"));
}

@Test
@@ -69,7 +69,7 @@

/** The permissions associated with the bucket. Fetched once and assumed to be immutable. */
private final Supplier<ObjectPermissions> mPermissions
= UnderFileSystemUtils.memoize(this::getPermissionsInternal);
= CommonUtils.memoize(this::getPermissionsInternal);

static {
try {
@@ -105,7 +105,7 @@

/** The permissions associated with the bucket. Fetched once and assumed to be immutable. */
private final Supplier<ObjectPermissions> mPermissions
= UnderFileSystemUtils.memoize(this::getPermissionsInternal);
= CommonUtils.memoize(this::getPermissionsInternal);

static {
byte[] dirByteHash = DigestUtils.md5(new byte[0]);

0 comments on commit c2d580b

Please sign in to comment.
You can’t perform that action at this time.