diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java index 74861c771..0f1cae0fc 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/MergeQueries.java @@ -27,6 +27,7 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.TableId; import com.google.common.annotations.VisibleForTesting; +import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; import com.wepay.kafka.connect.bigquery.exception.ExpectedInterruptException; @@ -35,7 +36,6 @@ import com.wepay.kafka.connect.bigquery.write.batch.MergeBatches; import com.wepay.kafka.connect.bigquery.write.row.BigQueryErrorResponses; import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +49,8 @@ import static com.wepay.kafka.connect.bigquery.utils.TableNameUtils.intTable; public class MergeQueries { + + private static final int WAIT_MAX_JITTER = 1000; public static final String INTERMEDIATE_TABLE_KEY_FIELD_NAME = "key"; public static final String INTERMEDIATE_TABLE_VALUE_FIELD_NAME = "value"; public static final String INTERMEDIATE_TABLE_ITERATION_FIELD_NAME = "i"; @@ -62,6 +64,8 @@ public class MergeQueries { private final boolean insertPartitionTime; private final boolean upsertEnabled; private final boolean deleteEnabled; + private final int bigQueryRetry; + private final long bigQueryRetryWait; private final MergeBatches mergeBatches; private final KCBQThreadPoolExecutor executor; private final BigQuery bigQuery; @@ -78,9 +82,11 @@ public MergeQueries(BigQuerySinkTaskConfig config, config.getKafkaKeyFieldName().orElseThrow(() -> new ConnectException("Kafka key field must be configured when upsert/delete is enabled") ), - config.getBoolean(config.BIGQUERY_PARTITION_DECORATOR_CONFIG), - config.getBoolean(config.UPSERT_ENABLED_CONFIG), - config.getBoolean(config.DELETE_ENABLED_CONFIG), + config.getBoolean(BigQuerySinkConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG), + config.getBoolean(BigQuerySinkConfig.UPSERT_ENABLED_CONFIG), + config.getBoolean(BigQuerySinkConfig.DELETE_ENABLED_CONFIG), + config.getInt(BigQuerySinkConfig.BIGQUERY_RETRY_CONFIG), + config.getLong(BigQuerySinkConfig.BIGQUERY_RETRY_WAIT_CONFIG), mergeBatches, executor, bigQuery, @@ -94,6 +100,8 @@ public MergeQueries(BigQuerySinkTaskConfig config, boolean insertPartitionTime, boolean upsertEnabled, boolean deleteEnabled, + int bigQueryRetry, + long bigQueryRetryWait, MergeBatches mergeBatches, KCBQThreadPoolExecutor executor, BigQuery bigQuery, @@ -103,6 +111,8 @@ public MergeQueries(BigQuerySinkTaskConfig config, this.insertPartitionTime = insertPartitionTime; this.upsertEnabled = upsertEnabled; this.deleteEnabled = deleteEnabled; + this.bigQueryRetry = bigQueryRetry; + this.bigQueryRetryWait = bigQueryRetryWait; this.mergeBatches = mergeBatches; this.executor = executor; this.bigQuery = bigQuery; @@ -150,16 +160,21 @@ private void mergeFlush( boolean success = false; while (!success) { try { + if (attempt > 0) { + SleepUtils.waitRandomTime(this.bigQueryRetryWait, WAIT_MAX_JITTER); + } bigQuery.query(QueryJobConfiguration.of(mergeFlushQuery)); success = true; } catch (BigQueryException e) { - if (BigQueryErrorResponses.isCouldNotSerializeAccessError(e)) { + if (attempt >= bigQueryRetry) { + throw new BigQueryConnectException("Failed to merge rows to destination table `" + destinationTable + "` within " + attempt + + " attempts.", e); + } else if (BigQueryErrorResponses.isCouldNotSerializeAccessError(e)) { + logger.warn("Serialization error while merging from {} to {}, attempting retry", intermediateTable, destinationTable); + attempt++; + } else if (BigQueryErrorResponses.isJobInternalError(e)) { + logger.warn("Job internal error while merging from {} to {}, attempting retry", intermediateTable, destinationTable); attempt++; - if (attempt == 30) { - throw new BigQueryConnectException("Failed to merge rows to destination table `" + destinationTable + "` within " + attempt - + " attempts due to BQ write serialization error.", e); - } - SleepUtils.waitRandomTime(10000, 20000); } else { throw e; } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SleepUtils.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SleepUtils.java index 5cc7a55d1..3d5576e76 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SleepUtils.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/utils/SleepUtils.java @@ -4,7 +4,7 @@ public final class SleepUtils { - public static void waitRandomTime(int sleepMs, int jitterMs) throws InterruptedException { - Thread.sleep(sleepMs + ThreadLocalRandom.current().nextInt(jitterMs)); + public static void waitRandomTime(long sleepMs, long jitterMs) throws InterruptedException { + Thread.sleep(sleepMs + ThreadLocalRandom.current().nextLong(jitterMs)); } } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryErrorResponses.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryErrorResponses.java index 9101ff096..982d28dc6 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryErrorResponses.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryErrorResponses.java @@ -44,6 +44,7 @@ public class BigQueryErrorResponses { private static final String BAD_REQUEST_REASON = "badRequest"; private static final String INVALID_REASON = "invalid"; private static final String INVALID_QUERY_REASON = "invalidQuery"; + private static final String JOB_INTERNAL_ERROR = "jobInternalError"; private static final String NOT_FOUND_REASON = "notFound"; private static final String QUOTA_EXCEEDED_REASON = "quotaExceeded"; private static final String RATE_LIMIT_EXCEEDED_REASON = "rateLimitExceeded"; @@ -84,6 +85,11 @@ public static boolean isUnspecifiedBadRequestError(BigQueryException exception) && exception.getReason() == null; } + public static boolean isJobInternalError(BigQueryException exception) { + return BAD_REQUEST_CODE == exception.getCode() + && JOB_INTERNAL_ERROR.equals(exception.getReason()); + } + public static boolean isQuotaExceededError(BigQueryException exception) { return FORBIDDEN_CODE == exception.getCode() // TODO: May be able to use exception.getReason() instead of (indirectly) exception.getError().getReason() diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/MergeQueriesTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/MergeQueriesTest.java index 7a8c2024a..0797a04b6 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/MergeQueriesTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/MergeQueriesTest.java @@ -19,11 +19,8 @@ package com.wepay.kafka.connect.bigquery; -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.Field; -import com.google.cloud.bigquery.LegacySQLTypeName; -import com.google.cloud.bigquery.Schema; -import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.*; +import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; import com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor; import com.wepay.kafka.connect.bigquery.write.batch.MergeBatches; import org.apache.kafka.connect.sink.SinkRecord; @@ -38,9 +35,12 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.concurrent.CountDownLatch; import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyObject; +import static org.mockito.Mockito.*; @RunWith(MockitoJUnitRunner.class) public class MergeQueriesTest { @@ -48,6 +48,8 @@ public class MergeQueriesTest { private static final String KEY = "kafkaKey"; private static final int BATCH_NUMBER = 42; + private static final int BIGQUERY_RETRY = 3; + private static final int BIGQUERY_RETRY_WAIT = 1000; private static final TableId DESTINATION_TABLE = TableId.of("ds1", "t"); private static final TableId INTERMEDIATE_TABLE = TableId.of("ds1", "t_tmp_6_uuid_epoch"); private static final Schema INTERMEDIATE_TABLE_SCHEMA = constructIntermediateTable(); @@ -66,7 +68,7 @@ public void setUp() { private MergeQueries mergeQueries(boolean insertPartitionTime, boolean upsert, boolean delete) { return new MergeQueries( - KEY, insertPartitionTime, upsert, delete, mergeBatches, executor, bigQuery, schemaManager, context + KEY, insertPartitionTime, upsert, delete, BIGQUERY_RETRY, BIGQUERY_RETRY_WAIT, mergeBatches, executor, bigQuery, schemaManager, context ); } @@ -336,6 +338,95 @@ public void testBatchCreation() { assertEquals(1, mergeBatches.incrementBatch(INTERMEDIATE_TABLE)); } + @Test + public void testBigQueryJobInternalErrorRetry() throws InterruptedException { + // Arrange + mergeBatches.addToBatch(TEST_SINK_RECORD, INTERMEDIATE_TABLE, new HashMap<>()); + + TableResult tableResultReponse = mock(TableResult.class); + BigQueryError jobInternalError = new BigQueryError("jobInternalError", null, "The job encountered an internal error during execution and was unable to complete successfully."); + when(bigQuery.query(anyObject())) + .thenThrow(new BigQueryException(400, "mock job internal error", jobInternalError)) + .thenReturn(tableResultReponse); + when(mergeBatches.destinationTableFor(INTERMEDIATE_TABLE)).thenReturn(DESTINATION_TABLE); + when(mergeBatches.incrementBatch(INTERMEDIATE_TABLE)).thenReturn(0); + when(mergeBatches.prepareToFlush(INTERMEDIATE_TABLE, 0)).thenReturn(true); + CountDownLatch latch = new CountDownLatch(1); + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + latch.countDown(); + return null; + }).when(executor).execute(any()); + MergeQueries mergeQueries = spy(mergeQueries(false, true, true)); + + // Act + mergeQueries.mergeFlush(INTERMEDIATE_TABLE); + + // Assert + latch.await(); + verify(bigQuery, times(3)).query(anyObject()); + } + + @Test + public void testBigQueryInvalidQueryErrorRetry() throws InterruptedException { + // Arrange + mergeBatches.addToBatch(TEST_SINK_RECORD, INTERMEDIATE_TABLE, new HashMap<>()); + + + TableResult tableResultReponse = mock(TableResult.class); + BigQueryError jobInternalError = new BigQueryError("invalidQuery", null, "Could not serialize access to table my_table due to concurrent update"); + when(bigQuery.query(anyObject())) + .thenThrow(new BigQueryException(400, "mock invalid query", jobInternalError)) + .thenReturn(tableResultReponse); + when(mergeBatches.destinationTableFor(INTERMEDIATE_TABLE)).thenReturn(DESTINATION_TABLE); + when(mergeBatches.incrementBatch(INTERMEDIATE_TABLE)).thenReturn(0); + when(mergeBatches.prepareToFlush(INTERMEDIATE_TABLE, 0)).thenReturn(true); + CountDownLatch latch = new CountDownLatch(1); + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + latch.countDown(); + return null; + }).when(executor).execute(any()); + MergeQueries mergeQueries = mergeQueries(false, true, true); + + // Act + mergeQueries.mergeFlush(INTERMEDIATE_TABLE); + + // Assert + latch.await(); + verify(bigQuery, times(3)).query(anyObject()); + } + + + @Test(expected = BigQueryConnectException.class) + public void testBigQueryRetryExceeded() throws InterruptedException { + // Arrange + mergeBatches.addToBatch(TEST_SINK_RECORD, INTERMEDIATE_TABLE, new HashMap<>()); + + BigQueryError jobInternalError = new BigQueryError("invalidQuery", null, "Could not serialize access to table my_table due to concurrent update"); + when(bigQuery.query(anyObject())) + .thenThrow(new BigQueryException(400, "mock invalid query", jobInternalError)); + when(mergeBatches.destinationTableFor(INTERMEDIATE_TABLE)).thenReturn(DESTINATION_TABLE); + when(mergeBatches.incrementBatch(INTERMEDIATE_TABLE)).thenReturn(0); + when(mergeBatches.prepareToFlush(INTERMEDIATE_TABLE, 0)).thenReturn(true); + CountDownLatch latch = new CountDownLatch(1); + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + latch.countDown(); + return null; + }).when(executor).execute(any()); + MergeQueries mergeQueries = mergeQueries(false, true, true); + + // Act + mergeQueries.mergeFlush(INTERMEDIATE_TABLE); + + //Assert + latch.await(); + } + private String table(TableId table) { return String.format("`%s`.`%s`", table.getDataset(), table.getTable()); }