From 84f8194520326de922f9558e57fe5d2dce76085a Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Sat, 3 Nov 2018 19:02:17 +0100 Subject: [PATCH 1/3] Make ES query recording allocation free --- .../objectpool/ObjectPoolBenchmark.java | 10 +-- .../co/elastic/apm/impl/ElasticApmTracer.java | 14 ++-- .../co/elastic/apm/impl/transaction/Db.java | 40 ++++++++++- ...lableObjectFactory.java => Allocator.java} | 2 +- .../apm/objectpool/NoopObjectPool.java | 10 +-- .../co/elastic/apm/objectpool/ObjectPool.java | 2 +- .../objectpool/impl/AbstractObjectPool.java | 13 ++-- .../apm/objectpool/impl/MixedObjectPool.java | 9 ++- .../objectpool/impl/QueueBasedObjectPool.java | 22 +++--- .../elastic/apm/objectpool/impl/Resetter.java | 22 ++++++ .../impl/ThreadLocalObjectPool.java | 8 +-- .../report/serialize/DslJsonSerializer.java | 12 +++- .../apm/objectpool/ObjectPoolTest.java | 2 +- .../ESRestClientInstrumentationHelper.java | 61 +++++++++++++++- ...lasticsearchRestClientInstrumentation.java | 5 +- ...ESRestClientInstrumentationHelperTest.java | 71 +++++++++++++++++++ ...sticsearchRestClientInstrumentationIT.java | 5 +- 17 files changed, 253 insertions(+), 55 deletions(-) rename apm-agent-core/src/main/java/co/elastic/apm/objectpool/{RecyclableObjectFactory.java => Allocator.java} (91%) create mode 100644 apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/Resetter.java create mode 100644 apm-agent-plugins/apm-es-restclient-plugin/src/test/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelperTest.java diff --git a/apm-agent-benchmarks/src/main/java/co/elastic/apm/benchmark/objectpool/ObjectPoolBenchmark.java b/apm-agent-benchmarks/src/main/java/co/elastic/apm/benchmark/objectpool/ObjectPoolBenchmark.java index 74774a3582..8d92e07d31 100644 --- a/apm-agent-benchmarks/src/main/java/co/elastic/apm/benchmark/objectpool/ObjectPoolBenchmark.java +++ b/apm-agent-benchmarks/src/main/java/co/elastic/apm/benchmark/objectpool/ObjectPoolBenchmark.java @@ -60,13 +60,13 @@ public static void main(String[] args) throws RunnerException { @Setup public void setUp() { tracer = new ElasticApmTracerBuilder().build(); - blockingQueueObjectPool = new QueueBasedObjectPool<>(new ArrayBlockingQueue<>(256), true, () -> new Transaction(tracer)); - jctoolsQueueObjectPool = new QueueBasedObjectPool<>(new MpmcArrayQueue<>(256), true, () -> new Transaction(tracer)); - jctoolsAtomicQueueObjectPool = new QueueBasedObjectPool<>(new MpmcAtomicArrayQueue<>(256), true, () -> new Transaction(tracer)); - agronaQueueObjectPool = new QueueBasedObjectPool<>(new ManyToManyConcurrentArrayQueue<>(256), true, () -> new Transaction(tracer)); + blockingQueueObjectPool = QueueBasedObjectPool.ofRecyclable(new ArrayBlockingQueue<>(256), true, () -> new Transaction(tracer)); + jctoolsQueueObjectPool = QueueBasedObjectPool.ofRecyclable(new MpmcArrayQueue<>(256), true, () -> new Transaction(tracer)); + jctoolsAtomicQueueObjectPool = QueueBasedObjectPool.ofRecyclable(new MpmcAtomicArrayQueue<>(256), true, () -> new Transaction(tracer)); + agronaQueueObjectPool = QueueBasedObjectPool.ofRecyclable(new ManyToManyConcurrentArrayQueue<>(256), true, () -> new Transaction(tracer)); mixedObjectPool = new MixedObjectPool<>(() -> new Transaction(tracer), new ThreadLocalObjectPool<>(256, true, () -> new Transaction(tracer)), - new QueueBasedObjectPool<>(new ManyToManyConcurrentArrayQueue<>(256), true, () -> new Transaction(tracer))); + QueueBasedObjectPool.ofRecyclable(new ManyToManyConcurrentArrayQueue<>(256), true, () -> new Transaction(tracer))); threadLocalObjectPool = new ThreadLocalObjectPool<>(64, true, () -> new Transaction(tracer)); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/impl/ElasticApmTracer.java b/apm-agent-core/src/main/java/co/elastic/apm/impl/ElasticApmTracer.java index 9571b1a48e..b10861d01d 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/impl/ElasticApmTracer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/impl/ElasticApmTracer.java @@ -30,7 +30,7 @@ import co.elastic.apm.impl.transaction.TraceContext; import co.elastic.apm.impl.transaction.Transaction; import co.elastic.apm.objectpool.ObjectPool; -import co.elastic.apm.objectpool.RecyclableObjectFactory; +import co.elastic.apm.objectpool.Allocator; import co.elastic.apm.objectpool.impl.QueueBasedObjectPool; import co.elastic.apm.report.Reporter; import co.elastic.apm.report.ReporterConfiguration; @@ -85,23 +85,23 @@ protected Deque initialValue() { this.spanListeners = spanListeners; int maxPooledElements = configurationRegistry.getConfig(ReporterConfiguration.class).getMaxQueueSize() * 2; coreConfiguration = configurationRegistry.getConfig(CoreConfiguration.class); - transactionPool = new QueueBasedObjectPool<>(AtomicQueueFactory.newQueue(createBoundedMpmc(maxPooledElements)), false, - new RecyclableObjectFactory() { + transactionPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.newQueue(createBoundedMpmc(maxPooledElements)), false, + new Allocator() { @Override public Transaction createInstance() { return new Transaction(ElasticApmTracer.this); } }); - spanPool = new QueueBasedObjectPool<>(AtomicQueueFactory.newQueue(createBoundedMpmc(maxPooledElements)), false, - new RecyclableObjectFactory() { + spanPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.newQueue(createBoundedMpmc(maxPooledElements)), false, + new Allocator() { @Override public Span createInstance() { return new Span(ElasticApmTracer.this); } }); // we are assuming that we don't need as many errors as spans or transactions - errorPool = new QueueBasedObjectPool<>(AtomicQueueFactory.newQueue(createBoundedMpmc(maxPooledElements / 2)), false, - new RecyclableObjectFactory() { + errorPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.newQueue(createBoundedMpmc(maxPooledElements / 2)), false, + new Allocator() { @Override public ErrorCapture createInstance() { return new ErrorCapture(ElasticApmTracer.this); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Db.java b/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Db.java index 75198e1d1c..c83e433316 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Db.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Db.java @@ -20,9 +20,16 @@ package co.elastic.apm.impl.transaction; +import co.elastic.apm.objectpool.Allocator; +import co.elastic.apm.objectpool.ObjectPool; import co.elastic.apm.objectpool.Recyclable; +import co.elastic.apm.objectpool.impl.QueueBasedObjectPool; +import co.elastic.apm.objectpool.impl.Resetter; +import co.elastic.apm.report.serialize.DslJsonSerializer; +import org.jctools.queues.atomic.MpmcAtomicArrayQueue; import javax.annotation.Nullable; +import java.nio.CharBuffer; /** @@ -30,6 +37,20 @@ */ public class Db implements Recyclable { + private final ObjectPool charBufferPool = new QueueBasedObjectPool(new MpmcAtomicArrayQueue<>(128), false, + new Allocator() { + @Override + public CharBuffer createInstance() { + return CharBuffer.allocate(DslJsonSerializer.MAX_LONG_STRING_VALUE_LENGTH); + } + }, + new Resetter() { + @Override + public void recycle(CharBuffer object) { + object.clear(); + } + }); + /** * Database instance name */ @@ -40,6 +61,10 @@ public class Db implements Recyclable { */ @Nullable private String statement; + + @Nullable + private CharBuffer statementBuffer; + /** * Database type. For any SQL database, "sql". For others, the lower-case database category, e.g. "cassandra", "hbase", or "redis" */ @@ -83,6 +108,16 @@ public Db withStatement(@Nullable String statement) { return this; } + public CharBuffer withStatementBuffer() { + this.statementBuffer = charBufferPool.createInstance(); + return this.statementBuffer; + } + + @Nullable + public CharBuffer getStatementBuffer() { + return statementBuffer; + } + /** * Database type. For any SQL database, "sql". For others, the lower-case database category, e.g. "cassandra", "hbase", or "redis" */ @@ -121,13 +156,16 @@ public void resetState() { statement = null; type = null; user = null; + charBufferPool.recycle(statementBuffer); + statementBuffer = null; } public boolean hasContent() { return instance != null || statement != null || type != null || - user != null; + user != null || + statementBuffer != null; } public void copyFrom(Db other) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/RecyclableObjectFactory.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/Allocator.java similarity index 91% rename from apm-agent-core/src/main/java/co/elastic/apm/objectpool/RecyclableObjectFactory.java rename to apm-agent-core/src/main/java/co/elastic/apm/objectpool/Allocator.java index 514917f193..23aaf6f1fe 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/RecyclableObjectFactory.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/Allocator.java @@ -19,7 +19,7 @@ */ package co.elastic.apm.objectpool; -public interface RecyclableObjectFactory { +public interface Allocator { T createInstance(); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/NoopObjectPool.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/NoopObjectPool.java index 0a91ed1ec8..2fd6f1ed1c 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/NoopObjectPool.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/NoopObjectPool.java @@ -24,21 +24,21 @@ public class NoopObjectPool implements ObjectPool { - private final RecyclableObjectFactory recyclableObjectFactory; + private final Allocator allocator; - public NoopObjectPool(RecyclableObjectFactory recyclableObjectFactory) { - this.recyclableObjectFactory = recyclableObjectFactory; + public NoopObjectPool(Allocator allocator) { + this.allocator = allocator; } @Nullable @Override public T tryCreateInstance() { - return recyclableObjectFactory.createInstance(); + return allocator.createInstance(); } @Override public T createInstance() { - return recyclableObjectFactory.createInstance(); + return allocator.createInstance(); } @Override diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/ObjectPool.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/ObjectPool.java index a313fa4b6f..734b40cf71 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/ObjectPool.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/ObjectPool.java @@ -22,7 +22,7 @@ import javax.annotation.Nullable; import java.io.Closeable; -public interface ObjectPool extends Closeable { +public interface ObjectPool extends Closeable { @Nullable T tryCreateInstance(); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/AbstractObjectPool.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/AbstractObjectPool.java index f74b33d034..bdb170f938 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/AbstractObjectPool.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/AbstractObjectPool.java @@ -20,18 +20,17 @@ package co.elastic.apm.objectpool.impl; import co.elastic.apm.objectpool.ObjectPool; -import co.elastic.apm.objectpool.Recyclable; -import co.elastic.apm.objectpool.RecyclableObjectFactory; +import co.elastic.apm.objectpool.Allocator; import java.util.concurrent.atomic.AtomicInteger; -public abstract class AbstractObjectPool implements ObjectPool { +public abstract class AbstractObjectPool implements ObjectPool { - protected final RecyclableObjectFactory recyclableObjectFactory; + protected final Allocator allocator; private final AtomicInteger garbageCreated = new AtomicInteger(); - protected AbstractObjectPool(RecyclableObjectFactory recyclableObjectFactory) { - this.recyclableObjectFactory = recyclableObjectFactory; + protected AbstractObjectPool(Allocator allocator) { + this.allocator = allocator; } @Override @@ -40,7 +39,7 @@ public T createInstance() { if (recyclable == null) { // queue is empty, falling back to creating a new instance garbageCreated.incrementAndGet(); - return recyclableObjectFactory.createInstance(); + return allocator.createInstance(); } else { return recyclable; } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/MixedObjectPool.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/MixedObjectPool.java index eb1589c4b5..3b80f64f06 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/MixedObjectPool.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/MixedObjectPool.java @@ -20,19 +20,18 @@ package co.elastic.apm.objectpool.impl; import co.elastic.apm.objectpool.ObjectPool; -import co.elastic.apm.objectpool.Recyclable; -import co.elastic.apm.objectpool.RecyclableObjectFactory; +import co.elastic.apm.objectpool.Allocator; import javax.annotation.Nullable; import java.io.IOException; -public class MixedObjectPool extends AbstractObjectPool { +public class MixedObjectPool extends AbstractObjectPool { private final ObjectPool primaryPool; private final ObjectPool secondaryPool; - public MixedObjectPool(final RecyclableObjectFactory recyclableObjectFactory, ObjectPool primaryPool, ObjectPool secondaryPool) { - super(recyclableObjectFactory); + public MixedObjectPool(final Allocator allocator, ObjectPool primaryPool, ObjectPool secondaryPool) { + super(allocator); this.primaryPool = primaryPool; this.secondaryPool = secondaryPool; } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java index 873a710e92..05985d5b40 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java @@ -19,8 +19,8 @@ */ package co.elastic.apm.objectpool.impl; +import co.elastic.apm.objectpool.Allocator; import co.elastic.apm.objectpool.Recyclable; -import co.elastic.apm.objectpool.RecyclableObjectFactory; import com.lmax.disruptor.EventFactory; import javax.annotation.Nullable; @@ -28,23 +28,29 @@ import java.util.Iterator; import java.util.Queue; -public class QueueBasedObjectPool extends AbstractObjectPool implements Collection { +public class QueueBasedObjectPool extends AbstractObjectPool implements Collection { private final Queue queue; + private final Resetter resetter; /** * @param queue the underlying queue - * @param preAllocate when set to true, the recyclableObjectFactory will be used to create maxPooledElements objects + * @param preAllocate when set to true, the allocator will be used to create maxPooledElements objects * which are then stored in the queue - * @param recyclableObjectFactory a factory method which is used to create new instances of the recyclable object. This factory is + * @param allocator a factory method which is used to create new instances of the recyclable object. This factory is * used when there are no objects in the queue and to preallocate the queue. */ - public QueueBasedObjectPool(Queue queue, boolean preAllocate, RecyclableObjectFactory recyclableObjectFactory) { - super(recyclableObjectFactory); + public static QueueBasedObjectPool ofRecyclable(Queue queue, boolean preAllocate, Allocator allocator) { + return new QueueBasedObjectPool(queue, preAllocate, allocator, Resetter.ForRecyclable.get()); + } + + public QueueBasedObjectPool(Queue queue, boolean preAllocate, Allocator allocator, Resetter resetter) { + super(allocator); this.queue = queue; + this.resetter = resetter; if (preAllocate) { for (int i = 0; i < this.queue.size(); i++) { - this.queue.offer(recyclableObjectFactory.createInstance()); + this.queue.offer(allocator.createInstance()); } } } @@ -57,7 +63,7 @@ public T tryCreateInstance() { @Override public void recycle(T obj) { - obj.resetState(); + resetter.recycle(obj); queue.offer(obj); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/Resetter.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/Resetter.java new file mode 100644 index 0000000000..ddb571141a --- /dev/null +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/Resetter.java @@ -0,0 +1,22 @@ +package co.elastic.apm.objectpool.impl; + +import co.elastic.apm.objectpool.Recyclable; + +public interface Resetter { + + void recycle(T object); + + class ForRecyclable implements Resetter { + private static ForRecyclable INSTANCE = new ForRecyclable(); + + public static Resetter get() { + return INSTANCE; + } + + @Override + public void recycle(Recyclable object) { + object.resetState(); + } + } + +} diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/ThreadLocalObjectPool.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/ThreadLocalObjectPool.java index 691c86975d..9d7588ebc1 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/ThreadLocalObjectPool.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/ThreadLocalObjectPool.java @@ -20,7 +20,7 @@ package co.elastic.apm.objectpool.impl; import co.elastic.apm.objectpool.Recyclable; -import co.elastic.apm.objectpool.RecyclableObjectFactory; +import co.elastic.apm.objectpool.Allocator; import javax.annotation.Nullable; @@ -30,8 +30,8 @@ public class ThreadLocalObjectPool extends AbstractObjectP private final int maxNumPooledObjectsPerThread; private final boolean preAllocate; - public ThreadLocalObjectPool(final int maxNumPooledObjectsPerThread, final boolean preAllocate, final RecyclableObjectFactory recyclableObjectFactory) { - super(recyclableObjectFactory); + public ThreadLocalObjectPool(final int maxNumPooledObjectsPerThread, final boolean preAllocate, final Allocator allocator) { + super(allocator); this.maxNumPooledObjectsPerThread = maxNumPooledObjectsPerThread; this.preAllocate = preAllocate; } @@ -78,7 +78,7 @@ private FixedSizeStack createStack(boolean preAllocate) { FixedSizeStack stack = new FixedSizeStack<>(maxNumPooledObjectsPerThread); if (preAllocate) { for (int i = 0; i < maxNumPooledObjectsPerThread; i++) { - stack.push(recyclableObjectFactory.createInstance()); + stack.push(allocator.createInstance()); } } return stack; diff --git a/apm-agent-core/src/main/java/co/elastic/apm/report/serialize/DslJsonSerializer.java b/apm-agent-core/src/main/java/co/elastic/apm/report/serialize/DslJsonSerializer.java index a2fdf14aee..6e8ea8cbe2 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/report/serialize/DslJsonSerializer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/report/serialize/DslJsonSerializer.java @@ -59,6 +59,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.CharBuffer; import java.nio.charset.Charset; import java.util.Arrays; import java.util.Collection; @@ -559,7 +560,16 @@ private void serializeDbContext(final Db db) { writeFieldName("db"); jw.writeByte(OBJECT_START); writeField("instance", db.getInstance()); - writeLongStringField("statement", db.getStatement()); + if (db.getStatement() != null) { + writeLongStringField("statement", db.getStatement()); + } else { + final CharBuffer statementBuffer = db.getStatementBuffer(); + if (statementBuffer != null && statementBuffer.length() > 0) { + writeFieldName("statement"); + jw.writeString(statementBuffer); + jw.writeByte(COMMA); + } + } writeField("type", db.getType()); writeLastField("user", db.getUser()); jw.writeByte(OBJECT_END); diff --git a/apm-agent-core/src/test/java/co/elastic/apm/objectpool/ObjectPoolTest.java b/apm-agent-core/src/test/java/co/elastic/apm/objectpool/ObjectPoolTest.java index 3cc8a16f64..4ca54ce0bb 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/objectpool/ObjectPoolTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/objectpool/ObjectPoolTest.java @@ -35,7 +35,7 @@ public class ObjectPoolTest { @BeforeEach void setUp() { // objectPool = new ThreadLocalObjectPool<>(10, false, TestRecyclable::new); - objectPool = new QueueBasedObjectPool<>(new MpmcAtomicArrayQueue<>(MAX_SIZE), true, TestRecyclable::new); + objectPool = QueueBasedObjectPool.ofRecyclable(new MpmcAtomicArrayQueue<>(MAX_SIZE), true, TestRecyclable::new); } @Test diff --git a/apm-agent-plugins/apm-es-restclient-plugin/src/main/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelper.java b/apm-agent-plugins/apm-es-restclient-plugin/src/main/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelper.java index bdc9001500..492b38c7f3 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/src/main/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelper.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/src/main/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelper.java @@ -27,14 +27,29 @@ import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CoderResult; import java.nio.charset.StandardCharsets; @VisibleForAdvice public class ESRestClientInstrumentationHelper { private static final Logger logger = LoggerFactory.getLogger(ESRestClientInstrumentationHelper.class); - - private static ThreadLocal bodyReadBuffer = new ThreadLocal<>(); private static final byte CURLY_BRACKET_UTF8 = '{'; + private static ThreadLocal bodyReadBuffer = new ThreadLocal<>(); + private static ThreadLocal bodyBuffer = new ThreadLocal<>() { + @Override + protected ByteBuffer initialValue() { + return ByteBuffer.allocate(1024); + } + }; + private static ThreadLocal threadLocalCharsetDecoder = new ThreadLocal<>() { + @Override + protected CharsetDecoder initialValue() { + return StandardCharsets.UTF_8.newDecoder(); + } + }; @Nullable @VisibleForAdvice @@ -51,7 +66,7 @@ public static String readRequestBody(InputStream bodyIS, String endpoint) throws int length = bodyIS.read(data, 0, data.length); // read only if UTF8-encoded (based on the first byte being UTF8 of curly bracket char) - if(data[0] == CURLY_BRACKET_UTF8) { + if (data[0] == CURLY_BRACKET_UTF8) { body = new String(data, 0, length, StandardCharsets.UTF_8); } } catch (IOException e) { @@ -62,4 +77,44 @@ public static String readRequestBody(InputStream bodyIS, String endpoint) throws return body; } + + /** + * Reads the provided {@link InputStream} into the {@link CharBuffer} without causing allocations. + *

+ * The {@link InputStream} is assumed to yield an UTF-8 encoded string. + *

+ *

+ * If the {@link InputStream} yields more chars than the {@link CharBuffer#limit()} of the provided {@link CharBuffer}, + * the rest of the input is silently ignored. + *

+ * + * @param is the source {@link InputStream}, which should be encoded with UTF-8. + * @param charBuffer the {@link CharBuffer} the {@link InputStream} should be written into + * @return {@code true}, if the input stream could be decoded with the UTF-8 charset, {@code false} otherwise. + * @throws IOException in case of errors reading from the provided {@link InputStream} + */ + @VisibleForAdvice + public static boolean readUtf8Stream(InputStream is, CharBuffer charBuffer) throws IOException { + final ByteBuffer buffer = bodyBuffer.get(); + final CharsetDecoder charsetDecoder = threadLocalCharsetDecoder.get(); + try (is) { + final byte[] bufferArray = buffer.array(); + for (int read = is.read(bufferArray); read != -1; read = is.read(bufferArray)) { + buffer.limit(read); + final CoderResult coderResult = charsetDecoder.decode(buffer, charBuffer, true); + buffer.clear(); + if (coderResult.isError()) { + return false; + } else if (coderResult.isOverflow()) { + return true; + } + } + return true; + } finally { + charsetDecoder.flush(charBuffer); + charBuffer.flip(); + buffer.clear(); + charsetDecoder.reset(); + } + } } diff --git a/apm-agent-plugins/apm-es-restclient-plugin/src/main/java/co/elastic/apm/es/restclient/ElasticsearchRestClientInstrumentation.java b/apm-agent-plugins/apm-es-restclient-plugin/src/main/java/co/elastic/apm/es/restclient/ElasticsearchRestClientInstrumentation.java index 8bd28ddd7c..d80db4870c 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/src/main/java/co/elastic/apm/es/restclient/ElasticsearchRestClientInstrumentation.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/src/main/java/co/elastic/apm/es/restclient/ElasticsearchRestClientInstrumentation.java @@ -75,10 +75,7 @@ private static void onBeforeExecute(@Advice.Argument(0) Request request, HttpEntity entity = request.getEntity(); if (entity != null && entity.isRepeatable()) { try { - String body = ESRestClientInstrumentationHelper.readRequestBody(entity.getContent(), request.getEndpoint()); - if (body != null && !body.isEmpty()) { - span.getContext().getDb().withStatement(body); - } + ESRestClientInstrumentationHelper.readUtf8Stream(entity.getContent(), span.getContext().getDb().withStatementBuffer()); } catch (IOException e) { // We can't log from here } diff --git a/apm-agent-plugins/apm-es-restclient-plugin/src/test/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelperTest.java b/apm-agent-plugins/apm-es-restclient-plugin/src/test/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelperTest.java new file mode 100644 index 0000000000..84f7e29be4 --- /dev/null +++ b/apm-agent-plugins/apm-es-restclient-plugin/src/test/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelperTest.java @@ -0,0 +1,71 @@ +package co.elastic.apm.es.restclient; + +import co.elastic.apm.objectpool.impl.QueueBasedObjectPool; +import org.apache.commons.lang3.RandomStringUtils; +import org.jctools.queues.atomic.MpmcAtomicArrayQueue; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.util.concurrent.ArrayBlockingQueue; + +import static java.nio.charset.StandardCharsets.UTF_16; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; + +class ESRestClientInstrumentationHelperTest { + + @Test + void readUtf8Stream() throws IOException { + final CharBuffer charBuffer = CharBuffer.allocate(8); + assertThat(ESRestClientInstrumentationHelper.readUtf8Stream(toInputStream("{foo}", UTF_8), charBuffer)).isTrue(); + assertThat(charBuffer.toString()).isEqualTo("{foo}"); + } + + @Test + void testStringLargerThanBuffer() throws IOException { + final CharBuffer charBuffer = CharBuffer.allocate(2000); + final String longString = RandomStringUtils.randomAlphanumeric(2000); + assertThat(ESRestClientInstrumentationHelper.readUtf8Stream(toInputStream(longString, UTF_8), charBuffer)).isTrue(); + assertThat(charBuffer.toString()).isEqualTo(longString); + } + + @Test + void testReusedBuffer() throws IOException { + final QueueBasedObjectPool charBuffers = new QueueBasedObjectPool<>(new ArrayBlockingQueue<>(1), true, + () -> CharBuffer.allocate(8), CharBuffer::clear); + + final CharBuffer charBuffer1 = charBuffers.createInstance(); + assertThat(ESRestClientInstrumentationHelper.readUtf8Stream(toInputStream("foo", UTF_8), charBuffer1)).isTrue(); + assertThat(charBuffer1.toString()).isEqualTo("foo"); + + charBuffers.recycle(charBuffer1); + + final CharBuffer charBuffer2 = charBuffers.createInstance(); + assertThat(ESRestClientInstrumentationHelper.readUtf8Stream(toInputStream("barbaz", UTF_8), charBuffer2)).isTrue(); + assertThat(charBuffer2.toString()).isEqualTo("barbaz"); + assertThat((Object) charBuffer1).isSameAs(charBuffer2); + + } + + @Test + void testOverflow() throws IOException { + final CharBuffer charBuffer = CharBuffer.allocate(8); + assertThat(ESRestClientInstrumentationHelper.readUtf8Stream(toInputStream("foobarbaz", UTF_8), charBuffer)).isTrue(); + assertThat(charBuffer.toString()).isEqualTo("foobarba"); + } + + @Test + void readUtf16Stream() throws IOException { + final CharBuffer charBuffer = CharBuffer.allocate(16); + assertThat(ESRestClientInstrumentationHelper.readUtf8Stream(toInputStream("{foo}", UTF_16), charBuffer)).isFalse(); + } + + @Nonnull + private ByteArrayInputStream toInputStream(String s, Charset charset) { + return new ByteArrayInputStream(s.getBytes(charset)); + } +} diff --git a/apm-agent-plugins/apm-es-restclient-plugin/src/test/java/co/elastic/apm/es/restclient/ElasticsearchRestClientInstrumentationIT.java b/apm-agent-plugins/apm-es-restclient-plugin/src/test/java/co/elastic/apm/es/restclient/ElasticsearchRestClientInstrumentationIT.java index 77894fff11..de87608837 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/src/test/java/co/elastic/apm/es/restclient/ElasticsearchRestClientInstrumentationIT.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/src/test/java/co/elastic/apm/es/restclient/ElasticsearchRestClientInstrumentationIT.java @@ -167,7 +167,7 @@ private void validateSpanContent(Span span, String expectedName, int statusCode, assertThat(span.getContext().getDb().getType()).isEqualTo(DB_CONTEXT_TYPE); if (!expectedName.contains(SEARCH_QUERY_PATH_SUFFIX)) { - assertThat(span.getContext().getDb().getStatement()).isNull(); + assertThat((CharSequence) (span.getContext().getDb().getStatementBuffer())).isNull(); } } @@ -181,7 +181,8 @@ private void validateHttpContextContent(Http http, int statusCode, String method private void validateDbContextContent(Span span, String statement) { Db db = span.getContext().getDb(); assertThat(db.getType()).isEqualTo(DB_CONTEXT_TYPE); - assertThat(db.getStatement()).isEqualTo(statement); + assertThat((CharSequence) db.getStatementBuffer()).isNotNull(); + assertThat(db.getStatementBuffer().toString()).isEqualTo(statement); } @Test From 59eacfee05f28d2db80d16b22206bd8174a0498c Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Sun, 4 Nov 2018 14:48:59 +0100 Subject: [PATCH 2/3] Incorporate feedback --- .../co/elastic/apm/impl/transaction/Db.java | 29 +++++++++-- .../objectpool/impl/QueueBasedObjectPool.java | 8 ++- .../elastic/apm/objectpool/impl/Resetter.java | 19 +++++++ .../plugin/api/ApiScopeInstrumentation.java | 19 +++++++ .../ESRestClientInstrumentationHelper.java | 52 ++++--------------- ...ESRestClientInstrumentationHelperTest.java | 23 +++++++- docs/configuration.asciidoc | 2 +- 7 files changed, 103 insertions(+), 49 deletions(-) diff --git a/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Db.java b/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Db.java index c83e433316..d49016c0d2 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Db.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Db.java @@ -37,7 +37,7 @@ */ public class Db implements Recyclable { - private final ObjectPool charBufferPool = new QueueBasedObjectPool(new MpmcAtomicArrayQueue<>(128), false, + private final ObjectPool charBufferPool = QueueBasedObjectPool.of(new MpmcAtomicArrayQueue(128), false, new Allocator() { @Override public CharBuffer createInstance() { @@ -108,11 +108,32 @@ public Db withStatement(@Nullable String statement) { return this; } + /** + * Gets a pooled {@link CharBuffer} to record the DB statement and associates it with this instance. + *

+ * Note: you may not hold a reference to the returned {@link CharBuffer} as it will be reused. + *

+ *

+ * Note: This method is not thread safe + *

+ * + * @return a {@link CharBuffer} to record the DB statement + */ public CharBuffer withStatementBuffer() { - this.statementBuffer = charBufferPool.createInstance(); + if (this.statementBuffer == null) { + this.statementBuffer = charBufferPool.createInstance(); + } return this.statementBuffer; } + /** + * Returns the associated pooled {@link CharBuffer} to record the DB statement. + *

+ * Note: returns {@code null} unless {@link #withStatementBuffer()} has previously been called + *

+ * + * @return a {@link CharBuffer} to record the DB statement, or {@code null} + */ @Nullable public CharBuffer getStatementBuffer() { return statementBuffer; @@ -156,7 +177,9 @@ public void resetState() { statement = null; type = null; user = null; - charBufferPool.recycle(statementBuffer); + if (statementBuffer != null) { + charBufferPool.recycle(statementBuffer); + } statementBuffer = null; } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java index 05985d5b40..d42b05f056 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java @@ -41,10 +41,14 @@ public class QueueBasedObjectPool extends AbstractObjectPool implements Co * used when there are no objects in the queue and to preallocate the queue. */ public static QueueBasedObjectPool ofRecyclable(Queue queue, boolean preAllocate, Allocator allocator) { - return new QueueBasedObjectPool(queue, preAllocate, allocator, Resetter.ForRecyclable.get()); + return new QueueBasedObjectPool<>(queue, preAllocate, allocator, Resetter.ForRecyclable.get()); } - public QueueBasedObjectPool(Queue queue, boolean preAllocate, Allocator allocator, Resetter resetter) { + public static QueueBasedObjectPool of(Queue queue, boolean preAllocate, Allocator allocator, Resetter resetter) { + return new QueueBasedObjectPool<>(queue, preAllocate, allocator, resetter); + } + + private QueueBasedObjectPool(Queue queue, boolean preAllocate, Allocator allocator, Resetter resetter) { super(allocator); this.queue = queue; this.resetter = resetter; diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/Resetter.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/Resetter.java index ddb571141a..9577220570 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/Resetter.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/Resetter.java @@ -1,3 +1,22 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 Elastic and contributors + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ package co.elastic.apm.objectpool.impl; import co.elastic.apm.objectpool.Recyclable; diff --git a/apm-agent-plugins/apm-api-plugin/src/main/java/co/elastic/apm/plugin/api/ApiScopeInstrumentation.java b/apm-agent-plugins/apm-api-plugin/src/main/java/co/elastic/apm/plugin/api/ApiScopeInstrumentation.java index 8f059c7a4d..b1240c5f59 100644 --- a/apm-agent-plugins/apm-api-plugin/src/main/java/co/elastic/apm/plugin/api/ApiScopeInstrumentation.java +++ b/apm-agent-plugins/apm-api-plugin/src/main/java/co/elastic/apm/plugin/api/ApiScopeInstrumentation.java @@ -1,3 +1,22 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 Elastic and contributors + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ package co.elastic.apm.plugin.api; import co.elastic.apm.bci.ElasticApmInstrumentation; diff --git a/apm-agent-plugins/apm-es-restclient-plugin/src/main/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelper.java b/apm-agent-plugins/apm-es-restclient-plugin/src/main/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelper.java index 492b38c7f3..6a2ece87bd 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/src/main/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelper.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/src/main/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelper.java @@ -20,11 +20,7 @@ package co.elastic.apm.es.restclient; import co.elastic.apm.bci.VisibleForAdvice; -import co.elastic.apm.report.serialize.DslJsonSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -35,49 +31,19 @@ @VisibleForAdvice public class ESRestClientInstrumentationHelper { - private static final Logger logger = LoggerFactory.getLogger(ESRestClientInstrumentationHelper.class); - private static final byte CURLY_BRACKET_UTF8 = '{'; - private static ThreadLocal bodyReadBuffer = new ThreadLocal<>(); - private static ThreadLocal bodyBuffer = new ThreadLocal<>() { + private static ThreadLocal threadLocalByteBuffer = new ThreadLocal() { @Override protected ByteBuffer initialValue() { return ByteBuffer.allocate(1024); } }; - private static ThreadLocal threadLocalCharsetDecoder = new ThreadLocal<>() { + private static ThreadLocal threadLocalCharsetDecoder = new ThreadLocal() { @Override protected CharsetDecoder initialValue() { return StandardCharsets.UTF_8.newDecoder(); } }; - @Nullable - @VisibleForAdvice - public static String readRequestBody(InputStream bodyIS, String endpoint) throws IOException { - String body = null; - try { - byte[] data = bodyReadBuffer.get(); - if (data == null) { - // The DslJsonSerializer.MAX_LONG_STRING_VALUE_LENGTH is actually used to count chars and not bytes, but that's not - // that important, the most important is that we limit the payload size we read and decode - data = new byte[DslJsonSerializer.MAX_LONG_STRING_VALUE_LENGTH]; - bodyReadBuffer.set(data); - } - int length = bodyIS.read(data, 0, data.length); - - // read only if UTF8-encoded (based on the first byte being UTF8 of curly bracket char) - if (data[0] == CURLY_BRACKET_UTF8) { - body = new String(data, 0, length, StandardCharsets.UTF_8); - } - } catch (IOException e) { - logger.info("Failed to read request body for " + endpoint); - } finally { - bodyIS.close(); - } - - return body; - } - /** * Reads the provided {@link InputStream} into the {@link CharBuffer} without causing allocations. *

@@ -94,27 +60,31 @@ public static String readRequestBody(InputStream bodyIS, String endpoint) throws * @throws IOException in case of errors reading from the provided {@link InputStream} */ @VisibleForAdvice - public static boolean readUtf8Stream(InputStream is, CharBuffer charBuffer) throws IOException { - final ByteBuffer buffer = bodyBuffer.get(); + public static boolean readUtf8Stream(final InputStream is, final CharBuffer charBuffer) throws IOException { + final ByteBuffer buffer = threadLocalByteBuffer.get(); final CharsetDecoder charsetDecoder = threadLocalCharsetDecoder.get(); - try (is) { + try { final byte[] bufferArray = buffer.array(); for (int read = is.read(bufferArray); read != -1; read = is.read(bufferArray)) { buffer.limit(read); final CoderResult coderResult = charsetDecoder.decode(buffer, charBuffer, true); buffer.clear(); if (coderResult.isError()) { + // this is not UTF-8 + charBuffer.clear(); return false; } else if (coderResult.isOverflow()) { - return true; + // stream yields more chars than the charBuffer can hold + break; } } + charsetDecoder.flush(charBuffer); return true; } finally { - charsetDecoder.flush(charBuffer); charBuffer.flip(); buffer.clear(); charsetDecoder.reset(); + is.close(); } } } diff --git a/apm-agent-plugins/apm-es-restclient-plugin/src/test/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelperTest.java b/apm-agent-plugins/apm-es-restclient-plugin/src/test/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelperTest.java index 84f7e29be4..ac3f39195b 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/src/test/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelperTest.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/src/test/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelperTest.java @@ -1,8 +1,26 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 Elastic and contributors + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ package co.elastic.apm.es.restclient; import co.elastic.apm.objectpool.impl.QueueBasedObjectPool; import org.apache.commons.lang3.RandomStringUtils; -import org.jctools.queues.atomic.MpmcAtomicArrayQueue; import org.junit.jupiter.api.Test; import javax.annotation.Nonnull; @@ -35,7 +53,7 @@ void testStringLargerThanBuffer() throws IOException { @Test void testReusedBuffer() throws IOException { - final QueueBasedObjectPool charBuffers = new QueueBasedObjectPool<>(new ArrayBlockingQueue<>(1), true, + final QueueBasedObjectPool charBuffers = QueueBasedObjectPool.of(new ArrayBlockingQueue<>(1), true, () -> CharBuffer.allocate(8), CharBuffer::clear); final CharBuffer charBuffer1 = charBuffers.createInstance(); @@ -62,6 +80,7 @@ void testOverflow() throws IOException { void readUtf16Stream() throws IOException { final CharBuffer charBuffer = CharBuffer.allocate(16); assertThat(ESRestClientInstrumentationHelper.readUtf8Stream(toInputStream("{foo}", UTF_16), charBuffer)).isFalse(); + assertThat(charBuffer.length()).isZero(); } @Nonnull diff --git a/docs/configuration.asciidoc b/docs/configuration.asciidoc index 9d960e5ca3..471612b959 100644 --- a/docs/configuration.asciidoc +++ b/docs/configuration.asciidoc @@ -1109,7 +1109,7 @@ The default unit for this option is `ms` # # Events like transactions and spans are buffered when the agent can't keep up with sending them to the APM Server or if the APM server is down. # -# If the queue is full, events are rejected which means you will transactions and spans in that case. +# If the queue is full, events are rejected which means you will lose transactions and spans in that case. # This guards the application from crashing in case the APM server is unavailable for a longer period of time. # # A lower value will decrease the heap overhead of the agent, From 5c2d8aaf58cc6b2e8e08f4ecf6d079ef621b544b Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Sun, 4 Nov 2018 15:14:08 +0100 Subject: [PATCH 3/3] Move ESRestClientInstrumentationHelper to core to avoid duplication --- .../java/co/elastic/apm/util/IOUtils.java | 4 +- .../java/co/elastic/apm/util/IOUtilsTest.java | 16 ++--- .../ESRestClientInstrumentationHelper.java | 65 ------------------- ...lasticsearchRestClientInstrumentation.java | 8 +-- ...sticsearchRestClientInstrumentationIT.java | 3 +- ...lasticsearchRestClientInstrumentation.java | 3 +- 6 files changed, 17 insertions(+), 82 deletions(-) rename apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/es/restclient/v6_4/ESRestClientInstrumentationHelper.java => apm-agent-core/src/main/java/co/elastic/apm/util/IOUtils.java (97%) rename apm-agent-plugins/apm-es-restclient-plugin/src/test/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelperTest.java => apm-agent-core/src/test/java/co/elastic/apm/util/IOUtilsTest.java (77%) delete mode 100644 apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/es/restclient/v5_6/ESRestClientInstrumentationHelper.java diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/es/restclient/v6_4/ESRestClientInstrumentationHelper.java b/apm-agent-core/src/main/java/co/elastic/apm/util/IOUtils.java similarity index 97% rename from apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/es/restclient/v6_4/ESRestClientInstrumentationHelper.java rename to apm-agent-core/src/main/java/co/elastic/apm/util/IOUtils.java index c3b61bd71c..d7f9e314f4 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/es/restclient/v6_4/ESRestClientInstrumentationHelper.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/util/IOUtils.java @@ -17,7 +17,7 @@ * limitations under the License. * #L% */ -package co.elastic.apm.es.restclient.v6_4; +package co.elastic.apm.util; import co.elastic.apm.bci.VisibleForAdvice; @@ -30,7 +30,7 @@ import java.nio.charset.StandardCharsets; @VisibleForAdvice -public class ESRestClientInstrumentationHelper { +public class IOUtils { private static ThreadLocal threadLocalByteBuffer = new ThreadLocal() { @Override protected ByteBuffer initialValue() { diff --git a/apm-agent-plugins/apm-es-restclient-plugin/src/test/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelperTest.java b/apm-agent-core/src/test/java/co/elastic/apm/util/IOUtilsTest.java similarity index 77% rename from apm-agent-plugins/apm-es-restclient-plugin/src/test/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelperTest.java rename to apm-agent-core/src/test/java/co/elastic/apm/util/IOUtilsTest.java index ac3f39195b..161dfab9b9 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/src/test/java/co/elastic/apm/es/restclient/ESRestClientInstrumentationHelperTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/util/IOUtilsTest.java @@ -17,7 +17,7 @@ * limitations under the License. * #L% */ -package co.elastic.apm.es.restclient; +package co.elastic.apm.util; import co.elastic.apm.objectpool.impl.QueueBasedObjectPool; import org.apache.commons.lang3.RandomStringUtils; @@ -34,12 +34,12 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.assertj.core.api.Assertions.assertThat; -class ESRestClientInstrumentationHelperTest { +class IOUtilsTest { @Test void readUtf8Stream() throws IOException { final CharBuffer charBuffer = CharBuffer.allocate(8); - assertThat(ESRestClientInstrumentationHelper.readUtf8Stream(toInputStream("{foo}", UTF_8), charBuffer)).isTrue(); + assertThat(IOUtils.readUtf8Stream(toInputStream("{foo}", UTF_8), charBuffer)).isTrue(); assertThat(charBuffer.toString()).isEqualTo("{foo}"); } @@ -47,7 +47,7 @@ void readUtf8Stream() throws IOException { void testStringLargerThanBuffer() throws IOException { final CharBuffer charBuffer = CharBuffer.allocate(2000); final String longString = RandomStringUtils.randomAlphanumeric(2000); - assertThat(ESRestClientInstrumentationHelper.readUtf8Stream(toInputStream(longString, UTF_8), charBuffer)).isTrue(); + assertThat(IOUtils.readUtf8Stream(toInputStream(longString, UTF_8), charBuffer)).isTrue(); assertThat(charBuffer.toString()).isEqualTo(longString); } @@ -57,13 +57,13 @@ void testReusedBuffer() throws IOException { () -> CharBuffer.allocate(8), CharBuffer::clear); final CharBuffer charBuffer1 = charBuffers.createInstance(); - assertThat(ESRestClientInstrumentationHelper.readUtf8Stream(toInputStream("foo", UTF_8), charBuffer1)).isTrue(); + assertThat(IOUtils.readUtf8Stream(toInputStream("foo", UTF_8), charBuffer1)).isTrue(); assertThat(charBuffer1.toString()).isEqualTo("foo"); charBuffers.recycle(charBuffer1); final CharBuffer charBuffer2 = charBuffers.createInstance(); - assertThat(ESRestClientInstrumentationHelper.readUtf8Stream(toInputStream("barbaz", UTF_8), charBuffer2)).isTrue(); + assertThat(IOUtils.readUtf8Stream(toInputStream("barbaz", UTF_8), charBuffer2)).isTrue(); assertThat(charBuffer2.toString()).isEqualTo("barbaz"); assertThat((Object) charBuffer1).isSameAs(charBuffer2); @@ -72,14 +72,14 @@ void testReusedBuffer() throws IOException { @Test void testOverflow() throws IOException { final CharBuffer charBuffer = CharBuffer.allocate(8); - assertThat(ESRestClientInstrumentationHelper.readUtf8Stream(toInputStream("foobarbaz", UTF_8), charBuffer)).isTrue(); + assertThat(IOUtils.readUtf8Stream(toInputStream("foobarbaz", UTF_8), charBuffer)).isTrue(); assertThat(charBuffer.toString()).isEqualTo("foobarba"); } @Test void readUtf16Stream() throws IOException { final CharBuffer charBuffer = CharBuffer.allocate(16); - assertThat(ESRestClientInstrumentationHelper.readUtf8Stream(toInputStream("{foo}", UTF_16), charBuffer)).isFalse(); + assertThat(IOUtils.readUtf8Stream(toInputStream("{foo}", UTF_16), charBuffer)).isFalse(); assertThat(charBuffer.length()).isZero(); } diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/es/restclient/v5_6/ESRestClientInstrumentationHelper.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/es/restclient/v5_6/ESRestClientInstrumentationHelper.java deleted file mode 100644 index 6064c2113a..0000000000 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/es/restclient/v5_6/ESRestClientInstrumentationHelper.java +++ /dev/null @@ -1,65 +0,0 @@ -/*- - * #%L - * Elastic APM Java agent - * %% - * Copyright (C) 2018 Elastic and contributors - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package co.elastic.apm.es.restclient.v5_6; - -import co.elastic.apm.bci.VisibleForAdvice; -import co.elastic.apm.report.serialize.DslJsonSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; - -@VisibleForAdvice -public class ESRestClientInstrumentationHelper { - private static final Logger logger = LoggerFactory.getLogger(ESRestClientInstrumentationHelper.class); - - private static ThreadLocal bodyReadBuffer = new ThreadLocal<>(); - private static final byte CURLY_BRACKET_UTF8 = '{'; - - @Nullable - @VisibleForAdvice - public static String readRequestBody(InputStream bodyIS, String endpoint) throws IOException { - String body = null; - try { - byte[] data = bodyReadBuffer.get(); - if (data == null) { - // The DslJsonSerializer.MAX_LONG_STRING_VALUE_LENGTH is actually used to count chars and not bytes, but that's not - // that important, the most important is that we limit the payload size we read and decode - data = new byte[DslJsonSerializer.MAX_LONG_STRING_VALUE_LENGTH]; - bodyReadBuffer.set(data); - } - int length = bodyIS.read(data, 0, data.length); - - // read only if UTF8-encoded (based on the first byte being UTF8 of curly bracket char) - if(data[0] == CURLY_BRACKET_UTF8) { - body = new String(data, 0, length, StandardCharsets.UTF_8); - } - } catch (IOException e) { - logger.info("Failed to read request body for " + endpoint); - } finally { - bodyIS.close(); - } - - return body; - } -} diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/es/restclient/v5_6/ElasticsearchRestClientInstrumentation.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/es/restclient/v5_6/ElasticsearchRestClientInstrumentation.java index 36f4807194..e24ee5b24c 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/es/restclient/v5_6/ElasticsearchRestClientInstrumentation.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/es/restclient/v5_6/ElasticsearchRestClientInstrumentation.java @@ -23,6 +23,7 @@ import co.elastic.apm.bci.VisibleForAdvice; import co.elastic.apm.impl.transaction.AbstractSpan; import co.elastic.apm.impl.transaction.Span; +import co.elastic.apm.util.IOUtils; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; @@ -57,7 +58,7 @@ public class ElasticsearchRestClientInstrumentation extends ElasticApmInstrument @Advice.OnMethodEnter private static void onBeforeExecute(@Advice.Argument(0) String method, @Advice.Argument(1) String endpoint, - @Advice.Argument(3) HttpEntity entity, + @Advice.Argument(3) @Nullable HttpEntity entity, @Advice.Local("span") Span span) { if (tracer == null) { return; @@ -77,10 +78,7 @@ private static void onBeforeExecute(@Advice.Argument(0) String method, if (endpoint.endsWith(SEARCH_QUERY_PATH_SUFFIX)) { if (entity != null && entity.isRepeatable()) { try { - String body = ESRestClientInstrumentationHelper.readRequestBody(entity.getContent(), endpoint); - if (body != null && !body.isEmpty()) { - span.getContext().getDb().withStatement(body); - } + IOUtils.readUtf8Stream(entity.getContent(), span.getContext().getDb().withStatementBuffer()); } catch (IOException e) { // We can't log from here } diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/test/java/co/elastic/apm/es/restclient/v5_6/ElasticsearchRestClientInstrumentationIT.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/test/java/co/elastic/apm/es/restclient/v5_6/ElasticsearchRestClientInstrumentationIT.java index 7b4eb43d7a..bb6f3c7a6a 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/test/java/co/elastic/apm/es/restclient/v5_6/ElasticsearchRestClientInstrumentationIT.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/test/java/co/elastic/apm/es/restclient/v5_6/ElasticsearchRestClientInstrumentationIT.java @@ -180,7 +180,8 @@ private void validateHttpContextContent(Http http, int statusCode, String method private void validateDbContextContent(Span span, String statement) { Db db = span.getContext().getDb(); assertThat(db.getType()).isEqualTo(DB_CONTEXT_TYPE); - assertThat(db.getStatement()).isEqualTo(statement); + assertThat((Object) db.getStatementBuffer()).isNotNull(); + assertThat(db.getStatementBuffer().toString()).isEqualTo(statement); } @Test diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/es/restclient/v6_4/ElasticsearchRestClientInstrumentation.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/es/restclient/v6_4/ElasticsearchRestClientInstrumentation.java index b36bff9d26..ed972d4157 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/es/restclient/v6_4/ElasticsearchRestClientInstrumentation.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/es/restclient/v6_4/ElasticsearchRestClientInstrumentation.java @@ -23,6 +23,7 @@ import co.elastic.apm.bci.VisibleForAdvice; import co.elastic.apm.impl.transaction.AbstractSpan; import co.elastic.apm.impl.transaction.Span; +import co.elastic.apm.util.IOUtils; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; @@ -75,7 +76,7 @@ private static void onBeforeExecute(@Advice.Argument(0) Request request, HttpEntity entity = request.getEntity(); if (entity != null && entity.isRepeatable()) { try { - ESRestClientInstrumentationHelper.readUtf8Stream(entity.getContent(), span.getContext().getDb().withStatementBuffer()); + IOUtils.readUtf8Stream(entity.getContent(), span.getContext().getDb().withStatementBuffer()); } catch (IOException e) { // We can't log from here }