Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Formatter;
import java.util.logging.Handler;
Expand Down Expand Up @@ -179,11 +178,14 @@ private class LogRecordHandler extends Handler implements Runnable {
private final BlockingDeque<BeamFnApi.LogEntry> bufferedLogEntries =
new LinkedBlockingDeque<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
private final Future<?> bufferedLogWriter;
private final ThreadLocal<Consumer<BeamFnApi.LogEntry>> logEntryHandler;
/**
* Safe object publishing is not required since we only care if the thread that set
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on the logic here? As far as I can tell, the goal is that the log publishing thread (which is responsible for draining the queue) will only ever offer the message, while other threads will attempt to wait. So we don't need a thread local, we just need to know if this is the thread responsible for draining the queue.

Copy link
Copy Markdown
Member Author

@lukecwik lukecwik May 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more of a comment about not needing the volatile key word since the thread that sets it is the only thread that cares that it was set. All other threads compute the same result if it is null. Also, I believe it is adequately explained in the comments surrounding usage/setting in the code.

* this field is equal to the thread also attempting to add a log entry.
*/
private Thread logEntryHandlerThread;

private LogRecordHandler(ExecutorService executorService) {
bufferedLogWriter = executorService.submit(this);
logEntryHandler = new ThreadLocal<>();
}

@Override
Expand All @@ -204,19 +206,18 @@ public void publish(LogRecord record) {
builder.setTrace(getStackTraceAsString(record.getThrown()));
}
// The thread that sends log records should never perform a blocking publish and
// only insert log records best effort. We detect which thread is logging
// by using the thread local, defaulting to the blocking publish.
MoreObjects.firstNonNull(
logEntryHandler.get(), this::blockingPublish).accept(builder.build());
}

/** Blocks caller till enough space exists to publish this log entry. */
private void blockingPublish(BeamFnApi.LogEntry logEntry) {
try {
bufferedLogEntries.put(logEntry);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
// only insert log records best effort.
if (Thread.currentThread() != logEntryHandlerThread) {
// Blocks caller till enough space exists to publish this log entry.
try {
bufferedLogEntries.put(builder.build());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have a time limit and noisily complain if the queue isn't drained in a reasonable amount of time? I'm thinking about cases where the logging thread mysteriously dies, it would be good to say "unable to put log entry after N minutes".

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noisily complaining should be part of the container health check or logged to the docker container log. When those exist, this is an excellent place to wire logging as being an issue/bottleneck.

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
} else {
// Never blocks caller, will drop log message if buffer is full.
bufferedLogEntries.offer(builder.build());
}
}

Expand All @@ -225,7 +226,8 @@ public void run() {
// Logging which occurs in this thread will attempt to publish log entries into the
// above handler which should never block if the queue is full otherwise
// this thread will get stuck.
logEntryHandler.set(bufferedLogEntries::offer);
logEntryHandlerThread = Thread.currentThread();

List<BeamFnApi.LogEntry> additionalLogEntries =
new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
try {
Expand Down