diff --git a/apm-agent-core/src/main/java/co/elastic/apm/impl/ElasticApmTracer.java b/apm-agent-core/src/main/java/co/elastic/apm/impl/ElasticApmTracer.java
index 973365d022..f154ff47b6 100644
--- a/apm-agent-core/src/main/java/co/elastic/apm/impl/ElasticApmTracer.java
+++ b/apm-agent-core/src/main/java/co/elastic/apm/impl/ElasticApmTracer.java
@@ -33,7 +33,6 @@
import co.elastic.apm.objectpool.impl.QueueBasedObjectPool;
import co.elastic.apm.report.Reporter;
import co.elastic.apm.report.ReporterConfiguration;
-import org.jctools.queues.atomic.AtomicQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.stagemonitor.configuration.ConfigurationOption;
@@ -42,10 +41,9 @@
import javax.annotation.Nullable;
import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
-import static org.jctools.queues.spec.ConcurrentQueueSpec.createBoundedMpmc;
-
/**
* This is the tracer implementation which provides access to lower level agent functionality.
*
@@ -76,14 +74,14 @@ public class ElasticApmTracer {
this.spanListeners = spanListeners;
int maxPooledElements = configurationRegistry.getConfig(ReporterConfiguration.class).getMaxQueueSize() * 2;
coreConfiguration = configurationRegistry.getConfig(CoreConfiguration.class);
- transactionPool = new QueueBasedObjectPool<>(AtomicQueueFactory.newQueue(createBoundedMpmc(maxPooledElements)), false,
+ transactionPool = new QueueBasedObjectPool<>(new ArrayBlockingQueue(maxPooledElements), false,
new RecyclableObjectFactory() {
@Override
public Transaction createInstance() {
return new Transaction(ElasticApmTracer.this);
}
});
- spanPool = new QueueBasedObjectPool<>(AtomicQueueFactory.newQueue(createBoundedMpmc(maxPooledElements)), false,
+ spanPool = new QueueBasedObjectPool<>(new ArrayBlockingQueue(maxPooledElements), false,
new RecyclableObjectFactory() {
@Override
public Span createInstance() {
@@ -91,7 +89,7 @@ public Span createInstance() {
}
});
// we are assuming that we don't need as many errors as spans or transactions
- errorPool = new QueueBasedObjectPool<>(AtomicQueueFactory.newQueue(createBoundedMpmc(maxPooledElements / 2)), false,
+ errorPool = new QueueBasedObjectPool<>(new ArrayBlockingQueue(maxPooledElements / 2), false,
new RecyclableObjectFactory() {
@Override
public ErrorCapture createInstance() {
@@ -126,7 +124,7 @@ public Transaction startTransaction(@Nullable String traceContextHeader, Sampler
if (!coreConfiguration.isActive()) {
transaction = noopTransaction();
} else {
- transaction = transactionPool.createInstance().start(traceContextHeader, nanoTime, sampler);
+ transaction = createTransaction().start(traceContextHeader, nanoTime, sampler);
}
if (logger.isDebugEnabled()) {
logger.debug("startTransaction {} {", transaction);
@@ -139,7 +137,20 @@ public Transaction startTransaction(@Nullable String traceContextHeader, Sampler
}
public Transaction noopTransaction() {
- return transactionPool.createInstance().startNoop();
+ return createTransaction().startNoop();
+ }
+
+ private Transaction createTransaction() {
+ Transaction transaction;
+ do {
+ transaction = transactionPool.createInstance();
+ if (transaction.isStarted()) {
+ String message = "This transaction has already been started: %s";
+ logger.warn(String.format(message, transaction));
+ assert false : String.format(message, transaction);
+ }
+ } while (transaction.isStarted());
+ return transaction;
}
@Nullable
diff --git a/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/AbstractSpan.java b/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/AbstractSpan.java
index 3791a15073..e231d327e8 100644
--- a/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/AbstractSpan.java
+++ b/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/AbstractSpan.java
@@ -47,6 +47,7 @@ public abstract class AbstractSpan implements Recyclable
* (Required)
*/
protected double duration;
+ protected volatile long startTimestampNanos;
@Nullable
private volatile AbstractSpan> previouslyActive;
/**
@@ -130,6 +131,7 @@ public void resetState() {
timestamp = 0;
duration = 0;
type = null;
+ traceContext.resetState();
// don't reset previouslyActive, as deactivate can be called after end
}
diff --git a/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Span.java b/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Span.java
index fee8d5ff1a..9d4c8faa2c 100644
--- a/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Span.java
+++ b/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Span.java
@@ -71,8 +71,8 @@ public Span start(Transaction transaction, @Nullable Span parentSpan, long nanoT
traceContext.setRecorded(false);
}
if (traceContext.isSampled()) {
- start = (nanoTime - transaction.getDuration()) / MS_IN_NANOS;
- duration = nanoTime;
+ start = (nanoTime - transaction.startTimestampNanos) / MS_IN_NANOS;
+ startTimestampNanos = nanoTime;
timestamp = System.currentTimeMillis();
}
return this;
@@ -131,7 +131,7 @@ public void end() {
@Override
public void end(long nanoTime) {
if (isSampled()) {
- this.duration = (nanoTime - duration) / MS_IN_NANOS;
+ this.duration = (nanoTime - startTimestampNanos) / MS_IN_NANOS;
}
this.tracer.endSpan(this);
}
@@ -145,7 +145,6 @@ public void resetState() {
stacktrace = null;
start = 0;
transaction = null;
- traceContext.resetState();
}
@Nullable
diff --git a/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Transaction.java b/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Transaction.java
index 58fa408bcb..863d9f8c50 100644
--- a/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Transaction.java
+++ b/apm-agent-core/src/main/java/co/elastic/apm/impl/transaction/Transaction.java
@@ -76,6 +76,7 @@ public class Transaction extends AbstractSpan {
* Transactions that are 'sampled' will include all available information. Transactions that are not sampled will not have 'spans' or 'context'. Defaults to true.
*/
private boolean noop;
+ private volatile boolean started;
public Transaction(ElasticApmTracer tracer) {
super(tracer);
@@ -88,10 +89,11 @@ public Transaction start(@Nullable String traceParentHeader, long startTimestamp
traceContext.asRootSpan(sampler);
}
- this.duration = startTimestampNanos;
+ this.startTimestampNanos = startTimestampNanos;
this.timestamp = System.currentTimeMillis();
this.id.setToRandomValue();
this.noop = false;
+ this.started = true;
return this;
}
@@ -204,7 +206,7 @@ public void end() {
@Override
public void end(long nanoTime) {
- this.duration = (nanoTime - duration) / ElasticApmTracer.MS_IN_NANOS;
+ this.duration = (nanoTime - startTimestampNanos) / ElasticApmTracer.MS_IN_NANOS;
if (!isSampled()) {
context.resetState();
}
@@ -245,7 +247,7 @@ public void resetState() {
spanCount.resetState();
spanIdCounter.set(0);
noop = false;
- traceContext.resetState();
+ started = false;
}
@Override
@@ -261,8 +263,12 @@ public boolean isNoop() {
return noop;
}
+ public boolean isStarted() {
+ return started;
+ }
+
@Override
public String toString() {
- return String.format("'%s' %s", name, id);
+ return String.format("'%s' %s (%d)", name, id, System.identityHashCode(this));
}
}
diff --git a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java
index 873a710e92..3ac2a9661c 100644
--- a/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java
+++ b/apm-agent-core/src/main/java/co/elastic/apm/objectpool/impl/QueueBasedObjectPool.java
@@ -22,6 +22,8 @@
import co.elastic.apm.objectpool.Recyclable;
import co.elastic.apm.objectpool.RecyclableObjectFactory;
import com.lmax.disruptor.EventFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.Collection;
@@ -31,6 +33,7 @@
public class QueueBasedObjectPool extends AbstractObjectPool implements Collection {
private final Queue queue;
+ private static final Logger logger = LoggerFactory.getLogger(QueueBasedObjectPool.class);
/**
* @param queue the underlying queue
@@ -57,6 +60,9 @@ public T tryCreateInstance() {
@Override
public void recycle(T obj) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("recycling {}", System.identityHashCode(obj));
+ }
obj.resetState();
queue.offer(obj);
}
diff --git a/apm-agent-core/src/main/java/co/elastic/apm/report/ApmServerHttpPayloadSender.java b/apm-agent-core/src/main/java/co/elastic/apm/report/ApmServerHttpPayloadSender.java
index 981ce0cd54..c795a641f1 100644
--- a/apm-agent-core/src/main/java/co/elastic/apm/report/ApmServerHttpPayloadSender.java
+++ b/apm-agent-core/src/main/java/co/elastic/apm/report/ApmServerHttpPayloadSender.java
@@ -87,7 +87,6 @@ public void writeTo(BufferedSink sink) throws IOException {
payloadSerializer.serializePayload(os, payload);
} finally {
os.close();
- payload.recycle();
}
}
})
@@ -109,6 +108,8 @@ public void writeTo(BufferedSink sink) throws IOException {
} catch (IOException e) {
logger.debug("Sending payload to APM server failed", e);
dropped += payload.getPayloadObjects().size();
+ } finally {
+ payload.recycle();
}
}
diff --git a/apm-agent-core/src/main/java/co/elastic/apm/report/ApmServerReporter.java b/apm-agent-core/src/main/java/co/elastic/apm/report/ApmServerReporter.java
index 8c3440ba9f..841b2a78af 100644
--- a/apm-agent-core/src/main/java/co/elastic/apm/report/ApmServerReporter.java
+++ b/apm-agent-core/src/main/java/co/elastic/apm/report/ApmServerReporter.java
@@ -32,6 +32,8 @@
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
@@ -47,6 +49,8 @@
*/
public class ApmServerReporter implements Reporter {
+ private static final Logger logger = LoggerFactory.getLogger(ApmServerReporter.class);
+
private static final EventTranslatorOneArg TRANSACTION_EVENT_TRANSLATOR = new EventTranslatorOneArg() {
@Override
public void translateTo(ReportingEvent event, long sequence, Transaction t) {
@@ -227,6 +231,7 @@ private boolean tryAddEventToRingBuffer(E event, EventTra
boolean queueFull = !disruptor.getRingBuffer().tryPublishEvent(eventTranslator, event);
if (queueFull) {
dropped.incrementAndGet();
+ logger.debug("Dropping event {} because ring buffer is full", event);
return false;
}
} else {
diff --git a/apm-agent-core/src/test/java/co/elastic/apm/impl/ElasticApmTracerConcurrencyTest.java b/apm-agent-core/src/test/java/co/elastic/apm/impl/ElasticApmTracerConcurrencyTest.java
new file mode 100644
index 0000000000..7c5bf27b65
--- /dev/null
+++ b/apm-agent-core/src/test/java/co/elastic/apm/impl/ElasticApmTracerConcurrencyTest.java
@@ -0,0 +1,110 @@
+/*-
+ * #%L
+ * Elastic APM Java agent
+ * %%
+ * Copyright (C) 2018 Elastic and contributors
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package co.elastic.apm.impl;
+
+import co.elastic.apm.configuration.CoreConfiguration;
+import co.elastic.apm.configuration.SpyConfiguration;
+import co.elastic.apm.impl.payload.Payload;
+import co.elastic.apm.impl.payload.ProcessFactory;
+import co.elastic.apm.impl.payload.ServiceFactory;
+import co.elastic.apm.impl.payload.SystemInfo;
+import co.elastic.apm.impl.transaction.Span;
+import co.elastic.apm.impl.transaction.Transaction;
+import co.elastic.apm.report.ApmServerReporter;
+import co.elastic.apm.report.IntakeV1ReportingEventHandler;
+import co.elastic.apm.report.PayloadSender;
+import co.elastic.apm.report.ReporterConfiguration;
+import co.elastic.apm.report.processor.ProcessorEventHandler;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.stagemonitor.configuration.ConfigurationRegistry;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+class ElasticApmTracerConcurrencyTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(ElasticApmTracerConcurrencyTest.class);
+
+ private ExecutorService executorService;
+ private ElasticApmTracer tracer;
+
+ @BeforeEach
+ void setUp() {
+ ConfigurationRegistry config = SpyConfiguration.createSpyConfig();
+ CoreConfiguration coreConfiguration = config.getConfig(CoreConfiguration.class);
+ ReporterConfiguration reporterConfiguration = config.getConfig(ReporterConfiguration.class);
+ tracer = new ElasticApmTracerBuilder().reporter(
+ new ApmServerReporter(true, reporterConfiguration,
+ coreConfiguration,
+ new IntakeV1ReportingEventHandler(
+ new ServiceFactory().createService(coreConfiguration, null, null),
+ ProcessFactory.ForCurrentVM.INSTANCE.getProcessInformation(),
+ SystemInfo.create(),
+ new PayloadSender() {
+ @Override
+ public void sendPayload(Payload payload) {
+ logger.info("Sending payload with {} elements", payload.getPayloadSize());
+ payload.recycle();
+ }
+
+ @Override
+ public long getReported() {
+ return 0;
+ }
+
+ @Override
+ public long getDropped() {
+ return 0;
+ }
+ }, reporterConfiguration, new ProcessorEventHandler(Collections.emptyList()))))
+ .configurationRegistry(config)
+ .build();
+ executorService = Executors.newFixedThreadPool(100);
+ }
+
+// @Test
+ void testCreateTransactions() throws Exception {
+ for (int i = 0; i < 100000; i++) {
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ Transaction transaction = tracer.startTransaction();
+ transaction.withName("test").withType("test");
+ try {
+ Span span = transaction.createSpan();
+ span.withName("SELECT").withType("db");
+ Thread.sleep(3);
+ span.end();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ transaction.end();
+ }
+ });
+ }
+ executorService.shutdown();
+ executorService.awaitTermination(60, TimeUnit.SECONDS);
+ }
+}
diff --git a/apm-agent-core/src/test/java/co/elastic/apm/impl/ElasticApmTracerTest.java b/apm-agent-core/src/test/java/co/elastic/apm/impl/ElasticApmTracerTest.java
index b2cd0ee497..df5b4d9b40 100644
--- a/apm-agent-core/src/test/java/co/elastic/apm/impl/ElasticApmTracerTest.java
+++ b/apm-agent-core/src/test/java/co/elastic/apm/impl/ElasticApmTracerTest.java
@@ -34,6 +34,7 @@
import java.io.IOException;
import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.assertj.core.api.Assertions.assertThat;
@@ -66,10 +67,13 @@ void testThreadLocalStorage() {
assertThat(span.isChildOf(transaction)).isTrue();
span.end();
}
+ assertThat(span.getDuration()).isLessThan(TimeUnit.SECONDS.toMillis(10));
+ assertThat(span.getStart()).isLessThan(TimeUnit.SECONDS.toMillis(10));
assertThat(tracerImpl.currentSpan()).isNull();
transaction.end();
}
assertThat(tracerImpl.currentTransaction()).isNull();
+ assertThat(transaction.getDuration()).isLessThan(TimeUnit.SECONDS.toMillis(10));
}
@Test
diff --git a/apm-agent-core/src/test/java/co/elastic/apm/objectpool/ObjectPoolErrorSimulator.java b/apm-agent-core/src/test/java/co/elastic/apm/objectpool/ObjectPoolErrorSimulator.java
new file mode 100644
index 0000000000..7f8317c73c
--- /dev/null
+++ b/apm-agent-core/src/test/java/co/elastic/apm/objectpool/ObjectPoolErrorSimulator.java
@@ -0,0 +1,65 @@
+package co.elastic.apm.objectpool;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.stream.Stream;
+
+public class ObjectPoolErrorSimulator {
+
+ public static final String START_TRANSACTION = "startTransaction";
+ public static final String RECYCLING = "recycling";
+ public static Queue objectPool = new ArrayBlockingQueue<>(512);
+ public static Set allTransactions = new HashSet<>();
+
+ public static void main(String[] args) throws Exception {
+ try (Stream lineStream = Files.lines(Paths.get(args[0]))) {
+ lineStream
+ .forEach(log -> {
+ if (log.contains(START_TRANSACTION)) {
+ simulateStartTransaction(log, Integer.parseInt(log, log.lastIndexOf('(') + 1, log.lastIndexOf(')'), 10));
+ } else if (log.contains(RECYCLING)) {
+ simulateRecycling(log, Integer.parseInt(log, log.indexOf(RECYCLING) + RECYCLING.length() + 1, log.length(), 10));
+ } else if (log.contains("This transaction has already been started")) {
+ System.out.println(log);
+ }
+ });
+ }
+ System.out.println("objectPool.size(): " + objectPool.size());
+ System.out.println("allTransactions.size(): " + allTransactions.size());
+ System.out.println("objectPool = " + objectPool);
+ System.out.println("allTransactions = " + allTransactions);
+ }
+
+ private static void simulateStartTransaction(String log, int identityHash) {
+ allTransactions.add(identityHash);
+ final Integer expectedIdentityHash = objectPool.peek();
+ if (expectedIdentityHash != null) {
+ if (!expectedIdentityHash.equals(identityHash)) {
+ System.out.println(String.format("Expected %d but got %d", expectedIdentityHash, identityHash));
+ System.out.println(log);
+ }
+ if (!objectPool.remove(identityHash)) {
+ System.out.println(String.format("Object pool does not contain %d", identityHash));
+ System.out.println(log);
+ }
+ }
+ }
+
+ private static void simulateRecycling(String log, int identityHash) {
+ if (isTransaction(identityHash)) {
+ if (objectPool.contains(identityHash)) {
+ System.out.println(String.format("Object pool already contains %d", identityHash));
+ System.out.println(log);
+ }
+ objectPool.offer(identityHash);
+ }
+ }
+
+ private static boolean isTransaction(int identityHash) {
+ return allTransactions.contains(identityHash);
+ }
+}
diff --git a/apm-agent-core/src/test/java/co/elastic/apm/objectpool/ObjectPoolTest.java b/apm-agent-core/src/test/java/co/elastic/apm/objectpool/ObjectPoolTest.java
index 3cc8a16f64..b941201b86 100644
--- a/apm-agent-core/src/test/java/co/elastic/apm/objectpool/ObjectPoolTest.java
+++ b/apm-agent-core/src/test/java/co/elastic/apm/objectpool/ObjectPoolTest.java
@@ -24,6 +24,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.concurrent.ArrayBlockingQueue;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.SoftAssertions.assertSoftly;
@@ -100,6 +102,20 @@ public void testRecycleInDifferentThread() throws Exception {
assertThat(objectPool.getObjectsInPool()).isEqualTo(1);
}
+ @Test
+ void testNoPreAllocation() {
+ objectPool = new QueueBasedObjectPool<>(new ArrayBlockingQueue<>(MAX_SIZE), false, TestRecyclable::new);
+ assertThat(objectPool.getSize()).isZero();
+ final TestRecyclable instance1 = objectPool.createInstance();
+ final TestRecyclable instance2 = objectPool.createInstance();
+ assertThat(objectPool.getSize()).isZero();
+ objectPool.recycle(instance1);
+ assertThat(objectPool.getSize()).isOne();
+ objectPool.recycle(instance2);
+ assertThat(objectPool.getSize()).isEqualTo(2);
+ assertThat(objectPool.createInstance()).isSameAs(instance1);
+ }
+
private static class TestRecyclable implements Recyclable {
private int state;