Skip to content

Commit

Permalink
[source-mysql]: support Planetscale MySQL's per-query row limit (#40561)
Browse files Browse the repository at this point in the history
Fixes airbytehq/oncall#5051

Planet MySQL has defined multiple system limits. One of our customers has been hitting the 100K per-query row limit, which causes our connector to emit system errors. This patch defines a chunk size limit for our connector. By default, it is the largest long, and if we find it is a Planetscale MySQL, we will adjust to 100K.
  • Loading branch information
theyueli committed Jun 27, 2024
1 parent 48aa409 commit 42124f6
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 2 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.4.10
dockerImageTag: 3.4.11
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.mysql.cj.MysqlType;
import io.airbyte.cdk.db.jdbc.AirbyteRecordData;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants;
import io.airbyte.cdk.integrations.source.relationaldb.DbSourceDiscoverUtil;
import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class MySqlInitialLoadHandler implements InitialLoadHandler<MysqlType> {

private static final long QUERY_TARGET_SIZE_GB = 1_073_741_824;
private static final long DEFAULT_CHUNK_SIZE = 1_000_000;
private long MAX_CHUNK_SIZE = Long.MAX_VALUE;
final Map<AirbyteStreamNameNamespacePair, TableSizeInfo> tableSizeInfoMap;

public MySqlInitialLoadHandler(final JsonNode config,
Expand All @@ -73,6 +75,16 @@ public MySqlInitialLoadHandler(final JsonNode config,
this.initialLoadStateManager = initialLoadStateManager;
this.streamStateForIncrementalRunSupplier = streamStateForIncrementalRunSupplier;
this.tableSizeInfoMap = tableSizeInfoMap;
adjustChunkSizeLimitForMySQLVariants();
}

private void adjustChunkSizeLimitForMySQLVariants() {
// For PSDB, we need to limit the chunk size to 100k rows to avoid the query being killed by the
// server.
// Reference:
// https://planetscale.com/docs/reference/planetscale-system-limits
if (config.get(JdbcUtils.HOST_KEY).asText().toLowerCase().contains("psdb.cloud"))
MAX_CHUNK_SIZE = 100_000;
}

public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
Expand Down Expand Up @@ -122,7 +134,7 @@ public AutoCloseableIterator<AirbyteMessage> getIteratorForStream(
.collect(Collectors.toList());
final AutoCloseableIterator<AirbyteRecordData> queryStream =
new MySqlInitialLoadRecordIterator(database, sourceOperations, quoteString, initialLoadStateManager, selectedDatabaseFields, pair,
calculateChunkSize(tableSizeInfoMap.get(pair), pair), isCompositePrimaryKey(airbyteStream));
Long.min(calculateChunkSize(tableSizeInfoMap.get(pair), pair), MAX_CHUNK_SIZE), isCompositePrimaryKey(airbyteStream));
final AutoCloseableIterator<AirbyteMessage> recordIterator =
getRecordIterator(queryStream, streamName, namespace, emittedAt.toEpochMilli());
final AutoCloseableIterator<AirbyteMessage> recordAndMessageIterator = augmentWithState(recordIterator, airbyteStream, pair);
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.4.11 | 2024-06-26 | [40561](https://github.com/airbytehq/airbyte/pull/40561) | Support PlanetScale MySQL's per-query row limit. |
| 3.4.10 | 2024-06-14 | [39349](https://github.com/airbytehq/airbyte/pull/39349) | Full refresh stream sending internal count metadata. |
| 3.4.9 | 2024-06-11 | [39405](https://github.com/airbytehq/airbyte/pull/39405) | Adopt latest CDK. |
| 3.4.8 | 2024-06-05 | [39144](https://github.com/airbytehq/airbyte/pull/39144) | Upgrade Debezium to 2.5.4 |
Expand Down

0 comments on commit 42124f6

Please sign in to comment.