Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 source: implementation for mysql cdc #3505

Merged
merged 15 commits into from
May 24, 2021
Merged

🎉 source: implementation for mysql cdc #3505

merged 15 commits into from
May 24, 2021

Conversation

subodh1810
Copy link
Contributor

@subodh1810 subodh1810 commented May 20, 2021

What

Issue : #2847

MySQL CDC is built on top of debezium MySQL connector. The CDC works on the basis of binlog files and position.
Few things to highlight about the PR :

  1. The debezium connector for MySQL monitors the database schema evolution over the time and stores the data in database history file. Without this file we can't fetch the records from binlog. We store the history information as part of our state. The way it works is debezium writes the schema information in a file. At the end of the sync we save the contents of the file in state. At the beginning of sync, we create the file and write the saved contents into the file so that debezium can fetch the data.
    We slightly modified the writing logic, since a single Airbyte connector can only fetch data for 1 database, we only need to store the schema evolution of tables in that database. In order to create this filtering we created our own class FilteredFileDatabaseHistory. The FilteredFileDatabaseHistory is used to filter out the schema information only related to the database that the connector is syncing. Since we need to store this information, FilteredFileDatabaseHistory enables us to reduce the size of the data that we save.
    AirbyteSchemaHistoryStorage is responsible to persist and reload the data i.e. read the contents of the file and save it in state at the end of the sync and create the file and copy the contents saved in state at the beginning of the sync.
    You can find more information about this here : https://debezium.io/documentation/reference/1.4/operations/debezium-server.html#debezium-source-database-history-class
    https://debezium.io/documentation/reference/development/engine.html#_in_the_code
  2. A lot of code is similar between MySQL and Postgres CDC implementation with slight differences. For now I have not abstracted out the common pieces.
  3. The offset logic is dependent on the binlog file name and position. Just like Postgres CDC, MySQL debezium connector writes the offset into a file and we save it in our state. When the connector starts fetching data, we create the file and store the saved data back in there so that debezium can start from a certain offset.
  4. MySQL debezium connector creates a snapshot of the data when it runs the first time. But the problem is that for snapshot, it grabs a global read lock that blocks writes by other database clients which is bad. In order to avoid this we use the parameter snapshot.locking.mode as none. This prevents the connector from acquiring any table locks during the snapshot but it is safe to use if and only if no schema changes are happening while the snapshot is running.
    Ref : https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-snapshots
    https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-snapshot-locking-mode

Pre-merge Checklist

  • Run integration tests
  • Publish Docker images

Recommended reading order

  1. MySqlSource.java#getIncrementalIterators
  2. DebeziumRecordPublisher
  3. the rest

┆Issue is synchronized with this Asana task by Unito

@subodh1810 subodh1810 self-assigned this May 20, 2021
@subodh1810 subodh1810 marked this pull request as ready for review May 20, 2021 17:44
@auto-assign auto-assign bot requested a review from michel-tricot May 20, 2021 17:44
@subodh1810 subodh1810 changed the title source: implementation for mysql cdc 🎉 source: implementation for mysql cdc May 21, 2021
@subodh1810
Copy link
Contributor Author

subodh1810 commented May 21, 2021

/test connector=source-mysql

🕑 source-mysql https://github.com/airbytehq/airbyte/actions/runs/863999219
✅ source-mysql https://github.com/airbytehq/airbyte/actions/runs/863999219

Copy link
Contributor

@cgardens cgardens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@subodh1810 this looks great! a lot of tough stuff to work through here. I think your approach makes a lot of sense.

@@ -383,7 +385,7 @@ public void testEmptyStateIncrementalIdenticalToFullRefresh() throws Exception {
.collect(Collectors.toList());
}

private ConfiguredAirbyteCatalog withSourceDefinedCursors(ConfiguredAirbyteCatalog catalog) {
public ConfiguredAirbyteCatalog withSourceDefinedCursors(ConfiguredAirbyteCatalog catalog) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you making this public and overriding it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! I guess I was playing around with something and forgot to revert the change. Will fix this. Its not required

"type": "string",
"order": 5
},
"replication_method": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in slack you asked about how we make this backwards compatible! I think you can just handle this in the connector code, so that if it find replication_method not set, it defaults to STANDARD. I think this isn't an ideal solution, but it's the best we have for right now until we have a better upgrade path for connectors.

@sherifnada do you agree? or is there some other option you'd prefer?

Copy link
Contributor

@sherifnada sherifnada May 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to push this configuration out of spec.json entirely and just rely on the sync mode being "cdc" (so introducing a 3rd mode next to full refresh and incremental)? If not, then this approach makes sense. We'll also want to remove this from the list of required parameters in spec.json I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cgardens @sherifnada if I understand correctly then this is what you meant right?

  private static boolean isCdc(JsonNode config) {
    final boolean isCdc = config.hasNonNull("replication_method")
        && ReplicationMethod.valueOf(config.get("replication_method").asText())
            .equals(ReplicationMethod.CDC);

    return isCdc;
  }

We already have this here
https://github.com/airbytehq/airbyte/pull/3505/files#diff-5e69a6c3136273688c785c97c6a40bef79049c430f8aba721b79705fe4079736R150

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also want to add a clause where if replication_method is not set then it assumes it is standard right?


/**
* This implementation is is kind of similar to
* {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)} ()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)} ()}
* {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)}

LOGGER.info("using CDC: {}", true);
// TODO: Figure out how to set the isCDC of stateManager to true. Its always false
final AirbyteFileOffsetBackingStore offsetManager = initializeState(stateManager);
AirbyteFileDatabaseHistoryStorageOperations dbHistoryStorageManager = initializeDBHistory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your linter is being too aggressive on new lines. Maybe double check that you have intellij to hardwrap at 150 chars?

Screen Shot 2021-05-21 at 7 55 52 AM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this variable name maybe just historyManager or schemaHistoryManager?

return offsetManager;
}

private AirbyteFileDatabaseHistoryStorageOperations initializeDBHistory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense for this method to just be a static method on AirbyteFileDatabaseHistoryStorageOperations?

}
}
} catch (IOException e) {
throw new RuntimeException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IllegaltStateException?

public DebeziumRecordPublisher(JsonNode config,
ConfiguredAirbyteCatalog catalog,
AirbyteFileOffsetBackingStore offsetManager,
AirbyteFileDatabaseHistoryStorageOperations airbyteFileDatabaseHistoryStorageOperations) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you change this variable name in MySqlSource maybe change it here too?

* function smoothly. Check {@link #persist(CdcState)} To understand more about file, please refer
* {@link FilteredFileDatabaseHistory}
*/
public class AirbyteFileDatabaseHistoryStorageOperations {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just AirbyteSchemaHistoryStorage? or something a little shorter? nbd if you prefer the verbose name.

@subodh1810
Copy link
Contributor Author

subodh1810 commented May 21, 2021

/test connector=source-mysql

🕑 source-mysql https://github.com/airbytehq/airbyte/actions/runs/864774776
✅ source-mysql https://github.com/airbytehq/airbyte/actions/runs/864774776

# Conflicts:
#	airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java
@subodh1810
Copy link
Contributor Author

subodh1810 commented May 24, 2021

/test connector=source-mysql

🕑 source-mysql https://github.com/airbytehq/airbyte/actions/runs/870938053
✅ source-mysql https://github.com/airbytehq/airbyte/actions/runs/870938053

Copy link
Contributor

@davinchia davinchia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!

  1. I appreciated all the tests you added.
  2. I appreciated all the helpful javadocs and comments.

I only had very minor readability comments, all of which are nits except for the shouldSignalClose function. There is also one question for my understanding. My suggestion changes should be pretty straightforward, so feel free to merge after addressing.

@@ -16,13 +16,16 @@ dependencies {

implementation 'mysql:mysql-connector-java:8.0.22'
implementation 'org.apache.commons:commons-lang3:3.11'
implementation 'io.debezium:debezium-embedded:1.4.2.Final'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: alphabetise

import org.slf4j.LoggerFactory;

/**
* The record iterator is the consumer (in the producer / consumer relationship with debezium) is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* The record iterator is the consumer (in the producer / consumer relationship with debezium) is
* The record iterator is the consumer (in the producer / consumer relationship with debezium)

* this signal either when the publisher had not produced a new record for a long time or when it
* has processed at least all of the records that were present in the database when the source was
* started. Because the publisher might publish more records between the consumer sending this
* signal and the publisher acutally shutting down, the consumer must stay alive as long as the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* signal and the publisher acutally shutting down, the consumer must stay alive as long as the
* signal and the publisher actually shutting down, the consumer must stay alive as long as the

* has processed at least all of the records that were present in the database when the source was
* started. Because the publisher might publish more records between the consumer sending this
* signal and the publisher acutally shutting down, the consumer must stay alive as long as the
* publisher is not closed or if there are any new records for it to process (even if the publisher
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* publisher is not closed or if there are any new records for it to process (even if the publisher
* publisher is not closed. Even after the publisher is closed, the consumer will finish processing any produced records before closing.

* started. Because the publisher might publish more records between the consumer sending this
* signal and the publisher acutally shutting down, the consumer must stay alive as long as the
* publisher is not closed or if there are any new records for it to process (even if the publisher
* is closed).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* is closed).

requestClose.call();
}

private boolean shouldSignalClose(ChangeEvent<String, String> event) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth rewriting these conditionals to simplify the boolean logic for readability. something like

  private boolean shouldSignalClos(ChangeEvent<String, String> event) {
    if (targetFilePosition.isEmpty()) {
      return false;
    }

    String file = Jsons.deserialize(event.value()).get("source").get("file").asText();
    int position = Jsons.deserialize(event.value()).get("source").get("pos").asInt();

    if (!file.equals(targetFilePosition.get().fileName)) {
      return false;
    }

    if (targetFilePosition.get().position >= position) {
      return false;
    }

    // if not snapshot or is snapshot but last record in snapshot.
    return SnapshotMetadata.TRUE != SnapshotMetadata.valueOf(
        Jsons.deserialize(event.value()).get("source").get("snapshot").asText()
            .toUpperCase());
  }

I generally try to avoid running right bracket since it can get confusing & return false/errors earlier to make clear what is happening.

if (targetFilePosition.isPresent()) {
String file = Jsons.deserialize(event.value()).get("source").get("file").asText();
int position = Jsons.deserialize(event.value()).get("source").get("pos").asInt();
if (file.equals(targetFilePosition.get().fileName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writing the above example made me think about this line.

are we comparing file names here? (slightly confused since we are compared file to filename) what happens if the names don't match up? should we error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be multiple files, we target the file with the latest records as end point. If the file name doesn't match then we are behind the latest file


/**
* MySQL Debezium connector monitors the database schema evolution over the time and stores the data
* in database history file. Without this file we can't fetch the records from binlog. We need to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* in database history file. Without this file we can't fetch the records from binlog. We need to
* in a database history file. Without this file we can't fetch the records from binlog. We need to

return isCdc;
}

static boolean shouldUseCDC(ConfiguredAirbyteCatalog catalog) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static boolean shouldUseCDC(ConfiguredAirbyteCatalog catalog) {
private static boolean shouldUseCDC(ConfiguredAirbyteCatalog catalog) {

@@ -66,6 +208,93 @@ public JsonNode toJdbcConfig(JsonNode config) {
return Jsons.jsonNode(configBuilder.build());
}

private static boolean isCdc(JsonNode config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I usually prefer separating stuff out. I think in this case returning the inline variable is better.

So

    return config.hasNonNull("replication_method")
        && ReplicationMethod.valueOf(config.get("replication_method").asText())
            .equals(ReplicationMethod.CDC);

This is a nit.

@subodh1810
Copy link
Contributor Author

subodh1810 commented May 24, 2021

/test connector=source-mysql

🕑 source-mysql https://github.com/airbytehq/airbyte/actions/runs/872094205
✅ source-mysql https://github.com/airbytehq/airbyte/actions/runs/872094205

@subodh1810
Copy link
Contributor Author

subodh1810 commented May 24, 2021

/publish connector=connectors/source-mysql

🕑 connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/872177590
✅ connectors/source-mysql https://github.com/airbytehq/airbyte/actions/runs/872177590

@subodh1810 subodh1810 merged commit 7ccc4fa into master May 24, 2021
@subodh1810 subodh1810 deleted the mysql-cdc branch May 24, 2021 19:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants