Skip to content
Closed
15 changes: 15 additions & 0 deletions hadoop-common-project/hadoop-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,21 @@
<artifactId>lz4-java</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-extension-autoconfigure</artifactId>
<version>${opentelemetry.version}-alpha</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,9 @@ export HADOOP_OS_TYPE=${HADOOP_OS_TYPE:-$(uname -s)}
# By default, Hadoop uses jsvc which needs to know to launch a
# server jvm.
# export HADOOP_REGISTRYDNS_SECURE_EXTRA_OPTS="-jvm server"

###
# Opentelemetry Trace Exporters Configuration
###
export OTEL_TRACES_EXPORTER=none
export OTEL_METRICS_EXPORTER=none
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,54 @@
*/
package org.apache.hadoop.tracing;

import io.opentelemetry.context.Scope;

import java.io.Closeable;

public class Span implements Closeable {

private io.opentelemetry.api.trace.Span span = null;
public Span() {
}

public Span(io.opentelemetry.api.trace.Span span){
this.span = span;
}

public Span addKVAnnotation(String key, String value) {
if(span != null){
span.setAttribute(key, value);
}
return this;
}

public Span addTimelineAnnotation(String msg) {
if(span != null){
span.addEvent(msg);
}
return this;
}

public SpanContext getContext() {
if(span != null){
return new SpanContext(span.getSpanContext());
}
return null;
}

public void finish() {
close();
}

public void close() {
if(span != null){
span.end();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set span to null, presumably

}
}

public Scope makeCurrent() {
if(span != null){
return span.makeCurrent();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,69 @@
package org.apache.hadoop.tracing;

import java.io.Closeable;
import java.util.HashMap;
import java.util.Map;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.api.trace.TraceStateBuilder;
import org.apache.hadoop.thirdparty.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Wrapper class for SpanContext to avoid using OpenTracing/OpenTelemetry
* SpanContext class directly for better separation.
*/
public class SpanContext implements Closeable {
public SpanContext() {
public class SpanContext implements Closeable {
public static final Logger LOG = LoggerFactory.getLogger(SpanContext.class.getName());
private static final String TRACE_ID = "TRACE_ID";
private static final String SPAN_ID = "SPAN_ID";
private static final String TRACE_FLAGS = "TRACE_FLAGS";


private io.opentelemetry.api.trace.SpanContext spanContext = null;
public SpanContext(io.opentelemetry.api.trace.SpanContext spanContext) {
this.spanContext = spanContext;
}

public void close() {

}

public Map<String, String> getKVSpanContext(){
if(spanContext != null){
//TODO: may we should move this to Proto
Map<String, String> kvMap = new HashMap<>();
kvMap.put(TRACE_ID, spanContext.getTraceId());
kvMap.put(SPAN_ID, spanContext.getSpanId());
kvMap.put(TRACE_FLAGS, spanContext.getTraceFlags().asHex());
kvMap.putAll(spanContext.getTraceState().asMap());
return kvMap;
}
return null;
}

static SpanContext buildFromKVMap(Map<String, String> kvMap){
String traceId = kvMap.get(TRACE_ID);
kvMap.remove(TRACE_ID);
String spanId = kvMap.get(SPAN_ID);
kvMap.remove(SPAN_ID);
String traceFlagsHex = kvMap.get(TRACE_FLAGS);
kvMap.remove(TRACE_FLAGS);
TraceFlags traceFlags = TraceFlags.fromHex(traceFlagsHex, 0);
TraceStateBuilder traceStateBuilder = TraceState.builder();
for(Map.Entry<String, String> keyValue: kvMap.entrySet()){
traceStateBuilder.put(keyValue.getKey(), keyValue.getValue());
}
TraceState traceState = traceStateBuilder.build();
io.opentelemetry.api.trace.SpanContext spanContext = io.opentelemetry.api.trace.SpanContext.createFromRemoteParent(traceId, spanId, traceFlags, traceState );

return new SpanContext(spanContext);
}

public io.opentelemetry.api.trace.SpanContext getSpanContext() {
return spanContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,32 @@
*/
package org.apache.hadoop.tracing;

import io.opentelemetry.context.Scope;

import java.io.Closeable;

public class TraceScope implements Closeable {
Span span;

Scope scope;
public TraceScope(Span span) {
this.span = span;
if(span != null){
this.scope = span.makeCurrent();
}
}

// Add tag to the span
public void addKVAnnotation(String key, String value) {
span.addKVAnnotation(key, value);
}

public void addKVAnnotation(String key, Number value) {
span.addKVAnnotation(key, value.toString());
}


public void addTimelineAnnotation(String msg) {
span.addTimelineAnnotation(msg);
}

public Span span() {
Expand All @@ -51,6 +60,9 @@ public void detach() {
}

public void close() {
if(scope != null){
scope.close();
}
if (span != null) {
span.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.thirdparty.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.HashMap;
import java.util.Map;

/**
* This class provides utility functions for tracing.
*/
@InterfaceAudience.Private
public class TraceUtils {
public static final Logger LOG = LoggerFactory.getLogger(TraceUtils.class.getName());
static final String DEFAULT_HADOOP_TRACE_PREFIX = "hadoop.htrace.";

public static TraceConfiguration wrapHadoopConf(final String prefix,
Expand All @@ -38,10 +45,40 @@ public static Tracer createAndRegisterTracer(String name) {
}

public static SpanContext byteStringToSpanContext(ByteString byteString) {
return null;
return deserialize(byteString);
}

public static ByteString spanContextToByteString(SpanContext context) {
return null;
Map<String, String> kvMap = context.getKVSpanContext();
ByteString byteString = serialize(kvMap);
return byteString;
}

//Added this for tracing will remove this after having
// a discussion
static ByteString serialize(Object obj){
try{
ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream os = new ObjectOutputStream(out);
os.writeObject(obj);
os.flush();
byte[] byteArray = out.toByteArray();
return ByteString.copyFrom(byteArray);
} catch (Exception e){
LOG.error("Error in searializing the object:", e);
return null;
}
}

static SpanContext deserialize(ByteString spanContextByteString) {
try {
ByteArrayInputStream in = new ByteArrayInputStream(spanContextByteString.toByteArray());
ObjectInputStream is = new ObjectInputStream(in);
Map<String, String> kvMap = (Map<String, String>) is.readObject();
return SpanContext.buildFromKVMap(kvMap);
} catch (Exception e) {
LOG.error("Error in deserializing the object:", e);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,28 @@
*/
package org.apache.hadoop.tracing;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.autoconfigure.OpenTelemetrySdkAutoConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* No-Op Tracer (for now) to remove HTrace without changing too many files.
*/
public class Tracer {
public static final Logger LOG = LoggerFactory.getLogger(Tracer.class.getName());
// Singleton
private static final Tracer globalTracer = null;
private static Tracer globalTracer = null;
io.opentelemetry.api.trace.Tracer OTelTracer;
private final NullTraceScope nullTraceScope;
private final String name;

public final static String SPAN_RECEIVER_CLASSES_KEY =
"span.receiver.classes";

public Tracer(String name) {
private Tracer(String name, io.opentelemetry.api.trace.Tracer tracer) {
this.name = name;
OTelTracer = tracer;
nullTraceScope = NullTraceScope.INSTANCE;
}

Expand All @@ -45,28 +53,32 @@ public static Tracer curThreadTracer() {
* @return org.apache.hadoop.tracing.Span
*/
public static Span getCurrentSpan() {
return null;
io.opentelemetry.api.trace.Span span = io.opentelemetry.api.trace.Span.current();
return span.getSpanContext().isValid()? new Span(span): null;
}

public TraceScope newScope(String description) {
return nullTraceScope;
Span span = new Span(OTelTracer.spanBuilder(description).startSpan());
return new TraceScope(span);
}

public Span newSpan(String description, SpanContext spanCtx) {
return new Span();
io.opentelemetry.api.trace.Span parentSpan = io.opentelemetry.api.trace.Span.wrap(spanCtx.getSpanContext());
io.opentelemetry.api.trace.Span span = OTelTracer.spanBuilder(description).setParent(Context.current().with(parentSpan)).startSpan();
return new Span(span);
}

public TraceScope newScope(String description, SpanContext spanCtx) {
return nullTraceScope;
return new TraceScope(newSpan(description, spanCtx));
}

public TraceScope newScope(String description, SpanContext spanCtx,
boolean finishSpanOnClose) {
return nullTraceScope;
return new TraceScope(newSpan(description, spanCtx));
}

public TraceScope activateSpan(Span span) {
return nullTraceScope;
return new TraceScope(span);
}

public void close() {
Expand All @@ -76,6 +88,8 @@ public String getName() {
return name;
}



public static class Builder {
static Tracer globalTracer;
private String name;
Expand All @@ -88,9 +102,32 @@ public Builder conf(TraceConfiguration conf) {
return this;
}

public Tracer build() {
static OpenTelemetry initialiseTracer(String name) {
//added to avoid test failures
setOTelEnvVariables("none", "none");
System.setProperty("otel.resource.attributes", String.format("service.name=%s", name));
OpenTelemetry openTelemetry = OpenTelemetrySdkAutoConfiguration.initialize();
return openTelemetry;
}

//this method is added to set the environment variables for testing
private static void setOTelEnvVariables(String metricsExporter, String tracesExporter) {
if(System.getenv().get("OTEL_TRACES_EXPORTER") == null){
System.setProperty("otel.metrics.exporter", metricsExporter);
} else {
LOG.info("Tracing Span Exporter set to :" + System.getenv().get("OTEL_TRACES_EXPORTER"));
}
if(System.getenv().get("OTEL_METRICS_EXPORTER") == null){
System.setProperty("otel.traces.exporter", tracesExporter);
} else {
LOG.info("Tracing Span Exporter set to :" + System.getenv().get("OTEL_METRICS_EXPORTER"));
}
}

public synchronized Tracer build() {
if (globalTracer == null) {
globalTracer = new Tracer(name);
globalTracer = new Tracer(name, initialiseTracer(name).getTracer(name));
Tracer.globalTracer = globalTracer;
}
return globalTracer;
}
Expand Down
Loading