Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -549,13 +549,13 @@ private static void dryRunQuery(BigQueryOptions options, String query) {
request.setQuery(query);
request.setDryRun(true);

String queryValidationErrorMsg = String.format(QUERY_VALIDATION_FAILURE_ERROR, query);
try {
BigQueryTableRowIterator.executeWithBackOff(
client.jobs().query(options.getProject(), request), QUERY_VALIDATION_FAILURE_ERROR,
query);
client.jobs().query(options.getProject(), request),
queryValidationErrorMsg);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e);
throw new IllegalArgumentException(queryValidationErrorMsg, e);
}
}

Expand Down Expand Up @@ -2314,17 +2314,17 @@ private Write() {}
}

private static void verifyDatasetPresence(BigQueryOptions options, TableReference table) {
String resourceNotFoundMsg =
String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryIO.toTableSpec(table));
try {
Bigquery client = Transport.newBigQueryClient(options).build();
BigQueryTableRowIterator.executeWithBackOff(
client.datasets().get(table.getProjectId(), table.getDatasetId()),
RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryIO.toTableSpec(table));
resourceNotFoundMsg);
} catch (Exception e) {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
throw new IllegalArgumentException(
String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryIO.toTableSpec(table)),
e);
throw new IllegalArgumentException(resourceNotFoundMsg, e);
} else {
throw new RuntimeException(
String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset",
Expand All @@ -2335,16 +2335,17 @@ private static void verifyDatasetPresence(BigQueryOptions options, TableReferenc
}

private static void verifyTablePresence(BigQueryOptions options, TableReference table) {
String resourceNotFoundMsg =
String.format(RESOURCE_NOT_FOUND_ERROR, "table", BigQueryIO.toTableSpec(table));
try {
Bigquery client = Transport.newBigQueryClient(options).build();
BigQueryTableRowIterator.executeWithBackOff(
client.tables().get(table.getProjectId(), table.getDatasetId(), table.getTableId()),
RESOURCE_NOT_FOUND_ERROR, "table", BigQueryIO.toTableSpec(table));
resourceNotFoundMsg);
} catch (Exception e) {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
throw new IllegalArgumentException(
String.format(RESOURCE_NOT_FOUND_ERROR, "table", BigQueryIO.toTableSpec(table)), e);
throw new IllegalArgumentException(resourceNotFoundMsg, e);
} else {
throw new RuntimeException(
String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
import com.google.api.client.util.Data;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.DatasetReference;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableCell;
Expand Down Expand Up @@ -138,16 +138,7 @@ public void open() throws IOException, InterruptedException {
ref = executeQueryAndWaitForCompletion();
}
// Get table schema.
Bigquery.Tables.Get get =
client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());

Table table =
executeWithBackOff(
get,
"Error opening BigQuery table %s of dataset %s : {}",
ref.getTableId(),
ref.getDatasetId());
schema = table.getSchema();
schema = getTable(ref).getSchema();
}

public boolean advance() throws IOException, InterruptedException {
Expand All @@ -172,9 +163,10 @@ public boolean advance() throws IOException, InterruptedException {
TableDataList result =
executeWithBackOff(
list,
"Error reading from BigQuery table %s of dataset %s : {}",
ref.getTableId(),
ref.getDatasetId());
String.format(
"Error reading from BigQuery table %s of dataset %s.",
ref.getTableId(),
ref.getDatasetId()));

pageToken = result.getPageToken();
iteratorOverCurrentBatch =
Expand Down Expand Up @@ -331,36 +323,55 @@ private TableRow getTypedTableRow(List<TableFieldSchema> fields, Map<String, Obj
return row;
}

// Get the BiqQuery table.
private Table getTable(TableReference ref) throws IOException, InterruptedException {
Bigquery.Tables.Get get =
client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());

return executeWithBackOff(
get,
String.format(
"Error opening BigQuery table %s of dataset %s.",
ref.getTableId(),
ref.getDatasetId()));
}

// Create a new BigQuery dataset
private void createDataset(String datasetId) throws IOException, InterruptedException {
private void createDataset(String datasetId, String location)
throws IOException, InterruptedException {
Dataset dataset = new Dataset();
DatasetReference reference = new DatasetReference();
reference.setProjectId(projectId);
reference.setDatasetId(datasetId);
dataset.setDatasetReference(reference);
dataset.setLocation(location);

String createDatasetError =
"Error when trying to create the temporary dataset " + datasetId + " in project "
+ projectId;
executeWithBackOff(
client.datasets().insert(projectId, dataset), createDatasetError + " :{}");
client.datasets().insert(projectId, dataset),
String.format(
"Error when trying to create the temporary dataset %s in project %s",
datasetId, projectId));
}

// Delete the given table that is available in the given dataset.
private void deleteTable(String datasetId, String tableId)
throws IOException, InterruptedException {
executeWithBackOff(
client.tables().delete(projectId, datasetId, tableId),
"Error when trying to delete the temporary table " + datasetId + " in dataset " + datasetId
+ " of project " + projectId + ". Manual deletion may be required. Error message : {}");
String.format(
"Error when trying to delete the temporary table %s in dataset %s of project %s. "
+ "Manual deletion may be required.",
tableId, datasetId, projectId));
}

// Delete the given dataset. This will fail if the given dataset has any tables.
private void deleteDataset(String datasetId) throws IOException, InterruptedException {
executeWithBackOff(
client.datasets().delete(projectId, datasetId),
"Error when trying to delete the temporary dataset " + datasetId + " in project "
+ projectId + ". Manual deletion may be required. Error message : {}");
String.format(
"Error when trying to delete the temporary dataset %s in project %s. "
+ "Manual deletion may be required.",
datasetId, projectId));
}

/**
Expand All @@ -371,13 +382,30 @@ private void deleteDataset(String datasetId) throws IOException, InterruptedExce
*/
private TableReference executeQueryAndWaitForCompletion()
throws IOException, InterruptedException {
// Dry run query to get source table location
Job dryRunJob = new Job()
.setConfiguration(new JobConfiguration()
.setQuery(new JobConfigurationQuery()
.setQuery(query))
.setDryRun(true));
JobStatistics jobStats = executeWithBackOff(
client.jobs().insert(projectId, dryRunJob),
String.format("Error when trying to dry run query %s.", query)).getStatistics();

TableReference sourceTable = jobStats.getQuery()
.getReferencedTables()
.get(0);

Table table = getTable(sourceTable);
String location = table.getLocation();

// Create a temporary dataset to store results.
// Starting dataset name with an "_" so that it is hidden.
Random rnd = new Random(System.currentTimeMillis());
temporaryDatasetId = "_dataflow_temporary_dataset_" + rnd.nextInt(1000000);
temporaryTableId = "dataflow_temporary_table_" + rnd.nextInt(1000000);

createDataset(temporaryDatasetId);
createDataset(temporaryDatasetId, location);
Job job = new Job();
JobConfiguration config = new JobConfiguration();
JobConfigurationQuery queryConfig = new JobConfigurationQuery();
Expand All @@ -393,15 +421,15 @@ private TableReference executeQueryAndWaitForCompletion()
destinationTable.setTableId(temporaryTableId);
queryConfig.setDestinationTable(destinationTable);

Insert insert = client.jobs().insert(projectId, job);
Job queryJob = executeWithBackOff(
insert, "Error when trying to execute the job for query " + query + " :{}");
client.jobs().insert(projectId, job),
String.format("Error when trying to execute the job for query %s.", query));
JobReference jobId = queryJob.getJobReference();

while (true) {
Job pollJob = executeWithBackOff(
client.jobs().get(projectId, jobId.getJobId()),
"Error when trying to get status of the job for query " + query + " :{}");
String.format("Error when trying to get status of the job for query %s.", query));
JobStatus status = pollJob.getStatus();
if (status.getState().equals("DONE")) {
// Job is DONE, but did not necessarily succeed.
Expand All @@ -419,12 +447,23 @@ private TableReference executeQueryAndWaitForCompletion()
}
}

/**
* Execute a BQ request with exponential backoff and return the result.
*
* @deprecated use {@link #executeWithBackOff(AbstractGoogleClientRequest, String)}.
*/
@Deprecated
public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error,
Object... errorArgs) throws IOException, InterruptedException {
return executeWithBackOff(client, String.format(error, errorArgs));
}

// Execute a BQ request with exponential backoff and return the result.
// client - BQ request to be executed
// error - Formatted message to log if when a request fails. Takes exception message as a
// formatter parameter.
public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error,
Object... errorArgs) throws IOException, InterruptedException {
public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks backwards-incompatible. Add a handler for the old method that indirects through String.format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

throws IOException, InterruptedException {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backOff =
new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_TIME.getMillis());
Expand All @@ -435,11 +474,12 @@ public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, St
result = client.execute();
break;
} catch (IOException e) {
LOG.error(String.format(error, errorArgs), e.getMessage());
LOG.error("{}", error, e);
if (!BackOffUtils.next(sleeper, backOff)) {
LOG.error(
String.format(error, errorArgs), "Failing after retrying " + MAX_RETRIES + " times.");
throw e;
String errorMessage = String.format(
"%s Failing to execute job after %d attempts.", error, MAX_RETRIES + 1);
LOG.error("{}", errorMessage, e);
throw new IOException(errorMessage, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@

import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatistics2;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableCell;
Expand All @@ -44,6 +45,7 @@
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.collect.ImmutableList;

import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -126,6 +128,11 @@ private static Table tableWithBasicSchema() {
new TableFieldSchema().setName("answer").setType("INTEGER"))));
}

private static Table tableWithLocation() {
return new Table()
.setLocation("EU");
}

private TableRow rawRow(Object... args) {
List<TableCell> cells = new LinkedList<>();
for (Object a : args) {
Expand All @@ -145,8 +152,11 @@ private TableDataList rawDataList(TableRow... rows) {
@Test
public void testReadFromQuery() throws IOException, InterruptedException {
// Mock job inserting.
Job dryRunJob = new Job().setStatistics(
new JobStatistics().setQuery(new JobStatistics2().setReferencedTables(
ImmutableList.of(new TableReference()))));
Job insertedJob = new Job().setJobReference(new JobReference());
when(mockJobsInsert.execute()).thenReturn(insertedJob);
when(mockJobsInsert.execute()).thenReturn(dryRunJob, insertedJob);

// Mock job polling.
JobStatus status = new JobStatus().setState("DONE");
Expand All @@ -161,7 +171,7 @@ public void testReadFromQuery() throws IOException, InterruptedException {
when(mockJobsGet.execute()).thenReturn(getJob);

// Mock table schema fetch.
when(mockTablesGet.execute()).thenReturn(tableWithBasicSchema());
when(mockTablesGet.execute()).thenReturn(tableWithLocation(), tableWithBasicSchema());

// Mock table data fetch.
when(mockTabledataList.execute()).thenReturn(rawDataList(rawRow("Arthur", 42)));
Expand Down Expand Up @@ -189,15 +199,15 @@ public void testReadFromQuery() throws IOException, InterruptedException {
verify(mockDatasets).delete(anyString(), anyString());
verify(mockDatasetsDelete).execute();
// Job inserted to run the query, polled once.
verify(mockClient, times(2)).jobs();
verify(mockJobs).insert(anyString(), any(Job.class));
verify(mockJobsInsert).execute();
verify(mockClient, times(3)).jobs();
verify(mockJobs, times(2)).insert(anyString(), any(Job.class));
verify(mockJobsInsert, times(2)).execute();
verify(mockJobs).get(anyString(), anyString());
verify(mockJobsGet).execute();
// Temp table get after query finish, deleted after reading.
verify(mockClient, times(2)).tables();
verify(mockTables).get("project", "dataset", "table");
verify(mockTablesGet).execute();
verify(mockClient, times(3)).tables();
verify(mockTables, times(2)).get(anyString(), anyString(), anyString());
verify(mockTablesGet, times(2)).execute();
verify(mockTables).delete(anyString(), anyString(), anyString());
verify(mockTablesDelete).execute();
// Table data read.
Expand All @@ -213,43 +223,29 @@ public void testReadFromQuery() throws IOException, InterruptedException {
*/
@Test
public void testQueryFailed() throws IOException {
// Job can be created.
JobReference ref = new JobReference();
Job insertedJob = new Job().setJobReference(ref);
when(mockJobsInsert.execute()).thenReturn(insertedJob);

// Job state polled with an error.
String errorReason = "bad query";
JobStatus status =
new JobStatus().setState("DONE").setErrorResult(new ErrorProto().setMessage(errorReason));
Job getJob = new Job().setJobReference(ref).setStatus(status);
when(mockJobsGet.execute()).thenReturn(getJob);
Exception exception = new IOException(errorReason);
when(mockJobsInsert.execute()).thenThrow(exception, exception, exception, exception);

String query = "NOT A QUERY";
try (BigQueryTableRowIterator iterator =
BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) {

try {
iterator.open();
fail();
} catch (Exception expected) {
// Verify message explains cause and reports the query.
assertThat(expected.getMessage(), containsString("failed"));
assertThat(expected.getMessage(), containsString(errorReason));
assertThat(expected.getMessage(), containsString("Error"));
assertThat(expected.getMessage(), containsString(query));
assertThat(expected.getCause().getMessage(), containsString(errorReason));
}
}

// Temp dataset created and then later deleted.
verify(mockClient, times(2)).datasets();
verify(mockDatasets).insert(anyString(), any(Dataset.class));
verify(mockDatasetsInsert).execute();
verify(mockDatasets).delete(anyString(), anyString());
verify(mockDatasetsDelete).execute();
// Job inserted to run the query, then polled once.
verify(mockClient, times(2)).jobs();
verify(mockClient, times(1)).jobs();
verify(mockJobs).insert(anyString(), any(Job.class));
verify(mockJobsInsert).execute();
verify(mockJobs).get(anyString(), anyString());
verify(mockJobsGet).execute();
verify(mockJobsInsert, times(3)).execute();
}
}