diff --git a/apm-agent-benchmarks/src/main/java/co/elastic/apm/benchmark/objectpool/ObjectPoolBenchmark.java b/apm-agent-benchmarks/src/main/java/co/elastic/apm/benchmark/objectpool/ObjectPoolBenchmark.java index 74774a3582..8d92e07d31 100644 --- a/apm-agent-benchmarks/src/main/java/co/elastic/apm/benchmark/objectpool/ObjectPoolBenchmark.java +++ b/apm-agent-benchmarks/src/main/java/co/elastic/apm/benchmark/objectpool/ObjectPoolBenchmark.java @@ -60,13 +60,13 @@ public static void main(String[] args) throws RunnerException { @Setup public void setUp() { tracer = new ElasticApmTracerBuilder().build(); - blockingQueueObjectPool = new QueueBasedObjectPool<>(new ArrayBlockingQueue<>(256), true, () -> new Transaction(tracer)); - jctoolsQueueObjectPool = new QueueBasedObjectPool<>(new MpmcArrayQueue<>(256), true, () -> new Transaction(tracer)); - jctoolsAtomicQueueObjectPool = new QueueBasedObjectPool<>(new MpmcAtomicArrayQueue<>(256), true, () -> new Transaction(tracer)); - agronaQueueObjectPool = new QueueBasedObjectPool<>(new ManyToManyConcurrentArrayQueue<>(256), true, () -> new Transaction(tracer)); + blockingQueueObjectPool = QueueBasedObjectPool.ofRecyclable(new ArrayBlockingQueue<>(256), true, () -> new Transaction(tracer)); + jctoolsQueueObjectPool = QueueBasedObjectPool.ofRecyclable(new MpmcArrayQueue<>(256), true, () -> new Transaction(tracer)); + jctoolsAtomicQueueObjectPool = QueueBasedObjectPool.ofRecyclable(new MpmcAtomicArrayQueue<>(256), true, () -> new Transaction(tracer)); + agronaQueueObjectPool = QueueBasedObjectPool.ofRecyclable(new ManyToManyConcurrentArrayQueue<>(256), true, () -> new Transaction(tracer)); mixedObjectPool = new MixedObjectPool<>(() -> new Transaction(tracer), new ThreadLocalObjectPool<>(256, true, () -> new Transaction(tracer)), - new QueueBasedObjectPool<>(new ManyToManyConcurrentArrayQueue<>(256), true, () -> new Transaction(tracer))); + QueueBasedObjectPool.ofRecyclable(new ManyToManyConcurrentArrayQueue<>(256), true, () -> new Transaction(tracer))); threadLocalObjectPool = new ThreadLocalObjectPool<>(64, true, () -> new Transaction(tracer)); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/impl/ElasticApmTracer.java b/apm-agent-core/src/main/java/co/elastic/apm/impl/ElasticApmTracer.java index 9571b1a48e..b10861d01d 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/impl/ElasticApmTracer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/impl/ElasticApmTracer.java @@ -30,7 +30,7 @@ import co.elastic.apm.impl.transaction.TraceContext; import co.elastic.apm.impl.transaction.Transaction; import co.elastic.apm.objectpool.ObjectPool; -import co.elastic.apm.objectpool.RecyclableObjectFactory; +import co.elastic.apm.objectpool.Allocator; import co.elastic.apm.objectpool.impl.QueueBasedObjectPool; import co.elastic.apm.report.Reporter; import co.elastic.apm.report.ReporterConfiguration; @@ -85,23 +85,23 @@ protected Deque initialValue() { this.spanListeners = spanListeners; int maxPooledElements = configurationRegistry.getConfig(ReporterConfiguration.class).getMaxQueueSize() * 2; coreConfiguration = configurationRegistry.getConfig(CoreConfiguration.class); - transactionPool = new QueueBasedObjectPool<>(AtomicQueueFactory.newQueue(createBoundedMpmc(maxPooledElements)), false, - new RecyclableObjectFactory() { + transactionPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.newQueue(createBoundedMpmc(maxPooledElements)), false, + new Allocator() { @Override public Transaction createInstance() { return new Transaction(ElasticApmTracer.this); } }); - spanPool = new QueueBasedObjectPool<>(AtomicQueueFactory.newQueue(createBoundedMpmc(maxPooledElements)), false, - new RecyclableObjectFactory() { + spanPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.newQueue(createBoundedMpmc(maxPooledElements)), false, + new Allocator() { @Override public Span createInstance() { return new Span(ElasticApmTracer.this); } }); // we are assuming that we don't need as many errors as spans or transactions - errorPool = new QueueBasedObjectPool<>(AtomicQueueFactory.newQueue(createBoundedMpmc(maxPooledElements / 2)), false, - new RecyclableObjectFactory() { + errorPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.newQueue(createBoundedMpmc(maxPooledElements / 2)), false, + new Allocator() { @Override public ErrorCapture createInstance() { return new ErrorCapture(ElasticApmTracer.this); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Db.java b/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Db.java index 75198e1d1c..d49016c0d2 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Db.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Db.java @@ -20,9 +20,16 @@ package co.elastic.apm.impl.transaction; +import co.elastic.apm.objectpool.Allocator; +import co.elastic.apm.objectpool.ObjectPool; import co.elastic.apm.objectpool.Recyclable; +import co.elastic.apm.objectpool.impl.QueueBasedObjectPool; +import co.elastic.apm.objectpool.impl.Resetter; +import co.elastic.apm.report.serialize.DslJsonSerializer; +import org.jctools.queues.atomic.MpmcAtomicArrayQueue; import javax.annotation.Nullable; +import java.nio.CharBuffer; /** @@ -30,6 +37,20 @@ */ public class Db implements Recyclable { + private final ObjectPool charBufferPool = QueueBasedObjectPool.of(new MpmcAtomicArrayQueue(128), false, + new Allocator() { + @Override + public CharBuffer createInstance() { + return CharBuffer.allocate(DslJsonSerializer.MAX_LONG_STRING_VALUE_LENGTH); + } + }, + new Resetter() { + @Override + public void recycle(CharBuffer object) { + object.clear(); + } + }); + /** * Database instance name */ @@ -40,6 +61,10 @@ public class Db implements Recyclable { */ @Nullable private String statement; + + @Nullable + private CharBuffer statementBuffer; + /** * Database type. For any SQL database, "sql". For others, the lower-case database category, e.g. "cassandra", "hbase", or "redis" */ @@ -83,6 +108,37 @@ public Db withStatement(@Nullable String statement) { return this; } + /** + * Gets a pooled {@link CharBuffer} to record the DB statement and associates it with this instance. + *

+ * Note: you may not hold a reference to the returned {@link CharBuffer} as it will be reused. + *

+ *

+ * Note: This method is not thread safe + *

+ * + * @return a {@link CharBuffer} to record the DB statement + */ + public CharBuffer withStatementBuffer() { + if (this.statementBuffer == null) { + this.statementBuffer = charBufferPool.createInstance(); + } + return this.statementBuffer; + } + + /** + * Returns the associated pooled {@link CharBuffer} to record the DB statement. + *

+ * Note: returns {@code null} unless {@link #withStatementBuffer()} has previously been called + *

+ * + * @return a {@link CharBuffer} to record the DB statement, or {@code null} + */ + @Nullable + public CharBuffer getStatementBuffer() { + return statementBuffer; + } + /** * Database type. For any SQL database, "sql". For others, the lower-case database category, e.g. "cassandra", "hbase", or "redis" */ @@ -121,13 +177,18 @@ public void resetState() { statement = null; type = null; user = null; + if (statementBuffer != null) { + charBufferPool.recycle(statementBuffer); + } + statementBuffer = null; } public boolean hasContent() { return instance != null || statement != null || type != null || - user != null; + user != null || + statementBuffer != null; } public void copyFrom(Db other) { diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/RecyclableObjectFactory.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/Allocator.java similarity index 91% rename from apm-agent-core/src/main/java/co/elastic/apm/objectpool/RecyclableObjectFactory.java rename to apm-agent-core/src/main/java/co/elastic/apm/objectpool/Allocator.java index 514917f193..23aaf6f1fe 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/RecyclableObjectFactory.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/Allocator.java @@ -19,7 +19,7 @@ */ package co.elastic.apm.objectpool; -public interface RecyclableObjectFactory { +public interface Allocator { T createInstance(); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/NoopObjectPool.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/NoopObjectPool.java index 0a91ed1ec8..2fd6f1ed1c 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/NoopObjectPool.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/NoopObjectPool.java @@ -24,21 +24,21 @@ public class NoopObjectPool implements ObjectPool { - private final RecyclableObjectFactory recyclableObjectFactory; + private final Allocator allocator; - public NoopObjectPool(RecyclableObjectFactory recyclableObjectFactory) { - this.recyclableObjectFactory = recyclableObjectFactory; + public NoopObjectPool(Allocator allocator) { + this.allocator = allocator; } @Nullable @Override public T tryCreateInstance() { - return recyclableObjectFactory.createInstance(); + return allocator.createInstance(); } @Override public T createInstance() { - return recyclableObjectFactory.createInstance(); + return allocator.createInstance(); } @Override diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/ObjectPool.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/ObjectPool.java index a313fa4b6f..734b40cf71 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/ObjectPool.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/ObjectPool.java @@ -22,7 +22,7 @@ import javax.annotation.Nullable; import java.io.Closeable; -public interface ObjectPool extends Closeable { +public interface ObjectPool extends Closeable { @Nullable T tryCreateInstance(); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/AbstractObjectPool.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/AbstractObjectPool.java index f74b33d034..bdb170f938 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/AbstractObjectPool.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/AbstractObjectPool.java @@ -20,18 +20,17 @@ package co.elastic.apm.objectpool.impl; import co.elastic.apm.objectpool.ObjectPool; -import co.elastic.apm.objectpool.Recyclable; -import co.elastic.apm.objectpool.RecyclableObjectFactory; +import co.elastic.apm.objectpool.Allocator; import java.util.concurrent.atomic.AtomicInteger; -public abstract class AbstractObjectPool implements ObjectPool { +public abstract class AbstractObjectPool implements ObjectPool { - protected final RecyclableObjectFactory recyclableObjectFactory; + protected final Allocator allocator; private final AtomicInteger garbageCreated = new AtomicInteger(); - protected AbstractObjectPool(RecyclableObjectFactory recyclableObjectFactory) { - this.recyclableObjectFactory = recyclableObjectFactory; + protected AbstractObjectPool(Allocator allocator) { + this.allocator = allocator; } @Override @@ -40,7 +39,7 @@ public T createInstance() { if (recyclable == null) { // queue is empty, falling back to creating a new instance garbageCreated.incrementAndGet(); - return recyclableObjectFactory.createInstance(); + return allocator.createInstance(); } else { return recyclable; } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/MixedObjectPool.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/MixedObjectPool.java index eb1589c4b5..3b80f64f06 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/MixedObjectPool.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/MixedObjectPool.java @@ -20,19 +20,18 @@ package co.elastic.apm.objectpool.impl; import co.elastic.apm.objectpool.ObjectPool; -import co.elastic.apm.objectpool.Recyclable; -import co.elastic.apm.objectpool.RecyclableObjectFactory; +import co.elastic.apm.objectpool.Allocator; import javax.annotation.Nullable; import java.io.IOException; -public class MixedObjectPool extends AbstractObjectPool { +public class MixedObjectPool extends AbstractObjectPool { private final ObjectPool primaryPool; private final ObjectPool secondaryPool; - public MixedObjectPool(final RecyclableObjectFactory recyclableObjectFactory, ObjectPool primaryPool, ObjectPool secondaryPool) { - super(recyclableObjectFactory); + public MixedObjectPool(final Allocator allocator, ObjectPool primaryPool, ObjectPool secondaryPool) { + super(allocator); this.primaryPool = primaryPool; this.secondaryPool = secondaryPool; } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java index 873a710e92..d42b05f056 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java @@ -19,8 +19,8 @@ */ package co.elastic.apm.objectpool.impl; +import co.elastic.apm.objectpool.Allocator; import co.elastic.apm.objectpool.Recyclable; -import co.elastic.apm.objectpool.RecyclableObjectFactory; import com.lmax.disruptor.EventFactory; import javax.annotation.Nullable; @@ -28,23 +28,33 @@ import java.util.Iterator; import java.util.Queue; -public class QueueBasedObjectPool extends AbstractObjectPool implements Collection { +public class QueueBasedObjectPool extends AbstractObjectPool implements Collection { private final Queue queue; + private final Resetter resetter; /** * @param queue the underlying queue - * @param preAllocate when set to true, the recyclableObjectFactory will be used to create maxPooledElements objects + * @param preAllocate when set to true, the allocator will be used to create maxPooledElements objects * which are then stored in the queue - * @param recyclableObjectFactory a factory method which is used to create new instances of the recyclable object. This factory is + * @param allocator a factory method which is used to create new instances of the recyclable object. This factory is * used when there are no objects in the queue and to preallocate the queue. */ - public QueueBasedObjectPool(Queue queue, boolean preAllocate, RecyclableObjectFactory recyclableObjectFactory) { - super(recyclableObjectFactory); + public static QueueBasedObjectPool ofRecyclable(Queue queue, boolean preAllocate, Allocator allocator) { + return new QueueBasedObjectPool<>(queue, preAllocate, allocator, Resetter.ForRecyclable.get()); + } + + public static QueueBasedObjectPool of(Queue queue, boolean preAllocate, Allocator allocator, Resetter resetter) { + return new QueueBasedObjectPool<>(queue, preAllocate, allocator, resetter); + } + + private QueueBasedObjectPool(Queue queue, boolean preAllocate, Allocator allocator, Resetter resetter) { + super(allocator); this.queue = queue; + this.resetter = resetter; if (preAllocate) { for (int i = 0; i < this.queue.size(); i++) { - this.queue.offer(recyclableObjectFactory.createInstance()); + this.queue.offer(allocator.createInstance()); } } } @@ -57,7 +67,7 @@ public T tryCreateInstance() { @Override public void recycle(T obj) { - obj.resetState(); + resetter.recycle(obj); queue.offer(obj); } diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/Resetter.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/Resetter.java new file mode 100644 index 0000000000..9577220570 --- /dev/null +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/Resetter.java @@ -0,0 +1,41 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 Elastic and contributors + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package co.elastic.apm.objectpool.impl; + +import co.elastic.apm.objectpool.Recyclable; + +public interface Resetter { + + void recycle(T object); + + class ForRecyclable implements Resetter { + private static ForRecyclable INSTANCE = new ForRecyclable(); + + public static Resetter get() { + return INSTANCE; + } + + @Override + public void recycle(Recyclable object) { + object.resetState(); + } + } + +} diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/ThreadLocalObjectPool.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/ThreadLocalObjectPool.java index 691c86975d..9d7588ebc1 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/ThreadLocalObjectPool.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/ThreadLocalObjectPool.java @@ -20,7 +20,7 @@ package co.elastic.apm.objectpool.impl; import co.elastic.apm.objectpool.Recyclable; -import co.elastic.apm.objectpool.RecyclableObjectFactory; +import co.elastic.apm.objectpool.Allocator; import javax.annotation.Nullable; @@ -30,8 +30,8 @@ public class ThreadLocalObjectPool extends AbstractObjectP private final int maxNumPooledObjectsPerThread; private final boolean preAllocate; - public ThreadLocalObjectPool(final int maxNumPooledObjectsPerThread, final boolean preAllocate, final RecyclableObjectFactory recyclableObjectFactory) { - super(recyclableObjectFactory); + public ThreadLocalObjectPool(final int maxNumPooledObjectsPerThread, final boolean preAllocate, final Allocator allocator) { + super(allocator); this.maxNumPooledObjectsPerThread = maxNumPooledObjectsPerThread; this.preAllocate = preAllocate; } @@ -78,7 +78,7 @@ private FixedSizeStack createStack(boolean preAllocate) { FixedSizeStack stack = new FixedSizeStack<>(maxNumPooledObjectsPerThread); if (preAllocate) { for (int i = 0; i < maxNumPooledObjectsPerThread; i++) { - stack.push(recyclableObjectFactory.createInstance()); + stack.push(allocator.createInstance()); } } return stack; diff --git a/apm-agent-core/src/main/java/co/elastic/apm/report/serialize/DslJsonSerializer.java b/apm-agent-core/src/main/java/co/elastic/apm/report/serialize/DslJsonSerializer.java index a2fdf14aee..6e8ea8cbe2 100644 --- a/apm-agent-core/src/main/java/co/elastic/apm/report/serialize/DslJsonSerializer.java +++ b/apm-agent-core/src/main/java/co/elastic/apm/report/serialize/DslJsonSerializer.java @@ -59,6 +59,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.CharBuffer; import java.nio.charset.Charset; import java.util.Arrays; import java.util.Collection; @@ -559,7 +560,16 @@ private void serializeDbContext(final Db db) { writeFieldName("db"); jw.writeByte(OBJECT_START); writeField("instance", db.getInstance()); - writeLongStringField("statement", db.getStatement()); + if (db.getStatement() != null) { + writeLongStringField("statement", db.getStatement()); + } else { + final CharBuffer statementBuffer = db.getStatementBuffer(); + if (statementBuffer != null && statementBuffer.length() > 0) { + writeFieldName("statement"); + jw.writeString(statementBuffer); + jw.writeByte(COMMA); + } + } writeField("type", db.getType()); writeLastField("user", db.getUser()); jw.writeByte(OBJECT_END); diff --git a/apm-agent-core/src/main/java/co/elastic/apm/util/IOUtils.java b/apm-agent-core/src/main/java/co/elastic/apm/util/IOUtils.java new file mode 100644 index 0000000000..d7f9e314f4 --- /dev/null +++ b/apm-agent-core/src/main/java/co/elastic/apm/util/IOUtils.java @@ -0,0 +1,90 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 Elastic and contributors + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package co.elastic.apm.util; + +import co.elastic.apm.bci.VisibleForAdvice; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CoderResult; +import java.nio.charset.StandardCharsets; + +@VisibleForAdvice +public class IOUtils { + private static ThreadLocal threadLocalByteBuffer = new ThreadLocal() { + @Override + protected ByteBuffer initialValue() { + return ByteBuffer.allocate(1024); + } + }; + private static ThreadLocal threadLocalCharsetDecoder = new ThreadLocal() { + @Override + protected CharsetDecoder initialValue() { + return StandardCharsets.UTF_8.newDecoder(); + } + }; + + /** + * Reads the provided {@link InputStream} into the {@link CharBuffer} without causing allocations. + *

+ * The {@link InputStream} is assumed to yield an UTF-8 encoded string. + *

+ *

+ * If the {@link InputStream} yields more chars than the {@link CharBuffer#limit()} of the provided {@link CharBuffer}, + * the rest of the input is silently ignored. + *

+ * + * @param is the source {@link InputStream}, which should be encoded with UTF-8. + * @param charBuffer the {@link CharBuffer} the {@link InputStream} should be written into + * @return {@code true}, if the input stream could be decoded with the UTF-8 charset, {@code false} otherwise. + * @throws IOException in case of errors reading from the provided {@link InputStream} + */ + @VisibleForAdvice + public static boolean readUtf8Stream(final InputStream is, final CharBuffer charBuffer) throws IOException { + final ByteBuffer buffer = threadLocalByteBuffer.get(); + final CharsetDecoder charsetDecoder = threadLocalCharsetDecoder.get(); + try { + final byte[] bufferArray = buffer.array(); + for (int read = is.read(bufferArray); read != -1; read = is.read(bufferArray)) { + buffer.limit(read); + final CoderResult coderResult = charsetDecoder.decode(buffer, charBuffer, true); + buffer.clear(); + if (coderResult.isError()) { + // this is not UTF-8 + charBuffer.clear(); + return false; + } else if (coderResult.isOverflow()) { + // stream yields more chars than the charBuffer can hold + break; + } + } + charsetDecoder.flush(charBuffer); + return true; + } finally { + charBuffer.flip(); + buffer.clear(); + charsetDecoder.reset(); + is.close(); + } + } +} diff --git a/apm-agent-core/src/test/java/co/elastic/apm/objectpool/ObjectPoolTest.java b/apm-agent-core/src/test/java/co/elastic/apm/objectpool/ObjectPoolTest.java index 3cc8a16f64..4ca54ce0bb 100644 --- a/apm-agent-core/src/test/java/co/elastic/apm/objectpool/ObjectPoolTest.java +++ b/apm-agent-core/src/test/java/co/elastic/apm/objectpool/ObjectPoolTest.java @@ -35,7 +35,7 @@ public class ObjectPoolTest { @BeforeEach void setUp() { // objectPool = new ThreadLocalObjectPool<>(10, false, TestRecyclable::new); - objectPool = new QueueBasedObjectPool<>(new MpmcAtomicArrayQueue<>(MAX_SIZE), true, TestRecyclable::new); + objectPool = QueueBasedObjectPool.ofRecyclable(new MpmcAtomicArrayQueue<>(MAX_SIZE), true, TestRecyclable::new); } @Test diff --git a/apm-agent-core/src/test/java/co/elastic/apm/util/IOUtilsTest.java b/apm-agent-core/src/test/java/co/elastic/apm/util/IOUtilsTest.java new file mode 100644 index 0000000000..161dfab9b9 --- /dev/null +++ b/apm-agent-core/src/test/java/co/elastic/apm/util/IOUtilsTest.java @@ -0,0 +1,90 @@ +/*- + * #%L + * Elastic APM Java agent + * %% + * Copyright (C) 2018 Elastic and contributors + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package co.elastic.apm.util; + +import co.elastic.apm.objectpool.impl.QueueBasedObjectPool; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.util.concurrent.ArrayBlockingQueue; + +import static java.nio.charset.StandardCharsets.UTF_16; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; + +class IOUtilsTest { + + @Test + void readUtf8Stream() throws IOException { + final CharBuffer charBuffer = CharBuffer.allocate(8); + assertThat(IOUtils.readUtf8Stream(toInputStream("{foo}", UTF_8), charBuffer)).isTrue(); + assertThat(charBuffer.toString()).isEqualTo("{foo}"); + } + + @Test + void testStringLargerThanBuffer() throws IOException { + final CharBuffer charBuffer = CharBuffer.allocate(2000); + final String longString = RandomStringUtils.randomAlphanumeric(2000); + assertThat(IOUtils.readUtf8Stream(toInputStream(longString, UTF_8), charBuffer)).isTrue(); + assertThat(charBuffer.toString()).isEqualTo(longString); + } + + @Test + void testReusedBuffer() throws IOException { + final QueueBasedObjectPool charBuffers = QueueBasedObjectPool.of(new ArrayBlockingQueue<>(1), true, + () -> CharBuffer.allocate(8), CharBuffer::clear); + + final CharBuffer charBuffer1 = charBuffers.createInstance(); + assertThat(IOUtils.readUtf8Stream(toInputStream("foo", UTF_8), charBuffer1)).isTrue(); + assertThat(charBuffer1.toString()).isEqualTo("foo"); + + charBuffers.recycle(charBuffer1); + + final CharBuffer charBuffer2 = charBuffers.createInstance(); + assertThat(IOUtils.readUtf8Stream(toInputStream("barbaz", UTF_8), charBuffer2)).isTrue(); + assertThat(charBuffer2.toString()).isEqualTo("barbaz"); + assertThat((Object) charBuffer1).isSameAs(charBuffer2); + + } + + @Test + void testOverflow() throws IOException { + final CharBuffer charBuffer = CharBuffer.allocate(8); + assertThat(IOUtils.readUtf8Stream(toInputStream("foobarbaz", UTF_8), charBuffer)).isTrue(); + assertThat(charBuffer.toString()).isEqualTo("foobarba"); + } + + @Test + void readUtf16Stream() throws IOException { + final CharBuffer charBuffer = CharBuffer.allocate(16); + assertThat(IOUtils.readUtf8Stream(toInputStream("{foo}", UTF_16), charBuffer)).isFalse(); + assertThat(charBuffer.length()).isZero(); + } + + @Nonnull + private ByteArrayInputStream toInputStream(String s, Charset charset) { + return new ByteArrayInputStream(s.getBytes(charset)); + } +} diff --git a/apm-agent-plugins/apm-api-plugin/src/main/java/co/elastic/apm/plugin/api/ApiScopeInstrumentation.java b/apm-agent-plugins/apm-api-plugin/src/main/java/co/elastic/apm/plugin/api/ApiScopeInstrumentation.java index 957bc4786a..b1240c5f59 100644 --- a/apm-agent-plugins/apm-api-plugin/src/main/java/co/elastic/apm/plugin/api/ApiScopeInstrumentation.java +++ b/apm-agent-plugins/apm-api-plugin/src/main/java/co/elastic/apm/plugin/api/ApiScopeInstrumentation.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/es/restclient/v5_6/ESRestClientInstrumentationHelper.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/es/restclient/v5_6/ESRestClientInstrumentationHelper.java deleted file mode 100644 index 6064c2113a..0000000000 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/es/restclient/v5_6/ESRestClientInstrumentationHelper.java +++ /dev/null @@ -1,65 +0,0 @@ -/*- - * #%L - * Elastic APM Java agent - * %% - * Copyright (C) 2018 Elastic and contributors - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package co.elastic.apm.es.restclient.v5_6; - -import co.elastic.apm.bci.VisibleForAdvice; -import co.elastic.apm.report.serialize.DslJsonSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; - -@VisibleForAdvice -public class ESRestClientInstrumentationHelper { - private static final Logger logger = LoggerFactory.getLogger(ESRestClientInstrumentationHelper.class); - - private static ThreadLocal bodyReadBuffer = new ThreadLocal<>(); - private static final byte CURLY_BRACKET_UTF8 = '{'; - - @Nullable - @VisibleForAdvice - public static String readRequestBody(InputStream bodyIS, String endpoint) throws IOException { - String body = null; - try { - byte[] data = bodyReadBuffer.get(); - if (data == null) { - // The DslJsonSerializer.MAX_LONG_STRING_VALUE_LENGTH is actually used to count chars and not bytes, but that's not - // that important, the most important is that we limit the payload size we read and decode - data = new byte[DslJsonSerializer.MAX_LONG_STRING_VALUE_LENGTH]; - bodyReadBuffer.set(data); - } - int length = bodyIS.read(data, 0, data.length); - - // read only if UTF8-encoded (based on the first byte being UTF8 of curly bracket char) - if(data[0] == CURLY_BRACKET_UTF8) { - body = new String(data, 0, length, StandardCharsets.UTF_8); - } - } catch (IOException e) { - logger.info("Failed to read request body for " + endpoint); - } finally { - bodyIS.close(); - } - - return body; - } -} diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/es/restclient/v5_6/ElasticsearchRestClientInstrumentation.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/es/restclient/v5_6/ElasticsearchRestClientInstrumentation.java index 36f4807194..e24ee5b24c 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/es/restclient/v5_6/ElasticsearchRestClientInstrumentation.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/main/java/co/elastic/apm/es/restclient/v5_6/ElasticsearchRestClientInstrumentation.java @@ -23,6 +23,7 @@ import co.elastic.apm.bci.VisibleForAdvice; import co.elastic.apm.impl.transaction.AbstractSpan; import co.elastic.apm.impl.transaction.Span; +import co.elastic.apm.util.IOUtils; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; @@ -57,7 +58,7 @@ public class ElasticsearchRestClientInstrumentation extends ElasticApmInstrument @Advice.OnMethodEnter private static void onBeforeExecute(@Advice.Argument(0) String method, @Advice.Argument(1) String endpoint, - @Advice.Argument(3) HttpEntity entity, + @Advice.Argument(3) @Nullable HttpEntity entity, @Advice.Local("span") Span span) { if (tracer == null) { return; @@ -77,10 +78,7 @@ private static void onBeforeExecute(@Advice.Argument(0) String method, if (endpoint.endsWith(SEARCH_QUERY_PATH_SUFFIX)) { if (entity != null && entity.isRepeatable()) { try { - String body = ESRestClientInstrumentationHelper.readRequestBody(entity.getContent(), endpoint); - if (body != null && !body.isEmpty()) { - span.getContext().getDb().withStatement(body); - } + IOUtils.readUtf8Stream(entity.getContent(), span.getContext().getDb().withStatementBuffer()); } catch (IOException e) { // We can't log from here } diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/test/java/co/elastic/apm/es/restclient/v5_6/ElasticsearchRestClientInstrumentationIT.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/test/java/co/elastic/apm/es/restclient/v5_6/ElasticsearchRestClientInstrumentationIT.java index 7b4eb43d7a..bb6f3c7a6a 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/test/java/co/elastic/apm/es/restclient/v5_6/ElasticsearchRestClientInstrumentationIT.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-5_6/src/test/java/co/elastic/apm/es/restclient/v5_6/ElasticsearchRestClientInstrumentationIT.java @@ -180,7 +180,8 @@ private void validateHttpContextContent(Http http, int statusCode, String method private void validateDbContextContent(Span span, String statement) { Db db = span.getContext().getDb(); assertThat(db.getType()).isEqualTo(DB_CONTEXT_TYPE); - assertThat(db.getStatement()).isEqualTo(statement); + assertThat((Object) db.getStatementBuffer()).isNotNull(); + assertThat(db.getStatementBuffer().toString()).isEqualTo(statement); } @Test diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/es/restclient/v6_4/ESRestClientInstrumentationHelper.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/es/restclient/v6_4/ESRestClientInstrumentationHelper.java deleted file mode 100644 index ccd4c321aa..0000000000 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/es/restclient/v6_4/ESRestClientInstrumentationHelper.java +++ /dev/null @@ -1,65 +0,0 @@ -/*- - * #%L - * Elastic APM Java agent - * %% - * Copyright (C) 2018 Elastic and contributors - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package co.elastic.apm.es.restclient.v6_4; - -import co.elastic.apm.bci.VisibleForAdvice; -import co.elastic.apm.report.serialize.DslJsonSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; - -@VisibleForAdvice -public class ESRestClientInstrumentationHelper { - private static final Logger logger = LoggerFactory.getLogger(ESRestClientInstrumentationHelper.class); - - private static ThreadLocal bodyReadBuffer = new ThreadLocal<>(); - private static final byte CURLY_BRACKET_UTF8 = '{'; - - @Nullable - @VisibleForAdvice - public static String readRequestBody(InputStream bodyIS, String endpoint) throws IOException { - String body = null; - try { - byte[] data = bodyReadBuffer.get(); - if (data == null) { - // The DslJsonSerializer.MAX_LONG_STRING_VALUE_LENGTH is actually used to count chars and not bytes, but that's not - // that important, the most important is that we limit the payload size we read and decode - data = new byte[DslJsonSerializer.MAX_LONG_STRING_VALUE_LENGTH]; - bodyReadBuffer.set(data); - } - int length = bodyIS.read(data, 0, data.length); - - // read only if UTF8-encoded (based on the first byte being UTF8 of curly bracket char) - if(data[0] == CURLY_BRACKET_UTF8) { - body = new String(data, 0, length, StandardCharsets.UTF_8); - } - } catch (IOException e) { - logger.info("Failed to read request body for " + endpoint); - } finally { - bodyIS.close(); - } - - return body; - } -} diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/es/restclient/v6_4/ElasticsearchRestClientInstrumentation.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/es/restclient/v6_4/ElasticsearchRestClientInstrumentation.java index a290d971f9..ed972d4157 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/es/restclient/v6_4/ElasticsearchRestClientInstrumentation.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/main/java/co/elastic/apm/es/restclient/v6_4/ElasticsearchRestClientInstrumentation.java @@ -23,6 +23,7 @@ import co.elastic.apm.bci.VisibleForAdvice; import co.elastic.apm.impl.transaction.AbstractSpan; import co.elastic.apm.impl.transaction.Span; +import co.elastic.apm.util.IOUtils; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; @@ -75,10 +76,7 @@ private static void onBeforeExecute(@Advice.Argument(0) Request request, HttpEntity entity = request.getEntity(); if (entity != null && entity.isRepeatable()) { try { - String body = ESRestClientInstrumentationHelper.readRequestBody(entity.getContent(), request.getEndpoint()); - if (body != null && !body.isEmpty()) { - span.getContext().getDb().withStatement(body); - } + IOUtils.readUtf8Stream(entity.getContent(), span.getContext().getDb().withStatementBuffer()); } catch (IOException e) { // We can't log from here } diff --git a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/test/java/co/elastic/apm/es/restclient/v6_4/ElasticsearchRestClientInstrumentationIT.java b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/test/java/co/elastic/apm/es/restclient/v6_4/ElasticsearchRestClientInstrumentationIT.java index 0b887ba5f2..cd3d942f7c 100644 --- a/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/test/java/co/elastic/apm/es/restclient/v6_4/ElasticsearchRestClientInstrumentationIT.java +++ b/apm-agent-plugins/apm-es-restclient-plugin/apm-es-restclient-plugin-6_4/src/test/java/co/elastic/apm/es/restclient/v6_4/ElasticsearchRestClientInstrumentationIT.java @@ -167,7 +167,7 @@ private void validateSpanContent(Span span, String expectedName, int statusCode, assertThat(span.getContext().getDb().getType()).isEqualTo(DB_CONTEXT_TYPE); if (!expectedName.contains(SEARCH_QUERY_PATH_SUFFIX)) { - assertThat(span.getContext().getDb().getStatement()).isNull(); + assertThat((CharSequence) (span.getContext().getDb().getStatementBuffer())).isNull(); } } @@ -181,7 +181,8 @@ private void validateHttpContextContent(Http http, int statusCode, String method private void validateDbContextContent(Span span, String statement) { Db db = span.getContext().getDb(); assertThat(db.getType()).isEqualTo(DB_CONTEXT_TYPE); - assertThat(db.getStatement()).isEqualTo(statement); + assertThat((CharSequence) db.getStatementBuffer()).isNotNull(); + assertThat(db.getStatementBuffer().toString()).isEqualTo(statement); } @Test