Skip to content

Commit

Permalink
[source-mysql] Fix NPE on cursor based full refresh (#37824)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaohansong committed May 3, 2024
1 parent 0311db0 commit d3864c2
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 5 deletions.
3 changes: 2 additions & 1 deletion airbyte-cdk/java/airbyte-cdk/README.md
Expand Up @@ -173,7 +173,8 @@ corresponds to that version.
### Java CDK

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.33.1 | 2024-05-03 | [\#37824](https://github.com/airbytehq/airbyte/pull/37824) | Add a unit test for cursor based sync |
| 0.33.0 | 2024-05-03 | [\#36935](https://github.com/airbytehq/airbyte/pull/36935) | Destinations: Enable non-safe-casting DV2 tests |
| 0.32.0 | 2024-05-03 | [\#36929](https://github.com/airbytehq/airbyte/pull/36929) | Destinations: Assorted DV2 changes for mysql |
| 0.31.7 | 2024-05-02 | [\#36910](https://github.com/airbytehq/airbyte/pull/36910) | changes for destination-snowflake |
Expand Down
@@ -1 +1 @@
version=0.33.0
version=0.33.1
Expand Up @@ -479,6 +479,46 @@ abstract class JdbcSourceAcceptanceTest<S : Source, T : TestDatabase<*, T, *>> {
Assertions.assertTrue(actualRecordMessages.containsAll(expectedMessages))
}

@Test
@Throws(Exception::class)
protected fun testReadBothIncrementalAndFullRefreshStreams() {
val catalog = getConfiguredCatalogWithOneStream(defaultNamespace)
val expectedMessages: MutableList<AirbyteMessage> = ArrayList(testMessages)

val streamName2 = streamName() + 2
val tableName = getFullyQualifiedTableName(TABLE_NAME + 2)
testdb!!
.with(createTableQuery(tableName, "id INTEGER, name VARCHAR(200)", ""))
.with("INSERT INTO %s(id, name) VALUES (1,'picard')", tableName)
.with("INSERT INTO %s(id, name) VALUES (2, 'crusher')", tableName)
.with("INSERT INTO %s(id, name) VALUES (3, 'vash')", tableName)

val airbyteStream2 =
CatalogHelpers.createConfiguredAirbyteStream(
streamName2,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING)
)
airbyteStream2.syncMode = SyncMode.INCREMENTAL
airbyteStream2.cursorField = java.util.List.of(COL_ID)
airbyteStream2.destinationSyncMode = DestinationSyncMode.APPEND
catalog.streams.add(airbyteStream2)

expectedMessages.addAll(getAirbyteMessagesSecondSync(streamName2))

System.out.println("catalog: " + catalog)

val actualMessages = MoreIterators.toList(source()!!.read(config(), catalog, null))
val actualRecordMessages = filterRecords(actualMessages)

setEmittedAtToNull(actualMessages)

Assertions.assertEquals(expectedMessages.size, actualRecordMessages.size)
Assertions.assertTrue(expectedMessages.containsAll(actualRecordMessages))
Assertions.assertTrue(actualRecordMessages.containsAll(expectedMessages))
}

protected open fun getAirbyteMessagesSecondSync(streamName: String?): List<AirbyteMessage> {
return testMessages
.stream()
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Expand Up @@ -6,7 +6,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.31.8'
cdkVersionRequired = '0.33.1'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.4.0
dockerImageTag: 3.4.1
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Expand Up @@ -175,7 +175,9 @@ public static Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair,
}

LOGGER.info("Querying max cursor value for {}.{}", namespace, name);

final String cursorField = cursorInfoOptional.get().getCursorField();
LOGGER.info("cursor field", cursorField);
final String quotedCursorField = getIdentifierWithQuoting(cursorField, quoteString);
final String cursorBasedSyncStatusQuery = String.format(MAX_CURSOR_VALUE_QUERY,
quotedCursorField,
Expand Down
Expand Up @@ -14,6 +14,7 @@
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.getTableSizeInfoForStreams;
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.logStreamSyncStatus;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.convertNameNamespacePairFromV0;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.filterStreamInIncrementalMode;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.getMySqlFullRefreshInitialLoadHandler;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.getMySqlInitialLoadGlobalStateManager;
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.initPairToPrimaryKeyInfoMap;
Expand Down Expand Up @@ -467,7 +468,8 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
if (isAnyStreamIncrementalSyncMode(catalog)) {
LOGGER.info("Syncing via Primary Key");
final MySqlCursorBasedStateManager cursorBasedStateManager = new MySqlCursorBasedStateManager(stateManager.getRawStateMessages(), catalog);
final InitialLoadStreams initialLoadStreams = streamsForInitialPrimaryKeyLoad(cursorBasedStateManager, catalog);
final InitialLoadStreams initialLoadStreams =
filterStreamInIncrementalMode(streamsForInitialPrimaryKeyLoad(cursorBasedStateManager, catalog));
final Map<AirbyteStreamNameNamespacePair, CursorBasedStatus> pairToCursorBasedStatus =
getCursorBasedSyncStatusForStreams(database, initialLoadStreams.streamsForInitialLoad(), stateManager, getQuoteString());
final CursorBasedStreams cursorBasedStreams =
Expand Down
Expand Up @@ -50,6 +50,7 @@
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import io.airbyte.protocol.models.v0.SyncMode;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
Expand Down Expand Up @@ -368,6 +369,13 @@ private static boolean streamHasPrimaryKey(final ConfiguredAirbyteStream stream)
return stream.getStream().getSourceDefinedPrimaryKey().size() > 0;
}

public static InitialLoadStreams filterStreamInIncrementalMode(final InitialLoadStreams stream) {
return new InitialLoadStreams(
stream.streamsForInitialLoad.stream().filter(airbyteStream -> airbyteStream.getSyncMode() == SyncMode.INCREMENTAL)
.collect(Collectors.toList()),
stream.pairToInitialLoadStatus);
}

public static List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final ConfiguredAirbyteCatalog catalog,
final Set<AirbyteStreamNameNamespacePair> alreadySyncedStreams) {
final Set<AirbyteStreamNameNamespacePair> allStreams = AirbyteStreamNameNamespacePair.fromConfiguredCatalog(catalog);
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Expand Up @@ -226,6 +226,7 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.4.1 | 2024-05-03 | [37824](https://github.com/airbytehq/airbyte/pull/37824) | Fixed a bug on Resumeable full refresh where cursor based source throw NPE. |
| 3.4.0 | 2024-05-02 | [36932](https://github.com/airbytehq/airbyte/pull/36932) | Resumeable full refresh. Note please upgrade your platform - minimum platform version is 0.58.0. |
| 3.3.25 | 2024-05-02 | [37781](https://github.com/airbytehq/airbyte/pull/37781) | Adopt latest CDK. |
| 3.3.24 | 2024-05-01 | [37742](https://github.com/airbytehq/airbyte/pull/37742) | Adopt latest CDK. Remove Debezium retries. |
Expand Down

0 comments on commit d3864c2

Please sign in to comment.