From 649907cf814b15bce65010907538c9a97fe3156f Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Tue, 25 Jul 2017 16:54:48 -0700 Subject: [PATCH] Add logging when Staging takes a long time --- .../beam/runners/dataflow/util/PackageUtil.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java index 565e965d4eb8..2d2c63a6f643 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java @@ -52,6 +52,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Internal; @@ -389,7 +391,19 @@ public DataflowPackage apply(StagingResult stagingResult) { } try { - List stagedPackages = Futures.allAsList(destinationPackages).get(); + ListenableFuture> stagingFutures = + Futures.allAsList(destinationPackages); + boolean finished = false; + do { + try { + stagingFutures.get(3L, TimeUnit.MINUTES); + finished = true; + } catch (TimeoutException e) { + // finished will still be false + LOG.info("Still staging {} files", classpathElements.size()); + } + } while (!finished); + List stagedPackages = stagingFutures.get(); LOG.info( "Staging files complete: {} files cached, {} files newly uploaded", numCached.get(), numUploaded.get());