Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;

/**
Expand All @@ -30,10 +31,10 @@ public class DBLatencyThresholdsAndWatcher extends ConfigChangeWatcher {
private AtomicReference<Map<String, Integer>> thresholds;
private AtomicReference<String> settingsString;

DBLatencyThresholdsAndWatcher(String config, TraceModuleProvider provider) {
public DBLatencyThresholdsAndWatcher(String config, TraceModuleProvider provider) {
super(TraceModule.NAME, provider, "slowDBAccessThreshold");
thresholds = new AtomicReference(new HashMap<>());
settingsString = new AtomicReference<>("");
thresholds = new AtomicReference<>(new HashMap<>());
settingsString = new AtomicReference<>(Const.EMPTY_STRING);

activeSetting(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,21 @@ public TraceModuleProvider() {

moduleConfig.setDbLatencyThresholdsAndWatcher(thresholds);

SegmentParserListenerManager listenerManager = new SegmentParserListenerManager();
if (moduleConfig.isTraceAnalysis()) {
listenerManager.add(new MultiScopesSpanListener.Factory());
listenerManager.add(new ServiceMappingSpanListener.Factory());
}
listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate()));
segmentProducer = new SegmentParse.Producer(getManager(), listenerManager(), moduleConfig);
segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager(), moduleConfig);

segmentProducer = new SegmentParse.Producer(getManager(), listenerManager, moduleConfig);
this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducerV2));
}

listenerManager = new SegmentParserListenerManager();
public SegmentParserListenerManager listenerManager() {
SegmentParserListenerManager listenerManager = new SegmentParserListenerManager();
if (moduleConfig.isTraceAnalysis()) {
listenerManager.add(new MultiScopesSpanListener.Factory());
listenerManager.add(new ServiceMappingSpanListener.Factory());
}
listenerManager.add(new SegmentSpanListener.Factory(moduleConfig.getSampleRate()));

segmentProducerV2 = new SegmentParseV2.Producer(getManager(), listenerManager, moduleConfig);

this.registerServiceImplementation(ISegmentParserService.class, new SegmentParserServiceImpl(segmentProducerV2));
return listenerManager;
}

@Override public void start() throws ModuleStartException {
Expand All @@ -98,15 +94,11 @@ public TraceModuleProvider() {
grpcHandlerRegister.addHandler(new TraceSegmentReportServiceHandler(segmentProducerV2, getManager()));
jettyHandlerRegister.addHandler(new TraceSegmentServletHandler(segmentProducer));

SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(getManager(), segmentProducer,
moduleConfig.getBufferPath() + "v5", moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart(),
false);
SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(getManager(), segmentProducer, moduleConfig.getBufferPath() + "v5", moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart(), false);
segmentProducer.setStandardizationWorker(standardizationWorker);

SegmentStandardizationWorker standardizationWorker2 = new SegmentStandardizationWorker(getManager(), segmentProducer,
moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart(),
true);
segmentProducerV2.setStandardizationWorker(standardizationWorker2);
SegmentStandardizationWorker standardizationWorkerV2 = new SegmentStandardizationWorker(getManager(), segmentProducerV2, moduleConfig.getBufferPath(), moduleConfig.getBufferOffsetMaxFileSize(), moduleConfig.getBufferDataMaxFileSize(), moduleConfig.isBufferFileCleanWhenRestart(), true);
segmentProducerV2.setStandardizationWorker(standardizationWorkerV2);
} catch (IOException e) {
throw new ModuleStartException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.buffer.BufferStream;
import org.apache.skywalking.oap.server.library.buffer.*;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParse;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
Expand All @@ -42,7 +41,7 @@ public class SegmentStandardizationWorker extends AbstractWorker<SegmentStandard
private CounterMetrics traceBufferFileIn;

public SegmentStandardizationWorker(ModuleDefineHolder moduleDefineHolder,
SegmentParse.Producer segmentParseCreator, String path, int offsetFileMaxSize,
DataStreamReader.CallBack<UpstreamSegment> segmentParse, String path, int offsetFileMaxSize,
int dataFileMaxSize, boolean cleanWhenRestart, boolean isV6) throws IOException {
super(moduleDefineHolder);

Expand All @@ -51,7 +50,7 @@ public SegmentStandardizationWorker(ModuleDefineHolder moduleDefineHolder,
builder.dataFileMaxSize(dataFileMaxSize);
builder.offsetFileMaxSize(offsetFileMaxSize);
builder.parser(UpstreamSegment.parser());
builder.callBack(segmentParseCreator);
builder.callBack(segmentParse);

BufferStream<UpstreamSegment> stream = builder.build();
stream.initialize();
Expand Down