Skip to content

Commit

Permalink
Fix #349 retry on jobInternalError
Browse files Browse the repository at this point in the history
  • Loading branch information
SebHeuze committed Oct 31, 2023
1 parent 1c1e808 commit caa684d
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 19 deletions.
Expand Up @@ -27,6 +27,7 @@
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableId;
import com.google.common.annotations.VisibleForTesting;
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
import com.wepay.kafka.connect.bigquery.exception.ExpectedInterruptException;
Expand All @@ -35,7 +36,6 @@
import com.wepay.kafka.connect.bigquery.write.batch.MergeBatches;
import com.wepay.kafka.connect.bigquery.write.row.BigQueryErrorResponses;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,6 +49,8 @@
import static com.wepay.kafka.connect.bigquery.utils.TableNameUtils.intTable;

public class MergeQueries {

private static final int WAIT_MAX_JITTER = 1000;
public static final String INTERMEDIATE_TABLE_KEY_FIELD_NAME = "key";
public static final String INTERMEDIATE_TABLE_VALUE_FIELD_NAME = "value";
public static final String INTERMEDIATE_TABLE_ITERATION_FIELD_NAME = "i";
Expand All @@ -62,6 +64,8 @@ public class MergeQueries {
private final boolean insertPartitionTime;
private final boolean upsertEnabled;
private final boolean deleteEnabled;
private final int bigQueryRetry;
private final long bigQueryRetryWait;
private final MergeBatches mergeBatches;
private final KCBQThreadPoolExecutor executor;
private final BigQuery bigQuery;
Expand All @@ -78,9 +82,11 @@ public MergeQueries(BigQuerySinkTaskConfig config,
config.getKafkaKeyFieldName().orElseThrow(() ->
new ConnectException("Kafka key field must be configured when upsert/delete is enabled")
),
config.getBoolean(config.BIGQUERY_PARTITION_DECORATOR_CONFIG),
config.getBoolean(config.UPSERT_ENABLED_CONFIG),
config.getBoolean(config.DELETE_ENABLED_CONFIG),
config.getBoolean(BigQuerySinkConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG),
config.getBoolean(BigQuerySinkConfig.UPSERT_ENABLED_CONFIG),
config.getBoolean(BigQuerySinkConfig.DELETE_ENABLED_CONFIG),
config.getInt(BigQuerySinkConfig.BIGQUERY_RETRY_CONFIG),
config.getLong(BigQuerySinkConfig.BIGQUERY_RETRY_WAIT_CONFIG),
mergeBatches,
executor,
bigQuery,
Expand All @@ -94,6 +100,8 @@ public MergeQueries(BigQuerySinkTaskConfig config,
boolean insertPartitionTime,
boolean upsertEnabled,
boolean deleteEnabled,
int bigQueryRetry,
long bigQueryRetryWait,
MergeBatches mergeBatches,
KCBQThreadPoolExecutor executor,
BigQuery bigQuery,
Expand All @@ -103,6 +111,8 @@ public MergeQueries(BigQuerySinkTaskConfig config,
this.insertPartitionTime = insertPartitionTime;
this.upsertEnabled = upsertEnabled;
this.deleteEnabled = deleteEnabled;
this.bigQueryRetry = bigQueryRetry;
this.bigQueryRetryWait = bigQueryRetryWait;
this.mergeBatches = mergeBatches;
this.executor = executor;
this.bigQuery = bigQuery;
Expand Down Expand Up @@ -150,16 +160,21 @@ private void mergeFlush(
boolean success = false;
while (!success) {
try {
if (attempt > 0) {
SleepUtils.waitRandomTime(this.bigQueryRetryWait, WAIT_MAX_JITTER);
}
bigQuery.query(QueryJobConfiguration.of(mergeFlushQuery));
success = true;
} catch (BigQueryException e) {
if (BigQueryErrorResponses.isCouldNotSerializeAccessError(e)) {
if (attempt >= bigQueryRetry) {
throw new BigQueryConnectException("Failed to merge rows to destination table `" + destinationTable + "` within " + attempt
+ " attempts.", e);
} else if (BigQueryErrorResponses.isCouldNotSerializeAccessError(e)) {
logger.warn("Serialization error while merging from {} to {}, attempting retry", intermediateTable, destinationTable);
attempt++;
} else if (BigQueryErrorResponses.isJobInternalError(e)) {
logger.warn("Job internal error while merging from {} to {}, attempting retry", intermediateTable, destinationTable);
attempt++;
if (attempt == 30) {
throw new BigQueryConnectException("Failed to merge rows to destination table `" + destinationTable + "` within " + attempt
+ " attempts due to BQ write serialization error.", e);
}
SleepUtils.waitRandomTime(10000, 20000);
} else {
throw e;
}
Expand Down
Expand Up @@ -4,7 +4,7 @@

public final class SleepUtils {

public static void waitRandomTime(int sleepMs, int jitterMs) throws InterruptedException {
Thread.sleep(sleepMs + ThreadLocalRandom.current().nextInt(jitterMs));
public static void waitRandomTime(long sleepMs, long jitterMs) throws InterruptedException {
Thread.sleep(sleepMs + ThreadLocalRandom.current().nextLong(jitterMs));
}
}
Expand Up @@ -44,6 +44,7 @@ public class BigQueryErrorResponses {
private static final String BAD_REQUEST_REASON = "badRequest";
private static final String INVALID_REASON = "invalid";
private static final String INVALID_QUERY_REASON = "invalidQuery";
private static final String JOB_INTERNAL_ERROR = "jobInternalError";
private static final String NOT_FOUND_REASON = "notFound";
private static final String QUOTA_EXCEEDED_REASON = "quotaExceeded";
private static final String RATE_LIMIT_EXCEEDED_REASON = "rateLimitExceeded";
Expand Down Expand Up @@ -84,6 +85,11 @@ public static boolean isUnspecifiedBadRequestError(BigQueryException exception)
&& exception.getReason() == null;
}

public static boolean isJobInternalError(BigQueryException exception) {
return BAD_REQUEST_CODE == exception.getCode()
&& JOB_INTERNAL_ERROR.equals(exception.getReason());
}

public static boolean isQuotaExceededError(BigQueryException exception) {
return FORBIDDEN_CODE == exception.getCode()
// TODO: May be able to use exception.getReason() instead of (indirectly) exception.getError().getReason()
Expand Down
Expand Up @@ -19,11 +19,8 @@

package com.wepay.kafka.connect.bigquery;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.*;
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
import com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor;
import com.wepay.kafka.connect.bigquery.write.batch.MergeBatches;
import org.apache.kafka.connect.sink.SinkRecord;
Expand All @@ -38,16 +35,21 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.Mockito.*;

@RunWith(MockitoJUnitRunner.class)
public class MergeQueriesTest {

private static final String KEY = "kafkaKey";

private static final int BATCH_NUMBER = 42;
private static final int BIGQUERY_RETRY = 3;
private static final int BIGQUERY_RETRY_WAIT = 1000;
private static final TableId DESTINATION_TABLE = TableId.of("ds1", "t");
private static final TableId INTERMEDIATE_TABLE = TableId.of("ds1", "t_tmp_6_uuid_epoch");
private static final Schema INTERMEDIATE_TABLE_SCHEMA = constructIntermediateTable();
Expand All @@ -66,7 +68,7 @@ public void setUp() {

private MergeQueries mergeQueries(boolean insertPartitionTime, boolean upsert, boolean delete) {
return new MergeQueries(
KEY, insertPartitionTime, upsert, delete, mergeBatches, executor, bigQuery, schemaManager, context
KEY, insertPartitionTime, upsert, delete, BIGQUERY_RETRY, BIGQUERY_RETRY_WAIT, mergeBatches, executor, bigQuery, schemaManager, context
);
}

Expand Down Expand Up @@ -336,6 +338,95 @@ public void testBatchCreation() {
assertEquals(1, mergeBatches.incrementBatch(INTERMEDIATE_TABLE));
}

@Test
public void testBigQueryJobInternalErrorRetry() throws InterruptedException {
// Arrange
mergeBatches.addToBatch(TEST_SINK_RECORD, INTERMEDIATE_TABLE, new HashMap<>());

TableResult tableResultReponse = mock(TableResult.class);
BigQueryError jobInternalError = new BigQueryError("jobInternalError", null, "The job encountered an internal error during execution and was unable to complete successfully.");
when(bigQuery.query(anyObject()))
.thenThrow(new BigQueryException(400, "mock job internal error", jobInternalError))
.thenReturn(tableResultReponse);
when(mergeBatches.destinationTableFor(INTERMEDIATE_TABLE)).thenReturn(DESTINATION_TABLE);
when(mergeBatches.incrementBatch(INTERMEDIATE_TABLE)).thenReturn(0);
when(mergeBatches.prepareToFlush(INTERMEDIATE_TABLE, 0)).thenReturn(true);
CountDownLatch latch = new CountDownLatch(1);
doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
latch.countDown();
return null;
}).when(executor).execute(any());
MergeQueries mergeQueries = spy(mergeQueries(false, true, true));

// Act
mergeQueries.mergeFlush(INTERMEDIATE_TABLE);

// Assert
latch.await();
verify(bigQuery, times(3)).query(anyObject());
}

@Test
public void testBigQueryInvalidQueryErrorRetry() throws InterruptedException {
// Arrange
mergeBatches.addToBatch(TEST_SINK_RECORD, INTERMEDIATE_TABLE, new HashMap<>());


TableResult tableResultReponse = mock(TableResult.class);
BigQueryError jobInternalError = new BigQueryError("invalidQuery", null, "Could not serialize access to table my_table due to concurrent update");
when(bigQuery.query(anyObject()))
.thenThrow(new BigQueryException(400, "mock invalid query", jobInternalError))
.thenReturn(tableResultReponse);
when(mergeBatches.destinationTableFor(INTERMEDIATE_TABLE)).thenReturn(DESTINATION_TABLE);
when(mergeBatches.incrementBatch(INTERMEDIATE_TABLE)).thenReturn(0);
when(mergeBatches.prepareToFlush(INTERMEDIATE_TABLE, 0)).thenReturn(true);
CountDownLatch latch = new CountDownLatch(1);
doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
latch.countDown();
return null;
}).when(executor).execute(any());
MergeQueries mergeQueries = mergeQueries(false, true, true);

// Act
mergeQueries.mergeFlush(INTERMEDIATE_TABLE);

// Assert
latch.await();
verify(bigQuery, times(3)).query(anyObject());
}


@Test(expected = BigQueryConnectException.class)
public void testBigQueryRetryExceeded() throws InterruptedException {
// Arrange
mergeBatches.addToBatch(TEST_SINK_RECORD, INTERMEDIATE_TABLE, new HashMap<>());

BigQueryError jobInternalError = new BigQueryError("invalidQuery", null, "Could not serialize access to table my_table due to concurrent update");
when(bigQuery.query(anyObject()))
.thenThrow(new BigQueryException(400, "mock invalid query", jobInternalError));
when(mergeBatches.destinationTableFor(INTERMEDIATE_TABLE)).thenReturn(DESTINATION_TABLE);
when(mergeBatches.incrementBatch(INTERMEDIATE_TABLE)).thenReturn(0);
when(mergeBatches.prepareToFlush(INTERMEDIATE_TABLE, 0)).thenReturn(true);
CountDownLatch latch = new CountDownLatch(1);
doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
latch.countDown();
return null;
}).when(executor).execute(any());
MergeQueries mergeQueries = mergeQueries(false, true, true);

// Act
mergeQueries.mergeFlush(INTERMEDIATE_TABLE);

//Assert
latch.await();
}

private String table(TableId table) {
return String.format("`%s`.`%s`", table.getDataset(), table.getTable());
}
Expand Down

0 comments on commit caa684d

Please sign in to comment.