Skip to content

Commit

Permalink
Fix client side leak caused by Gauge
Browse files Browse the repository at this point in the history
Cherry-pick of existing commit.
orig-pr: #15354
orig-commit: 2f2894b
orig-commit-author: Jiacheng Liu <jiacheliu3@gmail.com>

pr-link: #15419
change-id: cid-ea02704715059d8b28a5d1613a48e3cfece986d2
  • Loading branch information
alluxio-bot committed Apr 21, 2022
1 parent 02bfb65 commit 1f54873
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 39 deletions.
Expand Up @@ -13,10 +13,14 @@

import alluxio.conf.PropertyKey;
import alluxio.master.MasterClientContext;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.resource.DynamicResourcePool;
import alluxio.resource.ResourcePool;
import alluxio.util.ThreadFactoryUtils;

import com.codahale.metrics.Counter;

import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -36,6 +40,8 @@ public final class BlockMasterClientPool extends DynamicResourcePool<BlockMaster
private static final ScheduledExecutorService GC_EXECUTOR =
new ScheduledThreadPoolExecutor(BLOCK_MASTER_CLIENT_POOL_GC_THREADPOOL_SIZE,
ThreadFactoryUtils.build("BlockMasterClientPoolGcThreads-%d", true));
private static final Counter COUNTER = MetricsSystem.counter(
MetricKey.CLIENT_BLOCK_MASTER_CLIENT_COUNT.getName());

/**
* Creates a new block master client pool.
Expand Down Expand Up @@ -75,6 +81,11 @@ protected boolean isHealthy(BlockMasterClient client) {
return client.isConnected();
}

@Override
protected Counter getMetricCounter() {
return COUNTER;
}

@Override
protected boolean shouldGc(ResourceInternal<BlockMasterClient> clientResourceInternal) {
return System.currentTimeMillis()
Expand Down
Expand Up @@ -14,10 +14,13 @@
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.GrpcServerAddress;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.resource.DynamicResourcePool;
import alluxio.security.user.UserState;
import alluxio.util.ThreadFactoryUtils;

import com.codahale.metrics.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,6 +43,8 @@ public final class BlockWorkerClientPool extends DynamicResourcePool<BlockWorker
private static final ScheduledExecutorService GC_EXECUTOR =
new ScheduledThreadPoolExecutor(WORKER_CLIENT_POOL_GC_THREADPOOL_SIZE,
ThreadFactoryUtils.build("BlockWorkerClientPoolGcThreads-%d", true));
private static final Counter COUNTER = MetricsSystem.counter(
MetricKey.CLIENT_BLOCK_WORKER_CLIENT_COUNT.getName());
private final AlluxioConfiguration mConf;

/**
Expand Down Expand Up @@ -82,6 +87,11 @@ protected boolean isHealthy(BlockWorkerClient client) {
return client.isHealthy();
}

@Override
protected Counter getMetricCounter() {
return COUNTER;
}

@Override
protected boolean shouldGc(ResourceInternal<BlockWorkerClient> clientResourceInternal) {
return System.currentTimeMillis() - clientResourceInternal.getLastAccessTimeMs()
Expand Down
Expand Up @@ -32,7 +32,6 @@
import alluxio.grpc.GrpcServerAddress;
import alluxio.master.MasterClientContext;
import alluxio.master.MasterInquireClient;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.refresh.RefreshPolicy;
import alluxio.refresh.TimeoutRefresh;
Expand All @@ -46,7 +45,6 @@
import alluxio.wire.WorkerNetAddress;
import alluxio.worker.block.BlockWorker;

import com.codahale.metrics.CachedGauge;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
Expand All @@ -61,7 +59,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
Expand Down Expand Up @@ -94,8 +91,6 @@
@ThreadSafe
public class FileSystemContext implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FileSystemContext.class);
private static final String TOTAL_RPC_CLIENTS_METRICS_NAME
= MetricsSystem.getMetricName(MetricKey.CLIENT_TOTAL_RPC_CLIENTS.getName());

/**
* Unique ID for each FileSystemContext.
Expand Down Expand Up @@ -278,17 +273,6 @@ private synchronized void initContext(ClientContext ctx,
mBlockMasterClientPool = new BlockMasterClientPool(mMasterClientContext);
mBlockWorkerClientPoolMap = new ConcurrentHashMap<>();
mUriValidationEnabled = ctx.getUriValidationEnabled();

MetricsSystem.registerGaugeIfAbsent(TOTAL_RPC_CLIENTS_METRICS_NAME,
new CachedGauge<Integer>(1, TimeUnit.MINUTES) {
@Override
protected Integer loadValue() {
int totalClients = mFileSystemMasterClientPool.size() + mBlockMasterClientPool.size();
totalClients += mBlockWorkerClientPoolMap.values()
.stream().mapToInt(DynamicResourcePool::size).sum();
return totalClients;
}
});
}

/**
Expand All @@ -313,7 +297,6 @@ private synchronized void closeContext() throws IOException {
// developers should first mark their resources as closed prior to any exceptions being
// thrown.
mClosed.set(true);
MetricsSystem.removeMetrics(TOTAL_RPC_CLIENTS_METRICS_NAME);
LOG.debug("Closing fs master client pool with current size: {} for id: {}",
mFileSystemMasterClientPool.size(), mId);
mFileSystemMasterClientPool.close();
Expand Down
Expand Up @@ -13,9 +13,13 @@

import alluxio.conf.PropertyKey;
import alluxio.master.MasterClientContext;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.resource.DynamicResourcePool;
import alluxio.util.ThreadFactoryUtils;

import com.codahale.metrics.Counter;

import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -33,6 +37,8 @@ public final class FileSystemMasterClientPool extends DynamicResourcePool<FileSy
private static final ScheduledExecutorService GC_EXECUTOR =
new ScheduledThreadPoolExecutor(FS_MASTER_CLIENT_POOL_GC_THREADPOOL_SIZE,
ThreadFactoryUtils.build("FileSystemMasterClientPoolGcThreads-%d", true));
private static final Counter COUNTER = MetricsSystem.counter(
MetricKey.CLIENT_FILE_SYSTEM_MASTER_CLIENT_COUNT.getName());

/**
* Creates a new file system master client pool.
Expand Down Expand Up @@ -72,6 +78,11 @@ protected boolean isHealthy(FileSystemMasterClient client) {
return client.isConnected();
}

@Override
protected Counter getMetricCounter() {
return COUNTER;
}

@Override
protected boolean shouldGc(ResourceInternal<FileSystemMasterClient> clientResourceInternal) {
return System.currentTimeMillis() - clientResourceInternal
Expand Down
33 changes: 24 additions & 9 deletions core/common/src/main/java/alluxio/metrics/MetricKey.java
Expand Up @@ -1904,15 +1904,6 @@ public MetricKey build() {
.setMetricType(MetricType.COUNTER)
.setIsClusterAggregated(false)
.build();
public static final MetricKey CLIENT_TOTAL_RPC_CLIENTS =
new Builder("Client.TotalRPCClients")
.setDescription("The total number of RPC clients exist that is using to "
+ "or can be used to connect to master or worker for operations. "
+ "The sum of the sizes of FileSystemMasterClientPool, "
+ "BlockMasterClientPool, and BlockWorkerClientPool.")
.setMetricType(MetricType.COUNTER)
.setIsClusterAggregated(false)
.build();
public static final MetricKey CLIENT_META_DATA_CACHE_SIZE =
new Builder("Client.MetadataCacheSize")
.setDescription("The total number of files and directories whose metadata is cached "
Expand All @@ -1921,6 +1912,30 @@ public MetricKey build() {
.setMetricType(MetricType.GAUGE)
.setIsClusterAggregated(false)
.build();
public static final MetricKey CLIENT_FILE_SYSTEM_MASTER_CLIENT_COUNT =
new Builder("Client.FileSystemMasterClientCount")
.setDescription("Number of instances in the FileSystemMasterClientPool.")
.setMetricType(MetricType.COUNTER)
.setIsClusterAggregated(false)
.build();
public static final MetricKey CLIENT_BLOCK_MASTER_CLIENT_COUNT =
new Builder("Client.BlockMasterClientCount")
.setDescription("Number of instances in the BlockMasterClientPool.")
.setMetricType(MetricType.COUNTER)
.setIsClusterAggregated(false)
.build();
public static final MetricKey CLIENT_BLOCK_WORKER_CLIENT_COUNT =
new Builder("Client.BlockWorkerClientCount")
.setDescription("Number of instances in the BlockWorkerClientPool.")
.setMetricType(MetricType.COUNTER)
.setIsClusterAggregated(false)
.build();
public static final MetricKey CLIENT_DEFAULT_HIVE_CLIENT_COUNT =
new Builder("Client.DefaultHiveClientCount")
.setDescription("Number of instances in the DefaultHiveClientPool.")
.setMetricType(MetricType.COUNTER)
.setIsClusterAggregated(false)
.build();

// Fuse operation timer and failure counter metrics are added dynamically.
// Other Fuse related metrics are added here
Expand Down
3 changes: 1 addition & 2 deletions core/common/src/main/java/alluxio/metrics/MetricsSystem.java
Expand Up @@ -288,8 +288,7 @@ public static synchronized int getNumSinks() {
* @return metrics string
*/
public static String getResourcePoolMetricName(Object obj) {
return MetricsSystem.getMetricName("ResourcePool." + obj.getClass().getName() + "."
+ Integer.toHexString(System.identityHashCode(obj)));
return MetricsSystem.getMetricName("ResourcePool." + obj.getClass().getSimpleName());
}

/**
Expand Down
Expand Up @@ -13,8 +13,8 @@

import alluxio.Constants;
import alluxio.clock.SystemClock;
import alluxio.metrics.MetricsSystem;

import com.codahale.metrics.Counter;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -221,6 +221,7 @@ public static Options defaultOptions() {
// any performance overhead.
private final ConcurrentHashMap<T, ResourceInternal<T>> mResources =
new ConcurrentHashMap<>(32);
private final Counter mCounter;

// Thread to scan mAvailableResources to close those resources that are old.
private ScheduledExecutorService mExecutor;
Expand All @@ -235,11 +236,11 @@ public static Options defaultOptions() {
*/
public DynamicResourcePool(Options options) {
mExecutor = Preconditions.checkNotNull(options.getGcExecutor(), "executor");

mCounter = Preconditions.checkNotNull(getMetricCounter(),
"cannot find resource count metric for %s", getClass().getName());
mMaxCapacity = options.getMaxCapacity();
mMinCapacity = options.getMinCapacity();
mAvailableResources = new ArrayDeque<>(Math.min(mMaxCapacity, 32));

mGcFuture = mExecutor.scheduleAtFixedRate(() -> {
List<T> resourcesToGc = new ArrayList<>();

Expand All @@ -256,6 +257,7 @@ public DynamicResourcePool(Options options) {
resourcesToGc.add(next.mResource);
iterator.remove();
mResources.remove(next.mResource);
mCounter.dec();
currentSize--;
if (currentSize <= mMinCapacity) {
break;
Expand All @@ -275,15 +277,9 @@ public DynamicResourcePool(Options options) {
}
}
}, options.getInitialDelayMs(), options.getGcIntervalMs(), TimeUnit.MILLISECONDS);

registerGauges();
}

private void registerGauges() {
MetricsSystem.registerGaugeIfAbsent(
MetricsSystem.getResourcePoolMetricName(this),
() -> size());
}
protected abstract Counter getMetricCounter();

/**
* Acquires a resource of type {code T} from the pool.
Expand Down Expand Up @@ -436,6 +432,7 @@ private boolean add(ResourceInternal<T> resource) {
return false;
} else {
mResources.put(resource.mResource, resource);
mCounter.inc();
return true;
}
} finally {
Expand All @@ -452,6 +449,7 @@ private void remove(T resource) {
try {
mLock.lock();
mResources.remove(resource);
mCounter.dec();
} finally {
mLock.unlock();
}
Expand Down
Expand Up @@ -17,8 +17,10 @@

import alluxio.Constants;
import alluxio.clock.ManualClock;
import alluxio.metrics.MetricsSystem;
import alluxio.util.ThreadFactoryUtils;

import com.codahale.metrics.Counter;
import org.junit.Test;

import java.util.ArrayList;
Expand Down Expand Up @@ -54,7 +56,7 @@ public Resource(Integer i) {
}

/**
* Sets the the number representing current capacity of Resource and returns Resource Object.
* Sets the number representing current capacity of Resource and returns Resource Object.
*
* @param i the value of member variable represents the current capacity of Resource
* @return the Resource object
Expand Down Expand Up @@ -95,6 +97,11 @@ public TestPool(Options options) {
super(options.setGcExecutor(GC_EXECUTOR));
}

@Override
protected Counter getMetricCounter() {
return MetricsSystem.counter("Test.DynamicResourcePoolResourceCount");
}

@Override
protected boolean shouldGc(ResourceInternal<Resource> resourceInternal) {
return mClock.millis() - resourceInternal.getLastAccessTimeMs()
Expand Down
Expand Up @@ -16,9 +16,12 @@

import alluxio.Constants;
import alluxio.conf.ServerConfiguration;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.resource.CloseableResource;
import alluxio.util.ThreadFactoryUtils;

import com.codahale.metrics.Counter;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
Expand All @@ -39,6 +42,8 @@ public final class DefaultHiveClientPool extends AbstractHiveClientPool {
private static final ScheduledExecutorService GC_EXECUTOR =
new ScheduledThreadPoolExecutor(1, ThreadFactoryUtils.build("HiveClientPool-GC-%d", true));
private static final HiveMetaHookLoader NOOP_HOOK = table -> null;
private static final Counter COUNTER = MetricsSystem.counter(
MetricKey.CLIENT_DEFAULT_HIVE_CLIENT_COUNT.getName());

private final long mGcThresholdMs;
private final String mConnectionUri;
Expand Down Expand Up @@ -93,6 +98,11 @@ protected boolean isHealthy(IMetaStoreClient client) {
return true;
}

@Override
protected Counter getMetricCounter() {
return COUNTER;
}

@Override
protected boolean shouldGc(ResourceInternal<IMetaStoreClient> clientResourceInternal) {
return System.currentTimeMillis() - clientResourceInternal
Expand Down

0 comments on commit 1f54873

Please sign in to comment.