Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detailed logs #133

Merged
merged 6 commits into from
Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
152 changes: 107 additions & 45 deletions grpc-gcp/src/main/java/com/google/cloud/grpc/GcpManagedChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import com.google.cloud.grpc.proto.ApiConfig;
import com.google.cloud.grpc.proto.MethodConfig;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.TextFormat;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ConnectivityState;
Expand All @@ -46,7 +48,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
Expand Down Expand Up @@ -109,7 +110,6 @@ public class GcpManagedChannel extends ManagedChannel {
String.format("pool-%d", channelPoolIndex.incrementAndGet());
private final Map<String, Long> cumulativeMetricValues = new ConcurrentHashMap<>();
private ScheduledExecutorService logMetricService;
private String metricsOptionsToLog;

// Metrics counters.
private final AtomicInteger readyChannels = new AtomicInteger();
Expand All @@ -136,8 +136,8 @@ public class GcpManagedChannel extends ManagedChannel {
private final AtomicLong totalErrCalls = new AtomicLong();
private boolean minErrReported = false;
private boolean maxErrReported = false;
private int minAffinity = 0;
private int maxAffinity = 0;
private final AtomicInteger minAffinity = new AtomicInteger();
private final AtomicInteger maxAffinity = new AtomicInteger();
private final AtomicInteger totalAffinityCount = new AtomicInteger();
private final AtomicLong fallbacksSucceeded = new AtomicLong();
private final AtomicLong fallbacksFailed = new AtomicLong();
Expand All @@ -161,6 +161,11 @@ public GcpManagedChannel(
loadApiConfig(apiConfig);
this.delegateChannelBuilder = delegateChannelBuilder;
this.options = options;
logger.finer(log(
"Created with api config: %s, and options: %s",
apiConfig == null ? "null" : TextFormat.shortDebugString(apiConfig),
options
));
initOptions();
if (options.getResiliencyOptions() != null) {
fallbackEnabled = options.getResiliencyOptions().isNotReadyFallbackEnabled();
Expand Down Expand Up @@ -195,6 +200,7 @@ public GcpManagedChannel(
GcpManagedChannelOptions options) {
this(delegateChannelBuilder, apiConfig, options);
if (poolSize != 0) {
logger.finer(log("Pool size adjusted to %d", poolSize));
this.maxSize = poolSize;
}
}
Expand All @@ -207,6 +213,10 @@ private String log(String message) {
return String.format("%s: %s", metricPoolIndex, message);
}

private String log(String format, Object... args) {
return String.format("%s: %s", metricPoolIndex, String.format(format, args));
}

private void initOptions() {
GcpManagedChannelOptions.GcpChannelPoolOptions poolOptions = options.getChannelPoolOptions();
if (poolOptions != null) {
Expand All @@ -225,32 +235,22 @@ private synchronized void initLogMetrics() {
}

private void logMetricsOptions() {
if (metricsOptionsToLog != null) {
logger.fine(log(metricsOptionsToLog));
return;
}
final GcpMetricsOptions metricsOptions = options.getMetricsOptions();
if (metricsOptions == null) {
return;
if (options.getMetricsOptions() != null) {
logger.fine(log("Metrics options: %s", options.getMetricsOptions()));
}
}

Iterator<LabelKey> keyIterator = metricsOptions.getLabelKeys().iterator();
Iterator<LabelValue> valueIterator = metricsOptions.getLabelValues().iterator();

final List<String> tags = new ArrayList<>();
while (keyIterator.hasNext() && valueIterator.hasNext()) {
tags.add(
String.format("%s = %s", keyIterator.next().getKey(), valueIterator.next().getValue())
);
}

metricsOptionsToLog = String.format(
"Metrics name prefix = \"%s\", tags: %s",
metricsOptions.getNamePrefix(),
String.join(", ", tags)
);

logger.fine(log(metricsOptionsToLog));
private void logChannelsStats() {
logger.fine(log(
"Active streams counts: [%s]", Joiner.on(", ").join(
channelRefs.stream().mapToInt(ChannelRef::getActiveStreamsCount).iterator()
)
));
logger.fine(log(
"Affinity counts: [%s]", Joiner.on(", ").join(
channelRefs.stream().mapToInt(ChannelRef::getAffinityCount).iterator()
)
));
}

private void initMetrics() {
Expand Down Expand Up @@ -467,7 +467,7 @@ private void initMetrics() {
}

private void logGauge(String key, long value) {
logger.fine(log(String.format("stat: %s = %d", key, value)));
logger.fine(log("stat: %s = %d", key, value));
}

private void logCumulative(String key, long value) {
Expand All @@ -478,8 +478,10 @@ private void logCumulative(String key, long value) {
}));
}

private void logMetrics() {
@VisibleForTesting
void logMetrics() {
logMetricsOptions();
logChannelsStats();
reportMinReadyChannels();
reportMaxReadyChannels();
reportMaxChannels();
Expand Down Expand Up @@ -668,15 +670,17 @@ private int reportMaxTotalActiveStreams() {
}

private int reportMinAffinity() {
int value = minAffinity;
minAffinity = channelRefs.stream().mapToInt(ChannelRef::getAffinityCount).min().orElse(0);
int value = minAffinity.getAndSet(
channelRefs.stream().mapToInt(ChannelRef::getAffinityCount).min().orElse(0)
);
logGauge(GcpMetricsConstants.METRIC_MIN_AFFINITY, value);
return value;
}

private int reportMaxAffinity() {
int value = maxAffinity;
maxAffinity = channelRefs.stream().mapToInt(ChannelRef::getAffinityCount).max().orElse(0);
int value = maxAffinity.getAndSet(
channelRefs.stream().mapToInt(ChannelRef::getAffinityCount).max().orElse(0)
);
logGauge(GcpMetricsConstants.METRIC_MAX_AFFINITY, value);
return value;
}
Expand Down Expand Up @@ -707,14 +711,31 @@ private long reportTotalOkCalls() {
return value;
}

private LongSummaryStatistics calcStatsAndLog(String logLabel, ToLongFunction<ChannelRef> func) {
StringBuilder str = new StringBuilder(logLabel + ": [");
final LongSummaryStatistics stats =
channelRefs.stream().mapToLong(ch -> {
long count = func.applyAsLong(ch);
if (str.charAt(str.length() - 1) != '[') {
str.append(", ");
}
str.append(count);
return count;
}).summaryStatistics();

str.append("]");
logger.fine(log(str.toString()));
return stats;
}

private void calcMinMaxOkCalls() {
if (minOkReported && maxOkReported) {
minOkReported = false;
maxOkReported = false;
return;
}
final LongSummaryStatistics stats =
channelRefs.stream().mapToLong(ChannelRef::getAndResetOkCalls).summaryStatistics();
calcStatsAndLog("Ok calls", ChannelRef::getAndResetOkCalls);
minOkCalls = stats.getMin();
maxOkCalls = stats.getMax();
}
Expand Down Expand Up @@ -746,7 +767,7 @@ private void calcMinMaxErrCalls() {
return;
}
final LongSummaryStatistics stats =
channelRefs.stream().mapToLong(ChannelRef::getAndResetErrCalls).summaryStatistics();
calcStatsAndLog("Failed calls", ChannelRef::getAndResetErrCalls);
minErrCalls = stats.getMin();
maxErrCalls = stats.getMax();
}
Expand Down Expand Up @@ -865,6 +886,9 @@ public void run() {
return;
}
ConnectivityState newState = channel.getState(false);
logger.finer(
log("Channel %d state change detected: %s -> %s", channelId, currentState, newState)
);
if (newState == ConnectivityState.READY && currentState != ConnectivityState.READY) {
incReadyChannels();
saveReadinessTime(System.nanoTime() - connectingStartNanos);
Expand Down Expand Up @@ -925,10 +949,14 @@ public int getMaxActiveStreams() {
* @return {@link ChannelRef} channel to use for a call.
*/
protected ChannelRef getChannelRefForBind() {
ChannelRef channelRef;
if (options.getChannelPoolOptions() != null && options.getChannelPoolOptions().isUseRoundRobinOnBind()) {
return getChannelRefRoundRobin();
channelRef = getChannelRefRoundRobin();
} else {
channelRef = getChannelRef(null);
}
return getChannelRef(null);
logger.finest(log("Channel %d picked for bind operation.", channelRef.getId()));
return channelRef;
}

/**
Expand Down Expand Up @@ -981,6 +1009,7 @@ protected ChannelRef getChannelRef(@Nullable String key) {
Integer channelId = tempMap.get(key);
if (channelId != null && !fallbackMap.containsKey(channelId)) {
// Fallback channel is ready.
logger.finest(log("Using fallback channel: %d -> %d", mappedChannel.getId(), channelId));
fallbacksSucceeded.incrementAndGet();
return channelRefs.get(channelId);
}
Expand All @@ -990,11 +1019,15 @@ protected ChannelRef getChannelRef(@Nullable String key) {
&& channelRef.getActiveStreamsCount() < DEFAULT_MAX_STREAM) {
// Got a ready and not an overloaded channel.
if (channelRef.getId() != mappedChannel.getId()) {
logger.finest(log(
"Setting fallback channel: %d -> %d", mappedChannel.getId(), channelRef.getId()
));
fallbacksSucceeded.incrementAndGet();
tempMap.put(key, channelRef.getId());
}
return channelRef;
}
logger.finest(log("Failed to find fallback for channel %d", mappedChannel.getId()));
fallbacksFailed.incrementAndGet();
if (channelId != null) {
// Stick with previous mapping if fallback has failed.
Expand All @@ -1009,6 +1042,7 @@ private synchronized ChannelRef createNewChannel() {
final int size = channelRefs.size();
ChannelRef channelRef = new ChannelRef(delegateChannelBuilder.build(), size);
channelRefs.add(channelRef);
logger.finer(log("Channel %d created.", channelRef.getId()));
return channelRef;
}

Expand Down Expand Up @@ -1052,19 +1086,26 @@ private ChannelRef pickLeastBusyChannel(boolean forFallback) {
}

if (channelRefs.size() < maxSize && readyMinStreams >= maxConcurrentStreamsLowWatermark) {
if (!forFallback) {
if (!forFallback && readyCandidate == null) {
logger.finest(log("Fallback to newly created channel"));
fallbacksSucceeded.incrementAndGet();
}
return createNewChannel();
}

if (readyCandidate != null) {
if (!forFallback && readyCandidate.getId() != channelCandidate.getId()) {
logger.finest(log(
"Picking fallback channel: %d -> %d", channelCandidate.getId(), readyCandidate.getId()));
fallbacksSucceeded.incrementAndGet();
}
return readyCandidate;
}

if (!forFallback) {
logger.finest(log("Failed to find fallback for channel %d", channelCandidate.getId()));
fallbacksFailed.incrementAndGet();
}
return channelCandidate;
}

Expand Down Expand Up @@ -1099,6 +1140,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(

@Override
public ManagedChannel shutdownNow() {
logger.finer(log("Shutdown now started."));
for (ChannelRef channelRef : channelRefs) {
if (!channelRef.getChannel().isTerminated()) {
channelRef.getChannel().shutdownNow();
Expand All @@ -1112,6 +1154,7 @@ public ManagedChannel shutdownNow() {

@Override
public ManagedChannel shutdown() {
logger.finer(log("Shutdown started."));
for (ChannelRef channelRef : channelRefs) {
channelRef.getChannel().shutdown();
}
Expand Down Expand Up @@ -1222,6 +1265,12 @@ protected void bind(ChannelRef channelRef, List<String> affinityKeys) {
if (channelRef == null || affinityKeys == null) {
return;
}
logger.finest(log(
"Binding %d key(s) to channel %d: [%s]",
affinityKeys.size(),
channelRef.getId(),
String.join(", ", affinityKeys)
));
for (String affinityKey : affinityKeys) {
while (affinityKeyToChannelRef.putIfAbsent(affinityKey, channelRef) != null) {
unbind(Collections.singletonList(affinityKey));
Expand All @@ -1239,6 +1288,9 @@ protected void unbind(List<String> affinityKeys) {
ChannelRef channelRef = affinityKeyToChannelRef.remove(affinityKey);
if (channelRef != null) {
channelRef.affinityCountDecr();
logger.finest(log("Unbinding key %s from channel %d.", affinityKey, channelRef.getId()));
} else {
logger.finest(log("Unbinding key %s but it wasn't bound.", affinityKey));
}
}
}
Expand Down Expand Up @@ -1386,12 +1438,14 @@ protected int getId() {
}

protected void affinityCountIncr() {
affinityCount.incrementAndGet();
int count = affinityCount.incrementAndGet();
maxAffinity.getAndUpdate(currentMax -> Math.max(currentMax, count));
totalAffinityCount.incrementAndGet();
}

protected void affinityCountDecr() {
affinityCount.decrementAndGet();
int count = affinityCount.decrementAndGet();
minAffinity.getAndUpdate(currentMin -> Math.min(currentMin, count));
totalAffinityCount.decrementAndGet();
}

Expand Down Expand Up @@ -1456,7 +1510,7 @@ private void detectUnresponsiveConnection(
return;
}
if (deadlineExceededCount.incrementAndGet() >= unresponsiveDropCount
&& unresponsiveTimingConditionMet()) {
&& msSinceLastResponse() >= unresponsiveMs) {
maybeReconnectUnresponsive();
}
return;
Expand All @@ -1468,15 +1522,23 @@ && unresponsiveTimingConditionMet()) {
}
}

private boolean unresponsiveTimingConditionMet() {
return (System.nanoTime() - lastResponseNanos) / 1000000 >= unresponsiveMs;
private long msSinceLastResponse() {
return (System.nanoTime() - lastResponseNanos) / 1000000;
}

private synchronized void maybeReconnectUnresponsive() {
final long msSinceLastResponse = msSinceLastResponse();
if (deadlineExceededCount.get() >= unresponsiveDropCount
&& unresponsiveTimingConditionMet()) {
&& msSinceLastResponse >= unresponsiveMs) {
recordUnresponsiveDetection(
System.nanoTime() - lastResponseNanos, deadlineExceededCount.get());
logger.finer(log(
"Channel %d connection is unresponsive for %d ms and %d deadline exceeded calls. " +
"Forcing channel to idle state.",
channelId,
msSinceLastResponse,
deadlineExceededCount.get()
));
delegate.enterIdle();
lastResponseNanos = System.nanoTime();
deadlineExceededCount.set(0);
Expand Down