Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement telemetry for CI Visibility #6664

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package datadog.trace.civisibility.telemetry;

import datadog.trace.api.civisibility.telemetry.CiVisibilityCountMetric;
import datadog.trace.api.civisibility.telemetry.CiVisibilityDistributionMetric;
import datadog.trace.api.civisibility.telemetry.CiVisibilityMetricCollector;
import datadog.trace.api.civisibility.telemetry.CiVisibilityMetricData;
import datadog.trace.api.civisibility.telemetry.TagValue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CiVisibilityMetricCollectorImpl implements CiVisibilityMetricCollector {

private static final Logger log = LoggerFactory.getLogger(CiVisibilityMetricCollectorImpl.class);

private static final int COUNTER_CARD_SIZE = 64;

private final BlockingQueue<CiVisibilityMetricData> rawMetricsQueue;
private final BlockingQueue<DistributionSeriesPoint> rawDistributionPointsQueue;
private final AtomicLongArray counters;
/**
* Cards are used to avoid iterating over the entire {@link
* CiVisibilityMetricCollectorImpl#counters} array every time {@link
* CiVisibilityMetricCollectorImpl#prepareMetrics} is called.
*
* <p>Every card corresponds to {@link CiVisibilityMetricCollectorImpl#COUNTER_CARD_SIZE} elements
* of the counters array. If a card is "dirty" (set to {@code true}), it means one or more of the
* counters that it "covers" was modified. If it is not, then none of the corresponding counters
* were touched, and iterating that part of the array can be skipped.
*/
private final AtomicBoolean[] counterDirtyCards;

public CiVisibilityMetricCollectorImpl() {
this(
new ArrayBlockingQueue<>(RAW_QUEUE_SIZE),
new ArrayBlockingQueue<>(RAW_QUEUE_SIZE),
CiVisibilityCountMetric.count());
}

CiVisibilityMetricCollectorImpl(
final BlockingQueue<CiVisibilityMetricData> rawMetricsQueue,
final BlockingQueue<DistributionSeriesPoint> rawDistributionPointsQueue,
final int countersTotal) {
this.rawMetricsQueue = rawMetricsQueue;
this.rawDistributionPointsQueue = rawDistributionPointsQueue;
this.counters = new AtomicLongArray(countersTotal);

counterDirtyCards = new AtomicBoolean[(countersTotal - 1) / COUNTER_CARD_SIZE + 1];
for (int i = 0; i < counterDirtyCards.length; i++) {
counterDirtyCards[i] = new AtomicBoolean(false);
}
}

@Override
public void add(CiVisibilityDistributionMetric metric, int value, TagValue... tags) {
DistributionSeriesPoint point = metric.createDataPoint(value, tags);
if (!rawDistributionPointsQueue.offer(point)) {
log.debug(
"Discarding metric {}:{}:{} because the queue is full",
metric,
value,
Arrays.toString(tags));
}
}

@Override
public Collection<DistributionSeriesPoint> drainDistributionSeries() {
if (!this.rawDistributionPointsQueue.isEmpty()) {
List<DistributionSeriesPoint> drained =
new ArrayList<>(this.rawDistributionPointsQueue.size());
this.rawDistributionPointsQueue.drainTo(drained);
return drained;
}
return Collections.emptyList();
}

@Override
public void add(CiVisibilityCountMetric metric, long value, TagValue... tags) {
int counterIdx = metric.getIndex(tags);
counters.getAndAdd(counterIdx, value);

int dirtyCardIdx = counterIdx / COUNTER_CARD_SIZE;
counterDirtyCards[dirtyCardIdx].set(true);
}

@Override
public void prepareMetrics() {
int metricIdx = 0;
CiVisibilityCountMetric[] countMetrics = CiVisibilityCountMetric.values();

for (int dirtyCardIdx = 0; dirtyCardIdx < counterDirtyCards.length; dirtyCardIdx++) {
boolean dirty = counterDirtyCards[dirtyCardIdx].getAndSet(false);
if (!dirty) {
// none of the counters in this card was touched
continue;
}

int beginIdx = dirtyCardIdx * COUNTER_CARD_SIZE;
int endIdx = Math.min(beginIdx + COUNTER_CARD_SIZE, counters.length());
for (int counterIdx = beginIdx; counterIdx < endIdx; counterIdx++) {
while (countMetrics[metricIdx].getEndIndex() <= counterIdx) {
metricIdx++;
}

long counter = counters.getAndSet(counterIdx, 0);
if (counter == 0) {
continue;
}

CiVisibilityCountMetric metric = countMetrics[metricIdx];
TagValue[] tagValues = metric.getTagValues(counterIdx);
CiVisibilityMetricData metricData = metric.createData(counter, tagValues);
if (!rawMetricsQueue.offer(metricData)) {
// re-updating the counter to avoid losing metric data
add(metric, counter, tagValues);
return;
}
}
}
}

@Override
public Collection<CiVisibilityMetricData> drain() {
if (!this.rawMetricsQueue.isEmpty()) {
List<CiVisibilityMetricData> drained = new ArrayList<>(this.rawMetricsQueue.size());
this.rawMetricsQueue.drainTo(drained);
return drained;
}
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package datadog.trace.civisibility.telemetry

import datadog.trace.api.civisibility.telemetry.CiVisibilityCountMetric
import datadog.trace.api.civisibility.telemetry.CiVisibilityDistributionMetric
import datadog.trace.api.civisibility.telemetry.CiVisibilityMetricData
import datadog.trace.api.civisibility.telemetry.TagValue
import datadog.trace.api.civisibility.telemetry.tag.Endpoint
import datadog.trace.api.civisibility.telemetry.tag.EventType
import datadog.trace.api.civisibility.telemetry.tag.Library
import datadog.trace.api.telemetry.MetricCollector
import spock.lang.Specification

class CiVisibilityMetricCollectorTest extends Specification {

def "test distribution metric submission"() {
setup:
def collector = new CiVisibilityMetricCollectorImpl()

when:
collector.add(CiVisibilityDistributionMetric.ENDPOINT_PAYLOAD_BYTES, 123, Endpoint.CODE_COVERAGE)
collector.prepareMetrics()
def metrics = collector.drainDistributionSeries()

then:
metrics == [
new MetricCollector.DistributionSeriesPoint(CiVisibilityDistributionMetric.ENDPOINT_PAYLOAD_BYTES.getName(), true, "civisibility", 123, [Endpoint.CODE_COVERAGE.asString()])
]
}

def "test distribution metrics are not aggregated"() {
setup:
def collector = new CiVisibilityMetricCollectorImpl()

when:
collector.add(CiVisibilityDistributionMetric.ENDPOINT_PAYLOAD_BYTES, 123, Endpoint.CODE_COVERAGE)
collector.add(CiVisibilityDistributionMetric.ENDPOINT_PAYLOAD_BYTES, 456, Endpoint.CODE_COVERAGE)
collector.prepareMetrics()
def metrics = collector.drainDistributionSeries()

then:
metrics == [
new MetricCollector.DistributionSeriesPoint(CiVisibilityDistributionMetric.ENDPOINT_PAYLOAD_BYTES.getName(), true, "civisibility", 123, [Endpoint.CODE_COVERAGE.asString()]),
new MetricCollector.DistributionSeriesPoint(CiVisibilityDistributionMetric.ENDPOINT_PAYLOAD_BYTES.getName(), true, "civisibility", 456, [Endpoint.CODE_COVERAGE.asString()])
]
}

def "test count metric submission"() {
setup:
def collector = new CiVisibilityMetricCollectorImpl()

when:
collector.add(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS, 123)
collector.prepareMetrics()
def metrics = collector.drain()

then:
metrics == [new CiVisibilityMetricData(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS.getName(), 123)]
}

def "test count metric aggregation"() {
setup:
def collector = new CiVisibilityMetricCollectorImpl()

when:
collector.add(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS, 123)
collector.add(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS, 123)
collector.prepareMetrics()
def metrics = collector.drain()

then:
metrics == [new CiVisibilityMetricData(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS.getName(), 246)]
}

def "test count metrics submitted in different cycles are not aggregated"() {
setup:
def collector = new CiVisibilityMetricCollectorImpl()

when:
collector.add(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS, 123)
collector.prepareMetrics()

collector.add(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS, 123)
collector.prepareMetrics()

def metrics = collector.drain()

then:
metrics == [
new CiVisibilityMetricData(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS.getName(), 246),
new CiVisibilityMetricData(CiVisibilityCountMetric.CODE_COVERAGE_ERRORS.getName(), 246)
]
}

def "test count metrics with different tags are not aggregated"() {
setup:
def collector = new CiVisibilityMetricCollectorImpl()

when:
collector.add(CiVisibilityCountMetric.ITR_FORCED_RUN, 123, EventType.MODULE)
collector.add(CiVisibilityCountMetric.ITR_FORCED_RUN, 456, EventType.SESSION)
collector.prepareMetrics()

def metrics = collector.drain()

then:
metrics == [
new CiVisibilityMetricData(CiVisibilityCountMetric.ITR_FORCED_RUN.getName(), 123, EventType.MODULE),
new CiVisibilityMetricData(CiVisibilityCountMetric.ITR_FORCED_RUN.getName(), 456, EventType.SESSION)
]
}

def "test different count metrics are not aggregated"() {
setup:
def collector = new CiVisibilityMetricCollectorImpl()

when:
collector.add(CiVisibilityCountMetric.ITR_FORCED_RUN, 123, EventType.MODULE)
collector.add(CiVisibilityCountMetric.ITR_SKIPPED, 456, EventType.MODULE)
collector.prepareMetrics()

def metrics = collector.drain()

then:
metrics.size() == 2
metrics.contains(new CiVisibilityMetricData(CiVisibilityCountMetric.ITR_FORCED_RUN.getName(), 123, EventType.MODULE))
metrics.contains(new CiVisibilityMetricData(CiVisibilityCountMetric.ITR_SKIPPED.getName(), 456, EventType.MODULE))
}

def "test exception is thrown when a distribution metric is tagged with a tag that is not allowed for it"() {
setup:
def collector = new CiVisibilityMetricCollectorImpl()

when:
collector.add(CiVisibilityDistributionMetric.GIT_COMMAND_MS, 123, Library.JACOCO)

then:
thrown IllegalArgumentException
}

def "test exception is thrown when a count metric is tagged with a tag that is not allowed for it"() {
setup:
def collector = new CiVisibilityMetricCollectorImpl()

when:
collector.add(CiVisibilityCountMetric.ITR_FORCED_RUN, 123, Library.JACOCO)

then:
thrown IllegalArgumentException
}

/**
* This test enumerates all possible metric+tags variants,
* then tries submitting all possible variant pairs (combinations of 2 different metric+tags).
* The goal is to ensure that index calculation logic and card-marking are done right.
*/
def "test submission of all possible count metric pairs"() {
setup:
List<PossibleMetric> possibleMetrics = []

for (CiVisibilityCountMetric metric : CiVisibilityCountMetric.values()) {
def metricTags = metric.getTags()
// iterate over all possible combinations of metric tags
for (TagValue[] tags : cartesianProduct(metricTags)) {
possibleMetrics += new PossibleMetric(metric, tags)
}
}

def collector = new CiVisibilityMetricCollectorImpl()

expect:
for (int i = 0; i < possibleMetrics.size() - 1; i++) {
def firstMetric = possibleMetrics.get(i)
for (int j = i + 1; j < possibleMetrics.size(); j++) {
def secondMetric = possibleMetrics.get(j)

// deriving metric values from indices (as indices are unique, it's convenient to check that every metric has the correct value when drained)
// +1 is needed because 0 cannot be used as a metric value - metrics with value 0 are not submitted
int firstMetricValue = i + 1
int secondMetricValue = j + 1

collector.add(firstMetric.metric, firstMetricValue, firstMetric.tags)
collector.add(secondMetric.metric, secondMetricValue, secondMetric.tags)
collector.prepareMetrics()

def metrics = collector.drain()
assert metrics.size() == 2
assert metrics.contains(new CiVisibilityMetricData(firstMetric.metric.getName(), firstMetricValue, firstMetric.tags))
assert metrics.contains(new CiVisibilityMetricData(secondMetric.metric.getName(), secondMetricValue, secondMetric.tags))
}
}
}

private Collection<TagValue[]> cartesianProduct(Class<? extends TagValue>[] sets) {
Collection<TagValue[]> tuples = new ArrayList<>()
cartesianProductBacktrack(sets, tuples, new ArrayDeque<>(), 0)
return tuples
}

private void cartesianProductBacktrack(Class<? extends TagValue>[] sets, Collection<TagValue[]> tuples, Deque<TagValue> currentTuple, int offset) {
if (offset == sets.length) {
int idx = 0
TagValue[] tuple = new TagValue[currentTuple.size()]
for (TagValue element : currentTuple) {
tuple[tuple.length - ++idx] = element
}
tuples.add(tuple)
return
}

// a branch where we omit current tag
cartesianProductBacktrack(sets, tuples, currentTuple, offset + 1)

for (TagValue element : sets[offset].getEnumConstants()) {
currentTuple.push(element)
cartesianProductBacktrack(sets, tuples, currentTuple, offset + 1)
currentTuple.pop()
}
}

private static final class PossibleMetric {
private final CiVisibilityCountMetric metric
private final TagValue[] tags

PossibleMetric(CiVisibilityCountMetric metric, TagValue[] tags) {
this.metric = metric
this.tags = tags
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public final class CiVisibilityConfig {
public static final String CIVISIBILITY_FLAKY_RETRY_ENABLED = "civisibility.flaky.retry.enabled";
public static final String CIVISIBILITY_FLAKY_RETRY_COUNT = "civisibility.flaky.retry.count";
public static final String CIVISIBILITY_MODULE_NAME = "civisibility.module.name";
public static final String CIVISIBILITY_TELEMETRY_ENABLED = "civisibility.telemetry.enabled";

/* COVERAGE SETTINGS */
public static final String CIVISIBILITY_CODE_COVERAGE_ENABLED =
Expand Down
8 changes: 7 additions & 1 deletion internal-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,14 @@ excludedClassesCoverage += [
"datadog.trace.api.civisibility.coverage.TestReport",
"datadog.trace.api.civisibility.coverage.TestReportFileEntry",
"datadog.trace.api.civisibility.coverage.TestReportFileEntry.Segment",
"datadog.trace.api.civisibility.InstrumentationBridge",
"datadog.trace.api.civisibility.events.BuildEventsHandler.ModuleInfo",
"datadog.trace.api.civisibility.telemetry.tag.ErrorType",
"datadog.trace.api.civisibility.telemetry.tag.ExitCode",
"datadog.trace.api.civisibility.telemetry.CiVisibilityCountMetric.IndexHolder",
"datadog.trace.api.civisibility.telemetry.CiVisibilityDistributionMetric",
"datadog.trace.api.civisibility.telemetry.CiVisibilityMetricData",
"datadog.trace.api.civisibility.telemetry.NoOpMetricCollector",
"datadog.trace.api.civisibility.InstrumentationBridge",
// POJO
"datadog.trace.api.git.GitInfo",
"datadog.trace.api.git.GitInfoProvider",
Expand Down
Loading
Loading