Skip to content

Commit

Permalink
Merge pull request #13702 from [BEAM-11474] Set log entry transform i…
Browse files Browse the repository at this point in the history
…d with best effort in Java SDK harness

[BEAM-11474] Set log entry transform id with best effort in Java SDK harness
  • Loading branch information
boyuanzz committed Jan 12, 2021
2 parents dd71c9a + 327ec71 commit 6c9da02
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker>
* don't use a ThreadLocal to allow testing the implementation of this class without having to run
* from multiple threads.
*/
private static final Map<Thread, ExecutionStateTracker> CURRENT_TRACKERS =
private static final Map<Long, ExecutionStateTracker> CURRENT_TRACKERS =
new ConcurrentHashMap<>();

private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5);
Expand Down Expand Up @@ -180,7 +180,17 @@ public int compareTo(ExecutionStateTracker o) {
* either is no current state or if the current thread is not currently tracking the state.
*/
public static @Nullable ExecutionState getCurrentExecutionState() {
ExecutionStateTracker tracker = CURRENT_TRACKERS.get(Thread.currentThread());
ExecutionStateTracker tracker = CURRENT_TRACKERS.get(Thread.currentThread().getId());
return tracker == null ? null : tracker.currentState;
}

/**
* Return the current {@link ExecutionState} of the thread with thread id, or {@code null} if
* there either is no current state or if the corresponding thread is not currently tracking the
* state.
*/
public static @Nullable ExecutionState getCurrentExecutionState(long threadId) {
ExecutionStateTracker tracker = CURRENT_TRACKERS.get(threadId);
return tracker == null ? null : tracker.currentState;
}

Expand All @@ -203,7 +213,7 @@ public synchronized Closeable activate(Thread thread) {
checkState(
trackedThread == null, "Cannot activate an ExecutionStateTracker that is already in use.");

ExecutionStateTracker other = CURRENT_TRACKERS.put(thread, this);
ExecutionStateTracker other = CURRENT_TRACKERS.put(thread.getId(), this);
checkState(
other == null,
"Execution state of thread {} was already being tracked by {}",
Expand All @@ -224,7 +234,9 @@ public Thread getTrackedThread() {
private synchronized void deactivate() {
sampler.removeTracker(this);
Thread thread = this.trackedThread;
CURRENT_TRACKERS.remove(thread);
if (thread != null) {
CURRENT_TRACKERS.remove(thread.getId());
}
this.trackedThread = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.SimpleExecutionState;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions;
Expand All @@ -53,6 +57,7 @@
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCallStreamObserver;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientResponseObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand Down Expand Up @@ -215,6 +220,16 @@ public void publish(LogRecord record) {
if (loggerName != null) {
builder.setLogLocation(loggerName);
}
ExecutionState state = ExecutionStateTracker.getCurrentExecutionState(record.getThreadID());
if (state instanceof SimpleExecutionState) {
String transformId =
((SimpleExecutionState) state)
.getLabels()
.getOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, "");
if (!Strings.isNullOrEmpty(transformId)) {
builder.setTransformId(transformId);
}
}

// The thread that sends log records should never perform a blocking publish and
// only insert log records best effort.
Expand Down

0 comments on commit 6c9da02

Please sign in to comment.