Skip to content

Commit

Permalink
[pinpoint-apm#9595] Improve async state propagation in NonSampling state
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jan 12, 2023
1 parent ad883b3 commit adfd9d4
Show file tree
Hide file tree
Showing 26 changed files with 563 additions and 175 deletions.
Expand Up @@ -18,6 +18,7 @@

import com.navercorp.pinpoint.bootstrap.context.AsyncContext;
import com.navercorp.pinpoint.bootstrap.context.AsyncState;
import com.navercorp.pinpoint.profiler.context.id.LocalTraceRoot;
import com.navercorp.pinpoint.profiler.context.id.TraceRoot;

/**
Expand All @@ -31,4 +32,5 @@ public interface AsyncContextFactory {

AsyncContext newAsyncContext(TraceRoot traceRoot, AsyncId asyncId, boolean canSampled, AsyncState asyncState);

AsyncContext newDisableAsyncContext(LocalTraceRoot traceRoot);
}
Expand Up @@ -17,6 +17,7 @@
package com.navercorp.pinpoint.profiler.context;

import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.profiler.context.id.LocalTraceRoot;
import com.navercorp.pinpoint.profiler.context.id.TraceRoot;


Expand All @@ -25,11 +26,9 @@
*/
public interface AsyncTraceContext {

// Reference<Trace> continueAsyncTraceObject(TraceRoot traceRoot, int asyncId, short asyncSequence);
Trace continueAsyncContextTraceObject(TraceRoot traceRoot, LocalAsyncId localAsyncId);

Reference<Trace> continueAsyncContextTraceObject(TraceRoot traceRoot, LocalAsyncId localAsyncId, boolean canSampled);

Trace newAsyncContextTraceObject(TraceRoot traceRoot, LocalAsyncId localAsyncId, boolean canSampled);
Trace continueDisableAsyncContextTraceObject(LocalTraceRoot traceRoot);

Reference<Trace> currentRawTraceObject();

Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.bootstrap.context.TraceId;
import com.navercorp.pinpoint.common.annotations.InterfaceAudience;
import com.navercorp.pinpoint.profiler.context.id.LocalTraceRoot;
import com.navercorp.pinpoint.profiler.context.id.TraceRoot;

/**
Expand All @@ -34,7 +35,9 @@ public interface BaseTraceFactory {
@InterfaceAudience.LimitedPrivate("vert.x")
Trace continueAsyncTraceObject(TraceId traceId);

Trace continueAsyncContextTraceObject(TraceRoot traceRoot, LocalAsyncId localAsyncId, boolean canSampled);
Trace continueAsyncContextTraceObject(TraceRoot traceRoot, LocalAsyncId localAsyncId);

Trace continueDisableAsyncContextTraceObject(LocalTraceRoot traceRoot);

Trace newTraceObject();

Expand Down
Expand Up @@ -23,8 +23,8 @@
import com.navercorp.pinpoint.common.trace.ServiceType;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.profiler.context.id.TraceRoot;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Objects;

Expand All @@ -38,21 +38,23 @@ public class DefaultAsyncContext implements AsyncContext {

private final TraceRoot traceRoot;
private final AsyncId asyncId;
private final boolean canSampled;

private final AsyncTraceContext asyncTraceContext;
private final Binder<Trace> binder;

private final int asyncMethodApiId;


public DefaultAsyncContext(AsyncTraceContext asyncTraceContext, TraceRoot traceRoot, AsyncId asyncId, int asyncMethodApiId, boolean canSampled) {
public DefaultAsyncContext(AsyncTraceContext asyncTraceContext,
Binder<Trace> binder,
TraceRoot traceRoot,
AsyncId asyncId, int asyncMethodApiId) {
this.asyncTraceContext = Objects.requireNonNull(asyncTraceContext, "asyncTraceContext");
this.binder = Objects.requireNonNull(binder, "binder");
this.traceRoot = Objects.requireNonNull(traceRoot, "traceRoot");
this.asyncId = Objects.requireNonNull(asyncId, "asyncId");


this.asyncMethodApiId = asyncMethodApiId;
this.canSampled = canSampled;
}


Expand All @@ -63,7 +65,7 @@ public TraceRoot getTraceRoot() {
@Override
public Trace continueAsyncTraceObject() {

final Reference<Trace> reference = asyncTraceContext.currentRawTraceObject();
final Reference<Trace> reference = binder.get();
final Trace nestedTrace = reference.get();
if (nestedTrace != null) {
// return Nested Trace Object?
Expand All @@ -80,7 +82,7 @@ private Trace newAsyncContextTrace(Reference<Trace> reference) {
// final int asyncId = this.asyncId.getAsyncId();
// final short asyncSequence = this.asyncId.nextAsyncSequence();
final LocalAsyncId localAsyncId = this.asyncId.nextLocalAsyncId();
final Trace asyncTrace = asyncTraceContext.newAsyncContextTraceObject(traceRoot, localAsyncId, canSampled);
final Trace asyncTrace = asyncTraceContext.continueAsyncContextTraceObject(traceRoot, localAsyncId);


bind(reference, asyncTrace);
Expand Down Expand Up @@ -112,10 +114,7 @@ private Trace newAsyncContextTrace(Reference<Trace> reference) {
recorder.recordApiId(asyncMethodApiId);
}

if (asyncTrace.canSampled()) {
return asyncTrace;
}
return null;
return asyncTrace;
}

private void bind(Reference<Trace> reference, Trace asyncTrace) {
Expand All @@ -127,14 +126,27 @@ private void bind(Reference<Trace> reference, Trace asyncTrace) {

@Override
public Trace currentAsyncTraceObject() {
final Reference<Trace> reference = asyncTraceContext.currentTraceObject();
final Reference<Trace> reference = currentTraceObject();
return reference.get();
}


private Reference<Trace> currentTraceObject() {
final Reference<Trace> reference = binder.get();
final Trace trace = reference.get();
if (trace == null) {
return DefaultReference.emptyReference();
}
if (trace.canSampled()) {
return reference;
}
return DefaultReference.emptyReference();
}


@Override
public void close() {
asyncTraceContext.removeTraceObject();
binder.remove();
}

@Override
Expand Down
Expand Up @@ -19,7 +19,9 @@
import com.navercorp.pinpoint.bootstrap.context.AsyncContext;
import com.navercorp.pinpoint.bootstrap.context.AsyncState;
import com.navercorp.pinpoint.bootstrap.context.MethodDescriptor;
import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.profiler.context.id.AsyncIdGenerator;
import com.navercorp.pinpoint.profiler.context.id.LocalTraceRoot;
import com.navercorp.pinpoint.profiler.context.id.TraceRoot;
import com.navercorp.pinpoint.profiler.context.method.PredefinedMethodDescriptorRegistry;

Expand All @@ -32,12 +34,17 @@ public class DefaultAsyncContextFactory implements AsyncContextFactory {

private final AsyncTraceContext asyncTraceContext;
private final AsyncIdGenerator asyncIdGenerator;
private final Binder<Trace> binder;
private final PredefinedMethodDescriptorRegistry predefinedMethodDescriptorRegistry;
private final int asyncMethodApiId;

public DefaultAsyncContextFactory(AsyncTraceContext asyncTraceContext, AsyncIdGenerator asyncIdGenerator, PredefinedMethodDescriptorRegistry predefinedMethodDescriptorRegistry) {
public DefaultAsyncContextFactory(AsyncTraceContext asyncTraceContext,
Binder<Trace> binder,
AsyncIdGenerator asyncIdGenerator,
PredefinedMethodDescriptorRegistry predefinedMethodDescriptorRegistry) {
this.asyncTraceContext = Objects.requireNonNull(asyncTraceContext, "traceFactoryProvider");
this.asyncIdGenerator = Objects.requireNonNull(asyncIdGenerator, "asyncIdGenerator");
this.binder = Objects.requireNonNull(binder, "binder");

this.predefinedMethodDescriptorRegistry = Objects.requireNonNull(predefinedMethodDescriptorRegistry, "predefinedMethodDescriptorRegistry");

Expand All @@ -59,7 +66,11 @@ public AsyncContext newAsyncContext(TraceRoot traceRoot, AsyncId asyncId, boolea
Objects.requireNonNull(traceRoot, "traceRoot");
Objects.requireNonNull(asyncId, "asyncId");

return new DefaultAsyncContext(asyncTraceContext, traceRoot, asyncId, this.asyncMethodApiId, canSampled);
if (canSampled) {
return new DefaultAsyncContext(asyncTraceContext, binder, traceRoot, asyncId, this.asyncMethodApiId);
} else {
return newDisableAsyncContext(traceRoot);
}
}

@Override
Expand All @@ -68,8 +79,18 @@ public AsyncContext newAsyncContext(TraceRoot traceRoot, AsyncId asyncId, boolea
Objects.requireNonNull(asyncId, "asyncId");
Objects.requireNonNull(asyncState, "asyncState");

return new StatefulAsyncContext(asyncTraceContext, traceRoot, asyncId, asyncMethodApiId, asyncState, canSampled);
if (canSampled) {
return new StatefulAsyncContext(asyncTraceContext, binder, traceRoot, asyncId, asyncMethodApiId, asyncState);
} else {
// TODO
return new StatefulDisableAsyncContext(asyncTraceContext, binder, traceRoot, asyncState);
}

}

@Override
public AsyncContext newDisableAsyncContext(LocalTraceRoot traceRoot) {
return new DisableAsyncContext(asyncTraceContext, binder, traceRoot);
}

}
Expand Up @@ -18,21 +18,18 @@

import com.google.inject.Provider;
import com.navercorp.pinpoint.bootstrap.context.Trace;
import java.util.Objects;
import com.navercorp.pinpoint.exception.PinpointException;
import com.navercorp.pinpoint.profiler.context.id.LocalTraceRoot;
import com.navercorp.pinpoint.profiler.context.id.TraceRoot;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

import java.util.Objects;


/**
* @author Woonduk Kang(emeroad)
*/
public class DefaultAsyncTraceContext implements AsyncTraceContext {

private final Logger logger = LogManager.getLogger(this.getClass());

private static final Reference<Trace> EMPTY = DefaultReference.emptyReference();
private static final Reference<Trace> EMPTY_REF = DefaultReference.emptyReference();

private final Provider<BaseTraceFactory> baseTraceFactoryProvider;
private final Binder<Trace> binder;
Expand All @@ -43,19 +40,15 @@ public DefaultAsyncTraceContext(Provider<BaseTraceFactory> baseTraceFactoryProvi
}

@Override
public Reference<Trace> continueAsyncContextTraceObject(TraceRoot traceRoot, LocalAsyncId localAsyncId, boolean canSampled) {
final Reference<Trace> reference = checkAndGet();

final Trace trace = newAsyncContextTraceObject(traceRoot, localAsyncId, canSampled);

bind(reference, trace);
return reference;
public Trace continueAsyncContextTraceObject(TraceRoot traceRoot, LocalAsyncId localAsyncId) {
final BaseTraceFactory baseTraceFactory = baseTraceFactoryProvider.get();
return baseTraceFactory.continueAsyncContextTraceObject(traceRoot, localAsyncId);
}

@Override
public Trace newAsyncContextTraceObject(TraceRoot traceRoot, LocalAsyncId localAsyncId, boolean canSampled) {
public Trace continueDisableAsyncContextTraceObject(LocalTraceRoot traceRoot) {
final BaseTraceFactory baseTraceFactory = baseTraceFactoryProvider.get();
return baseTraceFactory.continueAsyncContextTraceObject(traceRoot, localAsyncId, canSampled);
return baseTraceFactory.continueDisableAsyncContextTraceObject(traceRoot);
}


Expand All @@ -69,12 +62,12 @@ public Reference<Trace> currentTraceObject() {
final Reference<Trace> reference = binder.get();
final Trace trace = reference.get();
if (trace == null) {
return EMPTY;
return EMPTY_REF;
}
if (trace.canSampled()) {
return reference;
}
return EMPTY;
return EMPTY_REF;
}


Expand All @@ -83,22 +76,4 @@ public void removeTraceObject() {
binder.remove();
}


private Reference<Trace> checkAndGet() {
final Reference<Trace> reference = this.binder.get();
final Trace old = reference.get();
if (old != null) {
final PinpointException exception = new PinpointException("already Trace Object exist.");
if (logger.isWarnEnabled()) {
logger.warn("beforeTrace:{}", old, exception);
}
throw exception;
}
return reference;
}

private void bind(Reference<Trace> reference, Trace trace) {
reference.set(trace);
}

}
Expand Up @@ -17,6 +17,7 @@
package com.navercorp.pinpoint.profiler.context;

import com.navercorp.pinpoint.bootstrap.context.AsyncState;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.SpanRecorder;
import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.bootstrap.context.TraceId;
Expand Down Expand Up @@ -143,21 +144,25 @@ Trace newTraceObject(TraceSampler.State state) {

// internal async trace.
@Override
public Trace continueAsyncContextTraceObject(TraceRoot traceRoot, LocalAsyncId localAsyncId, boolean sampling) {
if (sampling) {
final SpanChunkFactory spanChunkFactory = new AsyncSpanChunkFactory(traceRoot, localAsyncId);
final Storage storage = storageFactory.createStorage(spanChunkFactory);
public Trace continueAsyncContextTraceObject(TraceRoot traceRoot, LocalAsyncId localAsyncId) {
final SpanChunkFactory spanChunkFactory = new AsyncSpanChunkFactory(traceRoot, localAsyncId);
final Storage storage = storageFactory.createStorage(spanChunkFactory);

final CallStack<SpanEvent> callStack = callStackFactory.newCallStack();
final CallStack<SpanEvent> callStack = callStackFactory.newCallStack();

final SpanRecorder spanRecorder = recorderFactory.newTraceRootSpanRecorder(traceRoot);
final SpanRecorder spanRecorder = recorderFactory.newTraceRootSpanRecorder(traceRoot);

final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder(traceRoot);
final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder(traceRoot);

return new AsyncChildTrace(traceRoot, callStack, storage, spanRecorder, wrappedSpanEventRecorder, localAsyncId);
} else {
return new DisableAsyncChildTrace(traceRoot, localAsyncId);
}
return new AsyncChildTrace(traceRoot, callStack, storage, spanRecorder, wrappedSpanEventRecorder, localAsyncId);
}

@Override
public Trace continueDisableAsyncContextTraceObject(LocalTraceRoot traceRoot) {
final AsyncState asyncState = new DisableAsyncState();
SpanRecorder spanRecorder = recorderFactory.newDisableSpanRecorder(traceRoot);
SpanEventRecorder spanEventRecorder = recorderFactory.newDisableSpanEventRecorder(traceRoot, asyncState);
return new DisableAsyncChildTrace(traceRoot, spanRecorder, spanEventRecorder);
}


Expand All @@ -175,7 +180,7 @@ public Trace continueAsyncTraceObject(final TraceId traceId) {
final CallStack<SpanEvent> callStack = callStackFactory.newCallStack();

final ActiveTraceHandle handle = registerActiveTrace(traceRoot);
final SpanAsyncStateListener asyncStateListener = new SpanAsyncStateListener(span, storageFactory);
final ListenableAsyncState.AsyncStateListener asyncStateListener = new SpanAsyncStateListener(span, storageFactory);
final AsyncState asyncState = new ListenableAsyncState(traceRoot, asyncStateListener, handle, uriStatStorage);

final SpanRecorder spanRecorder = recorderFactory.newSpanRecorder(span);
Expand Down Expand Up @@ -210,7 +215,7 @@ Trace newAsyncTraceObject(TraceSampler.State state) {
final CallStack<SpanEvent> callStack = callStackFactory.newCallStack();

final ActiveTraceHandle handle = registerActiveTrace(traceRoot);
final SpanAsyncStateListener asyncStateListener = new SpanAsyncStateListener(span, storageFactory);
final ListenableAsyncState.AsyncStateListener asyncStateListener = new SpanAsyncStateListener(span, storageFactory);
final AsyncState asyncState = new ListenableAsyncState(traceRoot, asyncStateListener, handle, uriStatStorage);

final SpanRecorder spanRecorder = recorderFactory.newSpanRecorder(span);
Expand All @@ -232,9 +237,10 @@ public Trace disableSampling() {
private Trace newLocalTrace(long nextDisabledId) {
final LocalTraceRoot traceRoot = traceRootFactory.newDisableTraceRoot(nextDisabledId);
final SpanRecorder spanRecorder = recorderFactory.newDisableSpanRecorder(traceRoot);
final SpanEventRecorder spanEventRecorder = recorderFactory.newDisableSpanEventRecorder(traceRoot, new DisableAsyncState());
final long traceStartTime = traceRoot.getTraceStartTime();
final long threadId = Thread.currentThread().getId();
final ActiveTraceHandle activeTraceHandle = registerActiveTrace(nextDisabledId, traceStartTime, threadId);
return new DisableTrace(traceRoot, spanRecorder, activeTraceHandle, uriStatStorage);
return new DisableTrace(traceRoot, spanRecorder, spanEventRecorder, activeTraceHandle, uriStatStorage);
}
}

0 comments on commit adfd9d4

Please sign in to comment.