From 23e2b913946acb2690fbac2d751a5672d80121aa Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Wed, 25 Jan 2017 21:04:20 -0800 Subject: [PATCH 1/2] Recommit "DataflowRunner: parallelize staging of files" Revert "This closes #1847" This reverts commit 1c6e667414788fe99f583fac39d458a4984ae162, reversing changes made to 6413299a20be57de849684479134479fa1acee2d. --- runners/google-cloud-dataflow-java/pom.xml | 5 + .../beam/runners/dataflow/util/GcsStager.java | 18 +- .../runners/dataflow/util/PackageUtil.java | 349 +++++++++++------- .../dataflow/util/PackageUtilTest.java | 42 ++- .../apache/beam/sdk/options/GcsOptions.java | 4 +- .../org/apache/beam/sdk/util/GcsUtil.java | 12 + 6 files changed, 281 insertions(+), 149 deletions(-) diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index eea5502c293c2..9858b3dbee5ee 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -202,6 +202,11 @@ google-api-services-clouddebugger + + com.google.apis + google-api-services-storage + + com.google.auth google-auth-library-credentials diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java index 6ca4c3f789130..53822e3fa3cf1 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java @@ -17,13 +17,19 @@ */ package org.apache.beam.runners.dataflow.util; +import static com.google.common.base.MoreObjects.firstNonNull; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import com.google.api.services.dataflow.model.DataflowPackage; +import com.google.api.services.storage.Storage; import java.util.List; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; +import org.apache.beam.sdk.util.Transport; /** * Utility class for staging files to GCS. @@ -35,6 +41,7 @@ private GcsStager(DataflowPipelineOptions options) { this.options = options; } + @SuppressWarnings("unused") // used via reflection public static GcsStager fromOptions(PipelineOptions options) { return new GcsStager(options.as(DataflowPipelineOptions.class)); } @@ -48,7 +55,16 @@ public List stageFiles() { if (windmillBinary != null) { filesToStage.add("windmill_main=" + windmillBinary); } + int uploadSizeBytes = firstNonNull(options.getGcsUploadBufferSizeBytes(), 1024 * 1024); + checkArgument(uploadSizeBytes > 0, "gcsUploadBufferSizeBytes must be > 0"); + uploadSizeBytes = Math.min(uploadSizeBytes, 1024 * 1024); + Storage.Builder storageBuilder = Transport.newStorageClient(options); + GcsUtil util = GcsUtilFactory.create( + storageBuilder.build(), + storageBuilder.getHttpRequestInitializer(), + options.getExecutorService(), + uploadSizeBytes); return PackageUtil.stageClasspathElements( - options.getFilesToStage(), options.getStagingLocation()); + options.getFilesToStage(), options.getStagingLocation(), util); } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java index 6d910baba3724..fa8c94d95cf7c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java @@ -17,89 +17,81 @@ */ package org.apache.beam.runners.dataflow.util; +import static com.google.common.base.Preconditions.checkArgument; + import com.fasterxml.jackson.core.Base64Variants; import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.dataflow.model.DataflowPackage; import com.google.cloud.hadoop.util.ApiErrorExtractor; +import com.google.common.collect.Lists; import com.google.common.hash.Funnels; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import com.google.common.io.CountingOutputStream; import com.google.common.io.Files; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; -import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; import java.util.List; import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.GcsIOChannelFactory; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.ZipFiles; +import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Helper routines for packages. */ -public class PackageUtil { +class PackageUtil { private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class); /** * A reasonable upper bound on the number of jars required to launch a Dataflow job. */ - public static final int SANE_CLASSPATH_SIZE = 1000; - /** - * The initial interval to use between package staging attempts. - */ - private static final Duration INITIAL_BACKOFF_INTERVAL = Duration.standardSeconds(5); - /** - * The maximum number of retries when staging a file. - */ - private static final int MAX_RETRIES = 4; + private static final int SANE_CLASSPATH_SIZE = 1000; private static final FluentBackoff BACKOFF_FACTORY = - FluentBackoff.DEFAULT - .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_INTERVAL); + FluentBackoff.DEFAULT.withMaxRetries(4).withInitialBackoff(Duration.standardSeconds(5)); /** * Translates exceptions from API calls. */ private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor(); - /** - * Creates a DataflowPackage containing information about how a classpath element should be - * staged, including the staging destination as well as its size and hash. - * - * @param classpathElement The local path for the classpath element. - * @param stagingPath The base location for staged classpath elements. - * @param overridePackageName If non-null, use the given value as the package name - * instead of generating one automatically. - * @return The package. - */ - @Deprecated - public static DataflowPackage createPackage(File classpathElement, - String stagingPath, String overridePackageName) { - return createPackageAttributes(classpathElement, stagingPath, overridePackageName) - .getDataflowPackage(); - } - /** * Compute and cache the attributes of a classpath element that we will need to stage it. * - * @param classpathElement the file or directory to be staged. + * @param source the file or directory to be staged. * @param stagingPath The base location for staged classpath elements. * @param overridePackageName If non-null, use the given value as the package name * instead of generating one automatically. * @return a {@link PackageAttributes} that containing metadata about the object to be staged. */ - static PackageAttributes createPackageAttributes(File classpathElement, - String stagingPath, String overridePackageName) { + static PackageAttributes createPackageAttributes(File source, + String stagingPath, @Nullable String overridePackageName) { try { - boolean directory = classpathElement.isDirectory(); + boolean directory = source.isDirectory(); // Compute size and hash in one pass over file or directory. Hasher hasher = Hashing.md5().newHasher(); @@ -108,142 +100,232 @@ static PackageAttributes createPackageAttributes(File classpathElement, if (!directory) { // Files are staged as-is. - Files.asByteSource(classpathElement).copyTo(countingOutputStream); + Files.asByteSource(source).copyTo(countingOutputStream); } else { // Directories are recursively zipped. - ZipFiles.zipDirectory(classpathElement, countingOutputStream); + ZipFiles.zipDirectory(source, countingOutputStream); } long size = countingOutputStream.getCount(); String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes()); // Create the DataflowPackage with staging name and location. - String uniqueName = getUniqueContentName(classpathElement, hash); + String uniqueName = getUniqueContentName(source, hash); String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName); DataflowPackage target = new DataflowPackage(); target.setName(overridePackageName != null ? overridePackageName : uniqueName); target.setLocation(resourcePath); - return new PackageAttributes(size, hash, directory, target); + return new PackageAttributes(size, hash, directory, target, source.getPath()); } catch (IOException e) { - throw new RuntimeException("Package setup failure for " + classpathElement, e); + throw new RuntimeException("Package setup failure for " + source, e); } } - /** - * Transfers the classpath elements to the staging location. - * - * @param classpathElements The elements to stage. - * @param stagingPath The base location to stage the elements to. - * @return A list of cloud workflow packages, each representing a classpath element. - */ - public static List stageClasspathElements( - Collection classpathElements, String stagingPath) { - return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT); - } - - // Visible for testing. - static List stageClasspathElements( - Collection classpathElements, String stagingPath, - Sleeper retrySleeper) { - LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to " - + "prepare for execution.", classpathElements.size()); - - if (classpathElements.size() > SANE_CLASSPATH_SIZE) { - LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically " - + "copies to all workers. Having this many entries on your classpath may be indicative " - + "of an issue in your pipeline. You may want to consider trimming the classpath to " - + "necessary dependencies only, using --filesToStage pipeline option to override " - + "what files are being staged, or bundling several dependencies into one.", - classpathElements.size()); - } - - ArrayList packages = new ArrayList<>(); + /** Utility comparator used in uploading packages efficiently. */ + private static class PackageUploadOrder implements Comparator { + @Override + public int compare(PackageAttributes o1, PackageAttributes o2) { + // Smaller size compares high so that bigger packages are uploaded first. + long sizeDiff = o2.getSize() - o1.getSize(); + if (sizeDiff != 0) { + // returns sign of long + return Long.signum(sizeDiff); + } - if (stagingPath == null) { - throw new IllegalArgumentException( - "Can't stage classpath elements on because no staging location has been provided"); + // Otherwise, choose arbitrarily based on hash. + return o1.getHash().compareTo(o2.getHash()); } + } - int numUploaded = 0; - int numCached = 0; + /** + * Utility function that computes sizes and hashes of packages so that we can validate whether + * they have already been correctly staged. + */ + private static List computePackageAttributes( + Collection classpathElements, final String stagingPath, + ListeningExecutorService executorService) { + List> futures = new LinkedList<>(); for (String classpathElement : classpathElements) { - String packageName = null; + @Nullable String userPackageName = null; if (classpathElement.contains("=")) { String[] components = classpathElement.split("=", 2); - packageName = components[0]; + userPackageName = components[0]; classpathElement = components[1]; } + @Nullable final String packageName = userPackageName; - File file = new File(classpathElement); + final File file = new File(classpathElement); if (!file.exists()) { LOG.warn("Skipping non-existent classpath element {} that was specified.", classpathElement); continue; } - PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName); + ListenableFuture future = + executorService.submit(new Callable() { + @Override + public PackageAttributes call() throws Exception { + return createPackageAttributes(file, stagingPath, packageName); + } + }); + futures.add(future); + } + + try { + return Futures.allAsList(futures).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while staging packages", e); + } catch (ExecutionException e) { + throw new RuntimeException("Error while staging packages", e.getCause()); + } + } + + private static WritableByteChannel makeWriter(String target, GcsUtil gcsUtil) + throws IOException { + IOChannelFactory factory = IOChannelUtils.getFactory(target); + if (factory instanceof GcsIOChannelFactory) { + return gcsUtil.create(GcsPath.fromUri(target), MimeTypes.BINARY); + } else { + return factory.create(target, MimeTypes.BINARY); + } + } - DataflowPackage workflowPackage = attributes.getDataflowPackage(); - packages.add(workflowPackage); - String target = workflowPackage.getLocation(); + /** + * Utility to verify whether a package has already been staged and, if not, copy it to the + * staging location. + */ + private static void stageOnePackage( + PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached, + Sleeper retrySleeper, GcsUtil gcsUtil) { + String source = attributes.getSourcePath(); + String target = attributes.getDataflowPackage().getLocation(); - // TODO: Should we attempt to detect the Mime type rather than - // always using MimeTypes.BINARY? + // TODO: Should we attempt to detect the Mime type rather than + // always using MimeTypes.BINARY? + try { try { - try { - long remoteLength = IOChannelUtils.getSizeBytes(target); - if (remoteLength == attributes.getSize()) { - LOG.debug("Skipping classpath element already staged: {} at {}", - classpathElement, target); - numCached++; - continue; - } - } catch (FileNotFoundException expected) { - // If the file doesn't exist, it means we need to upload it. + long remoteLength = IOChannelUtils.getSizeBytes(target); + if (remoteLength == attributes.getSize()) { + LOG.debug("Skipping classpath element already staged: {} at {}", + attributes.getSourcePath(), target); + numCached.incrementAndGet(); + return; } + } catch (FileNotFoundException expected) { + // If the file doesn't exist, it means we need to upload it. + } - // Upload file, retrying on failure. - BackOff backoff = BACKOFF_FACTORY.backoff(); - while (true) { - try { - LOG.debug("Uploading classpath element {} to {}", classpathElement, target); - try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY)) { - copyContent(classpathElement, writer); - } - numUploaded++; - break; - } catch (IOException e) { - if (ERROR_EXTRACTOR.accessDenied(e)) { - String errorMessage = String.format( - "Uploaded failed due to permissions error, will NOT retry staging " - + "of classpath %s. Please verify credentials are valid and that you have " - + "write access to %s. Stale credentials can be resolved by executing " - + "'gcloud auth login'.", classpathElement, target); - LOG.error(errorMessage); - throw new IOException(errorMessage, e); - } - long sleep = backoff.nextBackOffMillis(); - if (sleep == BackOff.STOP) { - // Rethrow last error, to be included as a cause in the catch below. - LOG.error("Upload failed, will NOT retry staging of classpath: {}", - classpathElement, e); - throw e; - } else { - LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}", - classpathElement, e); - retrySleeper.sleep(sleep); - } + // Upload file, retrying on failure. + BackOff backoff = BACKOFF_FACTORY.backoff(); + while (true) { + try { + LOG.debug("Uploading classpath element {} to {}", source, target); + try (WritableByteChannel writer = makeWriter(target, gcsUtil)) { + copyContent(source, writer); + } + numUploaded.incrementAndGet(); + break; + } catch (IOException e) { + if (ERROR_EXTRACTOR.accessDenied(e)) { + String errorMessage = String.format( + "Uploaded failed due to permissions error, will NOT retry staging " + + "of classpath %s. Please verify credentials are valid and that you have " + + "write access to %s. Stale credentials can be resolved by executing " + + "'gcloud auth application-default login'.", source, target); + LOG.error(errorMessage); + throw new IOException(errorMessage, e); + } + long sleep = backoff.nextBackOffMillis(); + if (sleep == BackOff.STOP) { + // Rethrow last error, to be included as a cause in the catch below. + LOG.error("Upload failed, will NOT retry staging of classpath: {}", + source, e); + throw e; + } else { + LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}", + source, e); + retrySleeper.sleep(sleep); } } - } catch (Exception e) { - throw new RuntimeException("Could not stage classpath element: " + classpathElement, e); } + } catch (Exception e) { + throw new RuntimeException("Could not stage classpath element: " + source, e); } + } - LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, " - + "{} files cached", - numUploaded, numCached); + /** + * Transfers the classpath elements to the staging location. + * + * @param classpathElements The elements to stage. + * @param stagingPath The base location to stage the elements to. + * @return A list of cloud workflow packages, each representing a classpath element. + */ + static List stageClasspathElements( + Collection classpathElements, String stagingPath, GcsUtil gcsUtil) { + ListeningExecutorService executorService = + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(32)); + try { + return stageClasspathElements( + classpathElements, stagingPath, Sleeper.DEFAULT, executorService, gcsUtil); + } finally { + executorService.shutdown(); + } + } + + // Visible for testing. + static List stageClasspathElements( + Collection classpathElements, final String stagingPath, + final Sleeper retrySleeper, ListeningExecutorService executorService, final GcsUtil gcsUtil) { + LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to " + + "prepare for execution.", classpathElements.size()); + + if (classpathElements.size() > SANE_CLASSPATH_SIZE) { + LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically " + + "copies to all workers. Having this many entries on your classpath may be indicative " + + "of an issue in your pipeline. You may want to consider trimming the classpath to " + + "necessary dependencies only, using --filesToStage pipeline option to override " + + "what files are being staged, or bundling several dependencies into one.", + classpathElements.size()); + } + + checkArgument( + stagingPath != null, + "Can't stage classpath elements because no staging location has been provided"); + + // Inline a copy here because the inner code returns an immutable list and we want to mutate it. + List packageAttributes = + new LinkedList<>(computePackageAttributes(classpathElements, stagingPath, executorService)); + // Order package attributes in descending size order so that we upload the largest files first. + Collections.sort(packageAttributes, new PackageUploadOrder()); + + List packages = Lists.newArrayListWithExpectedSize(packageAttributes.size()); + final AtomicInteger numUploaded = new AtomicInteger(0); + final AtomicInteger numCached = new AtomicInteger(0); + + List> futures = new LinkedList<>(); + for (final PackageAttributes attributes : packageAttributes) { + packages.add(attributes.getDataflowPackage()); + futures.add(executorService.submit(new Runnable() { + @Override + public void run() { + stageOnePackage(attributes, numUploaded, numCached, retrySleeper, gcsUtil); + } + })); + } + try { + Futures.allAsList(futures).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while staging packages", e); + } catch (ExecutionException e) { + throw new RuntimeException("Error while staging packages", e.getCause()); + } + + LOG.info( + "Staging files complete: {} files cached, {} files newly uploaded", + numUploaded.get(), numCached.get()); return packages; } @@ -293,13 +375,15 @@ static class PackageAttributes { private final boolean directory; private final long size; private final String hash; + private final String sourcePath; private DataflowPackage dataflowPackage; public PackageAttributes(long size, String hash, boolean directory, - DataflowPackage dataflowPackage) { + DataflowPackage dataflowPackage, String sourcePath) { this.size = size; this.hash = Objects.requireNonNull(hash, "hash"); this.directory = directory; + this.sourcePath = Objects.requireNonNull(sourcePath, "sourcePath"); this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage"); } @@ -330,5 +414,12 @@ public long getSize() { public String getHash() { return hash; } + + /** + * @return the file to be uploaded + */ + public String getSourcePath() { + return sourcePath; + } } } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java index 05a87dd754474..3828415576fbf 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java @@ -18,12 +18,12 @@ package org.apache.beam.runners.dataflow.util; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; @@ -53,6 +53,7 @@ import com.google.common.collect.Lists; import com.google.common.io.Files; import com.google.common.io.LineReader; +import com.google.common.util.concurrent.MoreExecutors; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -235,7 +236,7 @@ public void testPackageUploadWithLargeClasspathLogsWarning() throws Exception { classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath()); } - PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH); + PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, mockGcsUtil); logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow"); } @@ -250,7 +251,7 @@ public void testPackageUploadWithFileSucceeds() throws Exception { when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); List targets = PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH); + ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil); DataflowPackage target = Iterables.getOnlyElement(targets); verify(mockGcsUtil).fileSize(any(GcsPath.class)); @@ -277,7 +278,7 @@ public void testPackageUploadWithDirectorySucceeds() throws Exception { when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); PackageUtil.stageClasspathElements( - ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH); + ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil); verify(mockGcsUtil).fileSize(any(GcsPath.class)); verify(mockGcsUtil).create(any(GcsPath.class), anyString()); @@ -304,7 +305,7 @@ public void testPackageUploadWithEmptyDirectorySucceeds() throws Exception { when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); List targets = PackageUtil.stageClasspathElements( - ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH); + ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil); DataflowPackage target = Iterables.getOnlyElement(targets); verify(mockGcsUtil).fileSize(any(GcsPath.class)); @@ -327,7 +328,8 @@ public void testPackageUploadFailsWhenIOExceptionThrown() throws Exception { try { PackageUtil.stageClasspathElements( ImmutableList.of(tmpFile.getAbsolutePath()), - STAGING_PATH, fastNanoClockAndSleeper); + STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(), + mockGcsUtil); } finally { verify(mockGcsUtil).fileSize(any(GcsPath.class)); verify(mockGcsUtil, times(5)).create(any(GcsPath.class), anyString()); @@ -348,16 +350,20 @@ public void testPackageUploadFailsWithPermissionsErrorGivesDetailedMessage() thr try { PackageUtil.stageClasspathElements( ImmutableList.of(tmpFile.getAbsolutePath()), - STAGING_PATH, fastNanoClockAndSleeper); + STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(), + mockGcsUtil); fail("Expected RuntimeException"); } catch (RuntimeException e) { - assertTrue("Expected IOException containing detailed message.", - e.getCause() instanceof IOException); - assertThat(e.getCause().getMessage(), + assertThat("Expected RuntimeException wrapping IOException.", + e.getCause(), instanceOf(RuntimeException.class)); + assertThat("Expected IOException containing detailed message.", + e.getCause().getCause(), instanceOf(IOException.class)); + assertThat(e.getCause().getCause().getMessage(), Matchers.allOf( Matchers.containsString("Uploaded failed due to permissions error"), Matchers.containsString( - "Stale credentials can be resolved by executing 'gcloud auth login'"))); + "Stale credentials can be resolved by executing 'gcloud auth application-default " + + "login'"))); } finally { verify(mockGcsUtil).fileSize(any(GcsPath.class)); verify(mockGcsUtil).create(any(GcsPath.class), anyString()); @@ -377,9 +383,8 @@ public void testPackageUploadEventuallySucceeds() throws Exception { try { PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), - STAGING_PATH, - fastNanoClockAndSleeper); + ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, fastNanoClockAndSleeper, + MoreExecutors.newDirectExecutorService(), mockGcsUtil); } finally { verify(mockGcsUtil).fileSize(any(GcsPath.class)); verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString()); @@ -393,7 +398,7 @@ public void testPackageUploadIsSkippedWhenFileAlreadyExists() throws Exception { when(mockGcsUtil.fileSize(any(GcsPath.class))).thenReturn(tmpFile.length()); PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH); + ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, mockGcsUtil); verify(mockGcsUtil).fileSize(any(GcsPath.class)); verifyNoMoreInteractions(mockGcsUtil); @@ -411,7 +416,7 @@ public void testPackageUploadIsNotSkippedWhenSizesAreDifferent() throws Exceptio when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); PackageUtil.stageClasspathElements( - ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH); + ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, mockGcsUtil); verify(mockGcsUtil).fileSize(any(GcsPath.class)); verify(mockGcsUtil).create(any(GcsPath.class), anyString()); @@ -429,7 +434,8 @@ public void testPackageUploadWithExplicitPackageName() throws Exception { when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); List targets = PackageUtil.stageClasspathElements( - ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH); + ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH, + mockGcsUtil); DataflowPackage target = Iterables.getOnlyElement(targets); verify(mockGcsUtil).fileSize(any(GcsPath.class)); @@ -446,7 +452,7 @@ public void testPackageUploadIsSkippedWithNonExistentResource() throws Exception String nonExistentFile = IOChannelUtils.resolve(tmpFolder.getRoot().getPath(), "non-existent-file"); assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements( - ImmutableList.of(nonExistentFile), STAGING_PATH)); + ImmutableList.of(nonExistentFile), STAGING_PATH, mockGcsUtil)); } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java index 0553efc27ae7d..72e106d439214 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcsOptions.java @@ -25,6 +25,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; import org.apache.beam.sdk.util.AppEngineEnvironment; import org.apache.beam.sdk.util.GcsPathValidator; import org.apache.beam.sdk.util.GcsUtil; @@ -81,8 +82,9 @@ public interface GcsOptions extends + "information on the restrictions and performance implications of this value.\n\n" + "https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/" + "com/google/cloud/hadoop/util/AbstractGoogleAsyncWriteChannel.java") + @Nullable Integer getGcsUploadBufferSizeBytes(); - void setGcsUploadBufferSizeBytes(Integer bytes); + void setGcsUploadBufferSizeBytes(@Nullable Integer bytes); /** * The class of the validator that should be created and used to validate paths. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index a10ea28aec43c..5e8358433d050 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -101,6 +101,18 @@ public GcsUtil create(PipelineOptions options) { gcsOptions.getExecutorService(), gcsOptions.getGcsUploadBufferSizeBytes()); } + + /** + * Returns an instance of {@link GcsUtil} based on the given parameters. + */ + public static GcsUtil create( + Storage storageClient, + HttpRequestInitializer httpRequestInitializer, + ExecutorService executorService, + @Nullable Integer uploadBufferSizeBytes) { + return new GcsUtil( + storageClient, httpRequestInitializer, executorService, uploadBufferSizeBytes); + } } private static final Logger LOG = LoggerFactory.getLogger(GcsUtil.class); From b0b91c842e09aa7fdb5c1dc216574daa43b437ea Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Wed, 25 Jan 2017 22:15:59 -0800 Subject: [PATCH 2/2] PackageUtil: preserve classpath ordering when uploading Also add a test --- .../runners/dataflow/util/PackageUtil.java | 11 +++++--- .../dataflow/util/PackageUtilTest.java | 27 +++++++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java index fa8c94d95cf7c..685d48c4672d5 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java @@ -297,16 +297,21 @@ static List stageClasspathElements( // Inline a copy here because the inner code returns an immutable list and we want to mutate it. List packageAttributes = new LinkedList<>(computePackageAttributes(classpathElements, stagingPath, executorService)); - // Order package attributes in descending size order so that we upload the largest files first. - Collections.sort(packageAttributes, new PackageUploadOrder()); + // Compute the returned list of DataflowPackage objects here so that they are returned in the + // same order as on the classpath. List packages = Lists.newArrayListWithExpectedSize(packageAttributes.size()); + for (final PackageAttributes attributes : packageAttributes) { + packages.add(attributes.getDataflowPackage()); + } + + // Order package attributes in descending size order so that we upload the largest files first. + Collections.sort(packageAttributes, new PackageUploadOrder()); final AtomicInteger numUploaded = new AtomicInteger(0); final AtomicInteger numCached = new AtomicInteger(0); List> futures = new LinkedList<>(); for (final PackageAttributes attributes : packageAttributes) { - packages.add(attributes.getDataflowPackage()); futures.add(executorService.submit(new Runnable() { @Override public void run() { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java index 3828415576fbf..800c5a99a62de 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @@ -59,6 +60,7 @@ import java.io.IOException; import java.nio.channels.Channels; import java.nio.channels.Pipe; +import java.nio.channels.Pipe.SinkChannel; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -86,6 +88,8 @@ import org.junit.runners.JUnit4; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** Tests for PackageUtil. */ @RunWith(JUnit4.class) @@ -264,6 +268,29 @@ public void testPackageUploadWithFileSucceeds() throws Exception { equalTo(contents)); } + @Test + public void testStagingPreservesClasspath() throws Exception { + File smallFile = makeFileWithContents("small.txt", "small"); + File largeFile = makeFileWithContents("large.txt", "large contents"); + when(mockGcsUtil.fileSize(any(GcsPath.class))) + .thenThrow(new FileNotFoundException("some/path")); + when(mockGcsUtil.create(any(GcsPath.class), anyString())) + .thenAnswer(new Answer() { + @Override + public SinkChannel answer(InvocationOnMock invocation) throws Throwable { + return Pipe.open().sink(); + } + }); + + List targets = PackageUtil.stageClasspathElements( + ImmutableList.of(smallFile.getAbsolutePath(), largeFile.getAbsolutePath()), + STAGING_PATH, mockGcsUtil); + // Verify that the packages are returned small, then large, matching input order even though + // the large file would be uploaded first. + assertThat(targets.get(0).getName(), startsWith("small")); + assertThat(targets.get(1).getName(), startsWith("large")); + } + @Test public void testPackageUploadWithDirectorySucceeds() throws Exception { Pipe pipe = Pipe.open();