From 6f202b020d95303e42a38efea16db9f8fda4f002 Mon Sep 17 00:00:00 2001 From: Pei He Date: Tue, 10 Jan 2017 11:49:37 -0800 Subject: [PATCH 1/2] [BEAM-1258] BigQueryServicesImpl.getTable() returns null when tables not found. --- .../io/gcp/bigquery/BigQueryServicesImpl.java | 30 ++++++--- .../bigquery/BigQueryServicesImplTest.java | 63 +++++++++++++++++++ 2 files changed, 86 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 4eb8e7b451c49..6c550901d5ab3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -398,14 +398,30 @@ public Table getTable(String projectId, String datasetId, String tableId) BackOff backoff = FluentBackoff.DEFAULT .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); - return executeWithRetries( - client.tables().get(projectId, datasetId, tableId), - String.format( - "Unable to get table: %s, aborting after %d retries.", - tableId, MAX_RPC_RETRIES), - Sleeper.DEFAULT, + return getTable( + new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId), backoff, - DONT_RETRY_NOT_FOUND); + Sleeper.DEFAULT); + } + + @VisibleForTesting + Table getTable(TableReference ref, BackOff backoff, Sleeper sleeper) + throws IOException, InterruptedException { + try { + return executeWithRetries( + client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId()), + String.format( + "Unable to get table: %s, aborting after %d retries.", + ref.getTableId(), MAX_RPC_RETRIES), + sleeper, + backoff, + DONT_RETRY_NOT_FOUND); + } catch (IOException e) { + if (errorExtractor.itemNotFound(e)) { + return null; + } + throw e; + } } /** diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index 10ed8bd12c9d2..9b812e4cb6447 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -319,6 +319,69 @@ public void testGetJobThrows() throws Exception { jobService.getJob(jobRef, Sleeper.DEFAULT, BackOff.STOP_BACKOFF); } + @Test + public void testGetTableSucceeds() throws Exception { + TableReference tableRef = new TableReference() + .setProjectId("projectId") + .setDatasetId("datasetId") + .setTableId("tableId"); + + Table testTable = new Table(); + testTable.setTableReference(tableRef); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testTable)); + + BigQueryServicesImpl.DatasetServiceImpl datasetService = + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); + + Table table = datasetService.getTable(tableRef, BackOff.STOP_BACKOFF, Sleeper.DEFAULT); + + assertEquals(testTable, table); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + @Test + public void testGetTableNotFound() throws IOException, InterruptedException { + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(404); + + BigQueryServicesImpl.DatasetServiceImpl datasetService = + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); + + TableReference tableRef = new TableReference() + .setProjectId("projectId") + .setDatasetId("datasetId") + .setTableId("tableId"); + Table table = datasetService.getTable(tableRef, BackOff.STOP_BACKOFF, Sleeper.DEFAULT); + + assertNull(table); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + @Test + public void testGetTableThrows() throws Exception { + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(401); + + TableReference tableRef = new TableReference() + .setProjectId("projectId") + .setDatasetId("datasetId") + .setTableId("tableId"); + + thrown.expect(IOException.class); + thrown.expectMessage(String.format("Unable to get table: %s", tableRef.getTableId())); + + BigQueryServicesImpl.DatasetServiceImpl datasetService = + new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); + datasetService.getTable(tableRef, BackOff.STOP_BACKOFF, Sleeper.DEFAULT); + } + @Test public void testExecuteWithRetries() throws IOException, InterruptedException { Table testTable = new Table(); From a5384ec6fa15600932496ba803ff919ab7b9fe7e Mon Sep 17 00:00:00 2001 From: Pei He Date: Tue, 10 Jan 2017 12:47:14 -0800 Subject: [PATCH 2/2] fixup! address comments. --- .../sdk/io/gcp/bigquery/BigQueryServices.java | 5 ++++- .../io/gcp/bigquery/BigQueryServicesImpl.java | 2 ++ .../gcp/bigquery/BigQueryServicesImplTest.java | 16 +++++++++------- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 8ca473d79af54..7173996ba3cf1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -114,8 +114,11 @@ JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig) */ interface DatasetService { /** - * Gets the specified {@link Table} resource by table ID or {@code null} if no table exists. + * Gets the specified {@link Table} resource by table ID. + * + *

Returns null if the table is not found. */ + @Nullable Table getTable(String projectId, String datasetId, String tableId) throws InterruptedException, IOException; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 6c550901d5ab3..c4c734464898d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -393,6 +393,7 @@ private DatasetServiceImpl(BigQueryOptions bqOptions) { * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. */ @Override + @Nullable public Table getTable(String projectId, String datasetId, String tableId) throws IOException, InterruptedException { BackOff backoff = @@ -405,6 +406,7 @@ public Table getTable(String projectId, String datasetId, String tableId) } @VisibleForTesting + @Nullable Table getTable(TableReference ref, BackOff backoff, Sleeper sleeper) throws IOException, InterruptedException { try { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index 9b812e4cb6447..bfd1319fa26df 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -330,18 +330,20 @@ public void testGetTableSucceeds() throws Exception { testTable.setTableReference(tableRef); when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); - when(response.getStatusCode()).thenReturn(200); - when(response.getContent()).thenReturn(toStream(testTable)); + when(response.getStatusCode()).thenReturn(403).thenReturn(200); + when(response.getContent()) + .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403))) + .thenReturn(toStream(testTable)); BigQueryServicesImpl.DatasetServiceImpl datasetService = new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); - Table table = datasetService.getTable(tableRef, BackOff.STOP_BACKOFF, Sleeper.DEFAULT); + Table table = datasetService.getTable(tableRef, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT); assertEquals(testTable, table); - verify(response, times(1)).getStatusCode(); - verify(response, times(1)).getContent(); - verify(response, times(1)).getContentType(); + verify(response, times(2)).getStatusCode(); + verify(response, times(2)).getContent(); + verify(response, times(2)).getContentType(); } @Test @@ -356,7 +358,7 @@ public void testGetTableNotFound() throws IOException, InterruptedException { .setProjectId("projectId") .setDatasetId("datasetId") .setTableId("tableId"); - Table table = datasetService.getTable(tableRef, BackOff.STOP_BACKOFF, Sleeper.DEFAULT); + Table table = datasetService.getTable(tableRef, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT); assertNull(table); verify(response, times(1)).getStatusCode();