New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Source-MySQL] Enhanced Standard sync with PK initial load -> Cursor based switch over #30270
Conversation
Before Merging a Connector Pull RequestWow! What a great pull request you have here! 🎉 To merge this PR, ensure the following has been done/considered for each connector added or updated:
If the checklist is complete, but the CI check is failing,
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is a change to this file needed?
.../connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java
Show resolved
Hide resolved
LOGGER.info("Querying max cursor value for {}.{}", namespace, name); | ||
final String cursorField = cursorInfoOptional.get().getCursorField(); | ||
final String cursorBasedSyncStatusQuery = String.format(MAX_CURSOR_VALUE_QUERY, | ||
cursorField, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should escape quote cursor fields like in this PR : https://github.com/airbytehq/airbyte/pull/30059/files
this.cdcState = cdcState; | ||
this.pairToPrimaryKeyLoadStatus = initPairToPrimaryKeyLoadStatusMap(initialLoadStreams.pairToInitialLoadStatus()); | ||
this.pairToPrimaryKeyInfo = pairToPrimaryKeyInfo; | ||
this.streamsThatHaveCompletedSnapshot = initStreamsCompletedSnapshot(initialLoadStreams, catalog); | ||
} | ||
|
||
private static Set<AirbyteStreamNameNamespacePair> initStreamsCompletedSnapshot(final InitialLoadStreams initialLoadStreams, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we deleting this code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this to the Utils class so I can use it as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move this back now that this is only being used in here! Just trying to minimise the changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm good with that! On it
@@ -338,6 +349,54 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final | |||
LOGGER.info("Using PK + CDC"); | |||
return MySqlInitialReadUtil.getCdcReadIterators(database, catalog, tableNameToTable, stateManager, emittedAt, getQuoteString()); | |||
} else { | |||
if (isAnyStreamIncrementalSyncMode(catalog)) { | |||
final MySqlCursorBasedStateManager cursorBasedStateManager = new MySqlCursorBasedStateManager(stateManager.getRawStateMessages(), catalog); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this to MySqlInitialReadUtil
? I assume this is the code that is doing the main logic of figuring out which streams to sync via pk?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved it! I actually eliminated the filtering based on state_type = cursor_based since anything that's not PK is implicitly cursor_based.
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class MySqlCursorBasedStateManager extends StreamStateManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is similar to the PostgresCursorBasedStateManager
class correct? Basically just to enable writing the state_type
and version
keys? Can we add a comment for that
IIRC we had a ticket to refactor this in StreamStateManager
when we implemented source-mysql - do you think we should do that here?
|
||
} | ||
|
||
public record StreamsCategorised(InitialLoadStreams initialLoadStreams, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove this and just keep InitialLoadStreams? Anything that is not in InitialLoadStreams would implicitly be a cursor based streams right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep! Addressed above.
source-snowflake test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-snowflake docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-snowflake/metadata.yaml | ✅ |
Connector version semver check | ✅ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-snowflake test
Coverage report for source-postgres
|
source-cockroachdb test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ❌ |
Build connector tar | ✅ |
Build source-cockroachdb docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-cockroachdb/metadata.yaml | ✅ |
Connector version semver check | ✅ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-cockroachdb test
source-mysql-strict-encrypt test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ❌ |
Build connector tar | ✅ |
Build source-mysql-strict-encrypt docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ❌ |
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml | ✅ |
Connector version semver check | ✅ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql-strict-encrypt test
source-oracle-strict-encrypt test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-oracle-strict-encrypt docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ❌ |
Acceptance tests | ❌ |
Validate airbyte-integrations/connectors/source-oracle-strict-encrypt/metadata.yaml | ✅ |
Connector version semver check | ✅ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-oracle-strict-encrypt test
source-postgres test report (commit
|
Step | Result |
---|---|
Java Connector Unit Tests | ✅ |
Build connector tar | ✅ |
Build source-postgres docker image for platform linux/x86_64 | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-postgres/metadata.yaml | ✅ |
Connector version semver check | ✅ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-postgres test
source-mysql test report (commit
|
Step | Result |
---|---|
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
Java Connector Unit Tests | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ✅ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
source-postgres test report (commit
|
Step | Result |
---|---|
Build connector tar | ✅ |
Build source-postgres docker image for platform linux/x86_64 | ✅ |
Java Connector Unit Tests | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-postgres/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-postgres test
source-mssql test report (commit
|
Step | Result |
---|
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mssql test
WIP WIP Functional feature done
372e55d
to
b0c1071
Compare
/approve-and-merge reason="ci passes and features are gated behind flag". Source-postgres and mssql changes are non-functional. " |
source-mysql test report (commit
|
Step | Result |
---|---|
Build connector tar | ✅ |
Build source-mysql docker image for platform linux/x86_64 | ✅ |
Java Connector Unit Tests | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-mysql/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql test
source-mssql test report (commit
|
Step | Result |
---|
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mssql test
source-postgres test report (commit
|
Step | Result |
---|---|
Build connector tar | ✅ |
Build source-postgres docker image for platform linux/x86_64 | ✅ |
Java Connector Unit Tests | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ✅ |
Validate airbyte-integrations/connectors/source-postgres/metadata.yaml | ✅ |
Connector version semver check | ✅ |
Connector version increment check | ❌ |
QA checks | ✅ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-postgres test
source-mysql-strict-encrypt test report (commit
|
Step | Result |
---|---|
Build connector tar | ✅ |
Build source-mysql-strict-encrypt docker image for platform linux/x86_64 | ✅ |
Java Connector Unit Tests | ✅ |
Java Connector Integration Tests | ✅ |
Acceptance tests | ❌ |
Validate airbyte-integrations/connectors/source-mysql-strict-encrypt/metadata.yaml | ✅ |
Connector version semver check | ✅ |
QA checks | ❌ |
☁️ View runs for commit in Dagger Cloud
Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command
airbyte-ci connectors --name=source-mysql-strict-encrypt test
What
Utilize Primary keys for initial sync then switch over to user defined cursor once high water mark is found and persisted in state.
How
With normal incremental syncs now, we will be using Primary key for the initial sync and retrieve the high water mark in the beginning and construct all of the relevant Primary Key and Standard Sync iterators at the beginning of the sync.
This includes a
SELECT MAX
query to retrieve max cursor value and populate the initial and final StreamState to provide a checkpoint so that there won't be any missing records on subsequent syncs.Note: The
getIncrementalIterators
method in theMysqlInitialLoadHandler
class now has logic to handle the edge case of when the customer de-select a primary key column. Solution here is to re-add the column names to theselectedDatabaseFields
in order to construct the correctSELECT
query, but those columns will not be part of the record as theCatalogHelpers.getTopLevelFieldNames(airbyteStream)
will not include the de-selected fields.Tests with flag turned on is in the file
MySqlPkJdbcSourceAcceptanceTest.java