Utility methods to simplify tracing RxJava when using manual instrumentation.
dependencies {
implementation "io.github.ikstewa:opentelemetry-rxjava2-tracer:«version»"
}
dependencies {
implementation "io.github.ikstewa:opentelemetry-rxjava3-tracer:«version»"
}
In order to solve parenting issues with async operations the standard OpenTelementry instrumentation for RxJava adds support for propagating the OpenTelementry Context through observables by registering RxJava plugins using the TracingAssembly.
Using the standard OpenTelementry Tracer API for manual instrumentation can be cumbersome when working with RxJava.
Tracing a Completable might look something like:
class MyClass {
private static final Tracer tracer =
openTelemetry.getTracer("instrumentation-library-name", "1.0.0");
void doWork() {
final Completable operation1 =
Completable.fromRunnable(
() -> {
Span execute = tracer.spanBuilder("Execute 1").startSpan();
try (var ss = execute.makeCurrent()) {
LOG.trace("Executing step 1");
} finally {
execute.end();
}
});
final Completable operation2 =
Completable.fromRunnable(
() -> {
Span execute = tracer.spanBuilder("Execute 2").startSpan();
try (var ss = execute.makeCurrent()) {
LOG.trace("Executing step 2");
} finally {
execute.end();
}
});
Span subscribe = tracer.spanBuilder("Subscribe").startSpan();
try (var s = subscribe.makeCurrent()) {
Completable.concatArray(operation1, operation2).subscribe();
} finally {
subscribe.end();
}
}
}
Using the RxTracer this can be simplified to:
class MyClass {
private static final Tracer tracer =
openTelemetry.getTracer("instrumentation-library-name", "1.0.0");
void doWork() {
final Completable operation1 =
Completable.fromRunnable(() -> LOG.trace("Executing step 1"))
.compose(RxTracer.traceCompletable(tracer.spanBuilder("Execute 1")));
final Completable operation2 =
Completable.fromRunnable(() -> LOG.trace("Executing step 2"))
.compose(RxTracer.traceCompletable(tracer.spanBuilder("Execute 2")));
Completable.concatArray(operation1, operation2)
.compose(RxTracer.traceCompletable(tracer.spanBuilder("Subscribe")))
.subscribe();
}
}
The default TracingAssembly support does not handle propagation through schedulers. This library adds an additiona RxJava plugin to ensure the context can propagate to another scheduler.
Given the following example:
void longRunningOperation() {
Span execute = tracer.spanBuilder("LongRunningOperation").startSpan();
try (var s = execute.makeCurrent()) {
LOG.info("Starting my long running operation");
// ...
LOG.info("Finished my long running operation");
} finally {
execute.end();
}
}
void doStuff() {
final var span = tracer.spanBuilder("Subscribe");
Completable.fromRunnable(this::longRunningOperation)
.subscribeOn(Schedulers.io())
.compose(RxTracer.traceCompletable(span))
.subscribe();
}
The default tracing will result in two spans, 'Subscribe' and 'LongRunningOperation'. However they both will not have a parent.
The RxTracer can be configured to propagate the context across schedulers as well so the 'LongRunningOperation' Span will have the 'Subscribe' span as the parent.
RxTracingAssembly.builder().setEnableSchedulerPropagation(true).build().enable();
import io.github.ikstewa.opentelemetry.rxjava3.RxTracer;
// Via compose
Completable tracedCompletable = completable.compose(RxTracer.traceCompletable(SpanBuilder))
// Via static methods
Completable tracedCompletable = RxTracer.traceCompletable(Completable, SpanBuilder)
import io.github.ikstewa.opentelemetry.rxjava3.RxTracer;
// Via compose
Single tracedSingle = single.compose(RxTracer.traceSingle(SpanBuilder))
// Via static methods
Single tracedSingle = RxTracer.traceSingle(Maybe, SpanBuilder)
import io.github.ikstewa.opentelemetry.rxjava3.RxTracer;
// Via compose
Maybe tracedMaybe = maybe.compose(RxTracer.traceMaybe(SpanBuilder))
// Via static methods
Maybe tracedMaybe = RxTracer.traceMaybe(Maybe, SpanBuilder)
import io.github.ikstewa.opentelemetry.rxjava3.RxTracer;
// Via compose
Observable tracedObservable = observable.compose(RxTracer.traceObservable(SpanBuilder))
// Via static methods
Observable tracedObservable = RxTracer.traceObservable(Observable, SpanBuilder)
import io.github.ikstewa.opentelemetry.rxjava3.RxTracer;
// Via compose
Flowable tracedFlowable = flowable.compose(RxTracer.traceFlowable(SpanBuilder))
// Via static methods
Flowable tracedFlowable = RxTracer.traceFlowable(Flowable, SpanBuilder)
To publish a new release:
./gradlew publishToSonatype closeAndReleaseSonatypeStagingRepository