Skip to content

Commit

Permalink
Delayed rollforward of commit 8fb311b.
Browse files Browse the repository at this point in the history
This was rolled back due to Tensorflow breakage but the patch I exported to gerrit (https://bazel-review.googlesource.com/c/bazel/+/18590) passed Tensorflow (https://ci.bazel.io/job/bazel/job/presubmit/52/Downstream_projects/). Confirmed with jcater@ that the "newly failing" projects in the Global Tests are known issues. I think we can check this in now.

Additionally I had attempted to reproduce any tensorflow issues with this by building and testing TensorFlow locally with this patch, and all tests which passed with the released bazel had also passed with this patch.

================= Original change description ==========================

Reinstate idleness checks where the server self-terminates when it's idle and there is either too much memory pressure or the workspace directory is gone.

Arguably, it should kill itself when the workspace directory is gone regardless of whether it's idle or not, but let's first get us back to a known good state, then we can think about improvements.

RELNOTES: None.
PiperOrigin-RevId: 172361085
  • Loading branch information
kush-c authored and buchgr committed Oct 18, 2017
1 parent 0faf171 commit 4869c4e
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 37 deletions.
117 changes: 87 additions & 30 deletions src/main/java/com/google/devtools/build/lib/server/GrpcServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,25 @@ public class GrpcServerImpl implements RPCServer {

private static final long NANOSECONDS_IN_MS = TimeUnit.MILLISECONDS.toNanos(1);

private static final long NANOS_PER_IDLE_CHECK =
TimeUnit.NANOSECONDS.convert(5, TimeUnit.SECONDS);

private class RunningCommand implements AutoCloseable {
private final Thread thread;
private final String id;

private RunningCommand() {
private RunningCommand() throws InterruptedException {
thread = Thread.currentThread();
id = UUID.randomUUID().toString();
synchronized (runningCommands) {
if (runningCommands.isEmpty()) {
busy();
}

if (shuttingDown) {
throw new InterruptedException();
}

runningCommands.put(id, this);
runningCommands.notify();
}
Expand Down Expand Up @@ -443,21 +451,57 @@ public void write(int byteAsInt) throws IOException {
}
}

// The synchronized block is here so that if the "PID file deleted" timer or the idle shutdown
// mechanism kicks in during a regular shutdown, they don't race.
@VisibleForTesting // productionVisibility = Visibility.PRIVATE
void signalShutdown() {
synchronized (runningCommands) {
shuttingDown = true;
server.shutdown();
}
}

/**
* A thread that watches if the PID file changes and shuts down the server immediately if so.
* A thread that shuts the server down under the following conditions:
*
* <ul>
* <li>The PID file changes (in this case, *very* quickly)
* <li>The workspace directory is deleted
* <li>There is too much memory pressure on the host
* </ul>
*/
private class PidFileWatcherThread extends Thread {
private boolean shuttingDown = false;
private class ShutdownWatcherThread extends Thread {
private long lastIdleCheckNanos;

private PidFileWatcherThread() {
super("pid-file-watcher");
private ShutdownWatcherThread() {
super("grpc-server-shutdown-watcher");
setDaemon(true);
}

// The synchronized block is here so that if the "PID file deleted" timer kicks in during a
// regular shutdown, they don't race.
private synchronized void signalShutdown() {
shuttingDown = true;
private void doIdleChecksMaybe() {
synchronized (runningCommands) {
if (!runningCommands.isEmpty()) {
lastIdleCheckNanos = -1;
return;
}

long currentNanos = BlazeClock.nanoTime();
if (lastIdleCheckNanos == -1) {
lastIdleCheckNanos = currentNanos;
return;
}

if (currentNanos - lastIdleCheckNanos < NANOS_PER_IDLE_CHECK) {
return;
}

if (!idleServerTasks.continueProcessing()) {
signalShutdown();
server.shutdown();
}

lastIdleCheckNanos = currentNanos;
}
}

@Override
Expand All @@ -473,8 +517,12 @@ public void run() {
// Handled by virtue of ok not being set to true
}

if (ok) {
doIdleChecksMaybe();
}

if (!ok) {
synchronized (PidFileWatcherThread.this) {
synchronized (ShutdownWatcherThread.this) {
if (shuttingDown) {
logger.warning("PID file deleted or overwritten but shutdown is already in progress");
break;
Expand Down Expand Up @@ -510,24 +558,21 @@ public void run() {
private final String responseCookie;
private final AtomicLong interruptCounter = new AtomicLong(0);
private final int maxIdleSeconds;
private final PidFileWatcherThread pidFileWatcherThread;
private final ShutdownWatcherThread shutdownWatcherThread;
private final Path pidFile;
private final String pidInFile;
private final List<Path> filesToDeleteAtExit = new ArrayList<>();
private final int port;

private Server server;
private IdleServerTasks idleServerTasks;
boolean serving;
private InetSocketAddress address;
private boolean serving;
private boolean shuttingDown = false;

public GrpcServerImpl(CommandExecutor commandExecutor, Clock clock, int port,
Path workspace, Path serverDirectory, int maxIdleSeconds) throws IOException {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
shutdownHook();
}
});
Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdownHook()));

// server.pid was written in the C++ launcher after fork() but before exec() .
// The client only accesses the pid file after connecting to the socket
Expand Down Expand Up @@ -556,12 +601,22 @@ public void run() {
requestCookie = generateCookie(random, 16);
responseCookie = generateCookie(random, 16);

pidFileWatcherThread = new PidFileWatcherThread();
pidFileWatcherThread.start();
shutdownWatcherThread = new ShutdownWatcherThread();
shutdownWatcherThread.start();
idleServerTasks = new IdleServerTasks(workspace);
idleServerTasks.idle();
}

@VisibleForTesting // productionVisibility = Visibility.PRIVATE
String getRequestCookie() {
return requestCookie;
}

@VisibleForTesting // productionVisibility = Visibility.PRIVATE
InetSocketAddress getAddress() {
return address;
}

private void idle() {
Preconditions.checkState(idleServerTasks == null);
idleServerTasks = new IdleServerTasks(workspace);
Expand Down Expand Up @@ -646,7 +701,7 @@ private void timeoutThread() {
}
}

server.shutdown();
signalShutdown();
}

/**
Expand All @@ -672,7 +727,7 @@ private void timeoutThread() {
@Override
public void prepareForAbruptShutdown() {
disableShutdownHooks();
pidFileWatcherThread.signalShutdown();
signalShutdown();
}

@Override
Expand Down Expand Up @@ -719,7 +774,7 @@ public void serve() throws IOException {
timeoutThread.start();
}
serving = true;

this.address = new InetSocketAddress(address.getAddress(), server.getPort());
writeServerFile(
PORT_FILE, InetAddresses.toUriString(address.getAddress()) + ":" + server.getPort());
writeServerFile(REQUEST_COOKIE_FILE, requestCookie);
Expand Down Expand Up @@ -761,9 +816,7 @@ private void shutdownHook() {
}
}

/**
* Schedule the specified file for (attempted) deletion at JVM exit.
*/
/** Schedule the specified file for (attempted) deletion at JVM exit. */
protected void deleteAtExit(final Path path) {
synchronized (filesToDeleteAtExit) {
filesToDeleteAtExit.add(path);
Expand All @@ -783,8 +836,8 @@ static void printStack(IOException e) {
logger.severe(err.toString());
}

private void executeCommand(
RunRequest request, StreamObserver<RunResponse> observer, GrpcSink sink) {
@VisibleForTesting // productionVisibility = Visibility.PRIVATE
void executeCommand(RunRequest request, StreamObserver<RunResponse> observer, GrpcSink sink) {
sink.setCommandThread(Thread.currentThread());

if (!request.getCookie().equals(requestCookie) || request.getClientDescription().isEmpty()) {
Expand Down Expand Up @@ -880,7 +933,7 @@ private void executeCommand(

boolean shutdown = commandExecutor.shutdown();
if (shutdown) {
server.shutdown();
signalShutdown();
}
RunResponse response =
RunResponse.newBuilder()
Expand Down Expand Up @@ -927,6 +980,8 @@ public void ping(PingRequest pingRequest, StreamObserver<PingResponse> streamObs

streamObserver.onNext(response.build());
streamObserver.onCompleted();
} catch (InterruptedException e) {
// Ignore, we are shutting down anyway
}
}

Expand Down Expand Up @@ -969,6 +1024,8 @@ private void doCancel(
logger.info(
"Client cancelled RPC of cancellation request for " + request.getCommandId());
}
} catch (InterruptedException e) {
// Ignore, we are shutting down anyway
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.google.devtools.build.lib.server;

import com.google.devtools.build.lib.clock.BlazeClock;
import com.google.devtools.build.lib.profiler.AutoProfiler;
import com.google.devtools.build.lib.unix.ProcMeminfoParser;
import com.google.devtools.build.lib.util.LoggingUtil;
Expand All @@ -35,11 +36,12 @@
*/
class IdleServerTasks {

private long idleStart;
private final Path workspaceDir;
private final ScheduledThreadPoolExecutor executor;
private static final Logger logger = Logger.getLogger(IdleServerTasks.class.getName());

private static final long FIVE_MIN_MILLIS = 1000 * 60 * 5;
private static final long FIVE_MIN_NANOS = 1000L * 1000 * 1000 * 60 * 5;

/**
* Must be called from the main thread.
Expand All @@ -56,6 +58,7 @@ public IdleServerTasks(@Nullable Path workspaceDir) {
public void idle() {
Preconditions.checkState(!executor.isShutdown());

idleStart = BlazeClock.nanoTime();
// Do a GC cycle while the server is idle.
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
Expand Down Expand Up @@ -99,11 +102,11 @@ public void busy() {
}

/**
* Return true iff the server should continue processing requests.
* Called from the main thread, so it should return quickly.
* Return true iff the server should continue processing requests. Called from the main thread, so
* it should return quickly.
*/
public boolean continueProcessing(long idleMillis) {
if (!memoryHeuristic(idleMillis)) {
public boolean continueProcessing() {
if (!memoryHeuristic()) {
return false;
}
if (workspaceDir == null) {
Expand All @@ -121,8 +124,10 @@ public boolean continueProcessing(long idleMillis) {
return stat != null && stat.isDirectory();
}

private boolean memoryHeuristic(long idleMillis) {
if (idleMillis < FIVE_MIN_MILLIS) {
private boolean memoryHeuristic() {
Preconditions.checkState(!executor.isShutdown());
long idleNanos = BlazeClock.nanoTime() - idleStart;
if (idleNanos < FIVE_MIN_NANOS) {
// Don't check memory health until after five minutes.
return true;
}
Expand Down
4 changes: 4 additions & 0 deletions src/test/java/com/google/devtools/build/lib/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -765,9 +765,13 @@ java_test(
"//src/main/java/com/google/devtools/build/lib:runtime",
"//src/main/java/com/google/devtools/build/lib:unix",
"//src/main/java/com/google/devtools/build/lib:util",
"//src/main/java/com/google/devtools/build/lib/clock",
"//src/main/java/com/google/devtools/build/lib/collect",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs",
"//src/main/protobuf:command_server_java_grpc",
"//src/main/protobuf:command_server_java_proto",
"//src/main/protobuf:invocation_policy_java_proto",
"//third_party:guava",
"//third_party:guava-testlib",
"//third_party:jsr305",
Expand Down
Loading

0 comments on commit 4869c4e

Please sign in to comment.