From 25478d15b8907c834ad1bfa21ce1d40af5c67435 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Tue, 30 May 2017 13:15:31 -0700 Subject: [PATCH] [BEAM-1347] Remove the usage of a thread local on a potentially hot path --- .../harness/logging/BeamFnLoggingClient.java | 36 ++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java index c8d11ed8ff91..d56ee6d6ae95 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java @@ -38,7 +38,6 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; -import java.util.function.Consumer; import java.util.function.Function; import java.util.logging.Formatter; import java.util.logging.Handler; @@ -179,11 +178,14 @@ private class LogRecordHandler extends Handler implements Runnable { private final BlockingDeque bufferedLogEntries = new LinkedBlockingDeque<>(MAX_BUFFERED_LOG_ENTRY_COUNT); private final Future bufferedLogWriter; - private final ThreadLocal> logEntryHandler; + /** + * Safe object publishing is not required since we only care if the thread that set + * this field is equal to the thread also attempting to add a log entry. + */ + private Thread logEntryHandlerThread; private LogRecordHandler(ExecutorService executorService) { bufferedLogWriter = executorService.submit(this); - logEntryHandler = new ThreadLocal<>(); } @Override @@ -204,19 +206,18 @@ public void publish(LogRecord record) { builder.setTrace(getStackTraceAsString(record.getThrown())); } // The thread that sends log records should never perform a blocking publish and - // only insert log records best effort. We detect which thread is logging - // by using the thread local, defaulting to the blocking publish. - MoreObjects.firstNonNull( - logEntryHandler.get(), this::blockingPublish).accept(builder.build()); - } - - /** Blocks caller till enough space exists to publish this log entry. */ - private void blockingPublish(BeamFnApi.LogEntry logEntry) { - try { - bufferedLogEntries.put(logEntry); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); + // only insert log records best effort. + if (Thread.currentThread() != logEntryHandlerThread) { + // Blocks caller till enough space exists to publish this log entry. + try { + bufferedLogEntries.put(builder.build()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } else { + // Never blocks caller, will drop log message if buffer is full. + bufferedLogEntries.offer(builder.build()); } } @@ -225,7 +226,8 @@ public void run() { // Logging which occurs in this thread will attempt to publish log entries into the // above handler which should never block if the queue is full otherwise // this thread will get stuck. - logEntryHandler.set(bufferedLogEntries::offer); + logEntryHandlerThread = Thread.currentThread(); + List additionalLogEntries = new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT); try {