Skip to content

Commit

Permalink
Handle connection failures to central
Browse files Browse the repository at this point in the history
  • Loading branch information
trask committed Dec 17, 2015
1 parent 90dc1e1 commit 67aecdb
Show file tree
Hide file tree
Showing 21 changed files with 564 additions and 349 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.glowroot.agent.central;

import java.util.List;

import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.glowroot.agent.central.CentralConnection.GrpcOneWayCall;
import org.glowroot.wire.api.Collector;
import org.glowroot.wire.api.model.AggregateOuterClass.AggregatesByType;
import org.glowroot.wire.api.model.CollectorServiceGrpc;
import org.glowroot.wire.api.model.CollectorServiceGrpc.CollectorServiceStub;
import org.glowroot.wire.api.model.CollectorServiceOuterClass.AggregateMessage;
import org.glowroot.wire.api.model.CollectorServiceOuterClass.EmptyMessage;
import org.glowroot.wire.api.model.CollectorServiceOuterClass.GaugeValueMessage;
import org.glowroot.wire.api.model.CollectorServiceOuterClass.JvmInfoMessage;
import org.glowroot.wire.api.model.CollectorServiceOuterClass.LogMessage;
import org.glowroot.wire.api.model.CollectorServiceOuterClass.TraceMessage;
import org.glowroot.wire.api.model.GaugeValueOuterClass.GaugeValue;
import org.glowroot.wire.api.model.JvmInfoOuterClass.JvmInfo;
import org.glowroot.wire.api.model.LogEventOuterClass.LogEvent;
import org.glowroot.wire.api.model.TraceOuterClass.Trace;

class CentralCollectorImpl implements Collector {

static final Logger logger = LoggerFactory.getLogger(CentralCollectorImpl.class);

private final CentralConnection centralConnection;
private final CollectorServiceStub collectorServiceStub;
private final String serverId;

CentralCollectorImpl(CentralConnection centralConnection, String serverId) {
this.centralConnection = centralConnection;
collectorServiceStub = CollectorServiceGrpc.newStub(centralConnection.getChannel());
this.serverId = serverId;
}

@Override
public void collectJvmInfo(JvmInfo jvmInfo) {
final JvmInfoMessage jvmInfoMessage = JvmInfoMessage.newBuilder()
.setServerId(serverId)
.setJvmInfo(jvmInfo)
.build();
centralConnection.callUntilSuccessful(new GrpcOneWayCall<EmptyMessage>() {
@Override
public void call(StreamObserver<EmptyMessage> responseObserver) {
collectorServiceStub.collectJvmInfo(jvmInfoMessage, responseObserver);
}
});
}

@Override
public void collectAggregates(long captureTime, List<AggregatesByType> aggregatesByType) {
final AggregateMessage aggregateMessage = AggregateMessage.newBuilder()
.setServerId(serverId)
.setCaptureTime(captureTime)
.addAllAggregatesByType(aggregatesByType)
.build();
centralConnection.callWithAFewRetries(new GrpcOneWayCall<EmptyMessage>() {
@Override
public void call(StreamObserver<EmptyMessage> responseObserver) {
collectorServiceStub.collectAggregates(aggregateMessage, responseObserver);
}
});
}

@Override
public void collectGaugeValues(List<GaugeValue> gaugeValues) {
final GaugeValueMessage gaugeValueMessage = GaugeValueMessage.newBuilder()
.setServerId(serverId)
.addAllGaugeValues(gaugeValues)
.build();
centralConnection.callWithAFewRetries(new GrpcOneWayCall<EmptyMessage>() {
@Override
public void call(StreamObserver<EmptyMessage> responseObserver) {
collectorServiceStub.collectGaugeValues(gaugeValueMessage, responseObserver);
}
});
}

@Override
public void collectTrace(Trace trace) {
final TraceMessage traceMessage = TraceMessage.newBuilder()
.setServerId(serverId)
.setTrace(trace)
.build();
centralConnection.callWithAFewRetries(new GrpcOneWayCall<EmptyMessage>() {
@Override
public void call(StreamObserver<EmptyMessage> responseObserver) {
collectorServiceStub.collectTrace(traceMessage, responseObserver);
}
});
}

@Override
public void log(LogEvent logEvent) {
if (centralConnection.suppressLogCollector()) {
return;
}
final LogMessage logMessage = LogMessage.newBuilder()
.setServerId(serverId)
.setLogEvent(logEvent)
.build();
centralConnection.callWithAFewRetries(new GrpcOneWayCall<EmptyMessage>() {
@Override
public void call(StreamObserver<EmptyMessage> responseObserver) {
collectorServiceStub.log(logMessage, responseObserver);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.glowroot.agent.central;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.ManagedChannel;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.netty.channel.EventLoopGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.glowroot.common.util.OnlyUsedByTests;

import static java.util.concurrent.TimeUnit.SECONDS;

class CentralConnection {

private static final Logger logger = LoggerFactory.getLogger(CentralConnection.class);

@SuppressWarnings("nullness:type.argument.type.incompatible")
private final ThreadLocal<Boolean> suppressLogCollector = new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};

private final EventLoopGroup eventLoopGroup;
private final ExecutorService executor;
private final ManagedChannel channel;

private volatile boolean closed;

CentralConnection(String collectorHost, int collectorPort) {
eventLoopGroup = EventLoopGroups.create("Glowroot-grpc-worker-ELG");
executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Glowroot-grpc-executor-%d")
.build());
channel = NettyChannelBuilder
.forAddress(collectorHost, collectorPort)
.eventLoopGroup(eventLoopGroup)
.executor(executor)
.negotiationType(NegotiationType.PLAINTEXT)
.build();
}

boolean suppressLogCollector() {
return suppressLogCollector.get();
}

ManagedChannel getChannel() {
return channel;
}

// important that these calls are idempotent (at least in central implementation)
<T> void callWithAFewRetries(GrpcOneWayCall<T> call) {
if (closed) {
return;
}
// TODO revisit retry/backoff after next grpc version

// 30 retries currently at 1 second apart covers 30 seconds which should be enough time to
// restart single central instance without losing data (though better to use central
// cluster)
//
// this cannot retry over too long a period since it retains memory of rpc message for that
// duration
call.call(new RetryingStreamObserver<T>(call, 30));
}

// important that these calls are idempotent (at least in central implementation)
<T> void callUntilSuccessful(GrpcOneWayCall<T> call) {
if (closed) {
return;
}
call.call(new RetryingStreamObserver<T>(call, -1));
}

void suppressLogCollector(Runnable runnable) {
boolean priorValue = suppressLogCollector.get();
suppressLogCollector.set(true);
try {
runnable.run();
} finally {
suppressLogCollector.set(priorValue);
}
}

@OnlyUsedByTests
void close() {
closed = true;
channel.shutdown();
}

@OnlyUsedByTests
void awaitClose() throws InterruptedException {
if (!channel.awaitTermination(10, SECONDS)) {
throw new IllegalStateException("Could not terminate gRPC channel");
}
executor.shutdown();
if (!executor.awaitTermination(10, SECONDS)) {
throw new IllegalStateException("Could not terminate gRPC executor");
}
if (!eventLoopGroup.shutdownGracefully(0, 0, SECONDS).await(10, SECONDS)) {
throw new IllegalStateException("Could not terminate gRPC event loop group");
}
}

interface GrpcOneWayCall<T extends /*@NonNull*/ Object> {
void call(StreamObserver<T> responseObserver);
}

private class RetryingStreamObserver<T extends /*@NonNull*/ Object>
implements StreamObserver<T> {

private final GrpcOneWayCall<T> grpcOneWayCall;
private final int maxRetries;

private volatile int retryCounter;

private RetryingStreamObserver(GrpcOneWayCall<T> grpcOneWayCall, int maxRetries) {
this.grpcOneWayCall = grpcOneWayCall;
this.maxRetries = maxRetries;
}

@Override
public void onNext(T value) {}

@Override
public void onError(final Throwable t) {
suppressLogCollector(new Runnable() {
@Override
public void run() {
logger.debug(t.getMessage(), t);
}
});
if (maxRetries != -1 && retryCounter++ > maxRetries) {
// no logging since DownstreamServiceObserver handles logging central connectivity
return;
}
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
// TODO revisit retry/backoff after next grpc version
Thread.sleep(1000);
grpcOneWayCall.call(RetryingStreamObserver.this);
} catch (final Throwable t) {
// intentionally capturing InterruptedException here as well to ensure
// reconnect is attempted no matter what
suppressLogCollector(new Runnable() {
@Override
public void run() {
logger.error(t.getMessage(), t);
}
});
}
}
});
}

@Override
public void onCompleted() {}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.glowroot.agent.central;

import java.net.InetAddress;
import java.util.Map;

import javax.annotation.Nullable;

import com.google.common.base.Strings;

import org.glowroot.agent.config.ConfigService;
import org.glowroot.agent.init.JvmInfoCreator;
import org.glowroot.common.live.LiveJvmService;
import org.glowroot.common.live.LiveWeavingService;
import org.glowroot.common.util.OnlyUsedByTests;

import static com.google.common.base.Preconditions.checkNotNull;

public class CentralModule {

private final CentralConnection centralConnection;
private final CentralCollectorImpl grpcCollector;
private final DownstreamServiceObserver downstreamServiceObserver;

public CentralModule(Map<String, String> properties, @Nullable String collectorHost,
ConfigService configService, LiveWeavingService liveWeavingService,
LiveJvmService liveJvmService) throws Exception {

String serverId = properties.get("glowroot.server.id");
if (Strings.isNullOrEmpty(serverId)) {
serverId = InetAddress.getLocalHost().getHostName();
}
String collectorPortStr = properties.get("glowroot.collector.port");
if (Strings.isNullOrEmpty(collectorPortStr)) {
collectorPortStr = System.getProperty("glowroot.collector.port");
}
int collectorPort;
if (Strings.isNullOrEmpty(collectorPortStr)) {
collectorPort = 80;
} else {
collectorPort = Integer.parseInt(collectorPortStr);
}
checkNotNull(collectorHost);

centralConnection = new CentralConnection(collectorHost, collectorPort);
ConfigUpdateService configUpdateService =
new ConfigUpdateService(configService, liveWeavingService);
grpcCollector = new CentralCollectorImpl(centralConnection, serverId);
downstreamServiceObserver = new DownstreamServiceObserver(centralConnection,
configUpdateService, liveJvmService, serverId);

grpcCollector.collectJvmInfo(JvmInfoCreator.create());
}

public CentralCollectorImpl getGrpcCollector() {
return grpcCollector;
}

@OnlyUsedByTests
public void close() throws InterruptedException {
downstreamServiceObserver.close();
centralConnection.close();
}

@OnlyUsedByTests
public void awaitClose() throws InterruptedException {
centralConnection.awaitClose();
}
}

0 comments on commit 67aecdb

Please sign in to comment.