Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics Part 1 - Add Counter Metric API #1940

Merged
merged 16 commits into from Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
48 changes: 48 additions & 0 deletions dart/lib/src/hub.dart
Expand Up @@ -2,6 +2,9 @@ import 'dart:async';
import 'dart:collection';

import 'package:meta/meta.dart';
import 'metrics/metric.dart';
import 'metrics/metrics_aggregator.dart';
import 'metrics/metrics_api.dart';
import 'profiling.dart';
import 'propagation_context.dart';
import 'transport/data_category.dart';
Expand Down Expand Up @@ -38,6 +41,14 @@ class Hub {

late final _WeakMap _throwableToSpan;

late final MetricsApi _metricsApi;

@internal
MetricsApi get metricsApi => _metricsApi;

@internal
MetricsAggregator? get metricsAggregator => _peek().client.metricsAggregator;

factory Hub(SentryOptions options) {
_validateOptions(options);

Expand All @@ -49,6 +60,7 @@ class Hub {
_stack.add(_StackItem(_getClient(_options), Scope(_options)));
_isEnabled = true;
_throwableToSpan = _WeakMap(_options);
_metricsApi = MetricsApi(hub: this);
}

static void _validateOptions(SentryOptions options) {
Expand Down Expand Up @@ -554,6 +566,42 @@ class Hub {
return sentryId;
}

@internal
Future<SentryId> captureMetrics(
Map<int, Iterable<Metric>> metricsBuckets) async {
var sentryId = SentryId.empty();

if (!_isEnabled) {
_options.logger(
SentryLevel.warning,
"Instance is disabled and this 'captureMetrics' call is a no-op.",
);
} else if (!_options.enableMetrics) {
_options.logger(
SentryLevel.info,
"Metrics are disabled and this 'captureMetrics' call is a no-op.",
);
} else if (metricsBuckets.isEmpty) {
_options.logger(
SentryLevel.info,
"Metrics are empty and this 'captureMetrics' call is a no-op.",
);
} else {
final item = _peek();
try {
sentryId = await item.client.captureMetrics(metricsBuckets);
} catch (exception, stackTrace) {
_options.logger(
SentryLevel.error,
'Error while capturing metrics.',
exception: exception,
stackTrace: stackTrace,
);
}
}
return sentryId;
}

@internal
void setSpanContext(
dynamic throwable,
Expand Down
15 changes: 15 additions & 0 deletions dart/lib/src/hub_adapter.dart
Expand Up @@ -4,6 +4,9 @@ import 'package:meta/meta.dart';
import 'hint.dart';

import 'hub.dart';
import 'metrics/metric.dart';
import 'metrics/metrics_aggregator.dart';
import 'metrics/metrics_api.dart';
import 'profiling.dart';
import 'protocol.dart';
import 'scope.dart';
Expand All @@ -23,6 +26,10 @@ class HubAdapter implements Hub {
@internal
SentryOptions get options => Sentry.currentHub.options;

@override
@internal
MetricsApi get metricsApi => Sentry.currentHub.metricsApi;

factory HubAdapter() {
return _instance;
}
Expand Down Expand Up @@ -181,4 +188,12 @@ class HubAdapter implements Hub {

@override
Scope get scope => Sentry.currentHub.scope;

@override
Future<SentryId> captureMetrics(Map<int, Iterable<Metric>> metricsBuckets) =>
Sentry.currentHub.captureMetrics(metricsBuckets);

@override
MetricsAggregator? get metricsAggregator =>
Sentry.currentHub.metricsAggregator;
}
132 changes: 132 additions & 0 deletions dart/lib/src/metrics/metric.dart
@@ -0,0 +1,132 @@
import 'package:meta/meta.dart';

import '../../sentry.dart';

final RegExp forbiddenKeyCharsRegex = RegExp('[^a-zA-Z0-9_/.-]+');
final RegExp forbiddenValueCharsRegex =
RegExp('[^\\w\\d\\s_:/@\\.\\{\\}\\[\\]\$-]+');
final RegExp forbiddenUnitCharsRegex = RegExp('[^a-zA-Z0-9_/.]+');

/// Base class for metrics.
/// Each metric is identified by a [key]. Its [type] describes its behaviour.
/// A [unit] (defaults to [SentryMeasurementUnit.none]) describes the values
/// being tracked. Optional [tags] can be added. The [timestamp] is the time
/// when the metric was emitted.
@internal
abstract class Metric {
final MetricType type;
final String key;
final SentryMeasurementUnit unit;
final Map<String, String> tags;

Metric({
required this.type,
required this.key,
required this.unit,
required this.tags,
});

/// Add a value to the metric.
add(double value);

/// Serialize the value into a list of Objects to be converted into a String.
Iterable<Object> _serializeValue();

/// Encodes the metric in the statsd format
/// See <a href="https://github.com/statsd/statsd#usage">github.com/statsd/statsd#usage</a> and
/// <a href="https://getsentry.github.io/relay/relay_metrics/index.html">getsentry.github.io/relay/relay_metrics/index.html</a>
/// for more details about the format.
///
/// Example format: key@none:1|c|#myTag:myValue|T1710844170
/// key@unit:value1:value2|type|#tagKey1:tagValue1,tagKey2:tagValue2,|TbucketKey
///
/// [bucketKey] is the key of the metric bucket that will be sent to Sentry,
/// and it's appended at the end of the encoded metric.
String encodeToStatsd(int bucketKey) {
final StringBuffer buffer = StringBuffer();
stefanosiano marked this conversation as resolved.
Show resolved Hide resolved
buffer.write(_normalizeKey(key));
buffer.write("@");

final String sanitizeUnitName = _sanitizeUnit(unit.name);
stefanosiano marked this conversation as resolved.
Show resolved Hide resolved
buffer.write(sanitizeUnitName);

for (Object value in _serializeValue()) {
buffer.write(":");
buffer.write(value.toString());
}

buffer.write("|");
buffer.write(type.statsdType);

if (tags.isNotEmpty) {
buffer.write("|#");
String serializedTags = tags.entries
.map((tag) =>
'${_normalizeKey(tag.key)}:${_normalizeTagValue(tag.value)}')
.join(',');
buffer.write(serializedTags);
}

buffer.write("|T");
buffer.write(bucketKey);

return buffer.toString();
}

/// Return a key created by [key], [type], [unit] and [tags].
/// This key should be used to retrieve the metric to update in aggregation.
String getCompositeKey() {
final String serializedTags = tags.entries.map((e) {
// We escape the ',' from the key and the value, as we will join the tags
// with a ',' to create the composite key.
String escapedKey = e.key.replaceAll(',', '\\,');
String escapedValue = e.value.replaceAll(',', '\\,');
return '$escapedKey=$escapedValue';
}).join(',');

return ('${type.statsdType}_${key}_${unit.name}_$serializedTags');
}

/// Remove forbidden characters from the metric key and tag key.
String _normalizeKey(String input) =>
input.replaceAll(forbiddenKeyCharsRegex, '_');

/// Remove forbidden characters from the tag value.
String _normalizeTagValue(String input) =>
input.replaceAll(forbiddenValueCharsRegex, '');

/// Remove forbidden characters from the metric unit.
String _sanitizeUnit(String input) =>
input.replaceAll(forbiddenUnitCharsRegex, '_');
}

@internal

/// Metric [MetricType.counter] that track a value that can only be incremented.
stefanosiano marked this conversation as resolved.
Show resolved Hide resolved
class CounterMetric extends Metric {
double value;

CounterMetric({
required this.value,
required super.key,
required super.unit,
required super.tags,
}) : super(type: MetricType.counter);

@override
add(double value) => this.value += value;

@override
Iterable<Object> _serializeValue() => [value];
}

@internal

/// The metric type and its associated statsd encoded value.
enum MetricType {
counter('c');
denrase marked this conversation as resolved.
Show resolved Hide resolved

final String statsdType;

const MetricType(this.statsdType);
}
147 changes: 147 additions & 0 deletions dart/lib/src/metrics/metrics_aggregator.dart
@@ -0,0 +1,147 @@
import 'dart:async';
import 'dart:collection';
import 'dart:math';

import 'package:meta/meta.dart';

import '../../sentry.dart';
import 'metric.dart';

/// Class that aggregates all metrics into time buckets and sends them.
@internal
class MetricsAggregator {
static const int _rollupInSeconds = 10;
final Duration _flushInterval;
final int _flushShiftMs;
final SentryOptions _options;
final Hub _hub;
bool _isClosed = false;
@visibleForTesting
Completer<void>? flushCompleter;

/// The key for this map is the timestamp of the bucket, rounded down to the
/// nearest RollupInSeconds. So it aggregates all the metrics over a certain
/// time period. The Value is a map of the metrics, each of which has a key
/// that uniquely identifies it within the time period.
/// The [SplayTreeMap] is used so that bucket keys are ordered.
final SplayTreeMap<int, Map<String, Metric>> _buckets = SplayTreeMap();

MetricsAggregator({
required SentryOptions options,
Hub? hub,
@visibleForTesting Duration flushInterval = const Duration(seconds: 5),
@visibleForTesting int? flushShiftMs,
}) : _options = options,
_hub = hub ?? HubAdapter(),
_flushInterval = flushInterval,
_flushShiftMs = flushShiftMs ??
(Random().nextDouble() * (_rollupInSeconds * 1000)).toInt();

/// Creates or update an existing Counter metric with [value].
/// The metric to update is identified using [key], [unit] and [tags].
/// The [timestamp] represents when the metric was emitted.
void increment(
String key,
double value,
SentryMeasurementUnit unit,
Map<String, String> tags,
) {
if (_isClosed) {
return;
}

final int bucketKey = _getBucketKey(_options.clock());
stefanosiano marked this conversation as resolved.
Show resolved Hide resolved
final Map<String, Metric> bucket =
_buckets.putIfAbsent(bucketKey, () => {});
final Metric metric =
CounterMetric(value: value, key: key, unit: unit, tags: tags);

// Update the existing metric in the bucket.
// If absent, add the newly created metric to the bucket.
bucket.update(
metric.getCompositeKey(),
(m) => m..add(value),
ifAbsent: () => metric,
);

// Schedule the metrics flushing.
_scheduleFlush();
}

Future<void> _scheduleFlush() async {
if (!_isClosed &&
_buckets.isNotEmpty &&
flushCompleter?.isCompleted != false) {
flushCompleter = Completer();

await flushCompleter?.future
.timeout(_flushInterval, onTimeout: _flushMetrics);
}
}

/// Flush the metrics, then schedule next flush again.
void _flushMetrics() async {
await _flush();

flushCompleter?.complete(null);
flushCompleter = null;
await _scheduleFlush();
}

/// Flush and sends metrics.
Future<void> _flush() async {
final Iterable<int> flushableBucketKeys = _getFlushableBucketKeys();
if (flushableBucketKeys.isEmpty) {
_options.logger(SentryLevel.debug, 'Metrics: nothing to flush');
return;
}

final Map<int, Iterable<Metric>> bucketsToFlush = {};
int numMetrics = 0;

for (int flushableBucketKey in flushableBucketKeys) {
final Map<String, Metric>? bucket = _buckets.remove(flushableBucketKey);
if (bucket != null) {
numMetrics += bucket.length;
bucketsToFlush[flushableBucketKey] = bucket.values;
}
}

if (numMetrics == 0) {
_options.logger(SentryLevel.debug, 'Metrics: only empty buckets found');
return;
}

_options.logger(SentryLevel.debug, 'Metrics: capture $numMetrics metrics');
await _hub.captureMetrics(bucketsToFlush);
}

/// Return a list of bucket keys to flush.
List<int> _getFlushableBucketKeys() {
// Flushable buckets are all buckets with timestamp lower than the current
// one (so now - rollupInSeconds), minus a random duration (flushShiftMs).
final DateTime maxTimestampToFlush = _options.clock().subtract(Duration(
seconds: _rollupInSeconds,
milliseconds: _flushShiftMs,
denrase marked this conversation as resolved.
Show resolved Hide resolved
));
final int maxKeyToFlush = _getBucketKey(maxTimestampToFlush);

// takeWhile works because we use a SplayTreeMap and keys are ordered.
// toList() is needed because takeWhile is lazy and we want to remove items
// from the buckets with these keys.
return _buckets.keys.takeWhile((value) => value <= maxKeyToFlush).toList();
}

/// The timestamp of the bucket, rounded down to the nearest RollupInSeconds.
int _getBucketKey(DateTime timestamp) {
final int seconds = timestamp.millisecondsSinceEpoch ~/ 1000;
return (seconds ~/ _rollupInSeconds) * _rollupInSeconds;
}

@visibleForTesting
SplayTreeMap<int, Map<String, Metric>> get buckets => _buckets;

void close() {
_isClosed = true;
}
}