Skip to content
Permalink
Browse files
[FLINK-26074] Improve FlameGraphs scalability for high parallelism jobs
  • Loading branch information
afedulov authored and MartijnVisser committed May 25, 2022
1 parent b51084d commit b39d8425750ffe42897b57b001141519173ea27f
Showing 17 changed files with 546 additions and 295 deletions.
@@ -21,30 +21,30 @@
import org.apache.flink.util.Preconditions;

import java.io.Serializable;
import java.util.List;
import java.util.Collection;

/** Response to the request to collect thread details samples. */
public class TaskThreadInfoResponse implements Serializable {

private static final long serialVersionUID = -4786454630050578031L;

private final List<ThreadInfoSample> samples;
private final Collection<ThreadInfoSample> samples;

/**
* Creates a response to the request to collect thread details samples.
*
* @param samples Thread info samples.
*/
public TaskThreadInfoResponse(List<ThreadInfoSample> samples) {
public TaskThreadInfoResponse(Collection<ThreadInfoSample> samples) {
this.samples = Preconditions.checkNotNull(samples);
}

/**
* Returns a list of ThreadInfoSample.
* Returns a collection of ThreadInfoSample.
*
* @return List of thread info samples for a particular execution attempt (Task)
* @return A collection of thread info samples for a particular execution attempt (Task)
*/
public List<ThreadInfoSample> getSamples() {
public Collection<ThreadInfoSample> getSamples() {
return samples;
}
}
@@ -22,7 +22,9 @@

import java.io.Serializable;
import java.lang.management.ThreadInfo;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* A serializable wrapper container for transferring parts of the {@link
@@ -54,11 +56,27 @@ public static Optional<ThreadInfoSample> from(@Nullable ThreadInfo threadInfo) {
}
}

/**
* Constructs a collection of {@link ThreadInfoSample}s from a collection of {@link ThreadInfo}
* samples.
*
* @param threadInfos the collection of {@link ThreadInfo}.
* @return the collection of the corresponding {@link ThreadInfoSample}s.
*/
public static Collection<ThreadInfoSample> from(Collection<ThreadInfo> threadInfos) {
return threadInfos.stream()
.map(
threadInfo ->
new ThreadInfoSample(
threadInfo.getThreadState(), threadInfo.getStackTrace()))
.collect(Collectors.toList());
}

public Thread.State getThreadState() {
return threadState;
}

public StackTraceElement[] getStackTrace() {
return stackTrace.clone();
return stackTrace;
}
}
@@ -546,23 +546,29 @@ private void stopTaskExecutorServices() throws Exception {

@Override
public CompletableFuture<TaskThreadInfoResponse> requestThreadInfoSamples(
final ExecutionAttemptID taskExecutionAttemptId,
final Collection<ExecutionAttemptID> taskExecutionAttemptIds,
final ThreadInfoSamplesRequest requestParams,
final Time timeout) {

final Task task = taskSlotTable.getTask(taskExecutionAttemptId);
if (task == null) {
return FutureUtils.completedExceptionally(
new IllegalStateException(
String.format(
"Cannot sample task %s. "
+ "Task is not known to the task manager.",
taskExecutionAttemptId)));
final Collection<Task> tasks = new ArrayList<>();
for (ExecutionAttemptID executionAttemptId : taskExecutionAttemptIds) {
final Task task = taskSlotTable.getTask(executionAttemptId);
if (task == null) {
log.warn(
String.format(
"Cannot sample task %s. "
+ "Task is not known to the task manager.",
executionAttemptId));
} else {
tasks.add(task);
}
}

final CompletableFuture<List<ThreadInfoSample>> stackTracesFuture =
threadInfoSampleService.requestThreadInfoSamples(
SampleableTaskAdapter.fromTask(task), requestParams);
Collection<SampleableTask> sampleableTasks =
tasks.stream().map(SampleableTaskAdapter::fromTask).collect(Collectors.toList());

final CompletableFuture<Collection<ThreadInfoSample>> stackTracesFuture =
threadInfoSampleService.requestThreadInfoSamples(sampleableTasks, requestParams);

return stackTracesFuture.thenApply(TaskThreadInfoResponse::new);
}
@@ -231,10 +231,10 @@ public CompletableFuture<ThreadDumpInfo> requestThreadDump(Time timeout) {

@Override
public CompletableFuture<TaskThreadInfoResponse> requestThreadInfoSamples(
ExecutionAttemptID taskExecutionAttemptId,
Collection<ExecutionAttemptID> taskExecutionAttemptIds,
ThreadInfoSamplesRequest requestParams,
Time timeout) {
return originalGateway.requestThreadInfoSamples(
taskExecutionAttemptId, requestParams, timeout);
taskExecutionAttemptIds, requestParams, timeout);
}
}
@@ -23,20 +23,22 @@
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/** RPC gateway for requesting {@link org.apache.flink.runtime.messages.ThreadInfoSample}. */
public interface TaskExecutorThreadInfoGateway {

/**
* Request a thread info sample from the given task.
* Request a thread info sample from the given tasks.
*
* @param taskExecutionAttemptId identifying the task to sample
* @param taskExecutionAttemptIds identifying the task to sample
* @param requestParams parameters of the request
* @param timeout of the request
* @return Future of stack trace sample response
*/
CompletableFuture<TaskThreadInfoResponse> requestThreadInfoSamples(
ExecutionAttemptID taskExecutionAttemptId,
Collection<ExecutionAttemptID> taskExecutionAttemptIds,
ThreadInfoSamplesRequest requestParams,
@RpcTimeout Time timeout);
}
@@ -27,11 +27,11 @@
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;

@@ -46,22 +46,24 @@ class ThreadInfoSampleService implements Closeable {
}

/**
* Returns a future that completes with a given number of thread info samples of a task thread.
* Returns a future that completes with a given number of thread info samples for a set of task
* threads.
*
* @param task The task to be sampled from.
* @param tasks The tasks to be sampled.
* @param requestParams Parameters of the sampling request.
* @return A future containing the stack trace samples.
*/
public CompletableFuture<List<ThreadInfoSample>> requestThreadInfoSamples(
final SampleableTask task, final ThreadInfoSamplesRequest requestParams) {
checkNotNull(task, "task must not be null");
public CompletableFuture<Collection<ThreadInfoSample>> requestThreadInfoSamples(
final Collection<? extends SampleableTask> tasks,
final ThreadInfoSamplesRequest requestParams) {
checkNotNull(tasks, "task must not be null");
checkNotNull(requestParams, "requestParams must not be null");

CompletableFuture<List<ThreadInfoSample>> resultFuture = new CompletableFuture<>();
CompletableFuture<Collection<ThreadInfoSample>> resultFuture = new CompletableFuture<>();
scheduledExecutor.execute(
() ->
requestThreadInfoSamples(
task,
tasks,
requestParams.getNumSamples(),
requestParams.getDelayBetweenSamples(),
requestParams.getMaxStackTraceDepth(),
@@ -71,43 +73,52 @@ public CompletableFuture<List<ThreadInfoSample>> requestThreadInfoSamples(
}

private void requestThreadInfoSamples(
final SampleableTask task,
final Collection<? extends SampleableTask> tasks,
final int numSamples,
final Duration delayBetweenSamples,
final int maxStackTraceDepth,
final List<ThreadInfoSample> currentTraces,
final CompletableFuture<List<ThreadInfoSample>> resultFuture) {
final Collection<ThreadInfoSample> currentTraces,
final CompletableFuture<Collection<ThreadInfoSample>> resultFuture) {

final long threadId = task.getExecutingThread().getId();
final Optional<ThreadInfoSample> threadInfoSample =
JvmUtils.createThreadInfoSample(threadId, maxStackTraceDepth);
final Collection<Long> threadIds =
tasks.stream()
.map(t -> t.getExecutingThread().getId())
.collect(Collectors.toList());

if (threadInfoSample.isPresent()) {
currentTraces.add(threadInfoSample.get());
final Collection<ThreadInfoSample> threadInfoSample =
JvmUtils.createThreadInfoSample(threadIds, maxStackTraceDepth);

if (!threadInfoSample.isEmpty()) {
currentTraces.addAll(threadInfoSample);
if (numSamples > 1) {
scheduledExecutor.schedule(
() ->
requestThreadInfoSamples(
tasks,
numSamples - 1,
delayBetweenSamples,
maxStackTraceDepth,
currentTraces,
resultFuture),
delayBetweenSamples.toMillis(),
TimeUnit.MILLISECONDS);
} else {
resultFuture.complete(currentTraces);
}
} else if (!currentTraces.isEmpty()) {
// Requested tasks are not running anymore, completing with whatever was collected by
// now.
resultFuture.complete(currentTraces);
} else {
final String ids =
tasks.stream()
.map(SampleableTask::getExecutionId)
.map(e -> e == null ? "unknown" : e.toString())
.collect(Collectors.joining(", ", "[", "]"));
resultFuture.completeExceptionally(
new IllegalStateException(
String.format(
"Cannot sample task %s. The task is not running.",
task.getExecutionId())));
}

if (numSamples > 1) {
scheduledExecutor.schedule(
() ->
requestThreadInfoSamples(
task,
numSamples - 1,
delayBetweenSamples,
maxStackTraceDepth,
currentTraces,
resultFuture),
delayBetweenSamples.toMillis(),
TimeUnit.MILLISECONDS);
} else {
resultFuture.complete(currentTraces);
"Cannot sample tasks %s. The tasks are not running.", ids)));
}
}

@@ -20,16 +20,24 @@

import org.apache.flink.runtime.messages.ThreadInfoSample;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/** Utilities for {@link java.lang.management.ManagementFactory}. */
public final class JvmUtils {

private static final Logger LOG = LoggerFactory.getLogger(JvmUtils.class);

/**
* Creates a thread dump of the current JVM.
*
@@ -56,6 +64,39 @@ public static Optional<ThreadInfoSample> createThreadInfoSample(
return ThreadInfoSample.from(threadMxBean.getThreadInfo(threadId, maxStackTraceDepth));
}

/**
* Creates a {@link ThreadInfoSample} for a specific thread. Contains thread traces if
* maxStackTraceDepth > 0.
*
* @param threadIds The IDs of the threads to create the thread dump for.
* @param maxStackTraceDepth The maximum number of entries in the stack trace to be collected.
* @return The thread information for the requested thread IDs.
*/
public static Collection<ThreadInfoSample> createThreadInfoSample(
Collection<Long> threadIds, int maxStackTraceDepth) {
ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
long[] threadIdsArray = threadIds.stream().mapToLong(l -> l).toArray();

ThreadInfo[] threadInfo = threadMxBean.getThreadInfo(threadIdsArray, maxStackTraceDepth);

List<ThreadInfo> threadInfoNoNulls =
IntStream.range(0, threadIdsArray.length)
.filter(
i -> {
if (threadInfo[i] == null) {
LOG.debug(
"FlameGraphs: thread {} is not alive or does not exist.",
threadIdsArray[i]);
return false;
}
return true;
})
.mapToObj(i -> threadInfo[i])
.collect(Collectors.toList());

return ThreadInfoSample.from(threadInfoNoNulls);
}

/** Private default constructor to avoid instantiation. */
private JvmUtils() {}
}

0 comments on commit b39d842

Please sign in to comment.