Skip to content
Merged
5 changes: 3 additions & 2 deletions sentry/api/sentry.api
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ public final class io/sentry/MemoryCollectionData {

public final class io/sentry/MetricsAggregator : io/sentry/IMetricsAggregator, java/io/Closeable, java/lang/Runnable {
public fun <init> (Lio/sentry/SentryOptions;Lio/sentry/metrics/IMetricsClient;)V
public fun <init> (Lio/sentry/metrics/IMetricsClient;Lio/sentry/ILogger;Lio/sentry/SentryDateProvider;Lio/sentry/ISentryExecutorService;)V
public fun <init> (Lio/sentry/metrics/IMetricsClient;Lio/sentry/ILogger;Lio/sentry/SentryDateProvider;ILio/sentry/ISentryExecutorService;)V
public fun close ()V
public fun distribution (Ljava/lang/String;DLio/sentry/MeasurementUnit;Ljava/util/Map;JI)V
public fun flush (Z)V
Expand Down Expand Up @@ -3495,7 +3495,8 @@ public abstract interface class io/sentry/metrics/MetricsApi$IMetricsInterface {
}

public final class io/sentry/metrics/MetricsHelper {
public static final field FLUSHER_SLEEP_TIME_MS I
public static final field FLUSHER_SLEEP_TIME_MS J
public static final field MAX_TOTAL_WEIGHT I
public fun <init> ()V
public static fun convertNanosTo (Lio/sentry/MeasurementUnit$Duration;J)D
public static fun encodeMetrics (JLjava/util/Collection;Ljava/lang/StringBuilder;)V
Expand Down
113 changes: 73 additions & 40 deletions sentry/src/main/java/io/sentry/MetricsAggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CRC32;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
Expand All @@ -43,14 +44,19 @@ public final class MetricsAggregator implements IMetricsAggregator, Runnable, Cl
// aggregates all of the metrics data for a particular time period. The Value is a dictionary for
// the metrics,
// each of which has a key that uniquely identifies it within the time period
private final NavigableMap<Long, Map<String, Metric>> buckets = new ConcurrentSkipListMap<>();
private final @NotNull NavigableMap<Long, Map<String, Metric>> buckets =
new ConcurrentSkipListMap<>();
private final @NotNull AtomicInteger totalBucketsWeight = new AtomicInteger();

private final int maxWeight;

public MetricsAggregator(
final @NotNull SentryOptions options, final @NotNull IMetricsClient client) {
this(
client,
options.getLogger(),
options.getDateProvider(),
MetricsHelper.MAX_TOTAL_WEIGHT,
NoOpSentryExecutorService.getInstance());
}

Expand All @@ -59,10 +65,12 @@ public MetricsAggregator(
final @NotNull IMetricsClient client,
final @NotNull ILogger logger,
final @NotNull SentryDateProvider dateProvider,
final int maxWeight,
final @NotNull ISentryExecutorService executorService) {
this.client = client;
this.logger = logger;
this.dateProvider = dateProvider;
this.maxWeight = maxWeight;
this.executorService = executorService;
}

Expand Down Expand Up @@ -153,71 +161,78 @@ private void add(
final double value,
@Nullable MeasurementUnit unit,
final @Nullable Map<String, String> tags,
@Nullable Long timestampMs,
@NotNull Long timestampMs,
final int stackLevel) {

if (isClosed) {
return;
}

if (timestampMs == null) {
timestampMs = nowMillis();
}

final @NotNull Metric metric;
switch (type) {
case Counter:
metric = new CounterMetric(key, value, unit, tags, timestampMs);
break;
case Gauge:
metric = new GaugeMetric(key, value, unit, tags, timestampMs);
break;
case Distribution:
metric = new DistributionMetric(key, value, unit, tags, timestampMs);
break;
case Set:
metric = new SetMetric(key, unit, tags, timestampMs);
//noinspection unchecked
metric.add((int) value);
break;
default:
throw new IllegalArgumentException("Unknown MetricType: " + type.name());
}

final long timeBucketKey = MetricsHelper.getTimeBucketKey(timestampMs);
final @NotNull Map<String, Metric> timeBucket = getOrAddTimeBucket(timeBucketKey);

final @NotNull String metricKey = MetricsHelper.getMetricBucketKey(type, key, unit, tags);

// TODO check if we can synchronize only the metric itself
// TODO ideally we can synchronize only the metric itself
synchronized (timeBucket) {
@Nullable Metric existingMetric = timeBucket.get(metricKey);
if (existingMetric != null) {
final int oldWeight = existingMetric.getWeight();
existingMetric.add(value);
final int newWeight = existingMetric.getWeight();
totalBucketsWeight.addAndGet(newWeight - oldWeight);
} else {
final @NotNull Metric metric;
switch (type) {
case Counter:
metric = new CounterMetric(key, value, unit, tags, timestampMs);
break;
case Gauge:
metric = new GaugeMetric(key, value, unit, tags, timestampMs);
break;
case Distribution:
metric = new DistributionMetric(key, value, unit, tags, timestampMs);
break;
case Set:
metric = new SetMetric(key, unit, tags, timestampMs);
// sets API is either ints or strings cr32 encoded into ints
// noinspection unchecked
metric.add((int) value);
break;
default:
throw new IllegalArgumentException("Unknown MetricType: " + type.name());
}
timeBucket.put(metricKey, metric);
totalBucketsWeight.addAndGet(metric.getWeight());
}
}

Comment thread
stefanosiano marked this conversation as resolved.
// spin up real executor service the first time metrics are collected
if (!isClosed && !flushScheduled) {
final boolean isOverWeight = isOverWeight();
if (!isClosed && (isOverWeight || !flushScheduled)) {
synchronized (this) {
if (!isClosed && !flushScheduled) {
flushScheduled = true;
if (!isClosed) {
// TODO this is probably not a good idea after all
// as it will slow down the first metric emission
// maybe move to constructor?
if (executorService instanceof NoOpSentryExecutorService) {
executorService = new SentryExecutorService();
}
executorService.schedule(this, MetricsHelper.FLUSHER_SLEEP_TIME_MS);

flushScheduled = true;
final long delayMs = isOverWeight ? 0 : MetricsHelper.FLUSHER_SLEEP_TIME_MS;
executorService.schedule(this, delayMs);
}
}
}
}

@Override
public void flush(final boolean force) {
public void flush(boolean force) {
if (!force && isOverWeight()) {
logger.log(SentryLevel.INFO, "Metrics: total weight exceeded, flushing all buckets");
force = true;
}

final @NotNull Set<Long> flushableBuckets = getFlushableBuckets(force);
if (flushableBuckets.isEmpty()) {
logger.log(SentryLevel.DEBUG, "Metrics: nothing to flush");
Expand All @@ -226,16 +241,21 @@ public void flush(final boolean force) {
logger.log(SentryLevel.DEBUG, "Metrics: flushing " + flushableBuckets.size() + " buckets");

final Map<Long, Map<String, Metric>> snapshot = new HashMap<>();
int totalSize = 0;
int numMetrics = 0;
for (long bucketKey : flushableBuckets) {
final @Nullable Map<String, Metric> metrics = buckets.remove(bucketKey);
if (metrics != null) {
totalSize += metrics.size();
snapshot.put(bucketKey, metrics);
final @Nullable Map<String, Metric> bucket = buckets.remove(bucketKey);
if (bucket != null) {
synchronized (bucket) {
final int weight = getBucketWeight(bucket);
totalBucketsWeight.addAndGet(-weight);

numMetrics += bucket.size();
snapshot.put(bucketKey, bucket);
}
}
}

if (totalSize == 0) {
if (numMetrics == 0) {
logger.log(SentryLevel.DEBUG, "Metrics: only empty buckets found");
return;
}
Expand All @@ -244,6 +264,19 @@ public void flush(final boolean force) {
client.captureMetrics(new EncodedMetrics(snapshot));
}

private boolean isOverWeight() {
final int totalWeight = buckets.size() + totalBucketsWeight.get();
return totalWeight >= maxWeight;
}

private static int getBucketWeight(final @NotNull Map<String, Metric> bucket) {
int weight = 0;
for (final @NotNull Metric value : bucket.values()) {
weight += value.getWeight();
}
return weight;
}

@NotNull
private Set<Long> getFlushableBuckets(final boolean force) {
if (force) {
Expand All @@ -262,7 +295,7 @@ private Map<String, Metric> getOrAddTimeBucket(final long bucketKey) {
@Nullable Map<String, Metric> bucket = buckets.get(bucketKey);
if (bucket == null) {
// although buckets is thread safe, we still need to synchronize here to avoid creating
// the same bucket at the same time
// the same bucket at the same time, overwriting each other
synchronized (buckets) {
bucket = buckets.get(bucketKey);
if (bucket == null) {
Expand Down
3 changes: 2 additions & 1 deletion sentry/src/main/java/io/sentry/metrics/MetricsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

@ApiStatus.Internal
public final class MetricsHelper {
public static final int FLUSHER_SLEEP_TIME_MS = 5000;
public static final long FLUSHER_SLEEP_TIME_MS = 5000;
public static final int MAX_TOTAL_WEIGHT = 100000;
private static final int ROLLUP_IN_SECONDS = 10;

private static final Pattern INVALID_KEY_CHARACTERS_PATTERN =
Expand Down
78 changes: 77 additions & 1 deletion sentry/src/test/java/io/sentry/MetricsAggregatorTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.sentry.metrics.MetricsHelperTest
import io.sentry.test.DeferredExecutorService
import org.mockito.kotlin.any
import org.mockito.kotlin.check
import org.mockito.kotlin.eq
import org.mockito.kotlin.mock
import org.mockito.kotlin.never
import org.mockito.kotlin.verify
Expand All @@ -27,11 +28,12 @@ class MetricsAggregatorTest {
var currentTimeMillis: Long = 0
var executorService = DeferredExecutorService()

fun getSut(): MetricsAggregator {
fun getSut(maxWeight: Int = MetricsHelper.MAX_TOTAL_WEIGHT): MetricsAggregator {
return MetricsAggregator(
client,
logger,
dateProvider,
maxWeight,
executorService
)
}
Expand Down Expand Up @@ -308,4 +310,78 @@ class MetricsAggregatorTest {
// and flushing is scheduled again
assertTrue(fixture.executorService.hasScheduledRunnables())
}

@Test
fun `weight is considered for force flushing`() {
// weight is determined by number of buckets + weight of metrics
val aggregator = fixture.getSut(5)

// when 3 values are emitted
for (i in 0 until 3) {
aggregator.distribution(
"name",
i.toDouble(),
null,
null,
fixture.currentTimeMillis,
1
)
}
// no metrics are captured by the client
fixture.executorService.runAll()
verify(fixture.client, never()).captureMetrics(any())

// once we have 4 values and one bucket = weight of 5
aggregator.distribution(
"name",
10.0,
null,
null,
fixture.currentTimeMillis,
1
)
// then flush without force still captures all metrics
fixture.executorService.runAll()
verify(fixture.client).captureMetrics(any())
}

@Test
fun `flushing is immediately scheduled if add operations causes too much weight`() {
fixture.executorService = mock()
val aggregator = fixture.getSut(1)

verify(fixture.executorService, never()).schedule(any(), any())

// when 1 value is emitted
aggregator.distribution(
"name",
1.0,
null,
null,
fixture.currentTimeMillis,
1
)

// flush is immediately scheduled
verify(fixture.executorService).schedule(any(), eq(0))
}

@Test
fun `flushing is deferred scheduled if add operations does not cause too much weight`() {
fixture.executorService = mock()
val aggregator = fixture.getSut(10)

// when 1 value is emitted
aggregator.distribution(
"name",
1.0,
null,
null,
fixture.currentTimeMillis,
1
)

// flush is scheduled for later
verify(fixture.executorService).schedule(any(), eq(MetricsHelper.FLUSHER_SLEEP_TIME_MS))
}
}