Skip to content

Commit

Permalink
Merge branch 'master' into failed-serialize-concurrent-merge-update
Browse files Browse the repository at this point in the history
  • Loading branch information
FreCap committed Mar 20, 2023
2 parents 18a46dd + 578d189 commit 9feb0f1
Show file tree
Hide file tree
Showing 28 changed files with 1,045 additions and 125 deletions.
20 changes: 20 additions & 0 deletions README.md
Expand Up @@ -163,6 +163,26 @@ The `$KCBQ_TEST_FOLDER` variable can be supplied to specify which subfolder of t
be used when testing the GCS batch loading feature; if not supplied, the top-level folder will be
used.

### Adding new GCP Credentials & BigQuery DataSet
This section is optional in case one wants to use a different GCP project and generate new creds for that
- **Create a GCP Service Account:** Follow instructions from https://cloud.google.com/iam/docs/creating-managing-service-accounts e.g.
```
gcloud iam service-accounts create kcbq-test --description="service account key for bigquery sink integration test" --display-name="kcbq-test"
```
- **Create Service Account Keys:** Follow instructions from https://cloud.google.com/iam/docs/creating-managing-service-account-keys e.g.
```
gcloud iam service-accounts keys create /tmp/creds.json --iam-account=kcbq-test@<GCP_PROJECT_NAME>.iam.gserviceaccount.com
```
- **Give BigQuery & Storage Admin Permissions to Service Account:**
- Open https://console.cloud.google.com/iam-admin/iam?project=<GCP_PROJECT_NAME>
- Click on Add and enter New Principal as created above e.g. `kcbq-test@<GCP_PROJECT_NAME>.iam.gserviceaccount.com`
- Add following 2 roles from "Select a role" drop down menu:
- BigQuery -> BigQuery Admin
- Cloud Storage -> Storage Admin
- **Add a BigQuery DataSet into the Project:**
- Open https://console.cloud.google.com/bigquery?project=<GCP_PROJECT_NAME>
- Click on the 3 vertical dots against the project name and click on "Create dataset" and follow the steps there.

### Running the Integration Tests

```bash
Expand Down
2 changes: 1 addition & 1 deletion kcbq-api/pom.xml
Expand Up @@ -25,7 +25,7 @@
<parent>
<groupId>com.wepay.kcbq</groupId>
<artifactId>kcbq-parent</artifactId>
<version>2.4.0-SNAPSHOT</version>
<version>2.6.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

Expand Down
6 changes: 3 additions & 3 deletions kcbq-connector/pom.xml
Expand Up @@ -25,7 +25,7 @@
<parent>
<groupId>com.wepay.kcbq</groupId>
<artifactId>kcbq-parent</artifactId>
<version>2.4.0-SNAPSHOT</version>
<version>2.6.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

Expand Down Expand Up @@ -200,10 +200,10 @@
</tags>

<requirements>
<requirement>Apache Kafka 0.11 or higher / Confluent Platform 3.3 or higher</requirement>
<requirement>Apache Kafka 2.6 or higher / Confluent Platform 6.0 or higher</requirement>
<requirement>Java 1.8 or higher</requirement>
<requirement>Active Google Cloud Platform (GCP) account with authorization to create resources</requirement>
<requirement>Kafka Connect 0.11 or higher / Confluent Platform 3.3 or higher</requirement>
<requirement>Kafka Connect 2.6 or higher / Confluent Platform 6.0 or higher</requirement>
</requirements>
</configuration>
</execution>
Expand Down
Expand Up @@ -35,6 +35,8 @@
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
import com.wepay.kafka.connect.bigquery.convert.SchemaConverter;
import com.wepay.kafka.connect.bigquery.exception.ConversionConnectException;
import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException;
import com.wepay.kafka.connect.bigquery.utils.FieldNameSanitizer;
import com.wepay.kafka.connect.bigquery.utils.PartitionedTableId;
import com.wepay.kafka.connect.bigquery.utils.SinkRecordConverter;
Expand All @@ -51,19 +53,20 @@
import com.wepay.kafka.connect.bigquery.write.row.GCSToBQWriter;
import com.wepay.kafka.connect.bigquery.write.row.SimpleBigQueryWriter;
import com.wepay.kafka.connect.bigquery.write.row.UpsertDeleteBigQueryWriter;
import java.io.IOException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -115,6 +118,11 @@ public class BigQuerySinkTask extends SinkTask {
private ScheduledExecutorService loadExecutor;

private Map<TableId, Table> cache;
private Map<String, String> topic2TableMap;
private int remainingRetries;
private boolean enableRetries;

private ErrantRecordHandler errantRecordHandler;

/**
* Create a new BigquerySinkTask.
Expand Down Expand Up @@ -183,25 +191,38 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, Offs
private PartitionedTableId getRecordTable(SinkRecord record) {
String tableName;
String dataset = config.getString(BigQuerySinkConfig.DEFAULT_DATASET_CONFIG);
String[] smtReplacement = record.topic().split(":");

if (smtReplacement.length == 2) {
dataset = smtReplacement[0];
tableName = smtReplacement[1];
} else if (smtReplacement.length == 1) {
tableName = smtReplacement[0];
if (topic2TableMap != null) {
tableName = topic2TableMap.getOrDefault(record.topic(), record.topic());
} else {
throw new ConnectException(String.format(
"Incorrect regex replacement format in topic name '%s'. "
+ "SMT replacement should either produce the <dataset>:<tableName> format "
+ "or just the <tableName> format.",
record.topic()
));
}
String[] smtReplacement = record.topic().split(":");

if (sanitize) {
tableName = FieldNameSanitizer.sanitizeName(tableName);
if (smtReplacement.length == 2) {
dataset = smtReplacement[0];
tableName = smtReplacement[1];
} else if (smtReplacement.length == 1) {
tableName = smtReplacement[0];
} else {
throw new ConnectException(String.format(
"Incorrect regex replacement format in topic name '%s'. "
+ "SMT replacement should either produce the <dataset>:<tableName> format "
+ "or just the <tableName> format.",
record.topic()
));
}
if (sanitize) {
tableName = FieldNameSanitizer.sanitizeName(tableName);
}
}

// TODO: Order of execution of topic/table name modifications =>
// regex router SMT modifies topic name in sinkrecord.
// It could be either : separated or not.

// should we use topic2table map with sanitize table name? doesn't make sense.

// we use table name from above to sanitize table name further.


TableId baseTableId = TableId.of(dataset, tableName);
if (upsertDelete) {
TableId intermediateTableId = mergeBatches.intermediateTableFor(baseTableId);
Expand Down Expand Up @@ -233,9 +254,7 @@ private PartitionedTableId getRecordTable(SinkRecord record) {

return builder.build();
}

@Override
public void put(Collection<SinkRecord> records) {
public void writeSinkRecords(Collection<SinkRecord> records) {
// Periodically poll for errors here instead of doing a stop-the-world check in flush()
executor.maybeThrowEncounteredError();

Expand All @@ -251,7 +270,8 @@ public void put(Collection<SinkRecord> records) {
TableWriterBuilder tableWriterBuilder;
if (config.getList(BigQuerySinkConfig.ENABLE_BATCH_CONFIG).contains(record.topic())) {
String topic = record.topic();
String gcsBlobName = topic + "_" + uuid + "_" + Instant.now().toEpochMilli();
long offset = record.kafkaOffset();
String gcsBlobName = topic + "_" + uuid + "_" + Instant.now().toEpochMilli() + "_" + offset;
String gcsFolderName = config.getString(BigQuerySinkConfig.GCS_FOLDER_NAME_CONFIG);
if (gcsFolderName != null && !"".equals(gcsFolderName)) {
gcsBlobName = gcsFolderName + "/" + gcsBlobName;
Expand All @@ -273,7 +293,16 @@ public void put(Collection<SinkRecord> records) {
}
tableWriterBuilders.put(table, tableWriterBuilder);
}
tableWriterBuilders.get(table).addRow(record, table.getBaseTableId());
try {
tableWriterBuilders.get(table).addRow(record, table.getBaseTableId());
} catch (ConversionConnectException ex) {
// Send records to DLQ in case of ConversionConnectException
if (errantRecordHandler.getErrantRecordReporter() != null) {
errantRecordHandler.sendRecordsToDLQ(Collections.singleton(record), ex);
} else {
throw ex;
}
}
}
}

Expand All @@ -286,6 +315,25 @@ public void put(Collection<SinkRecord> records) {
checkQueueSize();
}

@Override
public void put(Collection<SinkRecord> records) {
try {
writeSinkRecords(records);
remainingRetries = config.getInt(BigQuerySinkConfig.MAX_RETRIES_CONFIG);
} catch (RetriableException e) {
if(enableRetries) {
if(remainingRetries <= 0) {
throw new ConnectException(e);
} else {
logger.warn("Write of records failed, remainingRetries={}", remainingRetries);
remainingRetries--;
throw e;
}
} else {
throw e;
}
}
}
// Important: this method is only safe to call during put(), flush(), or preCommit(); otherwise,
// a ConcurrentModificationException may be triggered if the Connect framework is in the middle of
// a method invocation on the consumer for this task. This becomes especially likely if all topics
Expand Down Expand Up @@ -346,7 +394,14 @@ private Table retrieveTable(TableId tableId) {
try {
return getBigQuery().getTable(tableId);
} catch (BigQueryException e) {
if (BigQueryErrorResponses.isIOError(e)) {
/* 1. Authentication error thrown by bigquery is a type of IOException
and the error code is 0. That's why we create a separate
check function for Authentication error otherwise this falls under IOError check */

/* 2. For Authentication, we don't need Retry logic. Instead, we throw Bigquery exception directly. */
if (BigQueryErrorResponses.isAuthenticationError(e)) {
throw new BigQueryConnectException("Failed to authenticate client for table " + tableId + " with error " + e, e);
} else if (BigQueryErrorResponses.isIOError(e)) {
throw new RetriableException("Failed to retrieve information for table " + tableId, e);
} else {
throw e;
Expand Down Expand Up @@ -388,7 +443,7 @@ private SchemaManager newSchemaManager() {
timestampPartitionFieldName, partitionExpiration, clusteringFieldName, timePartitioningType);
}

private BigQueryWriter getBigQueryWriter() {
private BigQueryWriter getBigQueryWriter(ErrantRecordHandler errantRecordHandler) {
boolean autoCreateTables = config.getBoolean(BigQuerySinkConfig.TABLE_CREATE_CONFIG);
boolean allowNewBigQueryFields = config.getBoolean(BigQuerySinkConfig.ALLOW_NEW_BIGQUERY_FIELDS_CONFIG);
boolean allowRequiredFieldRelaxation = config.getBoolean(BigQuerySinkConfig.ALLOW_BIGQUERY_REQUIRED_FIELD_RELAXATION_CONFIG);
Expand All @@ -401,15 +456,17 @@ private BigQueryWriter getBigQueryWriter() {
retry,
retryWait,
autoCreateTables,
mergeBatches.intermediateToDestinationTables());
mergeBatches.intermediateToDestinationTables(),
errantRecordHandler);
} else if (autoCreateTables || allowNewBigQueryFields || allowRequiredFieldRelaxation) {
return new AdaptiveBigQueryWriter(bigQuery,
getSchemaManager(),
retry,
retryWait,
autoCreateTables);
autoCreateTables,
errantRecordHandler);
} else {
return new SimpleBigQueryWriter(bigQuery, retry, retryWait);
return new SimpleBigQueryWriter(bigQuery, retry, retryWait, errantRecordHandler);
}
}

Expand Down Expand Up @@ -462,6 +519,17 @@ public void start(Map<String, String> properties) {
bigQuery = new AtomicReference<>();
schemaManager = new AtomicReference<>();

// Initialise errantRecordReporter
ErrantRecordReporter errantRecordReporter = null;
try {
errantRecordReporter = context.errantRecordReporter(); // may be null if DLQ not enabled
} catch (NoClassDefFoundError | NullPointerException e) {
// Will occur in Connect runtimes earlier than 2.6
logger.warn("Connect versions prior to Apache Kafka 2.6 do not support the errant record "
+ "reporter");
}
errantRecordHandler = new ErrantRecordHandler(errantRecordReporter);

if (upsertDelete) {
String intermediateTableSuffix = String.format("_%s_%d_%s_%d",
config.getString(BigQuerySinkConfig.INTERMEDIATE_TABLE_SUFFIX_CONFIG),
Expand All @@ -473,7 +541,7 @@ public void start(Map<String, String> properties) {
}

cache = getCache();
bigQueryWriter = getBigQueryWriter();
bigQueryWriter = getBigQueryWriter(errantRecordHandler);
gcsToBQWriter = getGcsWriter();
executor = new KCBQThreadPoolExecutor(config, new LinkedBlockingQueue<>());
topicPartitionManager = new TopicPartitionManager();
Expand All @@ -492,6 +560,9 @@ public void start(Map<String, String> properties) {
}

recordConverter = getConverter(config);
topic2TableMap = config.getTopic2TableMap().orElse(null);
remainingRetries = config.getInt(BigQuerySinkConfig.MAX_RETRIES_CONFIG);
enableRetries = config.getBoolean(BigQuerySinkConfig.ENABLE_RETRIES_CONFIG);
}

private void startGCSToBQLoadTask() {
Expand Down
@@ -0,0 +1,59 @@
package com.wepay.kafka.connect.bigquery;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.Set;

public class ErrantRecordHandler {
private static final Logger logger = LoggerFactory.getLogger(ErrantRecordHandler.class);
private final ErrantRecordReporter errantRecordReporter;

private static final List<String> allowedBigQueryErrorReason = Arrays.asList("invalid");

public ErrantRecordHandler(ErrantRecordReporter errantRecordReporter) {
this.errantRecordReporter = errantRecordReporter;
}

public void sendRecordsToDLQ(Set<SinkRecord> rows, Exception e) {
if(errantRecordReporter != null) {
logger.debug("Sending {} records to DLQ", rows.size());
for (SinkRecord r : rows) {
// Reporting records in async mode
errantRecordReporter.report(r, e);
}
} else {
logger.warn("Cannot send Records to DLQ as ErrantRecordReporter is null");
}
}

public ErrantRecordReporter getErrantRecordReporter() {
return errantRecordReporter;
}

public List<String> getAllowedBigQueryErrorReason() {
return allowedBigQueryErrorReason;
}

public boolean isErrorReasonAllowed(List<BigQueryError> bqErrorList) {
for (BigQueryError bqError: bqErrorList) {
boolean errorMatch = false;
String bqErrorReason = bqError.getReason();
for (String allowedBqErrorReason: allowedBigQueryErrorReason) {
if (bqErrorReason.equalsIgnoreCase(allowedBqErrorReason)) {
errorMatch = true;
break;
}
}
if(!errorMatch)
return false;
}
return true;
}
}

0 comments on commit 9feb0f1

Please sign in to comment.