Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import co.elastic.apm.objectpool.impl.QueueBasedObjectPool;
import co.elastic.apm.report.Reporter;
import co.elastic.apm.report.ReporterConfiguration;
import org.jctools.queues.atomic.AtomicQueueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.stagemonitor.configuration.ConfigurationOption;
Expand All @@ -42,10 +41,9 @@

import javax.annotation.Nullable;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

import static org.jctools.queues.spec.ConcurrentQueueSpec.createBoundedMpmc;

/**
* This is the tracer implementation which provides access to lower level agent functionality.
* <p>
Expand Down Expand Up @@ -76,22 +74,22 @@ public class ElasticApmTracer {
this.spanListeners = spanListeners;
int maxPooledElements = configurationRegistry.getConfig(ReporterConfiguration.class).getMaxQueueSize() * 2;
coreConfiguration = configurationRegistry.getConfig(CoreConfiguration.class);
transactionPool = new QueueBasedObjectPool<>(AtomicQueueFactory.<Transaction>newQueue(createBoundedMpmc(maxPooledElements)), false,
transactionPool = new QueueBasedObjectPool<>(new ArrayBlockingQueue<Transaction>(maxPooledElements), false,
new RecyclableObjectFactory<Transaction>() {
@Override
public Transaction createInstance() {
return new Transaction(ElasticApmTracer.this);
}
});
spanPool = new QueueBasedObjectPool<>(AtomicQueueFactory.<Span>newQueue(createBoundedMpmc(maxPooledElements)), false,
spanPool = new QueueBasedObjectPool<>(new ArrayBlockingQueue<Span>(maxPooledElements), false,
new RecyclableObjectFactory<Span>() {
@Override
public Span createInstance() {
return new Span(ElasticApmTracer.this);
}
});
// we are assuming that we don't need as many errors as spans or transactions
errorPool = new QueueBasedObjectPool<>(AtomicQueueFactory.<ErrorCapture>newQueue(createBoundedMpmc(maxPooledElements / 2)), false,
errorPool = new QueueBasedObjectPool<>(new ArrayBlockingQueue<ErrorCapture>(maxPooledElements / 2), false,
new RecyclableObjectFactory<ErrorCapture>() {
@Override
public ErrorCapture createInstance() {
Expand Down Expand Up @@ -126,7 +124,7 @@ public Transaction startTransaction(@Nullable String traceContextHeader, Sampler
if (!coreConfiguration.isActive()) {
transaction = noopTransaction();
} else {
transaction = transactionPool.createInstance().start(traceContextHeader, nanoTime, sampler);
transaction = createTransaction().start(traceContextHeader, nanoTime, sampler);
}
if (logger.isDebugEnabled()) {
logger.debug("startTransaction {} {", transaction);
Expand All @@ -139,7 +137,20 @@ public Transaction startTransaction(@Nullable String traceContextHeader, Sampler
}

public Transaction noopTransaction() {
return transactionPool.createInstance().startNoop();
return createTransaction().startNoop();
}

private Transaction createTransaction() {
Transaction transaction;
do {
transaction = transactionPool.createInstance();
if (transaction.isStarted()) {
String message = "This transaction has already been started: %s";
logger.warn(String.format(message, transaction));
assert false : String.format(message, transaction);
}
} while (transaction.isStarted());
return transaction;
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public abstract class AbstractSpan<T extends AbstractSpan> implements Recyclable
* (Required)
*/
protected double duration;
protected volatile long startTimestampNanos;
@Nullable
private volatile AbstractSpan<?> previouslyActive;
/**
Expand Down Expand Up @@ -130,6 +131,7 @@ public void resetState() {
timestamp = 0;
duration = 0;
type = null;
traceContext.resetState();
// don't reset previouslyActive, as deactivate can be called after end
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public Span start(Transaction transaction, @Nullable Span parentSpan, long nanoT
traceContext.setRecorded(false);
}
if (traceContext.isSampled()) {
start = (nanoTime - transaction.getDuration()) / MS_IN_NANOS;
duration = nanoTime;
start = (nanoTime - transaction.startTimestampNanos) / MS_IN_NANOS;
startTimestampNanos = nanoTime;
timestamp = System.currentTimeMillis();
}
return this;
Expand Down Expand Up @@ -131,7 +131,7 @@ public void end() {
@Override
public void end(long nanoTime) {
if (isSampled()) {
this.duration = (nanoTime - duration) / MS_IN_NANOS;
this.duration = (nanoTime - startTimestampNanos) / MS_IN_NANOS;
}
this.tracer.endSpan(this);
}
Expand All @@ -145,7 +145,6 @@ public void resetState() {
stacktrace = null;
start = 0;
transaction = null;
traceContext.resetState();
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class Transaction extends AbstractSpan<Transaction> {
* Transactions that are 'sampled' will include all available information. Transactions that are not sampled will not have 'spans' or 'context'. Defaults to true.
*/
private boolean noop;
private volatile boolean started;

public Transaction(ElasticApmTracer tracer) {
super(tracer);
Expand All @@ -88,10 +89,11 @@ public Transaction start(@Nullable String traceParentHeader, long startTimestamp
traceContext.asRootSpan(sampler);
}

this.duration = startTimestampNanos;
this.startTimestampNanos = startTimestampNanos;
this.timestamp = System.currentTimeMillis();
this.id.setToRandomValue();
this.noop = false;
this.started = true;
return this;
}

Expand Down Expand Up @@ -204,7 +206,7 @@ public void end() {

@Override
public void end(long nanoTime) {
this.duration = (nanoTime - duration) / ElasticApmTracer.MS_IN_NANOS;
this.duration = (nanoTime - startTimestampNanos) / ElasticApmTracer.MS_IN_NANOS;
if (!isSampled()) {
context.resetState();
}
Expand Down Expand Up @@ -245,7 +247,7 @@ public void resetState() {
spanCount.resetState();
spanIdCounter.set(0);
noop = false;
traceContext.resetState();
started = false;
}

@Override
Expand All @@ -261,8 +263,12 @@ public boolean isNoop() {
return noop;
}

public boolean isStarted() {
return started;
}

@Override
public String toString() {
return String.format("'%s' %s", name, id);
return String.format("'%s' %s (%d)", name, id, System.identityHashCode(this));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import co.elastic.apm.objectpool.Recyclable;
import co.elastic.apm.objectpool.RecyclableObjectFactory;
import com.lmax.disruptor.EventFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.Collection;
Expand All @@ -31,6 +33,7 @@
public class QueueBasedObjectPool<T extends Recyclable> extends AbstractObjectPool<T> implements Collection<T> {

private final Queue<T> queue;
private static final Logger logger = LoggerFactory.getLogger(QueueBasedObjectPool.class);

/**
* @param queue the underlying queue
Expand All @@ -57,6 +60,9 @@ public T tryCreateInstance() {

@Override
public void recycle(T obj) {
if (logger.isDebugEnabled()) {
logger.debug("recycling {}", System.identityHashCode(obj));
}
obj.resetState();
queue.offer(obj);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public void writeTo(BufferedSink sink) throws IOException {
payloadSerializer.serializePayload(os, payload);
} finally {
os.close();
payload.recycle();
}
}
})
Expand All @@ -109,6 +108,8 @@ public void writeTo(BufferedSink sink) throws IOException {
} catch (IOException e) {
logger.debug("Sending payload to APM server failed", e);
dropped += payload.getPayloadObjects().size();
} finally {
payload.recycle();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
Expand All @@ -47,6 +49,8 @@
*/
public class ApmServerReporter implements Reporter {

private static final Logger logger = LoggerFactory.getLogger(ApmServerReporter.class);

private static final EventTranslatorOneArg<ReportingEvent, Transaction> TRANSACTION_EVENT_TRANSLATOR = new EventTranslatorOneArg<ReportingEvent, Transaction>() {
@Override
public void translateTo(ReportingEvent event, long sequence, Transaction t) {
Expand Down Expand Up @@ -227,6 +231,7 @@ private <E extends Recyclable> boolean tryAddEventToRingBuffer(E event, EventTra
boolean queueFull = !disruptor.getRingBuffer().tryPublishEvent(eventTranslator, event);
if (queueFull) {
dropped.incrementAndGet();
logger.debug("Dropping event {} because ring buffer is full", event);
return false;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*-
* #%L
* Elastic APM Java agent
* %%
* Copyright (C) 2018 Elastic and contributors
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package co.elastic.apm.impl;

import co.elastic.apm.configuration.CoreConfiguration;
import co.elastic.apm.configuration.SpyConfiguration;
import co.elastic.apm.impl.payload.Payload;
import co.elastic.apm.impl.payload.ProcessFactory;
import co.elastic.apm.impl.payload.ServiceFactory;
import co.elastic.apm.impl.payload.SystemInfo;
import co.elastic.apm.impl.transaction.Span;
import co.elastic.apm.impl.transaction.Transaction;
import co.elastic.apm.report.ApmServerReporter;
import co.elastic.apm.report.IntakeV1ReportingEventHandler;
import co.elastic.apm.report.PayloadSender;
import co.elastic.apm.report.ReporterConfiguration;
import co.elastic.apm.report.processor.ProcessorEventHandler;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.stagemonitor.configuration.ConfigurationRegistry;

import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class ElasticApmTracerConcurrencyTest {

private static final Logger logger = LoggerFactory.getLogger(ElasticApmTracerConcurrencyTest.class);

private ExecutorService executorService;
private ElasticApmTracer tracer;

@BeforeEach
void setUp() {
ConfigurationRegistry config = SpyConfiguration.createSpyConfig();
CoreConfiguration coreConfiguration = config.getConfig(CoreConfiguration.class);
ReporterConfiguration reporterConfiguration = config.getConfig(ReporterConfiguration.class);
tracer = new ElasticApmTracerBuilder().reporter(
new ApmServerReporter(true, reporterConfiguration,
coreConfiguration,
new IntakeV1ReportingEventHandler(
new ServiceFactory().createService(coreConfiguration, null, null),
ProcessFactory.ForCurrentVM.INSTANCE.getProcessInformation(),
SystemInfo.create(),
new PayloadSender() {
@Override
public void sendPayload(Payload payload) {
logger.info("Sending payload with {} elements", payload.getPayloadSize());
payload.recycle();
}

@Override
public long getReported() {
return 0;
}

@Override
public long getDropped() {
return 0;
}
}, reporterConfiguration, new ProcessorEventHandler(Collections.emptyList()))))
.configurationRegistry(config)
.build();
executorService = Executors.newFixedThreadPool(100);
}

// @Test
void testCreateTransactions() throws Exception {
for (int i = 0; i < 100000; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
Transaction transaction = tracer.startTransaction();
transaction.withName("test").withType("test");
try {
Span span = transaction.createSpan();
span.withName("SELECT").withType("db");
Thread.sleep(3);
span.end();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
transaction.end();
}
});
}
executorService.shutdown();
executorService.awaitTermination(60, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -66,10 +67,13 @@ void testThreadLocalStorage() {
assertThat(span.isChildOf(transaction)).isTrue();
span.end();
}
assertThat(span.getDuration()).isLessThan(TimeUnit.SECONDS.toMillis(10));
assertThat(span.getStart()).isLessThan(TimeUnit.SECONDS.toMillis(10));
assertThat(tracerImpl.currentSpan()).isNull();
transaction.end();
}
assertThat(tracerImpl.currentTransaction()).isNull();
assertThat(transaction.getDuration()).isLessThan(TimeUnit.SECONDS.toMillis(10));
}

@Test
Expand Down
Loading