From 3c55bbd7b83dca3e03309860da02e3becd8e467c Mon Sep 17 00:00:00 2001 From: Pei He Date: Mon, 29 Aug 2016 16:23:29 -0700 Subject: [PATCH 1/4] Create the temp dataset in the same location as the queried table in BigQueryTableRowIterator --- .../cloud/dataflow/sdk/io/BigQueryIO.java | 23 ++--- .../sdk/util/BigQueryTableRowIterator.java | 94 ++++++++++++------- .../util/BigQueryTableRowIteratorTest.java | 54 +++++------ 3 files changed, 98 insertions(+), 73 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index fefc84e8be..2d5c8cc824 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -549,13 +549,13 @@ private static void dryRunQuery(BigQueryOptions options, String query) { request.setQuery(query); request.setDryRun(true); + String queryValidationErrorMsg = String.format(QUERY_VALIDATION_FAILURE_ERROR, query); try { BigQueryTableRowIterator.executeWithBackOff( - client.jobs().query(options.getProject(), request), QUERY_VALIDATION_FAILURE_ERROR, - query); + client.jobs().query(options.getProject(), request), + queryValidationErrorMsg); } catch (Exception e) { - throw new IllegalArgumentException( - String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e); + throw new IllegalArgumentException(queryValidationErrorMsg, e); } } @@ -2314,17 +2314,17 @@ private Write() {} } private static void verifyDatasetPresence(BigQueryOptions options, TableReference table) { + String resourceNotFoundMsg = + String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryIO.toTableSpec(table)); try { Bigquery client = Transport.newBigQueryClient(options).build(); BigQueryTableRowIterator.executeWithBackOff( client.datasets().get(table.getProjectId(), table.getDatasetId()), - RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryIO.toTableSpec(table)); + resourceNotFoundMsg); } catch (Exception e) { ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) { - throw new IllegalArgumentException( - String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryIO.toTableSpec(table)), - e); + throw new IllegalArgumentException(resourceNotFoundMsg, e); } else { throw new RuntimeException( String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", @@ -2335,16 +2335,17 @@ private static void verifyDatasetPresence(BigQueryOptions options, TableReferenc } private static void verifyTablePresence(BigQueryOptions options, TableReference table) { + String resourceNotFoundMsg = + String.format(RESOURCE_NOT_FOUND_ERROR, "table", BigQueryIO.toTableSpec(table)); try { Bigquery client = Transport.newBigQueryClient(options).build(); BigQueryTableRowIterator.executeWithBackOff( client.tables().get(table.getProjectId(), table.getDatasetId(), table.getTableId()), - RESOURCE_NOT_FOUND_ERROR, "table", BigQueryIO.toTableSpec(table)); + resourceNotFoundMsg); } catch (Exception e) { ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) { - throw new IllegalArgumentException( - String.format(RESOURCE_NOT_FOUND_ERROR, "table", BigQueryIO.toTableSpec(table)), e); + throw new IllegalArgumentException(resourceNotFoundMsg, e); } else { throw new RuntimeException( String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table", diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java index 75b3bb9271..08c3d65777 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java @@ -27,7 +27,6 @@ import com.google.api.client.util.Data; import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.Bigquery.Jobs.Insert; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.DatasetReference; import com.google.api.services.bigquery.model.ErrorProto; @@ -35,6 +34,7 @@ import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationQuery; import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableCell; @@ -138,16 +138,7 @@ public void open() throws IOException, InterruptedException { ref = executeQueryAndWaitForCompletion(); } // Get table schema. - Bigquery.Tables.Get get = - client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); - - Table table = - executeWithBackOff( - get, - "Error opening BigQuery table %s of dataset %s : {}", - ref.getTableId(), - ref.getDatasetId()); - schema = table.getSchema(); + schema = getTable(ref).getSchema(); } public boolean advance() throws IOException, InterruptedException { @@ -172,9 +163,10 @@ public boolean advance() throws IOException, InterruptedException { TableDataList result = executeWithBackOff( list, - "Error reading from BigQuery table %s of dataset %s : {}", - ref.getTableId(), - ref.getDatasetId()); + String.format( + "Error reading from BigQuery table %s of dataset %s.", + ref.getTableId(), + ref.getDatasetId())); pageToken = result.getPageToken(); iteratorOverCurrentBatch = @@ -331,19 +323,34 @@ private TableRow getTypedTableRow(List fields, Map T executeWithBackOff(AbstractGoogleClientRequest client, String error, - Object... errorArgs) throws IOException, InterruptedException { + public static T executeWithBackOff(AbstractGoogleClientRequest client, String error) + throws IOException, InterruptedException { Sleeper sleeper = Sleeper.DEFAULT; BackOff backOff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_TIME.getMillis()); @@ -435,11 +463,11 @@ public static T executeWithBackOff(AbstractGoogleClientRequest client, St result = client.execute(); break; } catch (IOException e) { - LOG.error(String.format(error, errorArgs), e.getMessage()); + String errorMessage = error + String.format(" Cause: %s", e.getMessage()); + LOG.error(errorMessage); if (!BackOffUtils.next(sleeper, backOff)) { - LOG.error( - String.format(error, errorArgs), "Failing after retrying " + MAX_RETRIES + " times."); - throw e; + LOG.error(error + String.format(" Failing after retrying %d times.", MAX_RETRIES)); + throw new IOException(errorMessage, e); } } } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java index b82e62595c..306c01d6e0 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java @@ -31,11 +31,12 @@ import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Dataset; -import com.google.api.services.bigquery.model.ErrorProto; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationQuery; import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatistics; +import com.google.api.services.bigquery.model.JobStatistics2; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableCell; @@ -44,6 +45,7 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.collect.ImmutableList; import org.junit.After; import org.junit.Before; @@ -126,6 +128,11 @@ private static Table tableWithBasicSchema() { new TableFieldSchema().setName("answer").setType("INTEGER")))); } + private static Table tableWithLocation() { + return new Table() + .setLocation("EU"); + } + private TableRow rawRow(Object... args) { List cells = new LinkedList<>(); for (Object a : args) { @@ -145,8 +152,11 @@ private TableDataList rawDataList(TableRow... rows) { @Test public void testReadFromQuery() throws IOException, InterruptedException { // Mock job inserting. + Job dryRunJob = new Job().setStatistics( + new JobStatistics().setQuery(new JobStatistics2().setReferencedTables( + ImmutableList.of(new TableReference())))); Job insertedJob = new Job().setJobReference(new JobReference()); - when(mockJobsInsert.execute()).thenReturn(insertedJob); + when(mockJobsInsert.execute()).thenReturn(dryRunJob, insertedJob); // Mock job polling. JobStatus status = new JobStatus().setState("DONE"); @@ -161,7 +171,7 @@ public void testReadFromQuery() throws IOException, InterruptedException { when(mockJobsGet.execute()).thenReturn(getJob); // Mock table schema fetch. - when(mockTablesGet.execute()).thenReturn(tableWithBasicSchema()); + when(mockTablesGet.execute()).thenReturn(tableWithLocation(), tableWithBasicSchema()); // Mock table data fetch. when(mockTabledataList.execute()).thenReturn(rawDataList(rawRow("Arthur", 42))); @@ -189,15 +199,15 @@ public void testReadFromQuery() throws IOException, InterruptedException { verify(mockDatasets).delete(anyString(), anyString()); verify(mockDatasetsDelete).execute(); // Job inserted to run the query, polled once. - verify(mockClient, times(2)).jobs(); - verify(mockJobs).insert(anyString(), any(Job.class)); - verify(mockJobsInsert).execute(); + verify(mockClient, times(3)).jobs(); + verify(mockJobs, times(2)).insert(anyString(), any(Job.class)); + verify(mockJobsInsert, times(2)).execute(); verify(mockJobs).get(anyString(), anyString()); verify(mockJobsGet).execute(); // Temp table get after query finish, deleted after reading. - verify(mockClient, times(2)).tables(); - verify(mockTables).get("project", "dataset", "table"); - verify(mockTablesGet).execute(); + verify(mockClient, times(3)).tables(); + verify(mockTables, times(2)).get(anyString(), anyString(), anyString()); + verify(mockTablesGet, times(2)).execute(); verify(mockTables).delete(anyString(), anyString(), anyString()); verify(mockTablesDelete).execute(); // Table data read. @@ -213,43 +223,29 @@ public void testReadFromQuery() throws IOException, InterruptedException { */ @Test public void testQueryFailed() throws IOException { - // Job can be created. - JobReference ref = new JobReference(); - Job insertedJob = new Job().setJobReference(ref); - when(mockJobsInsert.execute()).thenReturn(insertedJob); - // Job state polled with an error. String errorReason = "bad query"; - JobStatus status = - new JobStatus().setState("DONE").setErrorResult(new ErrorProto().setMessage(errorReason)); - Job getJob = new Job().setJobReference(ref).setStatus(status); - when(mockJobsGet.execute()).thenReturn(getJob); + Exception exception = new IOException(errorReason); + when(mockJobsInsert.execute()).thenThrow(exception, exception, exception, exception); String query = "NOT A QUERY"; try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) { + try { iterator.open(); fail(); } catch (Exception expected) { // Verify message explains cause and reports the query. - assertThat(expected.getMessage(), containsString("failed")); + assertThat(expected.getMessage(), containsString("Error")); assertThat(expected.getMessage(), containsString(errorReason)); assertThat(expected.getMessage(), containsString(query)); } } - // Temp dataset created and then later deleted. - verify(mockClient, times(2)).datasets(); - verify(mockDatasets).insert(anyString(), any(Dataset.class)); - verify(mockDatasetsInsert).execute(); - verify(mockDatasets).delete(anyString(), anyString()); - verify(mockDatasetsDelete).execute(); // Job inserted to run the query, then polled once. - verify(mockClient, times(2)).jobs(); + verify(mockClient, times(1)).jobs(); verify(mockJobs).insert(anyString(), any(Job.class)); - verify(mockJobsInsert).execute(); - verify(mockJobs).get(anyString(), anyString()); - verify(mockJobsGet).execute(); + verify(mockJobsInsert, times(3)).execute(); } } From 188a241cd8a81b172529676c6a8b16597434c263 Mon Sep 17 00:00:00 2001 From: Pei He Date: Thu, 1 Sep 2016 12:35:56 -0700 Subject: [PATCH 2/4] address feedback --- .../sdk/util/BigQueryTableRowIterator.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java index 08c3d65777..ab7ae93668 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java @@ -447,6 +447,17 @@ private TableReference executeQueryAndWaitForCompletion() } } + /** + * Execute a BQ request with exponential backoff and return the result. + * + * @deprecated use {@link #executeWithBackOff(AbstractGoogleClientRequest, String)}. + */ + @Deprecated + public static T executeWithBackOff(AbstractGoogleClientRequest client, String error, + Object... errorArgs) throws IOException, InterruptedException { + return executeWithBackOff(client, String.format(error, errorArgs)); + } + // Execute a BQ request with exponential backoff and return the result. // client - BQ request to be executed // error - Formatted message to log if when a request fails. Takes exception message as a @@ -463,10 +474,11 @@ public static T executeWithBackOff(AbstractGoogleClientRequest client, St result = client.execute(); break; } catch (IOException e) { - String errorMessage = error + String.format(" Cause: %s", e.getMessage()); - LOG.error(errorMessage); + LOG.error("{}", error, e); if (!BackOffUtils.next(sleeper, backOff)) { - LOG.error(error + String.format(" Failing after retrying %d times.", MAX_RETRIES)); + String errorMessage = String.format( + "%s. Failing to execute job after %d attempts.", error, MAX_RETRIES + 1); + LOG.error(errorMessage); throw new IOException(errorMessage, e); } } From c7fd6452ac6b170e0221047cde487731e7020935 Mon Sep 17 00:00:00 2001 From: Pei He Date: Fri, 2 Sep 2016 13:43:16 -0700 Subject: [PATCH 3/4] address feedback --- .../cloud/dataflow/sdk/util/BigQueryTableRowIterator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java index ab7ae93668..e1b7079347 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIterator.java @@ -477,8 +477,8 @@ public static T executeWithBackOff(AbstractGoogleClientRequest client, St LOG.error("{}", error, e); if (!BackOffUtils.next(sleeper, backOff)) { String errorMessage = String.format( - "%s. Failing to execute job after %d attempts.", error, MAX_RETRIES + 1); - LOG.error(errorMessage); + "%s Failing to execute job after %d attempts.", error, MAX_RETRIES + 1); + LOG.error("{}", errorMessage, e); throw new IOException(errorMessage, e); } } From 14ece6727119da4cbb16f6d716440c789ed50c0b Mon Sep 17 00:00:00 2001 From: Pei He Date: Fri, 2 Sep 2016 15:55:15 -0700 Subject: [PATCH 4/4] fix failed tests --- .../cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java index 306c01d6e0..cecbcb70fb 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableRowIteratorTest.java @@ -238,8 +238,8 @@ public void testQueryFailed() throws IOException { } catch (Exception expected) { // Verify message explains cause and reports the query. assertThat(expected.getMessage(), containsString("Error")); - assertThat(expected.getMessage(), containsString(errorReason)); assertThat(expected.getMessage(), containsString(query)); + assertThat(expected.getCause().getMessage(), containsString(errorReason)); } }