Skip to content

Commit

Permalink
Speed up retrieval of data for flamegraphs (#93448)
Browse files Browse the repository at this point in the history
With this commit we parallelize the retrieval of stacktraces via the
`mget` API in the profiling plugin as experiments have shown that we can
better utilize the available resources and thus decrease latency.
Furthermore, we prepare the parallel retrieval of stackframes and
executables using the same approach. Our experiments have not shown a clear
indication of the optimal value for this setting, therefore we set the default
value to `1` to effectively keep the prior behavior. Finally, we also
introduce a setting that can be used to toggle the `realtime` (default kept as
`true`) to allow for further experimentation with that flag.
  • Loading branch information
danielmitterdorfer committed Feb 6, 2023
1 parent 725da76 commit ccd4bfc
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 61 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/93448.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 93448
summary: Speed up retrieval of data for flamegraphs
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,12 @@ public List<RestHandler> getRestHandlers(

@Override
public List<Setting<?>> getSettings() {
return List.of(PROFILING_ENABLED);
return List.of(
PROFILING_ENABLED,
TransportGetProfilingAction.PROFILING_MAX_STACKTRACE_QUERY_SLICES,
TransportGetProfilingAction.PROFILING_MAX_DETAIL_QUERY_SLICES,
TransportGetProfilingAction.PROFILING_QUERY_REALTIME
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
Expand All @@ -27,28 +29,65 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.ObjectPath;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicInteger;

public class TransportGetProfilingAction extends HandledTransportAction<GetProfilingRequest, GetProfilingResponse> {
private static final Logger log = LogManager.getLogger(TransportGetProfilingAction.class);

public static final Setting<Integer> PROFILING_MAX_STACKTRACE_QUERY_SLICES = Setting.intSetting(
"xpack.profiling.query.stacktrace.max_slices",
16,
1,
Setting.Property.NodeScope
);

public static final Setting<Integer> PROFILING_MAX_DETAIL_QUERY_SLICES = Setting.intSetting(
"xpack.profiling.query.details.max_slices",
1,
1,
Setting.Property.NodeScope
);

public static final Setting<Boolean> PROFILING_QUERY_REALTIME = Setting.boolSetting(
"xpack.profiling.query.realtime",
true,
Setting.Property.NodeScope
);

private final NodeClient nodeClient;
private final TransportService transportService;
private final int desiredSlices;
private final int desiredDetailSlices;
private final boolean realtime;

@Inject
public TransportGetProfilingAction(TransportService transportService, ActionFilters actionFilters, NodeClient nodeClient) {
public TransportGetProfilingAction(
Settings settings,
TransportService transportService,
ActionFilters actionFilters,
NodeClient nodeClient
) {
super(GetProfilingAction.NAME, transportService, actionFilters, GetProfilingRequest::new);
this.nodeClient = nodeClient;
this.transportService = transportService;
this.desiredSlices = PROFILING_MAX_STACKTRACE_QUERY_SLICES.get(settings);
this.desiredDetailSlices = PROFILING_MAX_DETAIL_QUERY_SLICES.get(settings);
this.realtime = PROFILING_QUERY_REALTIME.get(settings);
}

@Override
protected void doExecute(Task submitTask, GetProfilingRequest request, ActionListener<GetProfilingResponse> submitListener) {
long start = System.nanoTime();
Client client = new ParentTaskAssigningClient(this.nodeClient, transportService.getLocalNode(), submitTask);
EventsIndex mediumDownsampled = EventsIndex.MEDIUM_DOWNSAMPLED;
client.prepareSearch(mediumDownsampled.getName())
Expand All @@ -60,6 +99,7 @@ protected void doExecute(Task submitTask, GetProfilingRequest request, ActionLis
public void onResponse(SearchResponse searchResponse) {
long sampleCount = searchResponse.getHits().getTotalHits().value;
EventsIndex resampledIndex = mediumDownsampled.getResampledIndex(request.getSampleSize(), sampleCount);
log.debug("getResampledIndex took [" + (System.nanoTime() - start) / 1_000_000.0d + " ms].");
searchEventGroupByStackTrace(client, request, resampledIndex, submitListener);
}

Expand All @@ -86,6 +126,7 @@ private void searchEventGroupByStackTrace(
EventsIndex eventsIndex,
ActionListener<GetProfilingResponse> submitListener
) {
long start = System.nanoTime();
GetProfilingResponseBuilder responseBuilder = new GetProfilingResponseBuilder();
client.prepareSearch(eventsIndex.getName())
.setTrackTotalHits(false)
Expand Down Expand Up @@ -119,6 +160,7 @@ public void onResponse(SearchResponse searchResponse) {
stackTraceEvents.put(bucket.getKeyAsString(), finalCount);
}
}
log.debug("searchEventGroupByStackTrace took [" + (System.nanoTime() - start) / 1_000_000.0d + " ms].");
if (stackTraceEvents.isEmpty() == false) {
responseBuilder.setStackTraceEvents(stackTraceEvents);
retrieveStackTraces(client, responseBuilder, submitListener);
Expand All @@ -139,39 +181,85 @@ private void retrieveStackTraces(
GetProfilingResponseBuilder responseBuilder,
ActionListener<GetProfilingResponse> submitListener
) {
client.prepareMultiGet()
.addIds("profiling-stacktraces", responseBuilder.getStackTraceEvents().keySet())
.setRealtime(true)
.execute(new ActionListener<>() {
List<String> eventIds = new ArrayList<>(responseBuilder.getStackTraceEvents().keySet());
List<List<String>> slicedEventIds = sliced(eventIds, desiredSlices);
StackTraceHandler handler = new StackTraceHandler(client, responseBuilder, submitListener, slicedEventIds.size());
for (List<String> slice : slicedEventIds) {
client.prepareMultiGet().setRealtime(realtime).addIds("profiling-stacktraces", slice).execute(new ActionListener<>() {
@Override
public void onResponse(MultiGetResponse multiGetItemResponses) {
Map<String, StackTrace> stackTracePerId = new HashMap<>();
// sort items lexicographically to access Lucene's term dictionary more efficiently when issuing an mget request.
// The term dictionary is lexicographically sorted and using the same order reduces the number of page faults
// needed to load it.
Set<String> stackFrameIds = new TreeSet<>();
Set<String> executableIds = new TreeSet<>();
int totalFrames = 0;
for (MultiGetItemResponse trace : multiGetItemResponses) {
if (trace.isFailed() == false && trace.getResponse().isExists()) {
String id = trace.getId();
StackTrace stacktrace = StackTrace.fromSource(trace.getResponse().getSource());
stackTracePerId.put(id, stacktrace);
totalFrames += stacktrace.frameIds.length;
stackFrameIds.addAll(Arrays.asList(stacktrace.frameIds));
executableIds.addAll(Arrays.asList(stacktrace.fileIds));
}
}
responseBuilder.setStackTraces(stackTracePerId);
responseBuilder.setTotalFrames(totalFrames);
retrieveStackTraceDetails(client, responseBuilder, stackFrameIds, executableIds, submitListener);
handler.onResponse(multiGetItemResponses);
}

@Override
public void onFailure(Exception e) {
submitListener.onFailure(e);
}
});
}
}

// package private for testing
static <T> List<List<T>> sliced(List<T> c, int slices) {
if (c.size() <= slices) {
return List.of(c);
}
List<List<T>> slicedList = new ArrayList<>();
int batchSize = c.size() / slices;
if (c.size() % slices != 0) {
batchSize += 1;
}
for (int slice = 0; slice < slices; slice++) {
List<T> ids = c.subList(slice * batchSize, Math.min((slice + 1) * batchSize, c.size()));
slicedList.add(ids);
}
return Collections.unmodifiableList(slicedList);
}

private class StackTraceHandler {
private final AtomicInteger remainingSlices;
private final Client client;
private final GetProfilingResponseBuilder responseBuilder;
private final ActionListener<GetProfilingResponse> submitListener;
private final Map<String, StackTrace> stackTracePerId = new ConcurrentHashMap<>();
// sort items lexicographically to access Lucene's term dictionary more efficiently when issuing an mget request.
// The term dictionary is lexicographically sorted and using the same order reduces the number of page faults
// needed to load it.
private final Set<String> stackFrameIds = new ConcurrentSkipListSet<>();
private final Set<String> executableIds = new ConcurrentSkipListSet<>();
private final AtomicInteger totalFrames = new AtomicInteger();
private final long start = System.nanoTime();

private StackTraceHandler(
Client client,
GetProfilingResponseBuilder responseBuilder,
ActionListener<GetProfilingResponse> submitListener,
int slices
) {
this.remainingSlices = new AtomicInteger(slices);
this.client = client;
this.responseBuilder = responseBuilder;
this.submitListener = submitListener;
}

public void onResponse(MultiGetResponse multiGetItemResponses) {
for (MultiGetItemResponse trace : multiGetItemResponses) {
if (trace.isFailed() == false && trace.getResponse().isExists()) {
String id = trace.getId();
StackTrace stacktrace = StackTrace.fromSource(trace.getResponse().getSource());
stackTracePerId.put(id, stacktrace);
totalFrames.addAndGet(stacktrace.frameIds.length);
stackFrameIds.addAll(Arrays.asList(stacktrace.frameIds));
executableIds.addAll(Arrays.asList(stacktrace.fileIds));
}
}
if (this.remainingSlices.decrementAndGet() == 0) {
responseBuilder.setStackTraces(stackTracePerId);
responseBuilder.setTotalFrames(totalFrames.get());
log.debug("retrieveStackTraces took [" + (System.nanoTime() - start) / 1_000_000.0d + " ms].");
retrieveStackTraceDetails(client, responseBuilder, stackFrameIds, executableIds, submitListener);
}
}
}

private void retrieveStackTraceDetails(
Expand All @@ -181,39 +269,49 @@ private void retrieveStackTraceDetails(
Set<String> executableIds,
ActionListener<GetProfilingResponse> submitListener
) {

DetailsHandler handler = new DetailsHandler(responseBuilder, submitListener);
List<List<String>> slicedStackFrameIds = sliced(new ArrayList<>(stackFrameIds), desiredDetailSlices);
List<List<String>> slicedExecutableIds = sliced(new ArrayList<>(executableIds), desiredDetailSlices);
DetailsHandler handler = new DetailsHandler(
responseBuilder,
submitListener,
slicedExecutableIds.size(),
slicedStackFrameIds.size()
);

if (stackFrameIds.isEmpty()) {
handler.onStackFramesResponse(new MultiGetResponse(new MultiGetItemResponse[0]));
} else {
client.prepareMultiGet().addIds("profiling-stackframes", stackFrameIds).setRealtime(true).execute(new ActionListener<>() {
@Override
public void onResponse(MultiGetResponse multiGetItemResponses) {
handler.onStackFramesResponse(multiGetItemResponses);
}
for (List<String> slice : slicedStackFrameIds) {
client.prepareMultiGet().addIds("profiling-stackframes", slice).setRealtime(realtime).execute(new ActionListener<>() {
@Override
public void onResponse(MultiGetResponse multiGetItemResponses) {
handler.onStackFramesResponse(multiGetItemResponses);
}

@Override
public void onFailure(Exception e) {
submitListener.onFailure(e);
}
});
@Override
public void onFailure(Exception e) {
submitListener.onFailure(e);
}
});
}
}
// no data dependency - we can do this concurrently
if (executableIds.isEmpty()) {
handler.onExecutableDetailsResponse(new MultiGetResponse(new MultiGetItemResponse[0]));
} else {
client.prepareMultiGet().addIds("profiling-executables", executableIds).setRealtime(true).execute(new ActionListener<>() {
@Override
public void onResponse(MultiGetResponse multiGetItemResponses) {
handler.onExecutableDetailsResponse(multiGetItemResponses);
}
for (List<String> slice : slicedExecutableIds) {
client.prepareMultiGet().addIds("profiling-executables", slice).setRealtime(realtime).execute(new ActionListener<>() {
@Override
public void onResponse(MultiGetResponse multiGetItemResponses) {
handler.onExecutableDetailsResponse(multiGetItemResponses);
}

@Override
public void onFailure(Exception e) {
submitListener.onFailure(e);
}
});
@Override
public void onFailure(Exception e) {
submitListener.onFailure(e);
}
});
}
}
}

Expand Down Expand Up @@ -270,42 +368,51 @@ public int adjustSampleCount(int originalCount) {
private static class DetailsHandler {
private final GetProfilingResponseBuilder builder;
private final ActionListener<GetProfilingResponse> submitListener;
private volatile Map<String, String> executables;
private volatile Map<String, StackFrame> stackFrames;

private DetailsHandler(GetProfilingResponseBuilder builder, ActionListener<GetProfilingResponse> submitListener) {
private final Map<String, String> executables;
private final Map<String, StackFrame> stackFrames;
private final AtomicInteger expectedExecutableSlices;
private final AtomicInteger expectedStackFrameSlices;
private final long start = System.nanoTime();

private DetailsHandler(
GetProfilingResponseBuilder builder,
ActionListener<GetProfilingResponse> submitListener,
int expectedExecutableSlices,
int expectedStackFrameSlices
) {
this.builder = builder;
this.submitListener = submitListener;
this.executables = new ConcurrentHashMap<>();
this.stackFrames = new ConcurrentHashMap<>();
this.expectedExecutableSlices = new AtomicInteger(expectedExecutableSlices);
this.expectedStackFrameSlices = new AtomicInteger(expectedStackFrameSlices);
}

public void onStackFramesResponse(MultiGetResponse multiGetItemResponses) {
Map<String, StackFrame> stackFrames = new HashMap<>();
for (MultiGetItemResponse frame : multiGetItemResponses) {
if (frame.isFailed() == false && frame.getResponse().isExists()) {
stackFrames.put(frame.getId(), StackFrame.fromSource(frame.getResponse().getSource()));
}
}
// publish to object state only when completely done, otherwise mayFinish() could run twice
this.stackFrames = stackFrames;
expectedStackFrameSlices.decrementAndGet();
mayFinish();
}

public void onExecutableDetailsResponse(MultiGetResponse multiGetItemResponses) {
Map<String, String> executables = new HashMap<>();
for (MultiGetItemResponse executable : multiGetItemResponses) {
if (executable.isFailed() == false && executable.getResponse().isExists()) {
executables.put(executable.getId(), ObjectPath.eval("Executable.file.name", executable.getResponse().getSource()));
}
}
// publish to object state only when completely done, otherwise mayFinish() could run twice
this.executables = executables;
expectedExecutableSlices.decrementAndGet();
mayFinish();
}

public void mayFinish() {
if (executables != null && stackFrames != null) {
if (expectedStackFrameSlices.get() == 0 && expectedExecutableSlices.get() == 0) {
builder.setExecutables(executables);
builder.setStackFrames(stackFrames);
log.debug("retrieveStackTraceDetails took [" + (System.nanoTime() - start) / 1_000_000.0d + " ms].");
submitListener.onResponse(builder.build());
}
}
Expand Down

0 comments on commit ccd4bfc

Please sign in to comment.