Skip to content

Commit

Permalink
Make the addition of client metrics heartbeats asynchronous
Browse files Browse the repository at this point in the history
When client metrics is enabled, the master starts up takes four more
minutes. The four more minutes are taken by
`PollingMasterInquireClient.getPrimaryRpcAddress` in
`FileSystemContext.initContext`. `getPrimaryRpcAddress()` retries 44
times and takes 2 minutes each call which heavily delays the master
starts up. The primary rpc address is used for load configuration from
the leading master and initialize metrics heartbeat.

This PR put the load configuration logics in the metrics heartbeat
thread, so that client metrics logic in FileSystemContext will not block
actual operations.

pr-link: #10936
change-id: cid-116bac6350e928d788669c307a4f8bff578c835f
  • Loading branch information
LuQQiu committed Feb 21, 2020
1 parent f6ae689 commit 7c31190
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 27 deletions.
Expand Up @@ -231,15 +231,6 @@ private synchronized void initContext(ClientContext ctx,
.setMasterInquireClient(masterInquireClient).build();
mMetricsEnabled = getClusterConf().getBoolean(PropertyKey.USER_METRICS_COLLECTION_ENABLED);
if (mMetricsEnabled) {
try {
InetSocketAddress masterAddr = masterInquireClient.getPrimaryRpcAddress();
mMasterClientContext.loadConf(masterAddr, true, true);
} catch (UnavailableException e) {
LOG.error("Failed to get master address during initialization", e);
} catch (AlluxioStatusException ae) {
LOG.error("Failed to load configuration from "
+ "meta master during initialization", ae);
}
MetricsSystem.startSinks(getClusterConf().get(PropertyKey.METRICS_CONF_FILE));
MetricsHeartbeatContext.addHeartbeat(getClientContext(), masterInquireClient);
}
Expand Down
Expand Up @@ -11,9 +11,13 @@

package alluxio.client.metrics;

import alluxio.ClientContext;
import alluxio.Constants;
import alluxio.conf.AlluxioConfiguration;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.ClientMetrics;
import alluxio.master.MasterClientContext;
import alluxio.master.MasterInquireClient;
import alluxio.metrics.MetricsSystem;
import alluxio.util.logging.SamplingLogger;

Expand All @@ -22,6 +26,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -39,31 +44,42 @@ public final class ClientMasterSync {
private static final Logger SAMPLING_LOG =
new SamplingLogger(LoggerFactory.getLogger(ClientMasterSync.class), 30 * Constants.SECOND_MS);

private final String mApplicationId;
private final MasterInquireClient mInquireClient;
private final ClientContext mContext;

/**
* Client for communicating to metrics master.
*/
private final MetricsMasterClient mMasterClient;
private final String mApplicationId;
private final AlluxioConfiguration mConf;
private RetryHandlingMetricsMasterClient mMasterClient;

/**
* Constructs a new {@link ClientMasterSync}.
*
* @param appId the application id to send with metrics
* @param masterClient the master client
* @param conf Alluxio configuration
* @param ctx client context
* @param inquireClient the master inquire client
*/
public ClientMasterSync(String appId, MetricsMasterClient masterClient,
AlluxioConfiguration conf) {
mMasterClient = Preconditions.checkNotNull(masterClient, "masterClient");
public ClientMasterSync(String appId, ClientContext ctx, MasterInquireClient inquireClient) {
mApplicationId = Preconditions.checkNotNull(appId);
mConf = conf;
mInquireClient = inquireClient;
mContext = ctx;
}

/**
* Sends metrics to the master keyed with appId and client hostname.
*/
public synchronized void heartbeat() {
if (mMasterClient == null) {
if (loadConf()) {
mMasterClient = new RetryHandlingMetricsMasterClient(MasterClientContext
.newBuilder(mContext)
.setMasterInquireClient(mInquireClient)
.build());
} else {
return; // not heartbeat when failed to load conf
}
}
// TODO(zac): Support per FileSystem instance metrics
// Currently we only support JVM-level metrics. A list is used here because in the near
// future we will support sending per filesystem client-level metrics.
Expand All @@ -85,4 +101,33 @@ public synchronized void heartbeat() {
SAMPLING_LOG.warn("Failed to send metrics to master: {}", e.toString());
}
}

/**
* Close the metrics master client.
*/
public synchronized void close() {
if (mMasterClient != null) {
mMasterClient.close();
}
}

/**
* Loads configuration.
*
* @return true if successfully loaded configuration
*/
private boolean loadConf() {
try {
InetSocketAddress masterAddr = mInquireClient.getPrimaryRpcAddress();
mContext.loadConf(masterAddr, true, false);
} catch (UnavailableException e) {
SAMPLING_LOG.error("Failed to get master address during initialization: {}", e.toString());
return false;
} catch (AlluxioStatusException ae) {
SAMPLING_LOG.error("Failed to load configuration from "
+ "meta master during initialization: {}", ae.toString());
return false;
}
return true;
}
}
Expand Up @@ -14,7 +14,6 @@
import alluxio.ClientContext;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.master.MasterClientContext;
import alluxio.master.MasterInquireClient;
import alluxio.util.IdUtils;
import alluxio.util.ThreadFactoryUtils;
Expand Down Expand Up @@ -67,7 +66,6 @@ public class MetricsHeartbeatContext {
private static ScheduledExecutorService sExecutorService;

private final MasterInquireClient.ConnectDetails mConnectDetails;
private final RetryHandlingMetricsMasterClient mMetricsMasterClient;
private final ClientMasterSync mClientMasterSync;
private final AlluxioConfiguration mConf;

Expand All @@ -79,11 +77,7 @@ private MetricsHeartbeatContext(ClientContext ctx, MasterInquireClient inquireCl
mCtxCount = 0;
mConnectDetails = inquireClient.getConnectDetails();
mConf = ctx.getClusterConf();
mMetricsMasterClient = new RetryHandlingMetricsMasterClient(MasterClientContext
.newBuilder(ctx)
.setMasterInquireClient(inquireClient)
.build());
mClientMasterSync = new ClientMasterSync(sAppId, mMetricsMasterClient, mConf);
mClientMasterSync = new ClientMasterSync(sAppId, ctx, inquireClient);
}

private synchronized void addContext() {
Expand Down Expand Up @@ -129,7 +123,7 @@ private synchronized void close() {
MASTER_METRICS_HEARTBEAT.remove(mConnectDetails);
// Trigger the last heartbeat to preserve the client side metrics changes
heartbeat();
mMetricsMasterClient.close();
mClientMasterSync.close();
}

/**
Expand Down

0 comments on commit 7c31190

Please sign in to comment.