Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import javax.annotation.Nullable;

/** Instrumentation scopes have a mandatory name, optional version, and optional schema URL. */
public final class OtelInstrumentationScope {
public final class OtelInstrumentationScope implements Comparable<OtelInstrumentationScope> {

private final UTF8BytesString scopeName;
@Nullable private final UTF8BytesString scopeVersion;
Expand All @@ -32,6 +32,34 @@ public UTF8BytesString getSchemaUrl() {
return schemaUrl;
}

@Override
public int compareTo(OtelInstrumentationScope that) {
int cmp = scopeName.toString().compareTo(that.scopeName.toString());
if (cmp != 0) {
return cmp;
}
if (scopeVersion != that.scopeVersion) {
if (scopeVersion == null) {
return -1;
} else if (that.scopeVersion == null) {
return 1;
}
cmp = scopeVersion.toString().compareTo(that.scopeVersion.toString());
if (cmp != 0) {
Comment thread
AlexeyKuznetsov-DD marked this conversation as resolved.
return cmp;
}
}
if (schemaUrl != that.schemaUrl) {
if (schemaUrl == null) {
return -1;
} else if (that.schemaUrl == null) {
return 1;
}
return schemaUrl.toString().compareTo(that.schemaUrl.toString());
}
return 0;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof OtelInstrumentationScope)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package datadog.trace.bootstrap.otel.logs.data;

import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
import datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor;
import datadog.trace.bootstrap.otlp.logs.OtlpLogRecord;
import datadog.trace.bootstrap.otlp.logs.OtlpLogsVisitor;
import datadog.trace.bootstrap.otlp.logs.OtlpScopedLogsVisitor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.WeakHashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.function.BiConsumer;

/** Processes log records, grouping them by instrumentation scope. */
public final class OtelLogRecordProcessor {
public static final OtelLogRecordProcessor INSTANCE = new OtelLogRecordProcessor();

private static final Comparator<OtlpLogRecord> BY_SCOPE =
Comparator.comparing(o -> o.instrumentationScope);

private static final Map<ClassLoader, BiConsumer<Map<?, ?>, OtlpAttributeVisitor>>
ATTRIBUTE_READERS = Collections.synchronizedMap(new WeakHashMap<>());

private final Queue<OtlpLogRecord> queue = new ArrayBlockingQueue<>(2048);

public void addLog(OtlpLogRecord logRecord) {
queue.offer(logRecord);
}

public void collectLogs(OtlpLogsVisitor visitor) {
OtlpScopedLogsVisitor scopedVisitor = null;
OtelInstrumentationScope currentScope = null;
BiConsumer<Map<?, ?>, OtlpAttributeVisitor> attributesReader = null;
ClassLoader attributesClassLoader = null;
for (OtlpLogRecord logRecord : batchByScope()) {
if (logRecord.instrumentationScope != currentScope) {
currentScope = logRecord.instrumentationScope;
scopedVisitor = visitor.visitScopedLogs(currentScope);
}
Map<?, ?> attributes = logRecord.attributes;
if (!attributes.isEmpty()) {
ClassLoader cl = getAttributesClassLoader(attributes);
// avoid repeated lookups when attribute class-loader is same for all records
if (attributesReader == null || !Objects.equals(cl, attributesClassLoader)) {
attributesReader = ATTRIBUTE_READERS.get(cl);
attributesClassLoader = cl;
}
if (attributesReader != null) {
attributesReader.accept(attributes, scopedVisitor);
}
}
scopedVisitor.visitLogRecord(logRecord);
}
}

private static ClassLoader getAttributesClassLoader(Map<?, ?> attributes) {
// need to peek at the first key, as the map will be a JDK collection type
return attributes.keySet().iterator().next().getClass().getClassLoader();
}

public static void registerAttributeReader(
ClassLoader cl, BiConsumer<Map<?, ?>, OtlpAttributeVisitor> reader) {
ATTRIBUTE_READERS.put(cl, reader);
}

private List<OtlpLogRecord> batchByScope() {
// capture expected batch size; records emitted after here go into next batch
int batchSize = queue.size();
List<OtlpLogRecord> batch = new ArrayList<>(batchSize);
for (int i = 0; i < batchSize; i++) {
OtlpLogRecord logRecord = queue.poll();
if (logRecord != null) {
batch.add(logRecord);
} else {
break; // should not happen unless another thread is also batching records
}
Comment on lines +75 to +81
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: would it be more human-readable to use something like:

OtlpLogRecord logRecord;
    while ((logRecord = queue.poll()) != null) {
        batch.add(logRecord);
    }

WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would continually grow the batch if another thread is continually adding log records, potentially delaying the export indefinitely. That's why we grab the number of pending records first and then drain only those. Any records added after that initial sizing will be handled by the next scheduled export.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! probably make sense to add a comment for future generations? :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
batch.sort(BY_SCOPE);
return batch;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
/** A visitor to visit OpenTelemetry attributes. */
public interface OtlpAttributeVisitor {

int STRING = 0; // AttributeType.STRING
int BOOLEAN = 1; // AttributeType.BOOLEAN
int LONG = 2; // AttributeType.LONG
int DOUBLE = 3; // AttributeType.DOUBLE
int STRING_ARRAY = 4; // AttributeType.STRING_ARRAY
int BOOLEAN_ARRAY = 5; // AttributeType.BOOLEAN_ARRAY
int LONG_ARRAY = 6; // AttributeType.LONG_ARRAY
int DOUBLE_ARRAY = 7; // AttributeType.DOUBLE_ARRAY
int STRING_ATTRIBUTE = 0; // AttributeType.STRING
int BOOLEAN_ATTRIBUTE = 1; // AttributeType.BOOLEAN
int LONG_ATTRIBUTE = 2; // AttributeType.LONG
int DOUBLE_ATTRIBUTE = 3; // AttributeType.DOUBLE
int STRING_ARRAY_ATTRIBUTE = 4; // AttributeType.STRING_ARRAY
int BOOLEAN_ARRAY_ATTRIBUTE = 5; // AttributeType.BOOLEAN_ARRAY
int LONG_ARRAY_ATTRIBUTE = 6; // AttributeType.LONG_ARRAY
int DOUBLE_ARRAY_ATTRIBUTE = 7; // AttributeType.DOUBLE_ARRAY

/**
* Visits an attribute.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package datadog.trace.bootstrap.otlp.logs;

import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;
import java.util.Map;
import javax.annotation.Nullable;

public final class OtlpLogRecord {
Comment thread
mcculls marked this conversation as resolved.

public static final int STRING_BODY = 0; // ValueType.STRING
public static final int BOOLEAN_BODY = 1; // ValueType.BOOLEAN
public static final int LONG_BODY = 2; // ValueType.LONG
public static final int DOUBLE_BODY = 3; // ValueType.DOUBLE
public static final int ARRAY_BODY = 4; // ValueType.ARRAY
public static final int KEY_VALUE_LIST_BODY = 5; // ValueType.KEY_VALUE_LIST
public static final int BYTES_BODY = 6; // ValueType.BYTES

public final OtelInstrumentationScope instrumentationScope;

public final long timestampNanos;
public final long observedNanos;
public final int severityNumber;
@Nullable public final String severityText;
public final int bodyType;
@Nullable public final Object bodyValue;
@Nullable public final String eventName;
public final Map<?, ?> attributes;
@Nullable public final AgentSpanContext spanContext;

public OtlpLogRecord(
OtelInstrumentationScope instrumentationScope,
long timestampNanos,
long observedNanos,
int severityNumber,
@Nullable String severityText,
int bodyType,
@Nullable Object bodyValue,
@Nullable String eventName,
Map<?, ?> attributes,
@Nullable AgentSpanContext spanContext) {
this.instrumentationScope = instrumentationScope;
this.timestampNanos = timestampNanos;
this.observedNanos = observedNanos;
this.severityNumber = severityNumber;
this.severityText = severityText;
this.bodyType = bodyType;
this.bodyValue = bodyValue;
this.eventName = eventName;
this.attributes = attributes;
this.spanContext = spanContext;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package datadog.trace.bootstrap.otlp.logs;

import datadog.trace.bootstrap.otel.common.OtelInstrumentationScope;

/** A visitor to visit OpenTelemetry logs. */
public interface OtlpLogsVisitor {
Comment thread
mcculls marked this conversation as resolved.
/** Visits logs produced by an instrumentation scope. */
OtlpScopedLogsVisitor visitScopedLogs(OtelInstrumentationScope scope);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package datadog.trace.bootstrap.otlp.logs;

import datadog.trace.bootstrap.otlp.common.OtlpAttributeVisitor;

/** A visitor to visit log records produced by an instrumentation scope. */
public interface OtlpScopedLogsVisitor extends OtlpAttributeVisitor {

/** Visits an attribute of the upcoming log record. */
void visitAttribute(int type, String key, Object value);

/** Visits a log record. */
void visitLogRecord(OtlpLogRecord logRecord);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package datadog.opentelemetry.shim.logs;

import static datadog.opentelemetry.shim.trace.OtelExtractedContext.extract;
import static datadog.trace.bootstrap.otlp.logs.OtlpLogRecord.STRING_BODY;
import static io.opentelemetry.api.common.AttributeKey.stringKey;

import datadog.trace.api.time.SystemTimeSource;
import datadog.trace.api.time.TimeSource;
import datadog.trace.bootstrap.otel.logs.data.OtelLogRecordProcessor;
import datadog.trace.bootstrap.otlp.logs.OtlpLogRecord;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Value;
import io.opentelemetry.api.logs.LogRecordBuilder;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.context.Context;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;

@ParametersAreNonnullByDefault
final class OtelLogRecordBuilder implements LogRecordBuilder {
// package-visible for testing
static TimeSource TIME_SOURCE = SystemTimeSource.INSTANCE;

private static final AttributeKey<String> EXCEPTION_TYPE_KEY = stringKey("exception.type");
private static final AttributeKey<String> EXCEPTION_MESSAGE_KEY = stringKey("exception.message");

private final OtelLogger logger;

private long timestampNanos;
private long observedNanos;
private Severity severity = Severity.UNDEFINED_SEVERITY_NUMBER;
@Nullable private String severityText;
private int bodyType;
@Nullable private Object bodyValue;
@Nullable private String eventName;
@Nullable private Map<AttributeKey<?>, Object> attributes;
@Nullable private Context context;

private boolean attributesEmitted;

OtelLogRecordBuilder(OtelLogger logger) {
this.logger = logger;
}

@Override
public LogRecordBuilder setTimestamp(long timestamp, TimeUnit unit) {
this.timestampNanos = unit.toNanos(timestamp);
return this;
}

@Override
public LogRecordBuilder setTimestamp(Instant instant) {
this.timestampNanos = TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano();
return this;
}

@Override
public LogRecordBuilder setObservedTimestamp(long timestamp, TimeUnit unit) {
this.observedNanos = unit.toNanos(timestamp);
return this;
}

@Override
public LogRecordBuilder setObservedTimestamp(Instant instant) {
this.observedNanos = TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano();
return this;
}

@Override
public LogRecordBuilder setSeverity(Severity severity) {
this.severity = severity;
return this;
}

@Override
public LogRecordBuilder setSeverityText(String severityText) {
this.severityText = severityText;
return this;
}

@Override
public LogRecordBuilder setBody(String value) {
this.bodyType = STRING_BODY;
this.bodyValue = value;
return this;
}

@Override
public LogRecordBuilder setBody(Value<?> body) {
this.bodyType = body.getType().ordinal();
this.bodyValue = body.getValue();
return this;
}

@Override
public <T> LogRecordBuilder setAttribute(@Nullable AttributeKey<T> key, @Nullable T value) {
if (key == null || key.getKey().isEmpty()) {
return this;
}
if (attributesEmitted && attributes != null) {
// defensive copy if builder used after emit
attributes = new HashMap<>(attributes);
attributesEmitted = false;
}
if (value != null) {
if (attributes == null) {
attributes = new HashMap<>();
}
attributes.put(key, value);
} else if (attributes != null) {
attributes.remove(key);
}
return this;
}

@Override
public LogRecordBuilder setContext(Context context) {
this.context = context;
return this;
}

public LogRecordBuilder setEventName(String eventName) {
this.eventName = eventName;
return this;
}

public LogRecordBuilder setException(@Nullable Throwable throwable) {
if (throwable != null) {
setExceptionAttribute(EXCEPTION_TYPE_KEY, throwable.getClass().getName());
setExceptionAttribute(EXCEPTION_MESSAGE_KEY, throwable.getMessage());
}
return this;
}

private void setExceptionAttribute(AttributeKey<String> key, @Nullable String value) {
// avoid overwriting/removing existing exception details
if (value != null && (attributes == null || !attributes.containsKey(key))) {
setAttribute(key, value);
}
}

@Override
public void emit() {
attributesEmitted = true;
Context context = this.context != null ? this.context : Context.current();
if (logger.isEnabled(severity, context)) {
OtelLogRecordProcessor.INSTANCE.addLog(
new OtlpLogRecord(
logger.instrumentationScope,
timestampNanos,
observedNanos != 0 ? observedNanos : TIME_SOURCE.getCurrentTimeNanos(),
severity.getSeverityNumber(),
severityText,
bodyType,
bodyValue,
eventName,
attributes != null ? attributes : Collections.emptyMap(),
extract(context)));
}
}
}
Loading
Loading