Skip to content

Commit

Permalink
And more central work
Browse files Browse the repository at this point in the history
  • Loading branch information
trask committed Jan 28, 2016
1 parent 123e566 commit dc76f9d
Show file tree
Hide file tree
Showing 179 changed files with 3,318 additions and 2,079 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.glowroot.agent.fat.init.GlowrootFatAgentInit;
import org.glowroot.agent.fat.init.DataDirLocking.BaseDirLockedException;
import org.glowroot.agent.init.GlowrootAgentInit;
import org.glowroot.agent.init.GlowrootThinAgentInit;
import org.glowroot.agent.init.fat.DataDirLocking.BaseDirLockedException;
import org.glowroot.agent.init.fat.GlowrootFatAgentInit;
import org.glowroot.agent.util.AppServerDetection;
import org.glowroot.common.util.OnlyUsedByTests;
import org.glowroot.common.util.Version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.glowroot.agent.config.InstrumentationConfig;
import org.glowroot.agent.weaving.Advice;
import org.glowroot.agent.weaving.AdviceBuilder;
import org.glowroot.agent.weaving.ClassLoaders.LazyDefinedClass;
import org.glowroot.agent.weaving.ImmutableLazyDefinedClass;
import org.glowroot.common.config.InstrumentationConfig;
import org.glowroot.common.config.InstrumentationConfig.CaptureKind;
import org.glowroot.wire.api.model.AgentConfigOuterClass.AgentConfig.CaptureKind;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.objectweb.asm.Opcodes.ACC_FINAL;
Expand Down Expand Up @@ -179,10 +179,7 @@ private void addClassAnnotation(ClassWriter cw) {
} else if (!config.traceEntryCaptureSelfNested()) {
annotationVisitor.visit("nestingGroup", "__GeneratedAdvice" + uniqueNum);
}
Integer priority = config.priority();
if (priority != null) {
annotationVisitor.visit("priority", priority);
}
annotationVisitor.visit("priority", config.priority());
annotationVisitor.visitEnd();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.glowroot.agent.central.CentralConnection.GrpcOneWayCall;
import org.glowroot.agent.central.CentralConnection.GrpcCall;
import org.glowroot.wire.api.Collector;
import org.glowroot.wire.api.model.AgentConfigOuterClass.AgentConfig;
import org.glowroot.wire.api.model.AggregateOuterClass.AggregatesByType;
Expand All @@ -32,6 +32,7 @@
import org.glowroot.wire.api.model.CollectorServiceOuterClass.GaugeValue;
import org.glowroot.wire.api.model.CollectorServiceOuterClass.GaugeValueMessage;
import org.glowroot.wire.api.model.CollectorServiceOuterClass.InitMessage;
import org.glowroot.wire.api.model.CollectorServiceOuterClass.InitResponse;
import org.glowroot.wire.api.model.CollectorServiceOuterClass.LogEvent;
import org.glowroot.wire.api.model.CollectorServiceOuterClass.LogMessage;
import org.glowroot.wire.api.model.CollectorServiceOuterClass.ProcessInfo;
Expand All @@ -53,17 +54,24 @@ class CentralCollectorImpl implements Collector {
}

@Override
public void collectInit(ProcessInfo jvmInfo, AgentConfig agentConfig) {
public void collectInit(ProcessInfo jvmInfo, AgentConfig agentConfig,
final AgentConfigUpdater agentConfigUpdater) {
final InitMessage initMessage = InitMessage.newBuilder()
.setServerId(serverId)
.setProcessInfo(jvmInfo)
.setAgentConfig(agentConfig)
.build();
centralConnection.callUntilSuccessful(new GrpcOneWayCall<EmptyMessage>() {
centralConnection.callUntilSuccessful(new GrpcCall<InitResponse>() {
@Override
public void call(StreamObserver<EmptyMessage> responseObserver) {
public void call(StreamObserver<InitResponse> responseObserver) {
collectorServiceStub.collectInit(initMessage, responseObserver);
}
@Override
void doWithResponse(InitResponse response) {
if (response.hasAgentConfig()) {
agentConfigUpdater.update(response.getAgentConfig());
}
}
});
}

Expand All @@ -74,7 +82,7 @@ public void collectAggregates(long captureTime, List<AggregatesByType> aggregate
.setCaptureTime(captureTime)
.addAllAggregatesByType(aggregatesByType)
.build();
centralConnection.callWithAFewRetries(new GrpcOneWayCall<EmptyMessage>() {
centralConnection.callWithAFewRetries(new GrpcCall<EmptyMessage>() {
@Override
public void call(StreamObserver<EmptyMessage> responseObserver) {
collectorServiceStub.collectAggregates(aggregateMessage, responseObserver);
Expand All @@ -88,7 +96,7 @@ public void collectGaugeValues(List<GaugeValue> gaugeValues) {
.setServerId(serverId)
.addAllGaugeValues(gaugeValues)
.build();
centralConnection.callWithAFewRetries(new GrpcOneWayCall<EmptyMessage>() {
centralConnection.callWithAFewRetries(new GrpcCall<EmptyMessage>() {
@Override
public void call(StreamObserver<EmptyMessage> responseObserver) {
collectorServiceStub.collectGaugeValues(gaugeValueMessage, responseObserver);
Expand All @@ -102,7 +110,7 @@ public void collectTrace(Trace trace) {
.setServerId(serverId)
.setTrace(trace)
.build();
centralConnection.callWithAFewRetries(new GrpcOneWayCall<EmptyMessage>() {
centralConnection.callWithAFewRetries(new GrpcCall<EmptyMessage>() {
@Override
public void call(StreamObserver<EmptyMessage> responseObserver) {
collectorServiceStub.collectTrace(traceMessage, responseObserver);
Expand All @@ -119,7 +127,7 @@ public void log(LogEvent logEvent) {
.setServerId(serverId)
.setLogEvent(logEvent)
.build();
centralConnection.callWithAFewRetries(new GrpcOneWayCall<EmptyMessage>() {
centralConnection.callWithAFewRetries(new GrpcCall<EmptyMessage>() {
@Override
public void call(StreamObserver<EmptyMessage> responseObserver) {
collectorServiceStub.log(logMessage, responseObserver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ ManagedChannel getChannel() {
}

// important that these calls are idempotent (at least in central implementation)
<T extends /*@NonNull*/ Object> void callWithAFewRetries(GrpcOneWayCall<T> call) {
<T extends /*@NonNull*/ Object> void callWithAFewRetries(GrpcCall<T> call) {
if (closed) {
return;
}
Expand All @@ -89,15 +89,15 @@ ManagedChannel getChannel() {
//
// this cannot retry over too long a period since it retains memory of rpc message for that
// duration
call.call(new RetryingStreamObserver<T>(call, 30));
call.call(new RetryingStreamObserver<T>(call, 30, 60));
}

// important that these calls are idempotent (at least in central implementation)
<T extends /*@NonNull*/ Object> void callUntilSuccessful(GrpcOneWayCall<T> call) {
<T extends /*@NonNull*/ Object> void callUntilSuccessful(GrpcCall<T> call) {
if (closed) {
return;
}
call.call(new RetryingStreamObserver<T>(call, -1));
call.call(new RetryingStreamObserver<T>(call, -1, 15));
}

void suppressLogCollector(Runnable runnable) {
Expand Down Expand Up @@ -130,25 +130,32 @@ void awaitClose() throws InterruptedException {
}
}

interface GrpcOneWayCall<T extends /*@NonNull*/ Object> {
void call(StreamObserver<T> responseObserver);
static abstract class GrpcCall<T extends /*@NonNull*/ Object> {
abstract void call(StreamObserver<T> responseObserver);
void doWithResponse(@SuppressWarnings("unused") T response) {}
}

private class RetryingStreamObserver<T extends /*@NonNull*/ Object>
implements StreamObserver<T> {

private final GrpcOneWayCall<T> grpcOneWayCall;
private final GrpcCall<T> grpcCall;
private final int maxRetries;
private final int maxDelayInSeconds;

private volatile int retryCounter;
private volatile long nextDelayInSeconds = 1;

private RetryingStreamObserver(GrpcOneWayCall<T> grpcOneWayCall, int maxRetries) {
this.grpcOneWayCall = grpcOneWayCall;
private RetryingStreamObserver(GrpcCall<T> grpcCall, int maxRetries,
int maxDelayInSeconds) {
this.grpcCall = grpcCall;
this.maxRetries = maxRetries;
this.maxDelayInSeconds = maxDelayInSeconds;
}

@Override
public void onNext(T value) {}
public void onNext(T value) {
grpcCall.doWithResponse(value);
}

@Override
public void onError(final Throwable t) {
Expand All @@ -167,7 +174,7 @@ public void run() {
@Override
public void run() {
try {
grpcOneWayCall.call(RetryingStreamObserver.this);
grpcCall.call(RetryingStreamObserver.this);
} catch (final Throwable t) {
// intentionally capturing InterruptedException here as well to ensure
// reconnect is attempted no matter what
Expand All @@ -179,7 +186,8 @@ public void run() {
});
}
}
}, 1, SECONDS);
}, nextDelayInSeconds, SECONDS);
nextDelayInSeconds = Math.min(nextDelayInSeconds * 2, maxDelayInSeconds);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,40 @@
*/
package org.glowroot.agent.central;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;

import javax.annotation.Nullable;

import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.glowroot.agent.config.ConfigService;
import org.glowroot.agent.config.PluginCache;
import org.glowroot.agent.init.ProcessInfoCreator;
import org.glowroot.common.live.LiveJvmService;
import org.glowroot.common.live.LiveWeavingService;
import org.glowroot.common.util.OnlyUsedByTests;
import org.glowroot.wire.api.Collector.AgentConfigUpdater;
import org.glowroot.wire.api.model.AgentConfigOuterClass.AgentConfig;

import static com.google.common.base.Preconditions.checkNotNull;

public class CentralModule {

private static final Logger logger = LoggerFactory.getLogger(CentralModule.class);

private final CentralConnection centralConnection;
private final CentralCollectorImpl grpcCollector;
private final DownstreamServiceObserver downstreamServiceObserver;

public CentralModule(Map<String, String> properties, @Nullable String collectorHost,
ConfigService configService, LiveWeavingService liveWeavingService,
LiveJvmService liveJvmService, ScheduledExecutorService scheduledExecutor,
String glowrootVersion) throws Exception {
ConfigService configService, PluginCache pluginCache,
LiveWeavingService liveWeavingService, LiveJvmService liveJvmService,
ScheduledExecutorService scheduledExecutor, String glowrootVersion) throws Exception {

String serverId = properties.get("glowroot.server.id");
if (Strings.isNullOrEmpty(serverId)) {
Expand All @@ -60,16 +67,23 @@ public CentralModule(Map<String, String> properties, @Nullable String collectorH
checkNotNull(collectorHost);

centralConnection = new CentralConnection(collectorHost, collectorPort, scheduledExecutor);
ConfigUpdateService configUpdateService =
new ConfigUpdateService(configService, liveWeavingService);
final ConfigUpdateService configUpdateService =
new ConfigUpdateService(configService, pluginCache);
grpcCollector = new CentralCollectorImpl(centralConnection, serverId);
downstreamServiceObserver = new DownstreamServiceObserver(centralConnection,
configUpdateService, liveJvmService, serverId);
configUpdateService, liveJvmService, liveWeavingService, serverId);
downstreamServiceObserver.connectAsync();

// FIXME build agent config
grpcCollector.collectInit(ProcessInfoCreator.create(glowrootVersion),
AgentConfig.getDefaultInstance());
configService.getAgentConfig(), new AgentConfigUpdater() {
@Override
public void update(AgentConfig agentConfig) {
try {
configUpdateService.updateAgentConfig(agentConfig);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
});
}

public CentralCollectorImpl getGrpcCollector() {
Expand Down

0 comments on commit dc76f9d

Please sign in to comment.