From a211bd9bb6365f1fe76e9b16355f721fcaa80b47 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 28 Sep 2017 19:35:20 -0700 Subject: [PATCH 1/3] Make PackageUtil a proper class encapsulating its ExecutorService --- .../beam/runners/dataflow/util/GcsStager.java | 8 +- .../runners/dataflow/util/PackageUtil.java | 59 ++++++++---- .../dataflow/util/PackageUtilTest.java | 95 +++++++++++-------- 3 files changed, 103 insertions(+), 59 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java index d18e306cfe87..929be99d19da 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/GcsStager.java @@ -62,9 +62,9 @@ public List stageFiles() { .setMimeType(MimeTypes.BINARY) .build(); - return PackageUtil.stageClasspathElements( - options.getFilesToStage(), - options.getStagingLocation(), - createOptions); + try (PackageUtil packageUtil = PackageUtil.withDefaultThreadPool()) { + return packageUtil.stageClasspathElements( + options.getFilesToStage(), options.getStagingLocation(), createOptions); + } } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java index 931f7ea4a0a8..9d1e084a48e2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java @@ -34,6 +34,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import java.io.Closeable; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -51,6 +52,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; @@ -62,13 +64,18 @@ import org.slf4j.LoggerFactory; /** Helper routines for packages. */ -class PackageUtil { +@Internal +class PackageUtil implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class); + /** * A reasonable upper bound on the number of jars required to launch a Dataflow job. */ private static final int SANE_CLASSPATH_SIZE = 1000; + private static final int DEFAULT_THREAD_POOL_SIZE = 32; + private static final FluentBackoff BACKOFF_FACTORY = FluentBackoff.DEFAULT.withMaxRetries(4).withInitialBackoff(Duration.standardSeconds(5)); @@ -77,6 +84,27 @@ class PackageUtil { */ private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor(); + private final ListeningExecutorService executorService; + + private PackageUtil(ListeningExecutorService executorService) { + this.executorService = executorService; + } + + public static PackageUtil withDefaultThreadPool() { + return PackageUtil.withExecutorService( + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE))); + } + + public static PackageUtil withExecutorService(ListeningExecutorService executorService) { + return new PackageUtil(executorService); + } + + @Override + public void close() { + executorService.shutdown(); + } + + /** * Compute and cache the attributes of a classpath element that we will need to stage it. * @@ -140,9 +168,10 @@ public int compare(PackageAttributes o1, PackageAttributes o2) { * Utility function that computes sizes and hashes of packages so that we can validate whether * they have already been correctly staged. */ - private static List computePackageAttributes( - Collection classpathElements, final String stagingPath, - ListeningExecutorService executorService) { + private List computePackageAttributes( + Collection classpathElements, + final String stagingPath) { + List> futures = new LinkedList<>(); for (String classpathElement : classpathElements) { @Nullable String userPackageName = null; @@ -189,7 +218,7 @@ private static WritableByteChannel makeWriter(String target, CreateOptions creat * Utility to verify whether a package has already been staged and, if not, copy it to the * staging location. */ - private static void stageOnePackage( + private void stageOnePackage( PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached, Sleeper retrySleeper, CreateOptions createOptions) { String source = attributes.getSourcePath(); @@ -255,22 +284,16 @@ private static void stageOnePackage( * @param stagingPath The base location to stage the elements to. * @return A list of cloud workflow packages, each representing a classpath element. */ - static List stageClasspathElements( + List stageClasspathElements( Collection classpathElements, String stagingPath, CreateOptions createOptions) { - ListeningExecutorService executorService = - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(32)); - try { - return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT, - executorService, createOptions); - } finally { - executorService.shutdown(); - } + return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT, createOptions); } // Visible for testing. - static List stageClasspathElements( - Collection classpathElements, final String stagingPath, - final Sleeper retrySleeper, ListeningExecutorService executorService, + List stageClasspathElements( + Collection classpathElements, + final String stagingPath, + final Sleeper retrySleeper, final CreateOptions createOptions) { LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to " + "prepare for execution.", classpathElements.size()); @@ -290,7 +313,7 @@ static List stageClasspathElements( // Inline a copy here because the inner code returns an immutable list and we want to mutate it. List packageAttributes = - new LinkedList<>(computePackageAttributes(classpathElements, stagingPath, executorService)); + new LinkedList<>(computePackageAttributes(classpathElements, stagingPath)); // Compute the returned list of DataflowPackage objects here so that they are returned in the // same order as on the classpath. diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java index 5d0c0f2d68a6..de6416da4f95 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java @@ -86,6 +86,7 @@ import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.hamcrest.Matchers; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -97,22 +98,19 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -/** Tests for PackageUtil. */ +/** Tests for {@link PackageUtil}. */ @RunWith(JUnit4.class) public class PackageUtilTest { @Rule public ExpectedLogs logged = ExpectedLogs.none(PackageUtil.class); - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper(); - @Rule - public FastNanoClockAndSleeper fastNanoClockAndSleeper = new FastNanoClockAndSleeper(); - - @Mock - GcsUtil mockGcsUtil; + @Mock GcsUtil mockGcsUtil; // 128 bits, base64 encoded is 171 bits, rounds to 22 bytes private static final String HASH_PATTERN = "[a-zA-Z0-9+-]{22}"; private CreateOptions createOptions; + private PackageUtil defaultPackageUtil; @Before public void setUp() { @@ -122,6 +120,12 @@ public void setUp() { pipelineOptions.setGcsUtil(mockGcsUtil); FileSystems.setDefaultPipelineOptions(pipelineOptions); createOptions = StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build(); + defaultPackageUtil = PackageUtil.withDefaultThreadPool(); + } + + @After + public void teardown() { + defaultPackageUtil.close(); } private File makeFileWithContents(String name, String contents) throws Exception { @@ -224,7 +228,7 @@ public void testPackageUploadWithLargeClasspathLogsWarning() throws Exception { classpathElements.add(eltName + '=' + tmpFile.getAbsolutePath()); } - PackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, createOptions); + defaultPackageUtil.stageClasspathElements(classpathElements, STAGING_PATH, createOptions); logged.verifyWarn("Your classpath contains 1005 elements, which Google Cloud Dataflow"); } @@ -239,8 +243,9 @@ public void testPackageUploadWithFileSucceeds() throws Exception { when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); - List targets = PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, createOptions); + List targets = + defaultPackageUtil.stageClasspathElements( + ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, createOptions); DataflowPackage target = Iterables.getOnlyElement(targets); verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); @@ -269,9 +274,11 @@ public SinkChannel answer(InvocationOnMock invocation) throws Throwable { } }); - List targets = PackageUtil.stageClasspathElements( - ImmutableList.of(smallFile.getAbsolutePath(), largeFile.getAbsolutePath()), - STAGING_PATH, createOptions); + List targets = + defaultPackageUtil.stageClasspathElements( + ImmutableList.of(smallFile.getAbsolutePath(), largeFile.getAbsolutePath()), + STAGING_PATH, + createOptions); // Verify that the packages are returned small, then large, matching input order even though // the large file would be uploaded first. assertThat(targets.get(0).getName(), startsWith("small")); @@ -292,7 +299,7 @@ public void testPackageUploadWithDirectorySucceeds() throws Exception { new FileNotFoundException("some/path")))); when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); - PackageUtil.stageClasspathElements( + defaultPackageUtil.stageClasspathElements( ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions); verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); @@ -320,8 +327,9 @@ public void testPackageUploadWithEmptyDirectorySucceeds() throws Exception { new FileNotFoundException("some/path")))); when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); - List targets = PackageUtil.stageClasspathElements( - ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions); + List targets = + defaultPackageUtil.stageClasspathElements( + ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions); DataflowPackage target = Iterables.getOnlyElement(targets); verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); @@ -342,10 +350,12 @@ public void testPackageUploadFailsWhenIOExceptionThrown() throws Exception { when(mockGcsUtil.create(any(GcsPath.class), anyString())) .thenThrow(new IOException("Fake Exception: Upload error")); - try { - PackageUtil.stageClasspathElements( + try (PackageUtil directPackageUtil = + PackageUtil.withExecutorService(MoreExecutors.newDirectExecutorService())) { + directPackageUtil.stageClasspathElements( ImmutableList.of(tmpFile.getAbsolutePath()), - STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(), + STAGING_PATH, + fastNanoClockAndSleeper, createOptions); } finally { verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); @@ -365,10 +375,12 @@ public void testPackageUploadFailsWithPermissionsErrorGivesDetailedMessage() thr googleJsonResponseException( HttpStatusCodes.STATUS_CODE_FORBIDDEN, "Permission denied", "Test message"))); - try { - PackageUtil.stageClasspathElements( + try (PackageUtil directPackageUtil = + PackageUtil.withExecutorService(MoreExecutors.newDirectExecutorService())) { + directPackageUtil.stageClasspathElements( ImmutableList.of(tmpFile.getAbsolutePath()), - STAGING_PATH, fastNanoClockAndSleeper, MoreExecutors.newDirectExecutorService(), + STAGING_PATH, + fastNanoClockAndSleeper, createOptions); fail("Expected RuntimeException"); } catch (RuntimeException e) { @@ -400,10 +412,13 @@ public void testPackageUploadEventuallySucceeds() throws Exception { .thenThrow(new IOException("Fake Exception: 410 Gone")) // First attempt fails .thenReturn(pipe.sink()); // second attempt succeeds - try { - PackageUtil.stageClasspathElements( - ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, fastNanoClockAndSleeper, - MoreExecutors.newDirectExecutorService(), createOptions); + try (PackageUtil directPackageUtil = + PackageUtil.withExecutorService(MoreExecutors.newDirectExecutorService())) { + directPackageUtil.stageClasspathElements( + ImmutableList.of(tmpFile.getAbsolutePath()), + STAGING_PATH, + fastNanoClockAndSleeper, + createOptions); } finally { verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); verify(mockGcsUtil, times(2)).create(any(GcsPath.class), anyString()); @@ -418,8 +433,8 @@ public void testPackageUploadIsSkippedWhenFileAlreadyExists() throws Exception { .thenReturn(ImmutableList.of(StorageObjectOrIOException.create( createStorageObject(STAGING_PATH, tmpFile.length())))); - PackageUtil.stageClasspathElements(ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, - createOptions); + defaultPackageUtil.stageClasspathElements( + ImmutableList.of(tmpFile.getAbsolutePath()), STAGING_PATH, createOptions); verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); verifyNoMoreInteractions(mockGcsUtil); @@ -438,7 +453,7 @@ public void testPackageUploadIsNotSkippedWhenSizesAreDifferent() throws Exceptio createStorageObject(STAGING_PATH, Long.MAX_VALUE)))); when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); - PackageUtil.stageClasspathElements( + defaultPackageUtil.stageClasspathElements( ImmutableList.of(tmpDirectory.getAbsolutePath()), STAGING_PATH, createOptions); verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); @@ -457,9 +472,11 @@ public void testPackageUploadWithExplicitPackageName() throws Exception { new FileNotFoundException("some/path")))); when(mockGcsUtil.create(any(GcsPath.class), anyString())).thenReturn(pipe.sink()); - List targets = PackageUtil.stageClasspathElements( - ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), STAGING_PATH, - createOptions); + List targets = + defaultPackageUtil.stageClasspathElements( + ImmutableList.of(overriddenName + "=" + tmpFile.getAbsolutePath()), + STAGING_PATH, + createOptions); DataflowPackage target = Iterables.getOnlyElement(targets); verify(mockGcsUtil).getObjects(anyListOf(GcsPath.class)); @@ -473,10 +490,14 @@ public void testPackageUploadWithExplicitPackageName() throws Exception { @Test public void testPackageUploadIsSkippedWithNonExistentResource() throws Exception { - String nonExistentFile = FileSystems.matchNewResource(tmpFolder.getRoot().getPath(), true) - .resolve("non-existent-file", StandardResolveOptions.RESOLVE_FILE).toString(); - assertEquals(Collections.EMPTY_LIST, PackageUtil.stageClasspathElements( - ImmutableList.of(nonExistentFile), STAGING_PATH, createOptions)); + String nonExistentFile = + FileSystems.matchNewResource(tmpFolder.getRoot().getPath(), true) + .resolve("non-existent-file", StandardResolveOptions.RESOLVE_FILE) + .toString(); + assertEquals( + Collections.EMPTY_LIST, + defaultPackageUtil.stageClasspathElements( + ImmutableList.of(nonExistentFile), STAGING_PATH, createOptions)); } /** From a328127b9b0a0f59816bcbe84646446b4f75aafc Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 28 Sep 2017 20:03:51 -0700 Subject: [PATCH 2/3] Use AutoValue for Dataflow PackageAttributes --- .../runners/dataflow/util/PackageUtil.java | 164 +++++++----------- .../dataflow/util/PackageUtilTest.java | 29 ++-- 2 files changed, 84 insertions(+), 109 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java index 9d1e084a48e2..7496d1c93d3a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java @@ -23,6 +23,7 @@ import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.dataflow.model.DataflowPackage; +import com.google.auto.value.AutoValue; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.collect.Lists; import com.google.common.hash.Funnels; @@ -46,7 +47,6 @@ import java.util.Comparator; import java.util.LinkedList; import java.util.List; -import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -105,49 +105,6 @@ public void close() { } - /** - * Compute and cache the attributes of a classpath element that we will need to stage it. - * - * @param source the file or directory to be staged. - * @param stagingPath The base location for staged classpath elements. - * @param overridePackageName If non-null, use the given value as the package name - * instead of generating one automatically. - * @return a {@link PackageAttributes} that containing metadata about the object to be staged. - */ - static PackageAttributes createPackageAttributes(File source, - String stagingPath, @Nullable String overridePackageName) { - boolean directory = source.isDirectory(); - - // Compute size and hash in one pass over file or directory. - Hasher hasher = Hashing.md5().newHasher(); - OutputStream hashStream = Funnels.asOutputStream(hasher); - try (CountingOutputStream countingOutputStream = new CountingOutputStream(hashStream)) { - if (!directory) { - // Files are staged as-is. - Files.asByteSource(source).copyTo(countingOutputStream); - } else { - // Directories are recursively zipped. - ZipFiles.zipDirectory(source, countingOutputStream); - } - countingOutputStream.flush(); - - long size = countingOutputStream.getCount(); - String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes()); - - // Create the DataflowPackage with staging name and location. - String uniqueName = getUniqueContentName(source, hash); - String resourcePath = FileSystems.matchNewResource(stagingPath, true) - .resolve(uniqueName, StandardResolveOptions.RESOLVE_FILE).toString(); - DataflowPackage target = new DataflowPackage(); - target.setName(overridePackageName != null ? overridePackageName : uniqueName); - target.setLocation(resourcePath); - - return new PackageAttributes(size, hash, directory, target, source.getPath()); - } catch (IOException e) { - throw new RuntimeException("Package setup failure for " + source, e); - } - } - /** Utility comparator used in uploading packages efficiently. */ private static class PackageUploadOrder implements Comparator { @Override @@ -193,7 +150,11 @@ private List computePackageAttributes( executorService.submit(new Callable() { @Override public PackageAttributes call() throws Exception { - return createPackageAttributes(file, stagingPath, packageName); + PackageAttributes attributes = PackageAttributes.forFileToStage(file, stagingPath); + if (packageName != null) { + attributes = attributes.withPackageName(packageName); + } + return attributes; } }); futures.add(future); @@ -221,8 +182,8 @@ private static WritableByteChannel makeWriter(String target, CreateOptions creat private void stageOnePackage( PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached, Sleeper retrySleeper, CreateOptions createOptions) { - String source = attributes.getSourcePath(); - String target = attributes.getDataflowPackage().getLocation(); + File source = attributes.getSource(); + String target = attributes.getDestination().getLocation(); // TODO: Should we attempt to detect the Mime type rather than // always using MimeTypes.BINARY? @@ -231,7 +192,7 @@ private void stageOnePackage( long remoteLength = FileSystems.matchSingleFileSpec(target).sizeBytes(); if (remoteLength == attributes.getSize()) { LOG.debug("Skipping classpath element already staged: {} at {}", - attributes.getSourcePath(), target); + attributes.getSource(), target); numCached.incrementAndGet(); return; } @@ -245,7 +206,7 @@ private void stageOnePackage( try { LOG.debug("Uploading classpath element {} to {}", source, target); try (WritableByteChannel writer = makeWriter(target, createOptions)) { - copyContent(source, writer); + copyContent(attributes.getSource(), writer); } numUploaded.incrementAndGet(); break; @@ -319,7 +280,7 @@ List stageClasspathElements( // same order as on the classpath. List packages = Lists.newArrayListWithExpectedSize(packageAttributes.size()); for (final PackageAttributes attributes : packageAttributes) { - packages.add(attributes.getDataflowPackage()); + packages.add(attributes.getDestination()); } // Order package attributes in descending size order so that we upload the largest files first. @@ -381,67 +342,74 @@ static String getUniqueContentName(File classpathElement, String contentHash) { * *

The output channel is not closed. */ - private static void copyContent(String classpathElement, WritableByteChannel outputChannel) + private static void copyContent(File classpathElement, WritableByteChannel outputChannel) throws IOException { - final File classpathElementFile = new File(classpathElement); - if (classpathElementFile.isDirectory()) { - ZipFiles.zipDirectory(classpathElementFile, Channels.newOutputStream(outputChannel)); + if (classpathElement.isDirectory()) { + ZipFiles.zipDirectory(classpathElement, Channels.newOutputStream(outputChannel)); } else { - Files.asByteSource(classpathElementFile).copyTo(Channels.newOutputStream(outputChannel)); + Files.asByteSource(classpathElement).copyTo(Channels.newOutputStream(outputChannel)); } } /** * Holds the metadata necessary to stage a file or confirm that a staged file has not changed. */ - static class PackageAttributes { - private final boolean directory; - private final long size; - private final String hash; - private final String sourcePath; - private DataflowPackage dataflowPackage; - - public PackageAttributes(long size, String hash, boolean directory, - DataflowPackage dataflowPackage, String sourcePath) { - this.size = size; - this.hash = Objects.requireNonNull(hash, "hash"); - this.directory = directory; - this.sourcePath = Objects.requireNonNull(sourcePath, "sourcePath"); - this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage"); - } + @AutoValue + abstract static class PackageAttributes { + + public static PackageAttributes forFileToStage(File source, String stagingPath) + throws IOException { + + // Compute size and hash in one pass over file or directory. + long size; + String hash; + Hasher hasher = Hashing.md5().newHasher(); + OutputStream hashStream = Funnels.asOutputStream(hasher); + try (CountingOutputStream countingOutputStream = new CountingOutputStream(hashStream)) { + if (!source.isDirectory()) { + // Files are staged as-is. + Files.asByteSource(source).copyTo(countingOutputStream); + } else { + // Directories are recursively zipped. + ZipFiles.zipDirectory(source, countingOutputStream); + } + countingOutputStream.flush(); - /** - * @return the dataflowPackage - */ - public DataflowPackage getDataflowPackage() { - return dataflowPackage; - } + size = countingOutputStream.getCount(); + hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes()); + } - /** - * @return the directory - */ - public boolean isDirectory() { - return directory; - } + String uniqueName = getUniqueContentName(source, hash); - /** - * @return the size - */ - public long getSize() { - return size; - } + String resourcePath = + FileSystems.matchNewResource(stagingPath, true) + .resolve(uniqueName, StandardResolveOptions.RESOLVE_FILE) + .toString(); + DataflowPackage target = new DataflowPackage(); + target.setName(uniqueName); + target.setLocation(resourcePath); - /** - * @return the hash - */ - public String getHash() { - return hash; + return new AutoValue_PackageUtil_PackageAttributes(source, target, size, hash); } - /** - * @return the file to be uploaded - */ - public String getSourcePath() { - return sourcePath; + public PackageAttributes withPackageName(String overridePackageName) { + DataflowPackage newDestination = new DataflowPackage(); + newDestination.setName(overridePackageName); + newDestination.setLocation(getDestination().getLocation()); + + return new AutoValue_PackageUtil_PackageAttributes( + getSource(), newDestination, getSize(), getHash()); } + + /** @return the file to be uploaded */ + public abstract File getSource(); + + /** @return the dataflowPackage */ + public abstract DataflowPackage getDestination(); + + /** @return the size */ + public abstract long getSize(); + + /** @return the hash */ + public abstract String getHash(); } } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java index de6416da4f95..0b94f7ca1650 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java @@ -71,6 +71,7 @@ import java.util.List; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; +import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.util.PackageUtil.PackageAttributes; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.io.FileSystems; @@ -137,8 +138,14 @@ private File makeFileWithContents(String name, String contents) throws Exception static final GcsPath STAGING_GCS_PATH = GcsPath.fromComponents("somebucket", "base/path/"); static final String STAGING_PATH = STAGING_GCS_PATH.toString(); - private static PackageAttributes makePackageAttributes(File file, String overridePackageName) { - return PackageUtil.createPackageAttributes(file, STAGING_PATH, overridePackageName); + + private static PackageAttributes makePackageAttributes( + File file, @Nullable String overridePackageName) throws IOException { + PackageAttributes attributes = PackageUtil.PackageAttributes.forFileToStage(file, STAGING_PATH); + if (overridePackageName != null) { + attributes = attributes.withPackageName(overridePackageName); + } + return attributes; } @Test @@ -146,7 +153,7 @@ public void testFileWithExtensionPackageNamingAndSize() throws Exception { String contents = "This is a test!"; File tmpFile = makeFileWithContents("file.txt", contents); PackageAttributes attr = makePackageAttributes(tmpFile, null); - DataflowPackage target = attr.getDataflowPackage(); + DataflowPackage target = attr.getDestination(); assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN + ".txt")); assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName())); @@ -156,7 +163,7 @@ public void testFileWithExtensionPackageNamingAndSize() throws Exception { @Test public void testPackageNamingWithFileNoExtension() throws Exception { File tmpFile = makeFileWithContents("file", "This is a test!"); - DataflowPackage target = makePackageAttributes(tmpFile, null).getDataflowPackage(); + DataflowPackage target = makePackageAttributes(tmpFile, null).getDestination(); assertThat(target.getName(), RegexMatcher.matches("file-" + HASH_PATTERN)); assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName())); @@ -165,7 +172,7 @@ public void testPackageNamingWithFileNoExtension() throws Exception { @Test public void testPackageNamingWithDirectory() throws Exception { File tmpDirectory = tmpFolder.newFolder("folder"); - DataflowPackage target = makePackageAttributes(tmpDirectory, null).getDataflowPackage(); + DataflowPackage target = makePackageAttributes(tmpDirectory, null).getDestination(); assertThat(target.getName(), RegexMatcher.matches("folder-" + HASH_PATTERN + ".jar")); assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName())); @@ -175,11 +182,11 @@ public void testPackageNamingWithDirectory() throws Exception { public void testPackageNamingWithFilesHavingSameContentsAndSameNames() throws Exception { File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA"); makeFileWithContents("folder1/folderA/sameName", "This is a test!"); - DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage(); + DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDestination(); File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA"); makeFileWithContents("folder2/folderA/sameName", "This is a test!"); - DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage(); + DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDestination(); assertEquals(target1.getName(), target2.getName()); assertEquals(target1.getLocation(), target2.getLocation()); @@ -189,11 +196,11 @@ public void testPackageNamingWithFilesHavingSameContentsAndSameNames() throws Ex public void testPackageNamingWithFilesHavingSameContentsButDifferentNames() throws Exception { File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA"); makeFileWithContents("folder1/folderA/uniqueName1", "This is a test!"); - DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage(); + DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDestination(); File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA"); makeFileWithContents("folder2/folderA/uniqueName2", "This is a test!"); - DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage(); + DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDestination(); assertNotEquals(target1.getName(), target2.getName()); assertNotEquals(target1.getLocation(), target2.getLocation()); @@ -204,11 +211,11 @@ public void testPackageNamingWithDirectoriesHavingSameContentsButDifferentNames( throws Exception { File tmpDirectory1 = tmpFolder.newFolder("folder1", "folderA"); tmpFolder.newFolder("folder1", "folderA", "uniqueName1"); - DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDataflowPackage(); + DataflowPackage target1 = makePackageAttributes(tmpDirectory1, null).getDestination(); File tmpDirectory2 = tmpFolder.newFolder("folder2", "folderA"); tmpFolder.newFolder("folder2", "folderA", "uniqueName2"); - DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDataflowPackage(); + DataflowPackage target2 = makePackageAttributes(tmpDirectory2, null).getDestination(); assertNotEquals(target1.getName(), target2.getName()); assertNotEquals(target1.getLocation(), target2.getLocation()); From 58b6453f3ff934e8c453ab4d17bf7fd15c7d479c Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 28 Sep 2017 20:07:16 -0700 Subject: [PATCH 3/3] Refactor PackageUtil for more and simpler asynchrony --- .../runners/dataflow/util/PackageUtil.java | 336 +++++++++++------- 1 file changed, 202 insertions(+), 134 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java index 7496d1c93d3a..449b36d2fb46 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java @@ -25,12 +25,13 @@ import com.google.api.services.dataflow.model.DataflowPackage; import com.google.auto.value.AutoValue; import com.google.cloud.hadoop.util.ApiErrorExtractor; -import com.google.common.collect.Lists; +import com.google.common.base.Function; import com.google.common.hash.Funnels; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import com.google.common.io.CountingOutputStream; import com.google.common.io.Files; +import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -42,22 +43,22 @@ import java.io.OutputStream; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.extensions.gcp.storage.GcsCreateOptions; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.util.BackOffAdapter; import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.ZipFiles; import org.joda.time.Duration; import org.slf4j.Logger; @@ -76,6 +77,14 @@ class PackageUtil implements Closeable { private static final int DEFAULT_THREAD_POOL_SIZE = 32; + private static final Sleeper DEFAULT_SLEEPER = Sleeper.DEFAULT; + + private static final CreateOptions DEFAULT_CREATE_OPTIONS = + GcsCreateOptions.builder() + .setGcsUploadBufferSizeBytes(1024 * 1024) + .setMimeType(MimeTypes.BINARY) + .build(); + private static final FluentBackoff BACKOFF_FACTORY = FluentBackoff.DEFAULT.withMaxRetries(4).withInitialBackoff(Duration.standardSeconds(5)); @@ -121,136 +130,155 @@ public int compare(PackageAttributes o1, PackageAttributes o2) { } } - /** - * Utility function that computes sizes and hashes of packages so that we can validate whether - * they have already been correctly staged. - */ - private List computePackageAttributes( - Collection classpathElements, - final String stagingPath) { - - List> futures = new LinkedList<>(); - for (String classpathElement : classpathElements) { - @Nullable String userPackageName = null; - if (classpathElement.contains("=")) { - String[] components = classpathElement.split("=", 2); - userPackageName = components[0]; - classpathElement = components[1]; - } - @Nullable final String packageName = userPackageName; - - final File file = new File(classpathElement); - if (!file.exists()) { - LOG.warn("Skipping non-existent classpath element {} that was specified.", - classpathElement); - continue; - } + /** Asynchronously computes {@link PackageAttributes} for a single staged file. */ + private ListenableFuture computePackageAttributes( + final DataflowPackage source, final String stagingPath) { + + return executorService.submit( + new Callable() { + @Override + public PackageAttributes call() throws Exception { + final File file = new File(source.getLocation()); + if (!file.exists()) { + throw new FileNotFoundException( + String.format("Non-existent file to stage: %s", file.getAbsolutePath())); + } - ListenableFuture future = - executorService.submit(new Callable() { - @Override - public PackageAttributes call() throws Exception { - PackageAttributes attributes = PackageAttributes.forFileToStage(file, stagingPath); - if (packageName != null) { - attributes = attributes.withPackageName(packageName); - } - return attributes; + PackageAttributes attributes = PackageAttributes.forFileToStage(file, stagingPath); + if (source.getName() != null) { + attributes = attributes.withPackageName(source.getName()); } - }); - futures.add(future); - } + return attributes; + } + }); + } + private boolean alreadyStaged(PackageAttributes attributes) throws IOException { try { - return Futures.allAsList(futures).get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while staging packages", e); - } catch (ExecutionException e) { - throw new RuntimeException("Error while staging packages", e.getCause()); + long remoteLength = + FileSystems.matchSingleFileSpec(attributes.getDestination().getLocation()).sizeBytes(); + return remoteLength == attributes.getSize(); + } catch (FileNotFoundException expected) { + // If the file doesn't exist, it means we need to upload it. + return false; } } - private static WritableByteChannel makeWriter(String target, CreateOptions createOptions) - throws IOException { - return FileSystems.create(FileSystems.matchNewResource(target, false), createOptions); + /** Stages one file ("package") if necessary. */ + public ListenableFuture stagePackage( + final PackageAttributes attributes, + final Sleeper retrySleeper, + final CreateOptions createOptions) { + return executorService.submit( + new Callable() { + @Override + public StagingResult call() throws Exception { + return stagePackageSynchronously(attributes, retrySleeper, createOptions); + } + }); } - /** - * Utility to verify whether a package has already been staged and, if not, copy it to the - * staging location. - */ - private void stageOnePackage( - PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached, - Sleeper retrySleeper, CreateOptions createOptions) { + /** Synchronously stages a package, with retry and backoff for resiliency. */ + private StagingResult stagePackageSynchronously( + PackageAttributes attributes, Sleeper retrySleeper, CreateOptions createOptions) + throws IOException, InterruptedException { File source = attributes.getSource(); String target = attributes.getDestination().getLocation(); - // TODO: Should we attempt to detect the Mime type rather than - // always using MimeTypes.BINARY? + if (alreadyStaged(attributes)) { + LOG.debug("Skipping file already staged: {} at {}", source, target); + return StagingResult.cached(attributes); + } + try { + return tryStagePackageWithRetry(attributes, retrySleeper, createOptions); + } catch (Exception miscException) { + throw new RuntimeException( + String.format("Could not stage %s to %s", source, target), miscException); + } + } + + private StagingResult tryStagePackageWithRetry( + PackageAttributes attributes, Sleeper retrySleeper, CreateOptions createOptions) + throws IOException, InterruptedException { + File source = attributes.getSource(); + String target = attributes.getDestination().getLocation(); + BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff()); + + while (true) { try { - long remoteLength = FileSystems.matchSingleFileSpec(target).sizeBytes(); - if (remoteLength == attributes.getSize()) { - LOG.debug("Skipping classpath element already staged: {} at {}", - attributes.getSource(), target); - numCached.incrementAndGet(); - return; + return tryStagePackage(attributes, createOptions); + } catch (IOException ioException) { + + if (ERROR_EXTRACTOR.accessDenied(ioException)) { + String errorMessage = + String.format( + "Uploaded failed due to permissions error, will NOT retry staging " + + "of %s. Please verify credentials are valid and that you have " + + "write access to %s. Stale credentials can be resolved by executing " + + "'gcloud auth application-default login'.", + source, target); + LOG.error(errorMessage); + throw new IOException(errorMessage, ioException); } - } catch (FileNotFoundException expected) { - // If the file doesn't exist, it means we need to upload it. - } - // Upload file, retrying on failure. - BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff()); - while (true) { - try { - LOG.debug("Uploading classpath element {} to {}", source, target); - try (WritableByteChannel writer = makeWriter(target, createOptions)) { - copyContent(attributes.getSource(), writer); - } - numUploaded.incrementAndGet(); - break; - } catch (IOException e) { - if (ERROR_EXTRACTOR.accessDenied(e)) { - String errorMessage = String.format( - "Uploaded failed due to permissions error, will NOT retry staging " - + "of classpath %s. Please verify credentials are valid and that you have " - + "write access to %s. Stale credentials can be resolved by executing " - + "'gcloud auth application-default login'.", source, target); - LOG.error(errorMessage); - throw new IOException(errorMessage, e); - } - long sleep = backoff.nextBackOffMillis(); - if (sleep == BackOff.STOP) { - // Rethrow last error, to be included as a cause in the catch below. - LOG.error("Upload failed, will NOT retry staging of classpath: {}", - source, e); - throw e; - } else { - LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}", - source, e); - retrySleeper.sleep(sleep); - } + long sleep = backoff.nextBackOffMillis(); + if (sleep == BackOff.STOP) { + LOG.error("Upload failed, will NOT retry staging of package: {}", source, ioException); + throw new RuntimeException("Could not stage %s to %s", ioException); + } else { + LOG.warn( + "Upload attempt failed, sleeping before retrying staging of package: {}", + source, + ioException); + retrySleeper.sleep(sleep); } } - } catch (Exception e) { - throw new RuntimeException("Could not stage classpath element: " + source, e); } } + private StagingResult tryStagePackage( + PackageAttributes attributes, CreateOptions createOptions) + throws IOException, InterruptedException { + File source = attributes.getSource(); + String target = attributes.getDestination().getLocation(); + + LOG.info("Uploading {} to {}", source, target); + try (WritableByteChannel writer = + FileSystems.create(FileSystems.matchNewResource(target, false), createOptions)) { + copyContent(attributes.getSource(), writer); + } + return StagingResult.uploaded(attributes); + } + /** - * Transfers the classpath elements to the staging location. + * Transfers the classpath elements to the staging location using a default {@link Sleeper}. * - * @param classpathElements The elements to stage. - * @param stagingPath The base location to stage the elements to. - * @return A list of cloud workflow packages, each representing a classpath element. + * @see {@link #stageClasspathElements(Collection, String, Sleeper, CreateOptions)} */ List stageClasspathElements( Collection classpathElements, String stagingPath, CreateOptions createOptions) { - return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT, createOptions); + return stageClasspathElements(classpathElements, stagingPath, DEFAULT_SLEEPER, createOptions); + } + + /** + * Transfers the classpath elements to the staging location using default settings. + * + * @see {@link #stageClasspathElements(Collection, String, Sleeper, CreateOptions)} + */ + List stageClasspathElements( + Collection classpathElements, String stagingPath) { + return stageClasspathElements( + classpathElements, stagingPath, DEFAULT_SLEEPER, DEFAULT_CREATE_OPTIONS); } - // Visible for testing. + /** + * Transfers the classpath elements to the staging location. + * + * @param classpathElements The elements to stage. + * @param stagingPath The base location to stage the elements to. + * @return A list of cloud workflow packages, each representing a classpath element. + */ List stageClasspathElements( Collection classpathElements, final String stagingPath, @@ -272,45 +300,69 @@ List stageClasspathElements( stagingPath != null, "Can't stage classpath elements because no staging location has been provided"); - // Inline a copy here because the inner code returns an immutable list and we want to mutate it. - List packageAttributes = - new LinkedList<>(computePackageAttributes(classpathElements, stagingPath)); - - // Compute the returned list of DataflowPackage objects here so that they are returned in the - // same order as on the classpath. - List packages = Lists.newArrayListWithExpectedSize(packageAttributes.size()); - for (final PackageAttributes attributes : packageAttributes) { - packages.add(attributes.getDestination()); - } - - // Order package attributes in descending size order so that we upload the largest files first. - Collections.sort(packageAttributes, new PackageUploadOrder()); final AtomicInteger numUploaded = new AtomicInteger(0); final AtomicInteger numCached = new AtomicInteger(0); + List> destinationPackages = new ArrayList<>(); - List> futures = new LinkedList<>(); - for (final PackageAttributes attributes : packageAttributes) { - futures.add(executorService.submit(new Runnable() { - @Override - public void run() { - stageOnePackage(attributes, numUploaded, numCached, retrySleeper, createOptions); - } - })); + for (String classpathElement : classpathElements) { + DataflowPackage sourcePackage = new DataflowPackage(); + if (classpathElement.contains("=")) { + String[] components = classpathElement.split("=", 2); + sourcePackage.setName(components[0]); + sourcePackage.setLocation(components[1]); + } else { + sourcePackage.setName(null); + sourcePackage.setLocation(classpathElement); + } + + File sourceFile = new File(sourcePackage.getLocation()); + if (!sourceFile.exists()) { + LOG.warn("Skipping non-existent file to stage {}.", sourceFile); + continue; + } + + // TODO: Java 8 / Guava 23.0: FluentFuture + ListenableFuture stagingResult = + Futures.transformAsync( + computePackageAttributes(sourcePackage, stagingPath), + new AsyncFunction() { + @Override + public ListenableFuture apply( + final PackageAttributes packageAttributes) throws Exception { + return stagePackage(packageAttributes, retrySleeper, createOptions); + } + }); + + ListenableFuture stagedPackage = + Futures.transform( + stagingResult, + new Function() { + @Override + public DataflowPackage apply(StagingResult stagingResult) { + if (stagingResult.alreadyStaged()) { + numCached.incrementAndGet(); + } else { + numUploaded.incrementAndGet(); + } + return stagingResult.getPackageAttributes().getDestination(); + } + }); + + destinationPackages.add(stagedPackage); } + try { - Futures.allAsList(futures).get(); + List stagedPackages = Futures.allAsList(destinationPackages).get(); + LOG.info( + "Staging files complete: {} files cached, {} files newly uploaded", + numCached.get(), numUploaded.get()); + return stagedPackages; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Interrupted while staging packages", e); } catch (ExecutionException e) { throw new RuntimeException("Error while staging packages", e.getCause()); } - - LOG.info( - "Staging files complete: {} files cached, {} files newly uploaded", - numCached.get(), numUploaded.get()); - - return packages; } /** @@ -350,6 +402,22 @@ private static void copyContent(File classpathElement, WritableByteChannel outpu Files.asByteSource(classpathElement).copyTo(Channels.newOutputStream(outputChannel)); } } + + @AutoValue + abstract static class StagingResult { + abstract PackageAttributes getPackageAttributes(); + + abstract boolean alreadyStaged(); + + public static StagingResult cached(PackageAttributes attributes) { + return new AutoValue_PackageUtil_StagingResult(attributes, true); + } + + public static StagingResult uploaded(PackageAttributes attributes) { + return new AutoValue_PackageUtil_StagingResult(attributes, false); + } + } + /** * Holds the metadata necessary to stage a file or confirm that a staged file has not changed. */