Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #349 retry on jobInternalError #363

Merged
merged 1 commit into from Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -27,6 +27,7 @@
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableId;
import com.google.common.annotations.VisibleForTesting;
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
import com.wepay.kafka.connect.bigquery.exception.ExpectedInterruptException;
Expand All @@ -35,7 +36,6 @@
import com.wepay.kafka.connect.bigquery.write.batch.MergeBatches;
import com.wepay.kafka.connect.bigquery.write.row.BigQueryErrorResponses;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,6 +49,8 @@
import static com.wepay.kafka.connect.bigquery.utils.TableNameUtils.intTable;

public class MergeQueries {

private static final int WAIT_MAX_JITTER = 1000;
public static final String INTERMEDIATE_TABLE_KEY_FIELD_NAME = "key";
public static final String INTERMEDIATE_TABLE_VALUE_FIELD_NAME = "value";
public static final String INTERMEDIATE_TABLE_ITERATION_FIELD_NAME = "i";
Expand All @@ -62,6 +64,8 @@ public class MergeQueries {
private final boolean insertPartitionTime;
private final boolean upsertEnabled;
private final boolean deleteEnabled;
private final int bigQueryRetry;
private final long bigQueryRetryWait;
private final MergeBatches mergeBatches;
private final KCBQThreadPoolExecutor executor;
private final BigQuery bigQuery;
Expand All @@ -78,9 +82,11 @@ public MergeQueries(BigQuerySinkTaskConfig config,
config.getKafkaKeyFieldName().orElseThrow(() ->
new ConnectException("Kafka key field must be configured when upsert/delete is enabled")
),
config.getBoolean(config.BIGQUERY_PARTITION_DECORATOR_CONFIG),
config.getBoolean(config.UPSERT_ENABLED_CONFIG),
config.getBoolean(config.DELETE_ENABLED_CONFIG),
config.getBoolean(BigQuerySinkConfig.BIGQUERY_PARTITION_DECORATOR_CONFIG),
config.getBoolean(BigQuerySinkConfig.UPSERT_ENABLED_CONFIG),
config.getBoolean(BigQuerySinkConfig.DELETE_ENABLED_CONFIG),
config.getInt(BigQuerySinkConfig.BIGQUERY_RETRY_CONFIG),
config.getLong(BigQuerySinkConfig.BIGQUERY_RETRY_WAIT_CONFIG),
mergeBatches,
executor,
bigQuery,
Expand All @@ -94,6 +100,8 @@ public MergeQueries(BigQuerySinkTaskConfig config,
boolean insertPartitionTime,
boolean upsertEnabled,
boolean deleteEnabled,
int bigQueryRetry,
long bigQueryRetryWait,
MergeBatches mergeBatches,
KCBQThreadPoolExecutor executor,
BigQuery bigQuery,
Expand All @@ -103,6 +111,8 @@ public MergeQueries(BigQuerySinkTaskConfig config,
this.insertPartitionTime = insertPartitionTime;
this.upsertEnabled = upsertEnabled;
this.deleteEnabled = deleteEnabled;
this.bigQueryRetry = bigQueryRetry;
this.bigQueryRetryWait = bigQueryRetryWait;
this.mergeBatches = mergeBatches;
this.executor = executor;
this.bigQuery = bigQuery;
Expand Down Expand Up @@ -150,16 +160,19 @@ private void mergeFlush(
boolean success = false;
while (!success) {
try {
if (attempt > 0) {
SleepUtils.waitRandomTime(this.bigQueryRetryWait, WAIT_MAX_JITTER);
}
bigQuery.query(QueryJobConfiguration.of(mergeFlushQuery));
success = true;
} catch (BigQueryException e) {
if (BigQueryErrorResponses.isCouldNotSerializeAccessError(e)) {
attempt++;
if (attempt == 30) {
throw new BigQueryConnectException("Failed to merge rows to destination table `" + destinationTable + "` within " + attempt
+ " attempts due to BQ write serialization error.", e);
}
SleepUtils.waitRandomTime(10000, 20000);
if (attempt >= bigQueryRetry) {
throw new BigQueryConnectException("Failed to merge rows to destination table `" + destinationTable + "` within " + attempt
+ " attempts.", e);
} else if (BigQueryErrorResponses.isCouldNotSerializeAccessError(e)) {
logger.warn("Serialize access error while merging from {} to {}, retry attempt {}", intermediateTable, destinationTable, ++attempt);
} else if (BigQueryErrorResponses.isJobInternalError(e)) {
logger.warn("Job internal error while merging from {} to {}, retry attempt {}", intermediateTable, destinationTable, ++attempt);
} else {
throw e;
}
Expand Down
Expand Up @@ -4,7 +4,7 @@

public final class SleepUtils {

public static void waitRandomTime(int sleepMs, int jitterMs) throws InterruptedException {
Thread.sleep(sleepMs + ThreadLocalRandom.current().nextInt(jitterMs));
public static void waitRandomTime(long sleepMs, long jitterMs) throws InterruptedException {
Thread.sleep(sleepMs + ThreadLocalRandom.current().nextLong(jitterMs));
}
}
Expand Up @@ -44,6 +44,7 @@ public class BigQueryErrorResponses {
private static final String BAD_REQUEST_REASON = "badRequest";
private static final String INVALID_REASON = "invalid";
private static final String INVALID_QUERY_REASON = "invalidQuery";
private static final String JOB_INTERNAL_ERROR = "jobInternalError";
private static final String NOT_FOUND_REASON = "notFound";
private static final String QUOTA_EXCEEDED_REASON = "quotaExceeded";
private static final String RATE_LIMIT_EXCEEDED_REASON = "rateLimitExceeded";
Expand Down Expand Up @@ -84,6 +85,11 @@ public static boolean isUnspecifiedBadRequestError(BigQueryException exception)
&& exception.getReason() == null;
}

public static boolean isJobInternalError(BigQueryException exception) {
return BAD_REQUEST_CODE == exception.getCode()
&& JOB_INTERNAL_ERROR.equals(exception.getReason());
}

public static boolean isQuotaExceededError(BigQueryException exception) {
return FORBIDDEN_CODE == exception.getCode()
// TODO: May be able to use exception.getReason() instead of (indirectly) exception.getError().getReason()
Expand Down
Expand Up @@ -20,10 +20,14 @@
package com.wepay.kafka.connect.bigquery;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
import com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor;
import com.wepay.kafka.connect.bigquery.write.batch.MergeBatches;
import org.apache.kafka.connect.sink.SinkRecord;
Expand All @@ -38,8 +42,16 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
Expand All @@ -48,6 +60,8 @@ public class MergeQueriesTest {
private static final String KEY = "kafkaKey";

private static final int BATCH_NUMBER = 42;
private static final int BIGQUERY_RETRY = 3;
private static final int BIGQUERY_RETRY_WAIT = 1000;
private static final TableId DESTINATION_TABLE = TableId.of("ds1", "t");
private static final TableId INTERMEDIATE_TABLE = TableId.of("ds1", "t_tmp_6_uuid_epoch");
private static final Schema INTERMEDIATE_TABLE_SCHEMA = constructIntermediateTable();
Expand All @@ -66,7 +80,7 @@ public void setUp() {

private MergeQueries mergeQueries(boolean insertPartitionTime, boolean upsert, boolean delete) {
return new MergeQueries(
KEY, insertPartitionTime, upsert, delete, mergeBatches, executor, bigQuery, schemaManager, context
KEY, insertPartitionTime, upsert, delete, BIGQUERY_RETRY, BIGQUERY_RETRY_WAIT, mergeBatches, executor, bigQuery, schemaManager, context
);
}

Expand Down Expand Up @@ -336,6 +350,95 @@ public void testBatchCreation() {
assertEquals(1, mergeBatches.incrementBatch(INTERMEDIATE_TABLE));
}

@Test
public void testBigQueryJobInternalErrorRetry() throws InterruptedException {
// Arrange
mergeBatches.addToBatch(TEST_SINK_RECORD, INTERMEDIATE_TABLE, new HashMap<>());

TableResult tableResultReponse = mock(TableResult.class);
BigQueryError jobInternalError = new BigQueryError("jobInternalError", null, "The job encountered an internal error during execution and was unable to complete successfully.");
when(bigQuery.query(anyObject()))
.thenThrow(new BigQueryException(400, "mock job internal error", jobInternalError))
.thenReturn(tableResultReponse);
when(mergeBatches.destinationTableFor(INTERMEDIATE_TABLE)).thenReturn(DESTINATION_TABLE);
when(mergeBatches.incrementBatch(INTERMEDIATE_TABLE)).thenReturn(0);
when(mergeBatches.prepareToFlush(INTERMEDIATE_TABLE, 0)).thenReturn(true);
CountDownLatch latch = new CountDownLatch(1);
doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
latch.countDown();
return null;
}).when(executor).execute(any());
MergeQueries mergeQueries = spy(mergeQueries(false, true, true));

// Act
mergeQueries.mergeFlush(INTERMEDIATE_TABLE);

// Assert
latch.await();
verify(bigQuery, times(3)).query(anyObject());
}

@Test
public void testBigQueryInvalidQueryErrorRetry() throws InterruptedException {
// Arrange
mergeBatches.addToBatch(TEST_SINK_RECORD, INTERMEDIATE_TABLE, new HashMap<>());


TableResult tableResultReponse = mock(TableResult.class);
BigQueryError jobInternalError = new BigQueryError("invalidQuery", null, "Could not serialize access to table my_table due to concurrent update");
when(bigQuery.query(anyObject()))
.thenThrow(new BigQueryException(400, "mock invalid query", jobInternalError))
.thenReturn(tableResultReponse);
when(mergeBatches.destinationTableFor(INTERMEDIATE_TABLE)).thenReturn(DESTINATION_TABLE);
when(mergeBatches.incrementBatch(INTERMEDIATE_TABLE)).thenReturn(0);
when(mergeBatches.prepareToFlush(INTERMEDIATE_TABLE, 0)).thenReturn(true);
CountDownLatch latch = new CountDownLatch(1);
doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
latch.countDown();
return null;
}).when(executor).execute(any());
MergeQueries mergeQueries = mergeQueries(false, true, true);

// Act
mergeQueries.mergeFlush(INTERMEDIATE_TABLE);

// Assert
latch.await();
verify(bigQuery, times(3)).query(anyObject());
}


@Test(expected = BigQueryConnectException.class)
public void testBigQueryRetryExceeded() throws InterruptedException {
// Arrange
mergeBatches.addToBatch(TEST_SINK_RECORD, INTERMEDIATE_TABLE, new HashMap<>());

BigQueryError jobInternalError = new BigQueryError("invalidQuery", null, "Could not serialize access to table my_table due to concurrent update");
when(bigQuery.query(anyObject()))
.thenThrow(new BigQueryException(400, "mock invalid query", jobInternalError));
when(mergeBatches.destinationTableFor(INTERMEDIATE_TABLE)).thenReturn(DESTINATION_TABLE);
when(mergeBatches.incrementBatch(INTERMEDIATE_TABLE)).thenReturn(0);
when(mergeBatches.prepareToFlush(INTERMEDIATE_TABLE, 0)).thenReturn(true);
CountDownLatch latch = new CountDownLatch(1);
doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
latch.countDown();
return null;
}).when(executor).execute(any());
MergeQueries mergeQueries = mergeQueries(false, true, true);

// Act
mergeQueries.mergeFlush(INTERMEDIATE_TABLE);

//Assert
latch.await();
}

private String table(TableId table) {
return String.format("`%s`.`%s`", table.getDataset(), table.getTable());
}
Expand Down