Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apm-agent-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
<version>3.3.0</version>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>com.googlecode.concurrentlinkedhashmap</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@
import co.elastic.apm.agent.impl.transaction.TraceContext;
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.objectpool.impl.QueueBasedObjectPool;
import org.jctools.queues.atomic.AtomicQueueFactory;

import static org.jctools.queues.spec.ConcurrentQueueSpec.createBoundedMpmc;
import org.jctools.queues.atomic.MpmcAtomicArrayQueue;

public class ObjectPoolFactory {

protected <T extends Recyclable> ObjectPool<T> createRecyclableObjectPool(int maxCapacity, Allocator<T> allocator) {
return QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.<T>newQueue(createBoundedMpmc(maxCapacity)), false, allocator);
public <T extends Recyclable> ObjectPool<T> createRecyclableObjectPool(int maxCapacity, Allocator<T> allocator) {
return QueueBasedObjectPool.ofRecyclable(new MpmcAtomicArrayQueue<T>((maxCapacity)), false, allocator);
}

public ObjectPool<Transaction> createTransactionPool(int maxCapacity, final ElasticApmTracer tracer) {
Expand All @@ -54,19 +52,19 @@ public Span createInstance() {

public ObjectPool<ErrorCapture> createErrorPool(int maxCapacity, final ElasticApmTracer tracer) {
return createRecyclableObjectPool(maxCapacity, new Allocator<ErrorCapture>() {
@Override
public ErrorCapture createInstance() {
return new ErrorCapture(tracer);
}
});
@Override
public ErrorCapture createInstance() {
return new ErrorCapture(tracer);
}
});
}

public ObjectPool<TraceContext> createSpanLinkPool(int maxCapacity, final ElasticApmTracer tracer) {
return createRecyclableObjectPool(maxCapacity, new Allocator<TraceContext>() {
@Override
public TraceContext createInstance() {
return TraceContext.with64BitId(tracer);
}
});
@Override
public TraceContext createInstance() {
return TraceContext.with64BitId(tracer);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class TestObjectPoolFactory extends ObjectPoolFactory {
private BookkeeperObjectPool<TraceContext> spanLinksPool;

@Override
protected <T extends Recyclable> ObjectPool<T> createRecyclableObjectPool(int maxCapacity, Allocator<T> allocator) {
public <T extends Recyclable> ObjectPool<T> createRecyclableObjectPool(int maxCapacity, Allocator<T> allocator) {
ObjectPool<T> pool = super.createRecyclableObjectPool(maxCapacity, allocator);
BookkeeperObjectPool<T> wrappedPool = new BookkeeperObjectPool<>(pool);
createdPools.add(wrappedPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@
*/
package co.elastic.apm.agent.httpclient.helper;

import co.elastic.apm.agent.impl.GlobalTracer;
import co.elastic.apm.agent.impl.transaction.AbstractSpan;
import co.elastic.apm.agent.impl.transaction.Span;
import co.elastic.apm.agent.objectpool.Allocator;
import co.elastic.apm.agent.objectpool.ObjectPool;
import co.elastic.apm.agent.objectpool.impl.QueueBasedObjectPool;
import co.elastic.apm.agent.objectpool.ObjectPoolFactory;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.protocol.HttpContext;
import org.jctools.queues.atomic.AtomicQueueFactory;

import javax.annotation.Nullable;

import static org.jctools.queues.spec.ConcurrentQueueSpec.createBoundedMpmc;

public class ApacheHttpAsyncClientHelper {

private static final int MAX_POOLED_ELEMENTS = 256;
Expand All @@ -40,13 +38,9 @@ public class ApacheHttpAsyncClientHelper {
private final ObjectPool<FutureCallbackWrapper<?>> futureCallbackWrapperObjectPool;

public ApacheHttpAsyncClientHelper() {
requestProducerWrapperObjectPool = QueueBasedObjectPool.ofRecyclable(
AtomicQueueFactory.<HttpAsyncRequestProducerWrapper>newQueue(createBoundedMpmc(MAX_POOLED_ELEMENTS)),
false, new RequestProducerWrapperAllocator());

futureCallbackWrapperObjectPool = QueueBasedObjectPool.ofRecyclable(
AtomicQueueFactory.<FutureCallbackWrapper<?>>newQueue(createBoundedMpmc(MAX_POOLED_ELEMENTS)),
false, new FutureCallbackWrapperAllocator());
ObjectPoolFactory factory = GlobalTracer.requireTracerImpl().getObjectPoolFactory();
requestProducerWrapperObjectPool = factory.createRecyclableObjectPool(MAX_POOLED_ELEMENTS, new RequestProducerWrapperAllocator());
futureCallbackWrapperObjectPool = factory.createRecyclableObjectPool(MAX_POOLED_ELEMENTS, new FutureCallbackWrapperAllocator());
}

private class RequestProducerWrapperAllocator implements Allocator<HttpAsyncRequestProducerWrapper> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,21 @@
import co.elastic.apm.agent.matcher.WildcardMatcher;
import co.elastic.apm.agent.objectpool.Allocator;
import co.elastic.apm.agent.objectpool.ObjectPool;
import co.elastic.apm.agent.objectpool.impl.QueueBasedObjectPool;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.apm.agent.util.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.ResponseListener;
import org.jctools.queues.atomic.AtomicQueueFactory;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CancellationException;

import static org.jctools.queues.spec.ConcurrentQueueSpec.createBoundedMpmc;

public class ElasticsearchRestClientInstrumentationHelper {

private static final Logger logger = LoggerFactory.getLogger(ElasticsearchRestClientInstrumentationHelper.class);
Expand All @@ -70,10 +66,7 @@ public static ElasticsearchRestClientInstrumentationHelper get() {

private ElasticsearchRestClientInstrumentationHelper(ElasticApmTracer tracer) {
this.tracer = tracer;
responseListenerObjectPool = QueueBasedObjectPool.ofRecyclable(
AtomicQueueFactory.<ResponseListenerWrapper>newQueue(createBoundedMpmc(MAX_POOLED_ELEMENTS)),
false,
new ResponseListenerAllocator());
this.responseListenerObjectPool = tracer.getObjectPoolFactory().createRecyclableObjectPool(MAX_POOLED_ELEMENTS, new ResponseListenerAllocator());
}

private class ResponseListenerAllocator implements Allocator<ResponseListenerWrapper> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,17 @@
import co.elastic.apm.agent.matcher.WildcardMatcher;
import co.elastic.apm.agent.objectpool.Allocator;
import co.elastic.apm.agent.objectpool.ObjectPool;
import co.elastic.apm.agent.objectpool.impl.QueueBasedObjectPool;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.jctools.queues.atomic.AtomicQueueFactory;

import javax.annotation.Nullable;
import java.util.List;

import static org.jctools.queues.spec.ConcurrentQueueSpec.createBoundedMpmc;

public class KafkaInstrumentationHelper {

public static final Logger logger = LoggerFactory.getLogger(KafkaInstrumentationHelper.class);
Expand All @@ -55,10 +51,8 @@ public static KafkaInstrumentationHelper get() {

public KafkaInstrumentationHelper(ElasticApmTracer tracer) {
this.tracer = tracer;
messagingConfiguration = tracer.getConfig(MessagingConfiguration.class);
this.callbackWrapperObjectPool = QueueBasedObjectPool.ofRecyclable(
AtomicQueueFactory.<CallbackWrapper>newQueue(createBoundedMpmc(256)),
false,
this.messagingConfiguration = tracer.getConfig(MessagingConfiguration.class);
this.callbackWrapperObjectPool = tracer.getObjectPoolFactory().createRecyclableObjectPool(256,
new CallbackWrapperAllocator()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,14 @@
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.objectpool.Allocator;
import co.elastic.apm.agent.objectpool.ObjectPool;
import co.elastic.apm.agent.objectpool.impl.QueueBasedObjectPool;
import co.elastic.apm.agent.servlet.ServletTransactionHelper;
import org.jctools.queues.atomic.AtomicQueueFactory;

import jakarta.servlet.AsyncContext;
import jakarta.servlet.ServletRequest;

import static co.elastic.apm.agent.servlet.ServletTransactionHelper.ASYNC_ATTRIBUTE;
import static co.elastic.apm.agent.servlet.ServletTransactionHelper.TRANSACTION_ATTRIBUTE;
import static co.elastic.apm.agent.servlet.helper.AsyncConstants.ASYNC_LISTENER_ADDED;
import static co.elastic.apm.agent.servlet.helper.AsyncConstants.MAX_POOLED_ELEMENTS;
import static org.jctools.queues.spec.ConcurrentQueueSpec.createBoundedMpmc;

public class JakartaAsyncContextAdviceHelper implements AsyncContextAdviceHelper<AsyncContext> {

Expand All @@ -44,11 +40,8 @@ public class JakartaAsyncContextAdviceHelper implements AsyncContextAdviceHelper

public JakartaAsyncContextAdviceHelper(ElasticApmTracer tracer) {
this.tracer = tracer;
servletTransactionHelper = new ServletTransactionHelper(tracer);

asyncListenerObjectPool = QueueBasedObjectPool.ofRecyclable(
AtomicQueueFactory.<JakartaApmAsyncListener>newQueue(createBoundedMpmc(MAX_POOLED_ELEMENTS)),
false,
this.servletTransactionHelper = new ServletTransactionHelper(tracer);
this.asyncListenerObjectPool = tracer.getObjectPoolFactory().createRecyclableObjectPool(MAX_POOLED_ELEMENTS,
new JakartaAsyncContextAdviceHelper.JakartaApmAsyncListenerAllocator());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.objectpool.Allocator;
import co.elastic.apm.agent.objectpool.ObjectPool;
import co.elastic.apm.agent.objectpool.impl.QueueBasedObjectPool;
import co.elastic.apm.agent.servlet.ServletTransactionHelper;
import org.jctools.queues.atomic.AtomicQueueFactory;

import javax.servlet.AsyncContext;
import javax.servlet.ServletRequest;
Expand All @@ -34,7 +32,6 @@
import static co.elastic.apm.agent.servlet.ServletTransactionHelper.TRANSACTION_ATTRIBUTE;
import static co.elastic.apm.agent.servlet.helper.AsyncConstants.ASYNC_LISTENER_ADDED;
import static co.elastic.apm.agent.servlet.helper.AsyncConstants.MAX_POOLED_ELEMENTS;
import static org.jctools.queues.spec.ConcurrentQueueSpec.createBoundedMpmc;

public class JavaxAsyncContextAdviceHelper implements AsyncContextAdviceHelper<AsyncContext> {

Expand All @@ -44,11 +41,8 @@ public class JavaxAsyncContextAdviceHelper implements AsyncContextAdviceHelper<A

public JavaxAsyncContextAdviceHelper(ElasticApmTracer tracer) {
this.tracer = tracer;
servletTransactionHelper = new ServletTransactionHelper(tracer);

asyncListenerObjectPool = QueueBasedObjectPool.ofRecyclable(
AtomicQueueFactory.<JavaxApmAsyncListener>newQueue(createBoundedMpmc(MAX_POOLED_ELEMENTS)),
false,
this.servletTransactionHelper = new ServletTransactionHelper(tracer);
this.asyncListenerObjectPool = tracer.getObjectPoolFactory().createRecyclableObjectPool(MAX_POOLED_ELEMENTS,
new JavaxAsyncContextAdviceHelper.ApmAsyncListenerAllocator());
}

Expand Down