From caacf297023d0d6e4a6bfe2f7ccd6edb73914d89 Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 17 Oct 2016 11:21:02 -0700 Subject: [PATCH] Support waitUntilFinish, cancel in the DirectRunner --- .../beam/runners/direct/DirectRunner.java | 43 +++--- .../ExecutorServiceParallelExecutor.java | 137 +++++++++++++----- .../beam/runners/direct/PipelineExecutor.java | 23 ++- .../direct/TransformExecutorService.java | 6 + .../direct/TransformExecutorServices.java | 22 ++- .../beam/runners/direct/DirectRunnerTest.java | 55 +++++++ .../main/resources/beam/findbugs-filter.xml | 34 +++++ 7 files changed, 264 insertions(+), 56 deletions(-) diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 06aa3b1356b22..4992c6a7a1865 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -420,14 +419,34 @@ public MetricResults metrics() { * {@link DirectOptions#isShutdownUnboundedProducersWithMaxWatermark()} set to false, * this method will never return. * - *

See also {@link PipelineExecutor#awaitCompletion()}. + *

See also {@link PipelineExecutor#waitUntilFinish(Duration)}. */ @Override public State waitUntilFinish() { - if (!state.isTerminal()) { + return waitUntilFinish(Duration.ZERO); + } + + @Override + public State cancel() { + this.state = executor.getPipelineState(); + if (!this.state.isTerminal()) { + executor.stop(); + this.state = executor.getPipelineState(); + } + return executor.getPipelineState(); + } + + @Override + public State waitUntilFinish(Duration duration) { + State startState = this.state; + if (!startState.isTerminal()) { try { - executor.awaitCompletion(); - state = State.DONE; + state = executor.waitUntilFinish(duration); + } catch (UserCodeException uce) { + // Emulates the behavior of Pipeline#run(), where a stack trace caused by a + // UserCodeException is truncated and replaced with the stack starting at the call to + // waitToFinish + throw new Pipeline.PipelineExecutionException(uce.getCause()); } catch (Exception e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); @@ -438,19 +457,7 @@ public State waitUntilFinish() { throw new RuntimeException(e); } } - return state; - } - - @Override - public State cancel() throws IOException { - throw new UnsupportedOperationException("DirectPipelineResult does not support cancel."); - } - - @Override - public State waitUntilFinish(Duration duration) { - throw new UnsupportedOperationException( - "DirectPipelineResult does not support waitUntilFinish with a Duration parameter. See" - + " BEAM-596."); + return this.state; } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 935104ac4140e..8b9f9951ca003 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -25,6 +25,8 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.Collection; @@ -48,6 +50,7 @@ import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.FiredTimers; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.UserCodeException; @@ -55,6 +58,8 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,6 +104,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { * {@link CompletionCallback} decrement this value. */ private final AtomicLong outstandingWork = new AtomicLong(); + private AtomicReference pipelineState = new AtomicReference<>(State.RUNNING); public static ExecutorServiceParallelExecutor create( int targetParallelism, @@ -138,7 +144,10 @@ private ExecutorServiceParallelExecutor( // Executing TransformExecutorServices have a strong reference to their TransformExecutorService // which stops the TransformExecutorServices from being prematurely garbage collected executorServices = - CacheBuilder.newBuilder().weakValues().build(serialTransformExecutorServiceCacheLoader()); + CacheBuilder.newBuilder() + .weakValues() + .removalListener(shutdownExecutorServiceListener()) + .build(serialTransformExecutorServiceCacheLoader()); this.allUpdates = new ConcurrentLinkedQueue<>(); this.visibleUpdates = new LinkedBlockingQueue<>(); @@ -159,6 +168,19 @@ public TransformExecutorService load(StepAndKey stepAndKey) throws Exception { }; } + private RemovalListener shutdownExecutorServiceListener() { + return new RemovalListener() { + @Override + public void onRemoval( + RemovalNotification notification) { + TransformExecutorService service = notification.getValue(); + if (service != null) { + service.shutdown(); + } + } + }; + } + @Override public void start(Collection> roots) { int numTargetSplits = Math.max(3, targetParallelism); @@ -179,7 +201,7 @@ public void start(Collection> roots) { } @SuppressWarnings("unchecked") - public void scheduleConsumption( + private void scheduleConsumption( AppliedPTransform consumer, CommittedBundle bundle, CompletionCallback onComplete) { @@ -219,7 +241,9 @@ private void evaluateBundle( onComplete, transformExecutor); outstandingWork.incrementAndGet(); - transformExecutor.schedule(callable); + if (!pipelineState.get().isTerminal()) { + transformExecutor.schedule(callable); + } } private boolean isKeyed(PValue pvalue) { @@ -234,20 +258,66 @@ private void scheduleConsumers(ExecutorUpdate update) { } @Override - public void awaitCompletion() throws Exception { - VisibleExecutorUpdate update; - do { - // Get an update; don't block forever if another thread has handled it - update = visibleUpdates.poll(2L, TimeUnit.SECONDS); - if (update == null && executorService.isShutdown()) { + public State waitUntilFinish(Duration duration) throws Exception { + Instant completionTime; + if (duration.equals(Duration.ZERO)) { + completionTime = new Instant(Long.MAX_VALUE); + } else { + completionTime = Instant.now().plus(duration); + } + + VisibleExecutorUpdate update = null; + while (Instant.now().isBefore(completionTime) + && (update == null || isTerminalStateUpdate(update))) { + // Get an update; don't block forever if another thread has handled it. The call to poll will + // wait the entire timeout; this call primarily exists to relinquish any core. + update = visibleUpdates.poll(25L, TimeUnit.MILLISECONDS); + if (update == null && pipelineState.get().isTerminal()) { // there are no updates to process and no updates will ever be published because the // executor is shutdown - return; + return pipelineState.get(); } else if (update != null && update.exception.isPresent()) { throw update.exception.get(); } - } while (update == null || !update.isDone()); + } + return pipelineState.get(); + } + + @Override + public State getPipelineState() { + return pipelineState.get(); + } + + private boolean isTerminalStateUpdate(VisibleExecutorUpdate update) { + return !(update.getNewState() == null && update.getNewState().isTerminal()); + } + + @Override + public void stop() { + shutdownIfNecessary(State.CANCELLED); + while (!visibleUpdates.offer(VisibleExecutorUpdate.cancelled())) { + // Make sure "This Pipeline was Cancelled" notification arrives. + visibleUpdates.poll(); + } + } + + private void shutdownIfNecessary(State newState) { + if (!newState.isTerminal()) { + return; + } + LOG.debug("Pipeline has terminated. Shutting down."); + pipelineState.compareAndSet(State.RUNNING, newState); + // Stop accepting new work before shutting down the executor. This ensures that thread don't try + // to add work to the shutdown executor. + executorServices.invalidateAll(); + executorServices.cleanUp(); + parallelExecutorService.shutdown(); executorService.shutdown(); + try { + registry.cleanup(); + } catch (Exception e) { + visibleUpdates.add(VisibleExecutorUpdate.fromException(e)); + } } /** @@ -341,29 +411,35 @@ public static ExecutorUpdate fromException(Exception e) { } /** - * An update of interest to the user. Used in {@link #awaitCompletion} to decide whether to + * An update of interest to the user. Used in {@link #waitUntilFinish} to decide whether to * return normally or throw an exception. */ private static class VisibleExecutorUpdate { private final Optional exception; - private final boolean done; + @Nullable + private final State newState; public static VisibleExecutorUpdate fromException(Exception e) { - return new VisibleExecutorUpdate(false, e); + return new VisibleExecutorUpdate(null, e); } public static VisibleExecutorUpdate finished() { - return new VisibleExecutorUpdate(true, null); + return new VisibleExecutorUpdate(State.DONE, null); + } + + public static VisibleExecutorUpdate cancelled() { + return new VisibleExecutorUpdate(State.CANCELLED, null); } - private VisibleExecutorUpdate(boolean done, @Nullable Exception exception) { + private VisibleExecutorUpdate(State newState, @Nullable Exception exception) { this.exception = Optional.fromNullable(exception); - this.done = done; + this.newState = newState; } - public boolean isDone() { - return done; + public State getNewState() { + return newState; } + } private class MonitorRunnable implements Runnable { @@ -475,22 +551,15 @@ private void fireTimers() throws Exception { } private boolean shouldShutdown() { - boolean shouldShutdown = exceptionThrown || evaluationContext.isDone(); - if (shouldShutdown) { - LOG.debug("Pipeline has terminated. Shutting down."); - executorService.shutdown(); - try { - registry.cleanup(); - } catch (Exception e) { - visibleUpdates.add(VisibleExecutorUpdate.fromException(e)); - } - if (evaluationContext.isDone()) { - while (!visibleUpdates.offer(VisibleExecutorUpdate.finished())) { - visibleUpdates.poll(); - } - } + State nextState = State.UNKNOWN; + if (exceptionThrown) { + nextState = State.FAILED; + } else if (evaluationContext.isDone()) { + visibleUpdates.offer(VisibleExecutorUpdate.finished()); + nextState = State.DONE; } - return shouldShutdown; + shutdownIfNecessary(nextState); + return pipelineState.get().isTerminal(); } /** diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java index f900a22d32b1e..82f59a7c32882 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PipelineExecutor.java @@ -19,8 +19,11 @@ import java.util.Collection; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.joda.time.Duration; /** * An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both @@ -40,8 +43,24 @@ interface PipelineExecutor { * root {@link AppliedPTransform AppliedPTransforms} have completed, and all * {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally. * - * @throws Throwable whenever an executor thread throws anything, transfers the throwable to the + *

Waits for up to the provided duration, or forever if the provided duration is less than or + * equal to zero. + * + * @return The terminal state of the Pipeline. + * @throws Exception whenever an executor thread throws anything, transfers to the * waiting thread and rethrows it */ - void awaitCompletion() throws Exception; + State waitUntilFinish(Duration duration) throws Exception; + + /** + * Gets the current state of the {@link Pipeline}. + */ + State getPipelineState(); + + /** + * Shuts down the executor. + * + *

The executor may continue to run for a short time after this method returns. + */ + void stop(); } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java index 837b858020f1e..c6f770f8012e1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorService.java @@ -32,4 +32,10 @@ interface TransformExecutorService { * {@link TransformExecutor TransformExecutors} to be evaluated. */ void complete(TransformExecutor completed); + + /** + * Cancel any outstanding work, if possible. Any future calls to schedule should ignore any + * work. + */ + void shutdown(); } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java index 876da9d5cd2ea..673375827b8e1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java @@ -21,6 +21,7 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** @@ -56,6 +57,7 @@ public static TransformExecutorService serial(ExecutorService executor) { */ private static class ParallelEvaluationState implements TransformExecutorService { private final ExecutorService executor; + private final AtomicBoolean active = new AtomicBoolean(true); private ParallelEvaluationState(ExecutorService executor) { this.executor = executor; @@ -63,12 +65,19 @@ private ParallelEvaluationState(ExecutorService executor) { @Override public void schedule(TransformExecutor work) { - executor.submit(work); + if (active.get()) { + executor.submit(work); + } } @Override public void complete(TransformExecutor completed) { } + + @Override + public void shutdown() { + active.set(false); + } } /** @@ -84,6 +93,7 @@ private static class SerialEvaluationState implements TransformExecutorService { private AtomicReference> currentlyEvaluating; private final Queue> workQueue; + private boolean active = true; private SerialEvaluationState(ExecutorService executor) { this.executor = executor; @@ -113,12 +123,20 @@ public void complete(TransformExecutor completed) { updateCurrentlyEvaluating(); } + @Override + public void shutdown() { + synchronized (this) { + active = false; + } + workQueue.clear(); + } + private void updateCurrentlyEvaluating() { if (currentlyEvaluating.get() == null) { // Only synchronize if we need to update what's currently evaluating synchronized (this) { TransformExecutor newWork = workQueue.poll(); - if (newWork != null) { + if (active && newWork != null) { if (currentlyEvaluating.compareAndSet(null, newWork)) { executor.submit(newWork); } else { diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index d2b6d1d53f2a3..e601fcf34a944 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -32,9 +32,16 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -49,6 +56,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -65,6 +73,7 @@ import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.TypeDescriptor; import org.hamcrest.Matchers; +import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; import org.junit.internal.matchers.ThrowableMessageMatcher; @@ -221,6 +230,52 @@ public void splitsInputs() { p.run(); } + @Test + public void cancelShouldStopPipeline() throws Exception { + PipelineOptions opts = TestPipeline.testingPipelineOptions(); + opts.as(DirectOptions.class).setBlockOnRun(false); + opts.setRunner(DirectRunner.class); + + final Pipeline p = Pipeline.create(opts); + p.apply(CountingInput.unbounded().withRate(1L, Duration.standardSeconds(1))); + + final BlockingQueue resultExchange = new ArrayBlockingQueue<>(1); + Runnable cancelRunnable = new Runnable() { + @Override + public void run() { + try { + resultExchange.take().cancel(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + }; + + Callable runPipelineRunnable = new Callable() { + @Override + public PipelineResult call() { + PipelineResult res = p.run(); + try { + resultExchange.put(res); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + return res; + } + }; + + ExecutorService executor = Executors.newCachedThreadPool(); + executor.submit(cancelRunnable); + Future result = executor.submit(runPipelineRunnable); + + // If cancel doesn't work, this will hang forever + result.get().waitUntilFinish(); + } + @Test public void transformDisplayDataExceptionShouldFail() { DoFn brokenDoFn = new DoFn() { diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index 714fd00c13bd9..2799b00dfdbd8 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -154,6 +154,40 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + +