Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -543,10 +543,9 @@ public void deleteDataset(String projectId, String datasetId)
backoff);
}

@Override
public long insertAll(
TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
throws IOException, InterruptedException {
@VisibleForTesting
long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList,
BackOff backoff, final Sleeper sleeper) throws IOException, InterruptedException {
checkNotNull(ref, "ref");
if (executor == null) {
this.executor = options.as(GcsOptions.class).getExecutorService();
Expand All @@ -556,8 +555,6 @@ public long insertAll(
+ "as many elements as rowList");
}

BackOff backoff = INSERT_BACKOFF_FACTORY.backoff();

long retTotalDataSize = 0;
List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
// These lists contain the rows to publish. Initially the contain the entire list.
Expand Down Expand Up @@ -607,7 +604,7 @@ public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
if (new ApiErrorExtractor().rateLimited(e)) {
LOG.info("BigQuery insertAll exceeded rate limit, retrying");
try {
Thread.sleep(backoff.nextBackOffMillis());
sleeper.sleep(backoff.nextBackOffMillis());
} catch (InterruptedException interrupted) {
throw new IOException(
"Interrupted while waiting before retrying insertAll");
Expand Down Expand Up @@ -662,23 +659,31 @@ public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
break;
}
try {
Thread.sleep(backoff.nextBackOffMillis());
sleeper.sleep(nextBackoffMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(
"Interrupted while waiting before retrying insert of " + retryRows);
}
LOG.info("Retrying failed inserts to BigQuery");
rowsToPublish = retryRows;
idsToPublish = retryIds;
allErrors.clear();
LOG.info("Retrying {} failed inserts to BigQuery", rowsToPublish.size());
}
if (!allErrors.isEmpty()) {
throw new IOException("Insert failed: " + allErrors);
} else {
return retTotalDataSize;
}
}

@Override
public long insertAll(
TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
throws IOException, InterruptedException {
return insertAll(
ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
}
}

private static class BigQueryJsonReaderImpl implements BigQueryJsonReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -33,6 +36,7 @@
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.client.testing.http.MockHttpTransport;
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
import com.google.api.client.testing.util.MockSleeper;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
Expand All @@ -42,6 +46,7 @@
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
Expand All @@ -67,6 +72,8 @@
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/**
* Tests for {@link BigQueryServicesImpl}.
Expand Down Expand Up @@ -345,13 +352,95 @@ public void testInsertRetry() throws Exception {

DatasetServiceImpl dataService =
new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
dataService.insertAll(ref, rows, null);
dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper());
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
verify(response, times(2)).getContentType();
expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, retrying");
}

/**
* Tests that {@link DatasetServiceImpl#insertAll} retries selected rows on failure.
*/
@Test
public void testInsertRetrySelectRows() throws Exception {
TableReference ref =
new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
List<TableRow> rows = ImmutableList.of(
new TableRow().set("row", "a"), new TableRow().set("row", "b"));
List<String> insertIds = ImmutableList.of("a", "b");

final TableDataInsertAllResponse bFailed = new TableDataInsertAllResponse()
.setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(1L)));

final TableDataInsertAllResponse allRowsSucceeded = new TableDataInsertAllResponse();

when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(200).thenReturn(200);
when(response.getContent())
.thenReturn(toStream(bFailed)).thenReturn(toStream(allRowsSucceeded));

DatasetServiceImpl dataService =
new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
dataService.insertAll(ref, rows, insertIds, TEST_BACKOFF.backoff(), new MockSleeper());
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
verify(response, times(2)).getContentType();
expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you are verifying this through log message. I was trying to verify this through a mock of the BQ client req/response and that was not as easy.

}

// A BackOff that makes a total of 4 attempts
private static final FluentBackoff TEST_BACKOFF = FluentBackoff.DEFAULT.withMaxRetries(3);

/**
* Tests that {@link DatasetServiceImpl#insertAll} fails gracefully when persistent issues.
*/
@Test
public void testInsertFailsGracefully() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the test that catches the issue with reusing the backoff multiple times instead of using the cached nextBackoffMillis.

TableReference ref =
new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
List<TableRow> rows = ImmutableList.of(new TableRow(), new TableRow());

final TableDataInsertAllResponse row1Failed = new TableDataInsertAllResponse()
.setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(1L)));

final TableDataInsertAllResponse row0Failed = new TableDataInsertAllResponse()
.setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(0L)));

when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
// Always return 200.
when(response.getStatusCode()).thenReturn(200);
// Return row 1 failing, then we retry row 1 as row 0, and row 0 persistently fails.
when(response.getContent())
.thenReturn(toStream(row1Failed))
.thenAnswer(new Answer<InputStream>() {
@Override
public InputStream answer(InvocationOnMock invocation) throws Throwable {
return toStream(row0Failed);
}
});

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extraneous new line


DatasetServiceImpl dataService =
new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());

// Expect it to fail.
try {
dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper());
fail();
} catch (IOException e) {
assertThat(e, instanceOf(IOException.class));
assertThat(e.getMessage(), containsString("Insert failed:"));
assertThat(e.getMessage(), containsString("[{\"index\":0}]"));
}

// Verify the exact number of retries as well as log messages.
verify(response, times(4)).getStatusCode();
verify(response, times(4)).getContent();
verify(response, times(4)).getContentType();
expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
}

/**
* Tests that {@link DatasetServiceImpl#insertAll} does not retry non-rate-limited attempts.
*/
Expand All @@ -377,7 +466,7 @@ public void testInsertDoesNotRetry() throws Throwable {
new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());

try {
dataService.insertAll(ref, rows, null);
dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper());
fail();
} catch (RuntimeException e) {
verify(response, times(1)).getStatusCode();
Expand Down