Skip to content

Commit

Permalink
Cassandra storage impl for central (Part II)
Browse files Browse the repository at this point in the history
  • Loading branch information
trask committed Dec 1, 2015
1 parent a504b99 commit fc249c2
Show file tree
Hide file tree
Showing 73 changed files with 3,870 additions and 4,168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private static void valueOfArray(Object array, StringBuilder sb) {
sb.append('[');
int len = Array.getLength(array);
for (int i = 0; i < len; i++) {
if (i != 0) {
if (i > 0) {
sb.append(", ");
}
valueOf(Array.get(array, i), sb);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@
import org.glowroot.agent.model.TimerImpl;
import org.glowroot.agent.model.Transaction;
import org.glowroot.common.config.AdvancedConfig;
import org.glowroot.common.live.ImmutableErrorPoint;
import org.glowroot.common.live.ImmutableOverallErrorSummary;
import org.glowroot.common.live.ImmutableOverallSummary;
import org.glowroot.common.live.ImmutableOverviewAggregate;
import org.glowroot.common.live.ImmutablePercentileAggregate;
import org.glowroot.common.live.ImmutableThroughputAggregate;
import org.glowroot.common.live.ImmutableTransactionErrorSummary;
import org.glowroot.common.live.ImmutableTransactionSummary;
import org.glowroot.common.live.LiveAggregateRepository.ErrorPoint;
import org.glowroot.common.live.LiveAggregateRepository.OverallErrorSummary;
import org.glowroot.common.live.LiveAggregateRepository.OverallSummary;
import org.glowroot.common.live.LiveAggregateRepository.OverviewAggregate;
Expand Down Expand Up @@ -178,10 +176,11 @@ ThroughputAggregate buildLiveThroughputAggregate(long captureTime) throws IOExce
.build();
}

OverallSummary getLiveOverallSummary() {
OverallSummary getLiveOverallSummary(long captureTime) {
return ImmutableOverallSummary.builder()
.totalNanos(totalNanos)
.transactionCount(transactionCount)
.lastCaptureTime(captureTime)
.build();
}

Expand All @@ -195,10 +194,11 @@ TransactionSummary getLiveTransactionSummary() {
.build();
}

OverallErrorSummary getLiveOverallErrorSummary() {
OverallErrorSummary getLiveOverallErrorSummary(long captureTime) {
return ImmutableOverallErrorSummary.builder()
.errorCount(errorCount)
.transactionCount(transactionCount)
.lastCaptureTime(captureTime)
.build();
}

Expand All @@ -222,18 +222,6 @@ List<Aggregate.QueriesByType> getLiveQueries() {
return queries.toProtobuf(false);
}

@Nullable
ErrorPoint buildErrorPoint(long captureTime) {
if (errorCount == 0) {
return null;
}
return ImmutableErrorPoint.builder()
.captureTime(captureTime)
.errorCount(errorCount)
.transactionCount(transactionCount)
.build();
}

private List<Aggregate.Timer> getRootTimersProtobuf() {
List<Aggregate.Timer> rootTimers = Lists.newArrayListWithCapacity(this.rootTimers.size());
for (MutableTimer rootTimer : this.rootTimers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

import org.glowroot.agent.model.Profile;
import org.glowroot.agent.model.Transaction;
import org.glowroot.common.live.LiveAggregateRepository.ErrorPoint;
import org.glowroot.common.live.LiveAggregateRepository.OverallErrorSummary;
import org.glowroot.common.live.LiveAggregateRepository.OverallSummary;
import org.glowroot.common.live.LiveAggregateRepository.OverviewAggregate;
Expand All @@ -43,7 +42,7 @@
import org.glowroot.common.model.LazyHistogram.ScratchBuffer;
import org.glowroot.wire.api.Collector;
import org.glowroot.wire.api.model.AggregateOuterClass.Aggregate;
import org.glowroot.wire.api.model.AggregateOuterClass.OverallAggregate;
import org.glowroot.wire.api.model.AggregateOuterClass.AggregatesByType;
import org.glowroot.wire.api.model.AggregateOuterClass.TransactionAggregate;

public class AggregateIntervalCollector {
Expand Down Expand Up @@ -76,20 +75,22 @@ public void add(Transaction transaction) {
}

void flush(Collector collector) throws Exception {
List<OverallAggregate> overallAggregates = Lists.newArrayList();
List<TransactionAggregate> transactionAggregates = Lists.newArrayList();
List<AggregatesByType> aggregatesByTypeList = Lists.newArrayList();
ScratchBuffer scratchBuffer = new ScratchBuffer();
for (Entry<String, IntervalTypeCollector> e : typeCollectors.entrySet()) {
IntervalTypeCollector intervalTypeCollector = e.getValue();
overallAggregates.add(buildOverallAggregate(e.getKey(),
intervalTypeCollector.overallAggregateCollector, scratchBuffer));
AggregatesByType.Builder aggregatesByType = AggregatesByType.newBuilder()
.setTransactionType(e.getKey())
.setOverallAggregate(buildOverallAggregate(
intervalTypeCollector.overallAggregateCollector, scratchBuffer));
for (Entry<String, AggregateCollector> f : intervalTypeCollector.transactionAggregateCollectors
.entrySet()) {
transactionAggregates.add(buildTransactionAggregate(e.getKey(), f.getKey(),
f.getValue(), scratchBuffer));
aggregatesByType.addTransactionAggregate(
buildTransactionAggregate(f.getKey(), f.getValue(), scratchBuffer));
}
aggregatesByTypeList.add(aggregatesByType.build());
}
collector.collectAggregates(captureTime, overallAggregates, transactionAggregates);
collector.collectAggregates(captureTime, aggregatesByTypeList);
}

public @Nullable OverallSummary getLiveOverallSummary(String transactionType) {
Expand All @@ -99,7 +100,7 @@ void flush(Collector collector) throws Exception {
}
AggregateCollector aggregateCollector = intervalTypeCollector.overallAggregateCollector;
synchronized (aggregateCollector) {
return aggregateCollector.getLiveOverallSummary();
return aggregateCollector.getLiveOverallSummary(captureTime);
}
}

Expand All @@ -126,7 +127,7 @@ public List<TransactionSummary> getLiveTransactionSummaries(String transactionTy
}
AggregateCollector aggregateCollector = intervalTypeCollector.overallAggregateCollector;
synchronized (aggregateCollector) {
return aggregateCollector.getLiveOverallErrorSummary();
return aggregateCollector.getLiveOverallErrorSummary(captureTime);
}
}

Expand All @@ -139,8 +140,12 @@ public List<TransactionErrorSummary> getLiveTransactionErrorSummaries(String tra
for (Entry<String, AggregateCollector> entry : intervalTypeCollector.transactionAggregateCollectors
.entrySet()) {
AggregateCollector aggregateCollector = entry.getValue();
TransactionErrorSummary transactionErrorSummary;
synchronized (aggregateCollector) {
errorSummaries.add(aggregateCollector.getLiveTransactionErrorSummary());
transactionErrorSummary = aggregateCollector.getLiveTransactionErrorSummary();
}
if (transactionErrorSummary.errorCount() > 0) {
errorSummaries.add(transactionErrorSummary);
}
}
return errorSummaries;
Expand Down Expand Up @@ -185,19 +190,6 @@ public List<TransactionErrorSummary> getLiveTransactionErrorSummaries(String tra
}
}

public @Nullable ErrorPoint getLiveErrorPoint(String transactionType,
@Nullable String transactionName, long liveCaptureTime) throws IOException {
AggregateCollector aggregateCollector =
getAggregateCollector(transactionType, transactionName);
if (aggregateCollector == null) {
return null;
}
synchronized (aggregateCollector) {
long capturedAt = Math.min(liveCaptureTime, captureTime);
return aggregateCollector.buildErrorPoint(capturedAt);
}
}

public List<Aggregate.QueriesByType> getLiveQueries(String transactionType,
@Nullable String transactionName) throws IOException {
AggregateCollector aggregateCollector =
Expand Down Expand Up @@ -243,22 +235,17 @@ private IntervalTypeCollector getTypeCollector(String transactionType) {
return typeCollector;
}

private OverallAggregate buildOverallAggregate(String transactionType,
AggregateCollector aggregateCollector, ScratchBuffer scratchBuffer) throws IOException {
private Aggregate buildOverallAggregate(AggregateCollector aggregateCollector,
ScratchBuffer scratchBuffer) throws IOException {
synchronized (aggregateCollector) {
return OverallAggregate.newBuilder()
.setTransactionType(transactionType)
.setAggregate(aggregateCollector.build(scratchBuffer))
.build();
return aggregateCollector.build(scratchBuffer);
}
}

private TransactionAggregate buildTransactionAggregate(String transactionType,
String transactionName, AggregateCollector aggregateCollector,
ScratchBuffer scratchBuffer) throws IOException {
private TransactionAggregate buildTransactionAggregate(String transactionName,
AggregateCollector aggregateCollector, ScratchBuffer scratchBuffer) throws IOException {
synchronized (aggregateCollector) {
return TransactionAggregate.newBuilder()
.setTransactionType(transactionType)
.setTransactionName(transactionName)
.setAggregate(aggregateCollector.build(scratchBuffer))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

import org.glowroot.wire.api.Collector;
import org.glowroot.wire.api.model.AggregateOuterClass.OverallAggregate;
import org.glowroot.wire.api.model.AggregateOuterClass.TransactionAggregate;
import org.glowroot.wire.api.model.AggregateOuterClass.AggregatesByType;
import org.glowroot.wire.api.model.GaugeValueOuterClass.GaugeValue;
import org.glowroot.wire.api.model.JvmInfoOuterClass.JvmInfo;
import org.glowroot.wire.api.model.LogEventOuterClass.LogEvent;
Expand All @@ -41,10 +40,10 @@ public void collectJvmInfo(JvmInfo jvmInfo) throws Exception {
}

@Override
public void collectAggregates(long captureTime, List<OverallAggregate> overallAggregates,
List<TransactionAggregate> transactionAggregates) throws Exception {
public void collectAggregates(long captureTime, List<AggregatesByType> aggregatesByType)
throws Exception {
if (instance != null) {
instance.collectAggregates(captureTime, overallAggregates, transactionAggregates);
instance.collectAggregates(captureTime, aggregatesByType);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
import org.slf4j.LoggerFactory;

import org.glowroot.wire.api.Collector;
import org.glowroot.wire.api.model.AggregateOuterClass.OverallAggregate;
import org.glowroot.wire.api.model.AggregateOuterClass.TransactionAggregate;
import org.glowroot.wire.api.model.AggregateOuterClass.AggregatesByType;
import org.glowroot.wire.api.model.CollectorServiceGrpc;
import org.glowroot.wire.api.model.CollectorServiceGrpc.CollectorServiceStub;
import org.glowroot.wire.api.model.CollectorServiceOuterClass.AggregateMessage;
Expand Down Expand Up @@ -90,13 +89,12 @@ public void collectJvmInfo(JvmInfo jvmInfo) throws Exception {
}

@Override
public void collectAggregates(long captureTime, List<OverallAggregate> overallAggregates,
List<TransactionAggregate> transactionAggregates) throws Exception {
public void collectAggregates(long captureTime, List<AggregatesByType> aggregatesByType)
throws Exception {
AggregateMessage aggregateMessage = AggregateMessage.newBuilder()
.setServerId(serverId)
.setCaptureTime(captureTime)
.addAllOverallAggregate(overallAggregates)
.addAllTransactionAggregate(transactionAggregates)
.addAllAggregatesByType(aggregatesByType)
.build();
client.collectAggregates(aggregateMessage, loggingStreamObserver);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ public void collectAggregates(AggregateMessage request,
StreamObserver<EmptyMessage> responseObserver) {
try {
collector.collectAggregates(request.getCaptureTime(),
request.getOverallAggregateList(),
request.getTransactionAggregateList());
request.getAggregatesByTypeList());
} catch (Throwable t) {
responseObserver.onError(t);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import org.glowroot.storage.repo.TraceRepository;
import org.glowroot.storage.repo.helper.AlertingService;
import org.glowroot.wire.api.Collector;
import org.glowroot.wire.api.model.AggregateOuterClass.OverallAggregate;
import org.glowroot.wire.api.model.AggregateOuterClass.TransactionAggregate;
import org.glowroot.wire.api.model.AggregateOuterClass.AggregatesByType;
import org.glowroot.wire.api.model.GaugeValueOuterClass.GaugeValue;
import org.glowroot.wire.api.model.JvmInfoOuterClass.JvmInfo;
import org.glowroot.wire.api.model.LogEventOuterClass.LogEvent;
Expand Down Expand Up @@ -56,10 +55,9 @@ public void collectJvmInfo(JvmInfo jvmInfo) throws Exception {
}

@Override
public void collectAggregates(long captureTime, List<OverallAggregate> overallAggregates,
List<TransactionAggregate> transactionAggregates) throws Exception {
aggregateRepository.store(SERVER_ID, captureTime, overallAggregates,
transactionAggregates);
public void collectAggregates(long captureTime, List<AggregatesByType> aggregatesByType)
throws Exception {
aggregateRepository.store(SERVER_ID, captureTime, aggregatesByType);
alertingService.checkAlerts(captureTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ class FatAgentModule {
DataSource dataSource;
if (h2MemDb) {
// mem db is only used for testing (by glowroot-test-container)
dataSource = DataSource.createH2InMemory();
dataSource = new DataSource();
} else {
dataSource = DataSource.createH2(new File(dataDir, "data.h2.db"));
dataSource = new DataSource(new File(dataDir, "data.h2.db"));
}

if (viewerMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,15 @@ static List<String> usedTypes() {
types.addAll(getGlowrootUsedTypes());
types.addAll(getH2UsedTypes());
return types;

}

private static List<String> getGlowrootUsedTypes() {
List<String> types = Lists.newArrayList();
types.add("org.glowroot.storage.simplerepo.util.CappedDatabase");
types.add("org.glowroot.storage.simplerepo.util.CappedDatabase$ShutdownHookThread");
types.add("org.glowroot.storage.simplerepo.util.CappedDatabaseOutputStream");
types.add("org.glowroot.storage.simplerepo.util.ConnectionPool");
types.add("org.glowroot.storage.simplerepo.util.ConnectionPool$ShutdownHookThread");
types.add("org.glowroot.storage.simplerepo.util.DataSource");
types.add("org.glowroot.storage.simplerepo.util.DataSource$ShutdownHookThread");
return types;
}

Expand Down

0 comments on commit fc249c2

Please sign in to comment.