Skip to content

Commit

Permalink
Add support for kafka bytebuffers in 3.x
Browse files Browse the repository at this point in the history
  • Loading branch information
manuel-alvarez-alvarez committed Feb 27, 2024
1 parent de4b6e7 commit 2c9cf72
Show file tree
Hide file tree
Showing 28 changed files with 1,419 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public void methodAdvice(MethodTransformer transformer) {
takesArguments(String.class)
.or(takesArguments(InputStream.class))
.or(takesArguments(Reader.class))
.or(takesArguments(URL.class))),
.or(takesArguments(URL.class))
.or(takesArguments(byte[].class))
.or(takesArguments(byte[].class, int.class, int.class))),
Json1FactoryInstrumentation.class.getName() + "$InstrumenterAdvice");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Json1FactoryInstrumentationTest extends AgentTestRunner {
then:
result != null
1 * propagationModule.taintIfTainted(_ as JsonParser, is)
2 * is.read(_,_,_)
2 * is.read(_, _, _)
0 * _
}
Expand Down Expand Up @@ -92,4 +92,25 @@ class Json1FactoryInstrumentationTest extends AgentTestRunner {
1 * ssrfModule.onURLConnection(url)
0 * _
}
void 'test createParser(byte[])'() {
setup:
final propagationModule = Mock(PropagationModule)
InstrumentationBridge.registerIastModule(propagationModule)
final bytes = args[0]
when:
final parser = new JsonFactory().&createJsonParser.call(args as Object[])
then:
parser != null
1 * propagationModule.taintIfTainted(_ as JsonParser, bytes)
0 * _
where:
args | _
['Hello'.bytes] | _
['Hello'.bytes, 0, 3] | _
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public void methodAdvice(MethodTransformer transformer) {
.or(takesArguments(InputStream.class))
.or(takesArguments(Reader.class))
.or(takesArguments(URL.class))
.or(takesArguments(byte[].class)))),
.or(takesArguments(byte[].class))
.or(takesArguments(byte[].class, int.class, int.class)))),
Json2FactoryInstrumentation.class.getName() + "$InstrumenterAdvice");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,24 @@ class Json2FactoryInstrumentationTest extends AgentTestRunner {
1 * ssrfModule.onURLConnection(url)
0 * _
}
void 'test createParser(byte[])'() {
setup:
final propagationModule = Mock(PropagationModule)
InstrumentationBridge.registerIastModule(propagationModule)
final bytes = args[0]
when:
final parser = new JsonFactory().&createJsonParser.call(args as Object[])
then:
parser != null
1 * propagationModule.taintIfTainted(_ as JsonParser, bytes)
0 * _
where:
args | _
['Hello'.bytes] | _
['Hello'.bytes, 0, 3] | _
}
}
19 changes: 18 additions & 1 deletion dd-java-agent/instrumentation/kafka-clients-0.11/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ muzzle {
apply from: "$rootDir/gradle/java.gradle"

addTestSuite('latestDepTest')
addTestSuite('iastLatestDepTest3')

dependencies {
compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0'
Expand All @@ -28,7 +29,7 @@ dependencies {
testRuntimeOnly project(':dd-java-agent:instrumentation:java-lang')
testRuntimeOnly project(':dd-java-agent:instrumentation:java-io')
testRuntimeOnly project(':dd-java-agent:instrumentation:jackson-core')
testRuntimeOnly(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.10')
testImplementation(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.10')

// Include latest version of kafka itself along with latest version of client libs.
// This seems to help with jar compatibility hell.
Expand All @@ -38,9 +39,25 @@ dependencies {
latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '2.+'
latestDepTestImplementation group: 'org.assertj', name: 'assertj-core', version: '3.19.+'
latestDepTestImplementation deps.guava

// Add kafka version 3.x for IAST
iastLatestDepTest3Implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.+'
iastLatestDepTest3Implementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '3.+'
iastLatestDepTest3RuntimeOnly project(':dd-java-agent:instrumentation:iast-instrumenter')
iastLatestDepTest3RuntimeOnly project(':dd-java-agent:instrumentation:java-lang')
iastLatestDepTest3RuntimeOnly project(':dd-java-agent:instrumentation:java-io')
iastLatestDepTest3RuntimeOnly project(':dd-java-agent:instrumentation:jackson-core')
iastLatestDepTest3Implementation(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.15.3')
}

configurations.testRuntimeClasspath {
// spock-core depends on assertj version that is not compatible with kafka-clients
resolutionStrategy.force 'org.assertj:assertj-core:2.9.1'
}

iastLatestDepTest3.configure {
javaLauncher = getJavaLauncherFor(17)
jvmArgs = ['--add-opens', 'java.base/java.util=ALL-UNNAMED']
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
package iast

import com.fasterxml.jackson.core.JsonParser
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.iast.InstrumentationBridge
import datadog.trace.api.iast.SourceTypes
import datadog.trace.api.iast.Taintable
import datadog.trace.api.iast.VulnerabilityMarks
import datadog.trace.api.iast.propagation.CodecModule
import datadog.trace.api.iast.propagation.PropagationModule
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.serialization.ByteBufferDeserializer
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.kafka.support.serializer.JsonDeserializer

import java.nio.ByteBuffer

import static datadog.trace.api.iast.VulnerabilityMarks.NOT_MARKED

class KafkaIastDeserializerTest extends AgentTestRunner {

private static final int BUFF_OFFSET = 10

@Override
protected void configurePreAgent() {
injectSysConfig('dd.iast.enabled', 'true')
}

void 'test string deserializer: #test'() {
given:
final source = test.source
final propagationModule = Mock(PropagationModule)
final codecModule = Mock(CodecModule)
[propagationModule, codecModule].each { InstrumentationBridge.registerIastModule(it) }

and:
final payload = "Hello World!".bytes
final deserializer = new StringDeserializer()

when:
deserializer.configure([:], source == SourceTypes.KAFKA_MESSAGE_KEY)
test.method.deserialize(deserializer, "test", payload)

then:
switch (test.method) {
case Method.DEFAULT:
1 * propagationModule.taint(payload, source) // taint byte[]
1 * codecModule.onStringFromBytes(payload, 0, payload.length, _, _ as String) // taint byte[] => string
break
case Method.WITH_HEADERS:
1 * propagationModule.taint(payload, source) // taint byte[]
1 * codecModule.onStringFromBytes(payload, 0, payload.length, _, _ as String) // taint byte[] => string
break
case Method.WITH_BYTE_BUFFER:
1 * propagationModule.taint(_ as ByteBuffer, source, 0, payload.length) // taint ByteBuffer
1 * propagationModule.taintIfTainted(payload, _ as ByteBuffer, 0, payload.length, false, NOT_MARKED) // taint ByteBuffer => byte[]
1 * codecModule.onStringFromBytes(payload, 0, payload.length, _, _ as String) // taint byte[] => string
break
case Method.WITH_BYTE_BUFFER_OFFSET:
1 * propagationModule.taint(_ as ByteBuffer, source, BUFF_OFFSET, payload.length) // taint ByteBuffer
1 * propagationModule.taintIfTainted(_ as byte[], _ as ByteBuffer, true, NOT_MARKED) // taint ByteBuffer => byte[]
1 * codecModule.onStringFromBytes(_ as byte[], BUFF_OFFSET, payload.length, _, _ as String) // taint byte[] => string
break
}
0 * _

where:
test << testSuite()
}

void 'test byte array deserializer: #test'() {
given:
final source = test.source
final propagationModule = Mock(PropagationModule)
InstrumentationBridge.registerIastModule(propagationModule)

and:
final payload = "Hello World!".bytes
final deserializer = new ByteArrayDeserializer()

when:
deserializer.configure([:], source == SourceTypes.KAFKA_MESSAGE_KEY)
test.method.deserialize(deserializer, "test", payload)

then:
switch (test.method) {
case Method.DEFAULT:
1 * propagationModule.taint(payload, source) // taint byte[]
break
case Method.WITH_HEADERS:
1 * propagationModule.taint(payload, source) // taint byte[]
break
case Method.WITH_BYTE_BUFFER:
1 * propagationModule.taint(_ as ByteBuffer, source, 0, payload.length) // taint ByteBuffer
1 * propagationModule.taintIfTainted(payload, _ as ByteBuffer, 0, payload.length, false, NOT_MARKED) // taint ByteBuffer => byte[]
break
case Method.WITH_BYTE_BUFFER_OFFSET:
1 * propagationModule.taint(_ as ByteBuffer, source, BUFF_OFFSET, payload.length) // taint ByteBuffer
1 * propagationModule.taintIfTainted(payload, _ as ByteBuffer, BUFF_OFFSET, payload.length, false, NOT_MARKED) // taint ByteBuffer => byte[]
break
}
0 * _

where:
test << testSuite()
}

void 'test byte buffer deserializer: #test'() {
given:
final source = test.source
final propagationModule = Mock(PropagationModule)
InstrumentationBridge.registerIastModule(propagationModule)

and:
final payload = "Hello World!".bytes
final deserializer = new ByteBufferDeserializer()

when:
deserializer.configure([:], source == SourceTypes.KAFKA_MESSAGE_KEY)
test.method.deserialize(deserializer, "test", payload)

then:
switch (test.method) {
case Method.DEFAULT:
1 * propagationModule.taint(payload, source) // taint byte[]
1 * propagationModule.taintIfTainted(_ as ByteBuffer, payload, true, NOT_MARKED) // taint byte[] => ByteBuffer
break
case Method.WITH_HEADERS:
1 * propagationModule.taint(payload, source) // taint byte[]
1 * propagationModule.taintIfTainted(_ as ByteBuffer, payload, true, NOT_MARKED) // taint byte[] => ByteBuffer
break
case Method.WITH_BYTE_BUFFER:
1 * propagationModule.taint(_ as ByteBuffer, source, 0, payload.length) // taint ByteBuffer
break
case Method.WITH_BYTE_BUFFER_OFFSET:
1 * propagationModule.taint(_ as ByteBuffer, source, BUFF_OFFSET, payload.length) // taint ByteBuffer
break
}
0 * _

where:
test << testSuite()
}

void 'test json deserialization: #test'() {
given:
final source = test.source
final propagationModule = Mock(PropagationModule)
InstrumentationBridge.registerIastModule(propagationModule)

and:
final json = '{ "name": "Mr Bean" }'
final payload = json.bytes
final deserializer = new JsonDeserializer(TestBean)

when:
deserializer.configure([:], source == SourceTypes.KAFKA_MESSAGE_KEY)
test.method.deserialize(deserializer, 'test', payload)

then:
switch (test.method) {
case Method.DEFAULT:
1 * propagationModule.taint(payload, source) // taint byte[]
break
case Method.WITH_HEADERS:
1 * propagationModule.taint(payload, source) // taint byte[]
break
case Method.WITH_BYTE_BUFFER:
1 * propagationModule.taint(_ as ByteBuffer, source, 0, payload.length) // taint ByteBuffer
1 * propagationModule.taintIfTainted(payload, _ as ByteBuffer, 0, payload.length, false, NOT_MARKED) // taint byte[] => ByteBuffer
break
case Method.WITH_BYTE_BUFFER_OFFSET:
1 * propagationModule.taint(_ as ByteBuffer, source, BUFF_OFFSET, payload.length) // taint ByteBuffer
1 * propagationModule.taintIfTainted(payload, _ as ByteBuffer, BUFF_OFFSET, payload.length, false, NOT_MARKED) // taint byte[] => ByteBuffer
break
}
// taint JSON
1 * propagationModule.taintIfTainted(_ as JsonParser, payload)
1 * propagationModule.findSource(_) >> Stub(Taintable.Source) {
getOrigin() >> source
getValue() >> json
}
1 * propagationModule.taint(_, 'name', source, 'name', json)
1 * propagationModule.taint(_, 'Mr Bean', source, 'name', json)
0 * _

where:
test << testSuite()
}

private static List<Suite> testSuite() {
return [SourceTypes.KAFKA_MESSAGE_KEY, SourceTypes.KAFKA_MESSAGE_VALUE].collectMany { source ->
return [
new Suite(source: source, method: Method.DEFAULT),
new Suite(source: source, method: Method.WITH_HEADERS),
new Suite(source: source, method: Method.WITH_BYTE_BUFFER),
new Suite(source: source, method: Method.WITH_BYTE_BUFFER_OFFSET)
]
}
}

enum Method {
DEFAULT{
@Override
<T> T deserialize(Deserializer<T> deserializer, String topic, byte[] payload) {
return deserializer.deserialize(topic, payload)
}
},
WITH_HEADERS{
@Override
<T> T deserialize(Deserializer<T> deserializer, String topic, byte[] payload) {
return deserializer.deserialize(topic, new RecordHeaders(), payload)
}
},
WITH_BYTE_BUFFER{
@SuppressWarnings('GroovyAssignabilityCheck')
@Override
<T> T deserialize(Deserializer<T> deserializer, String topic, byte[] payload) {
ByteBuffer buffer = ByteBuffer.allocateDirect(payload.length)
buffer.put(payload)
buffer.position(0)
return deserializer.deserialize(topic, new RecordHeaders(), buffer)
}
},
WITH_BYTE_BUFFER_OFFSET{
@SuppressWarnings('GroovyAssignabilityCheck')
@Override
<T> T deserialize(Deserializer<T> deserializer, String topic, byte[] payload) {
final byte[] buffer = new byte[payload.length + BUFF_OFFSET]
System.arraycopy(payload, 0, buffer, BUFF_OFFSET, payload.length)
return deserializer.deserialize(topic, new RecordHeaders(), ByteBuffer.wrap(buffer, BUFF_OFFSET, payload.length))
}
}

abstract <T> T deserialize(Deserializer<T> deserializer, String topic, byte[] payload)
}

static class Suite {
byte source
Method method

@Override
String toString() {
return "${method.name()}: ${SourceTypes.toString(source)}"
}
}

static class TestBean {
String name
}
}

0 comments on commit 2c9cf72

Please sign in to comment.