Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
520 changes: 260 additions & 260 deletions .circleci/config.yml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ public void close() {}
public void close(final boolean closeContinuationScope) {}
}

static class NoopTraceScope implements TraceScope {
static final NoopTraceScope INSTANCE = new NoopTraceScope();
public static class NoopTraceScope implements TraceScope {
public static final NoopTraceScope INSTANCE = new NoopTraceScope();

@Override
public Continuation capture() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ testSets {
latestDepTest {
dirName = 'test'
}

lettuceTest
}

dependencies {
Expand All @@ -33,4 +35,11 @@ dependencies {
latestDepTestCompile group: 'io.projectreactor', name: 'reactor-core', version: '3.+'
// Looks like later versions on reactor need this dependency for some reason even though it is marked as optional.
latestDepTestCompile group: 'io.micrometer', name: 'micrometer-core', version: '1.+'

lettuceTestCompile project(':dd-java-agent:instrumentation:lettuce-5')
lettuceTestCompile group: 'io.lettuce', name: 'lettuce-core', version: '5.+'
lettuceTestCompile group: 'io.micrometer', name: 'micrometer-core', version: '1.+'
lettuceTestCompile group: 'com.github.kstyrc', name: 'embedded-redis', version: '0.6'
}

test.finalizedBy lettuceTest
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.utils.PortUtils
import datadog.trace.agent.test.utils.TraceUtils
import io.lettuce.core.ClientOptions
import io.lettuce.core.RedisClient
import io.lettuce.core.api.StatefulConnection
import io.lettuce.core.api.reactive.RedisReactiveCommands
import reactor.core.scheduler.Schedulers
import redis.embedded.RedisServer
import spock.lang.Shared

class LettuceReactiveTest extends AgentTestRunner {
public static final String HOST = "127.0.0.1"
public static final int DB_INDEX = 0
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()

@Shared
String embeddedDbUri

@Shared
RedisServer redisServer

RedisClient redisClient
StatefulConnection connection
RedisReactiveCommands<String, ?> reactive

def setupSpec() {
int port = PortUtils.randomOpenPort()
String dbAddr = HOST + ":" + port + "/" + DB_INDEX
embeddedDbUri = "redis://" + dbAddr

redisServer = RedisServer.builder()
// bind to localhost to avoid firewall popup
.setting("bind " + HOST)
// set max memory to avoid problems in CI
.setting("maxmemory 128M")
.port(port).build()
}

def setup() {
redisClient = RedisClient.create(embeddedDbUri)

println "Using redis: $redisServer.args"
redisServer.start()
redisClient.setOptions(CLIENT_OPTIONS)
connection = redisClient.connect()

reactive = connection.reactive()
reactive.set("test", "test").block()

TEST_WRITER.waitForTraces(2)
TEST_WRITER.clear()
}

def cleanup() {
connection.close()
redisServer.stop()
}

def "blocking subscriber"() {
when:
TraceUtils.runUnderTrace("test-parent") {
reactive.set("a", "1")
.then(reactive.get("a")) // The get here is ending up in another trace
.block()
}
TEST_WRITER.waitForTraces(1)

def traces = TEST_WRITER.collect()

then:
traces.size() == 1
traces.get(0).size() == 3
}

def "async subscriber"() {
when:
TraceUtils.runUnderTrace("test-parent") {
reactive.set("a", "1")
.then(reactive.get("a")) // The get here is ending up in another trace
.subscribe()
}
TEST_WRITER.waitForTraces(1)

def traces = TEST_WRITER.collect()

then:
traces.size() == 1
traces.get(0).size() == 3
}

def "async subscriber with specific thread pool"() {
when:
TraceUtils.runUnderTrace("test-parent") {
reactive.set("a", "1")
.then(reactive.get("a")) // The get here is ending up in another trace
.subscribeOn(Schedulers.elastic())
.subscribe()
}
TEST_WRITER.waitForTraces(1)

def traces = TEST_WRITER.collect()

then:
traces.size() == 1
traces.get(0).size() == 3
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
package datadog.trace.instrumentation.reactor.core;

import static datadog.trace.agent.tooling.ClassLoaderMatcher.hasClassesNamed;
import static datadog.trace.agent.tooling.bytebuddy.matcher.DDElementMatchers.extendsClass;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isAbstract;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.isTypeInitializer;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
Expand All @@ -26,36 +19,19 @@ public FluxAndMonoInstrumentation() {
}

@Override
public ElementMatcher<ClassLoader> classLoaderMatcher() {
// Optimization for expensive typeMatcher.
return hasClassesNamed("reactor.core.publisher.Mono");
public String[] helperClassNames() {
return new String[] {
packageName + ".ReactorHooksAdvice", packageName + ".ReactorHooksAdvice$TracingSubscriber"
};
}

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isAbstract())
.and(
extendsClass(
named("reactor.core.publisher.Mono").or(named("reactor.core.publisher.Flux"))));
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".ReactorCoreAdviceUtils",
packageName + ".ReactorCoreAdviceUtils$TracingSubscriber",
};
return named("reactor.core.publisher.Hooks");
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod()
.and(isPublic())
.and(named("subscribe"))
.and(takesArgument(0, named("reactor.core.CoreSubscriber")))
.and(takesArguments(1)),
// Cannot reference class directly here because it would lead to class load failure on Java7
packageName + ".FluxAndMonoSubscribeAdvice");
return singletonMap(isTypeInitializer(), packageName + ".ReactorHooksAdvice");
}
}

This file was deleted.

This file was deleted.

Loading