Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ public static void main(String[] args) throws RunnerException {
@Setup
public void setUp() {
tracer = new ElasticApmTracerBuilder().build();
blockingQueueObjectPool = new QueueBasedObjectPool<>(new ArrayBlockingQueue<>(256), true, () -> new Transaction(tracer));
jctoolsQueueObjectPool = new QueueBasedObjectPool<>(new MpmcArrayQueue<>(256), true, () -> new Transaction(tracer));
jctoolsAtomicQueueObjectPool = new QueueBasedObjectPool<>(new MpmcAtomicArrayQueue<>(256), true, () -> new Transaction(tracer));
agronaQueueObjectPool = new QueueBasedObjectPool<>(new ManyToManyConcurrentArrayQueue<>(256), true, () -> new Transaction(tracer));
blockingQueueObjectPool = QueueBasedObjectPool.ofRecyclable(new ArrayBlockingQueue<>(256), true, () -> new Transaction(tracer));
jctoolsQueueObjectPool = QueueBasedObjectPool.ofRecyclable(new MpmcArrayQueue<>(256), true, () -> new Transaction(tracer));
jctoolsAtomicQueueObjectPool = QueueBasedObjectPool.ofRecyclable(new MpmcAtomicArrayQueue<>(256), true, () -> new Transaction(tracer));
agronaQueueObjectPool = QueueBasedObjectPool.ofRecyclable(new ManyToManyConcurrentArrayQueue<>(256), true, () -> new Transaction(tracer));
mixedObjectPool = new MixedObjectPool<>(() -> new Transaction(tracer),
new ThreadLocalObjectPool<>(256, true, () -> new Transaction(tracer)),
new QueueBasedObjectPool<>(new ManyToManyConcurrentArrayQueue<>(256), true, () -> new Transaction(tracer)));
QueueBasedObjectPool.ofRecyclable(new ManyToManyConcurrentArrayQueue<>(256), true, () -> new Transaction(tracer)));
threadLocalObjectPool = new ThreadLocalObjectPool<>(64, true, () -> new Transaction(tracer));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import co.elastic.apm.impl.transaction.TraceContext;
import co.elastic.apm.impl.transaction.Transaction;
import co.elastic.apm.objectpool.ObjectPool;
import co.elastic.apm.objectpool.RecyclableObjectFactory;
import co.elastic.apm.objectpool.Allocator;
import co.elastic.apm.objectpool.impl.QueueBasedObjectPool;
import co.elastic.apm.report.Reporter;
import co.elastic.apm.report.ReporterConfiguration;
Expand Down Expand Up @@ -85,23 +85,23 @@ protected Deque<Object> initialValue() {
this.spanListeners = spanListeners;
int maxPooledElements = configurationRegistry.getConfig(ReporterConfiguration.class).getMaxQueueSize() * 2;
coreConfiguration = configurationRegistry.getConfig(CoreConfiguration.class);
transactionPool = new QueueBasedObjectPool<>(AtomicQueueFactory.<Transaction>newQueue(createBoundedMpmc(maxPooledElements)), false,
new RecyclableObjectFactory<Transaction>() {
transactionPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.<Transaction>newQueue(createBoundedMpmc(maxPooledElements)), false,
new Allocator<Transaction>() {
@Override
public Transaction createInstance() {
return new Transaction(ElasticApmTracer.this);
}
});
spanPool = new QueueBasedObjectPool<>(AtomicQueueFactory.<Span>newQueue(createBoundedMpmc(maxPooledElements)), false,
new RecyclableObjectFactory<Span>() {
spanPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.<Span>newQueue(createBoundedMpmc(maxPooledElements)), false,
new Allocator<Span>() {
@Override
public Span createInstance() {
return new Span(ElasticApmTracer.this);
}
});
// we are assuming that we don't need as many errors as spans or transactions
errorPool = new QueueBasedObjectPool<>(AtomicQueueFactory.<ErrorCapture>newQueue(createBoundedMpmc(maxPooledElements / 2)), false,
new RecyclableObjectFactory<ErrorCapture>() {
errorPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.<ErrorCapture>newQueue(createBoundedMpmc(maxPooledElements / 2)), false,
new Allocator<ErrorCapture>() {
@Override
public ErrorCapture createInstance() {
return new ErrorCapture(ElasticApmTracer.this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,37 @@

package co.elastic.apm.impl.transaction;

import co.elastic.apm.objectpool.Allocator;
import co.elastic.apm.objectpool.ObjectPool;
import co.elastic.apm.objectpool.Recyclable;
import co.elastic.apm.objectpool.impl.QueueBasedObjectPool;
import co.elastic.apm.objectpool.impl.Resetter;
import co.elastic.apm.report.serialize.DslJsonSerializer;
import org.jctools.queues.atomic.MpmcAtomicArrayQueue;

import javax.annotation.Nullable;
import java.nio.CharBuffer;


/**
* An object containing contextual data for database spans
*/
public class Db implements Recyclable {

private final ObjectPool<CharBuffer> charBufferPool = QueueBasedObjectPool.of(new MpmcAtomicArrayQueue<CharBuffer>(128), false,
new Allocator<CharBuffer>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stay consistent with the other pool initialization, not through usage of new

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean I should add a static initializer for creating a QueueBasedObjectPool with an Allocator and Resetter? Or do you mean I should not use an anonymous inner class for Allocator?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created QueueBasedObjectPool.of static initializer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to the ObjectPool- I saw you changed all others to use a static initializer, so I wondered why not this one as well.
Anyway, not very important

@Override
public CharBuffer createInstance() {
return CharBuffer.allocate(DslJsonSerializer.MAX_LONG_STRING_VALUE_LENGTH);
}
},
new Resetter<CharBuffer>() {
@Override
public void recycle(CharBuffer object) {
object.clear();
}
});

/**
* Database instance name
*/
Expand All @@ -40,6 +61,10 @@ public class Db implements Recyclable {
*/
@Nullable
private String statement;

@Nullable
private CharBuffer statementBuffer;

/**
* Database type. For any SQL database, "sql". For others, the lower-case database category, e.g. "cassandra", "hbase", or "redis"
*/
Expand Down Expand Up @@ -83,6 +108,37 @@ public Db withStatement(@Nullable String statement) {
return this;
}

/**
* Gets a pooled {@link CharBuffer} to record the DB statement and associates it with this instance.
* <p>
* Note: you may not hold a reference to the returned {@link CharBuffer} as it will be reused.
* </p>
* <p>
* Note: This method is not thread safe
* </p>
*
* @return a {@link CharBuffer} to record the DB statement
*/
public CharBuffer withStatementBuffer() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add documentation that user of this method is not allowed to hold a reference to the buffer

if (this.statementBuffer == null) {
this.statementBuffer = charBufferPool.createInstance();
}
return this.statementBuffer;
}

/**
* Returns the associated pooled {@link CharBuffer} to record the DB statement.
* <p>
* Note: returns {@code null} unless {@link #withStatementBuffer()} has previously been called
* </p>
*
* @return a {@link CharBuffer} to record the DB statement, or {@code null}
*/
@Nullable
public CharBuffer getStatementBuffer() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

return statementBuffer;
}

/**
* Database type. For any SQL database, "sql". For others, the lower-case database category, e.g. "cassandra", "hbase", or "redis"
*/
Expand Down Expand Up @@ -121,13 +177,18 @@ public void resetState() {
statement = null;
type = null;
user = null;
if (statementBuffer != null) {
charBufferPool.recycle(statementBuffer);
}
statementBuffer = null;
}

public boolean hasContent() {
return instance != null ||
statement != null ||
type != null ||
user != null;
user != null ||
statementBuffer != null;
}

public void copyFrom(Db other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
package co.elastic.apm.objectpool;

public interface RecyclableObjectFactory<T extends Recyclable> {
public interface Allocator<T> {

T createInstance();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@

public class NoopObjectPool<T extends Recyclable> implements ObjectPool<T> {

private final RecyclableObjectFactory<T> recyclableObjectFactory;
private final Allocator<T> allocator;

public NoopObjectPool(RecyclableObjectFactory<T> recyclableObjectFactory) {
this.recyclableObjectFactory = recyclableObjectFactory;
public NoopObjectPool(Allocator<T> allocator) {
this.allocator = allocator;
}

@Nullable
@Override
public T tryCreateInstance() {
return recyclableObjectFactory.createInstance();
return allocator.createInstance();
}

@Override
public T createInstance() {
return recyclableObjectFactory.createInstance();
return allocator.createInstance();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import javax.annotation.Nullable;
import java.io.Closeable;

public interface ObjectPool<T extends Recyclable> extends Closeable {
public interface ObjectPool<T> extends Closeable {
@Nullable
T tryCreateInstance();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@
package co.elastic.apm.objectpool.impl;

import co.elastic.apm.objectpool.ObjectPool;
import co.elastic.apm.objectpool.Recyclable;
import co.elastic.apm.objectpool.RecyclableObjectFactory;
import co.elastic.apm.objectpool.Allocator;

import java.util.concurrent.atomic.AtomicInteger;

public abstract class AbstractObjectPool<T extends Recyclable> implements ObjectPool<T> {
public abstract class AbstractObjectPool<T> implements ObjectPool<T> {

protected final RecyclableObjectFactory<T> recyclableObjectFactory;
protected final Allocator<T> allocator;
private final AtomicInteger garbageCreated = new AtomicInteger();

protected AbstractObjectPool(RecyclableObjectFactory<T> recyclableObjectFactory) {
this.recyclableObjectFactory = recyclableObjectFactory;
protected AbstractObjectPool(Allocator<T> allocator) {
this.allocator = allocator;
}

@Override
Expand All @@ -40,7 +39,7 @@ public T createInstance() {
if (recyclable == null) {
// queue is empty, falling back to creating a new instance
garbageCreated.incrementAndGet();
return recyclableObjectFactory.createInstance();
return allocator.createInstance();
} else {
return recyclable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,18 @@
package co.elastic.apm.objectpool.impl;

import co.elastic.apm.objectpool.ObjectPool;
import co.elastic.apm.objectpool.Recyclable;
import co.elastic.apm.objectpool.RecyclableObjectFactory;
import co.elastic.apm.objectpool.Allocator;

import javax.annotation.Nullable;
import java.io.IOException;

public class MixedObjectPool<T extends Recyclable> extends AbstractObjectPool<T> {
public class MixedObjectPool<T> extends AbstractObjectPool<T> {

private final ObjectPool<T> primaryPool;
private final ObjectPool<T> secondaryPool;

public MixedObjectPool(final RecyclableObjectFactory<T> recyclableObjectFactory, ObjectPool<T> primaryPool, ObjectPool<T> secondaryPool) {
super(recyclableObjectFactory);
public MixedObjectPool(final Allocator<T> allocator, ObjectPool<T> primaryPool, ObjectPool<T> secondaryPool) {
super(allocator);
this.primaryPool = primaryPool;
this.secondaryPool = secondaryPool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,42 @@
*/
package co.elastic.apm.objectpool.impl;

import co.elastic.apm.objectpool.Allocator;
import co.elastic.apm.objectpool.Recyclable;
import co.elastic.apm.objectpool.RecyclableObjectFactory;
import com.lmax.disruptor.EventFactory;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;

public class QueueBasedObjectPool<T extends Recyclable> extends AbstractObjectPool<T> implements Collection<T> {
public class QueueBasedObjectPool<T> extends AbstractObjectPool<T> implements Collection<T> {

private final Queue<T> queue;
private final Resetter<T> resetter;

/**
* @param queue the underlying queue
* @param preAllocate when set to true, the recyclableObjectFactory will be used to create maxPooledElements objects
* @param preAllocate when set to true, the allocator will be used to create maxPooledElements objects
* which are then stored in the queue
* @param recyclableObjectFactory a factory method which is used to create new instances of the recyclable object. This factory is
* @param allocator a factory method which is used to create new instances of the recyclable object. This factory is
* used when there are no objects in the queue and to preallocate the queue.
*/
public QueueBasedObjectPool(Queue<T> queue, boolean preAllocate, RecyclableObjectFactory<T> recyclableObjectFactory) {
super(recyclableObjectFactory);
public static <T extends Recyclable> QueueBasedObjectPool<T> ofRecyclable(Queue<T> queue, boolean preAllocate, Allocator<T> allocator) {
return new QueueBasedObjectPool<>(queue, preAllocate, allocator, Resetter.ForRecyclable.<T>get());
}

public static <T> QueueBasedObjectPool<T> of(Queue<T> queue, boolean preAllocate, Allocator<T> allocator, Resetter<T> resetter) {
return new QueueBasedObjectPool<>(queue, preAllocate, allocator, resetter);
}

private QueueBasedObjectPool(Queue<T> queue, boolean preAllocate, Allocator<T> allocator, Resetter<T> resetter) {
super(allocator);
this.queue = queue;
this.resetter = resetter;
if (preAllocate) {
for (int i = 0; i < this.queue.size(); i++) {
this.queue.offer(recyclableObjectFactory.createInstance());
this.queue.offer(allocator.createInstance());
}
}
}
Expand All @@ -57,7 +67,7 @@ public T tryCreateInstance() {

@Override
public void recycle(T obj) {
obj.resetState();
resetter.recycle(obj);
queue.offer(obj);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*-
* #%L
* Elastic APM Java agent
* %%
* Copyright (C) 2018 Elastic and contributors
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package co.elastic.apm.objectpool.impl;

import co.elastic.apm.objectpool.Recyclable;

public interface Resetter<T> {

void recycle(T object);

class ForRecyclable<T extends Recyclable> implements Resetter<T> {
private static ForRecyclable INSTANCE = new ForRecyclable();

public static <T extends Recyclable> Resetter<T> get() {
return INSTANCE;
}

@Override
public void recycle(Recyclable object) {
object.resetState();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package co.elastic.apm.objectpool.impl;

import co.elastic.apm.objectpool.Recyclable;
import co.elastic.apm.objectpool.RecyclableObjectFactory;
import co.elastic.apm.objectpool.Allocator;

import javax.annotation.Nullable;

Expand All @@ -30,8 +30,8 @@ public class ThreadLocalObjectPool<T extends Recyclable> extends AbstractObjectP
private final int maxNumPooledObjectsPerThread;
private final boolean preAllocate;

public ThreadLocalObjectPool(final int maxNumPooledObjectsPerThread, final boolean preAllocate, final RecyclableObjectFactory<T> recyclableObjectFactory) {
super(recyclableObjectFactory);
public ThreadLocalObjectPool(final int maxNumPooledObjectsPerThread, final boolean preAllocate, final Allocator<T> allocator) {
super(allocator);
this.maxNumPooledObjectsPerThread = maxNumPooledObjectsPerThread;
this.preAllocate = preAllocate;
}
Expand Down Expand Up @@ -78,7 +78,7 @@ private FixedSizeStack<T> createStack(boolean preAllocate) {
FixedSizeStack<T> stack = new FixedSizeStack<>(maxNumPooledObjectsPerThread);
if (preAllocate) {
for (int i = 0; i < maxNumPooledObjectsPerThread; i++) {
stack.push(recyclableObjectFactory.createInstance());
stack.push(allocator.createInstance());
}
}
return stack;
Expand Down
Loading