From 5e8800ca02f4a9d4d21e72ecd9eaded9cd24b09b Mon Sep 17 00:00:00 2001 From: Pei He Date: Tue, 13 Dec 2016 17:16:12 -0800 Subject: [PATCH 1/3] GcsUtil: set timeout and retry for BatchRequest with HttpRequestInitializer. --- .../java/org/apache/beam/sdk/util/GcsUtil.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index dcdba464bca54..e0d6501d03dd4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -25,6 +25,7 @@ import com.google.api.client.googleapis.json.GoogleJsonError; import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.storage.Storage; @@ -93,8 +94,10 @@ public static class GcsUtilFactory implements DefaultValueFactory { public GcsUtil create(PipelineOptions options) { LOG.debug("Creating new GcsUtil"); GcsOptions gcsOptions = options.as(GcsOptions.class); + Storage.Builder storageBuilder = Transport.newStorageClient(gcsOptions); return new GcsUtil( - Transport.newStorageClient(gcsOptions).build(), + storageBuilder.build(), + storageBuilder.getHttpRequestInitializer(), gcsOptions.getExecutorService(), gcsOptions.getGcsUploadBufferSizeBytes()); } @@ -132,6 +135,7 @@ public GcsUtil create(PipelineOptions options) { /** Client for the GCS API. */ private Storage storageClient; + private final HttpRequestInitializer httpRequestInitializer; /** Buffer size for GCS uploads (in bytes). */ @Nullable private final Integer uploadBufferSizeBytes; @@ -156,9 +160,11 @@ public boolean isGcsPatternSupported(String gcsPattern) { private GcsUtil( Storage storageClient, + HttpRequestInitializer httpRequestInitializer, ExecutorService executorService, @Nullable Integer uploadBufferSizeBytes) { this.storageClient = storageClient; + this.httpRequestInitializer = httpRequestInitializer; this.uploadBufferSizeBytes = uploadBufferSizeBytes; this.executorService = executorService; } @@ -526,7 +532,7 @@ List makeGetBatches( List batches = new LinkedList<>(); for (List filesToGet : Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) { - BatchRequest batch = storageClient.batch(); + BatchRequest batch = storageClient.batch(httpRequestInitializer); for (GcsPath path : filesToGet) { results.add(enqueueGetFileSize(path, batch)); } @@ -548,14 +554,14 @@ List makeCopyBatches(List srcFilenames, List destF destFilenames.size()); List batches = new LinkedList<>(); - BatchRequest batch = storageClient.batch(); + BatchRequest batch = storageClient.batch(httpRequestInitializer); for (int i = 0; i < srcFilenames.size(); i++) { final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i)); final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i)); enqueueCopy(sourcePath, destPath, batch); if (batch.size() >= MAX_REQUESTS_PER_BATCH) { batches.add(batch); - batch = storageClient.batch(); + batch = storageClient.batch(httpRequestInitializer); } } if (batch.size() > 0) { @@ -568,7 +574,7 @@ List makeRemoveBatches(Collection filenames) throws IOExce List batches = new LinkedList<>(); for (List filesToDelete : Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) { - BatchRequest batch = storageClient.batch(); + BatchRequest batch = storageClient.batch(httpRequestInitializer); for (String file : filesToDelete) { enqueueDelete(GcsPath.fromUri(file), batch); } From e0e72ed7c2604a7a0e170a02254047027c97cc0a Mon Sep 17 00:00:00 2001 From: Pei He Date: Mon, 19 Dec 2016 15:57:48 -0800 Subject: [PATCH 2/3] fixup! add tests. --- .../org/apache/beam/sdk/util/GcsUtilTest.java | 35 +++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java index 6ca87f9112c32..815d624f5c978 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java @@ -38,6 +38,7 @@ import com.google.api.client.http.HttpStatusCodes; import com.google.api.client.http.HttpTransport; import com.google.api.client.http.LowLevelHttpRequest; +import com.google.api.client.http.LowLevelHttpResponse; import com.google.api.client.json.GenericJson; import com.google.api.client.json.Json; import com.google.api.client.json.JsonFactory; @@ -55,11 +56,15 @@ import com.google.cloud.hadoop.util.ClientRequestHelper; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; + +import java.io.ByteArrayInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.math.BigInteger; import java.net.SocketTimeoutException; import java.nio.channels.SeekableByteChannel; +import java.nio.charset.StandardCharsets; import java.nio.file.AccessDeniedException; import java.util.ArrayList; import java.util.Arrays; @@ -400,17 +405,30 @@ public void testGetSizeBytesWhenFileNotFoundBatch() throws Exception { + "\n" + error.toString(); thrown.expect(FileNotFoundException.class); - MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse() - .setContentType("multipart/mixed; boundary=" + contentBoundary) - .setContent(content) - .setStatusCode(HttpStatusCodes.STATUS_CODE_OK); + final LowLevelHttpResponse mockResponse = Mockito.mock(LowLevelHttpResponse.class); + when(mockResponse.getContentType()).thenReturn("multipart/mixed; boundary=" + contentBoundary); + + // 429: Too many requests, then 200: OK. + when(mockResponse.getStatusCode()).thenReturn(429, 200); + when(mockResponse.getContent()).thenReturn(toStream("error"), toStream(content)); + + // A mock transport that lets us mock the API responses. MockHttpTransport mockTransport = - new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build(); + new MockHttpTransport.Builder() + .setLowLevelHttpRequest( + new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() throws IOException { + return mockResponse; + } + }) + .build(); GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); + gcsUtil.setStorageClient( + new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())); gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); } @@ -723,4 +741,9 @@ public void testMakeGetBatches() throws IOException { assertThat(sumBatchSizes(batches), equalTo(501)); assertEquals(501, results.size()); } + + /** A helper to wrap a {@link GenericJson} object in a content stream. */ + private static InputStream toStream(String content) throws IOException { + return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); + } } From 76629ccf041e951d8ee25a138e5623464f26b2d1 Mon Sep 17 00:00:00 2001 From: Pei He Date: Fri, 6 Jan 2017 11:05:48 -0800 Subject: [PATCH 3/3] fixup! address comments. --- .../org/apache/beam/sdk/util/GcsUtil.java | 12 ++++-- .../org/apache/beam/sdk/util/GcsUtilTest.java | 39 +++++++++++++++++-- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index e0d6501d03dd4..521673cbe9dfa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -532,7 +532,7 @@ List makeGetBatches( List batches = new LinkedList<>(); for (List filesToGet : Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) { - BatchRequest batch = storageClient.batch(httpRequestInitializer); + BatchRequest batch = createBatchRequest(); for (GcsPath path : filesToGet) { results.add(enqueueGetFileSize(path, batch)); } @@ -554,14 +554,14 @@ List makeCopyBatches(List srcFilenames, List destF destFilenames.size()); List batches = new LinkedList<>(); - BatchRequest batch = storageClient.batch(httpRequestInitializer); + BatchRequest batch = createBatchRequest(); for (int i = 0; i < srcFilenames.size(); i++) { final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i)); final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i)); enqueueCopy(sourcePath, destPath, batch); if (batch.size() >= MAX_REQUESTS_PER_BATCH) { batches.add(batch); - batch = storageClient.batch(httpRequestInitializer); + batch = createBatchRequest(); } } if (batch.size() > 0) { @@ -574,7 +574,7 @@ List makeRemoveBatches(Collection filenames) throws IOExce List batches = new LinkedList<>(); for (List filesToDelete : Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) { - BatchRequest batch = storageClient.batch(httpRequestInitializer); + BatchRequest batch = createBatchRequest(); for (String file : filesToDelete) { enqueueDelete(GcsPath.fromUri(file), batch); } @@ -654,6 +654,10 @@ public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOE }); } + private BatchRequest createBatchRequest() { + return storageClient.batch(httpRequestInitializer); + } + /** * Expands glob expressions to regular expressions. * diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java index 815d624f5c978..d592761ba3907 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java @@ -56,7 +56,6 @@ import com.google.cloud.hadoop.util.ClientRequestHelper; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; - import java.io.ByteArrayInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -397,6 +396,38 @@ public void testGetSizeBytesWhenFileNotFoundBatch() throws Exception { .set("error", new GenericJson().set("code", 404)); error.setFactory(jsonFactory); + String content = contentBoundary + "\n" + + "Content-Type: application/http\n" + + "\n" + + "HTTP/1.1 404 Not Found\n" + + "Content-Length: 105\n" + + "\n" + + error.toString(); + thrown.expect(FileNotFoundException.class); + MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse() + .setContentType("multipart/mixed; boundary=" + contentBoundary) + .setContent(content) + .setStatusCode(HttpStatusCodes.STATUS_CODE_OK); + + MockHttpTransport mockTransport = + new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build(); + + GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); + + gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); + gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); + } + + @Test + public void testGetSizeBytesWhenFileNotFoundBatchRetry() throws Exception { + JsonFactory jsonFactory = new JacksonFactory(); + + String contentBoundary = "batch_foobarbaz"; + + GenericJson error = new GenericJson() + .set("error", new GenericJson().set("code", 404)); + error.setFactory(jsonFactory); + String content = contentBoundary + "\n" + "Content-Type: application/http\n" + "\n" @@ -427,7 +458,7 @@ public LowLevelHttpResponse execute() throws IOException { GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); - gcsUtil.setStorageClient( + gcsUtil.setStorageClient( new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())); gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); } @@ -742,7 +773,9 @@ public void testMakeGetBatches() throws IOException { assertEquals(501, results.size()); } - /** A helper to wrap a {@link GenericJson} object in a content stream. */ + /** + * A helper to wrap a {@link GenericJson} object in a content stream. + */ private static InputStream toStream(String content) throws IOException { return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); }