diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json index eacd8a73ad20..01b99a20bc24 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad", "name": "MySQL", "dockerRepository": "airbyte/source-mysql", - "dockerImageTag": "0.3.4", + "dockerImageTag": "0.3.5", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql", "icon": "mysql.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 350e4b200e07..cd90e1865ead 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -71,7 +71,7 @@ - sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad name: MySQL dockerRepository: airbyte/source-mysql - dockerImageTag: 0.3.4 + dockerImageTag: 0.3.5 documentationUrl: https://docs.airbyte.io/integrations/sources/mysql icon: mysql.svg - sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77 diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index d1dfaa75e26a..c539ddc70ca0 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -8,6 +8,6 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.4 +LABEL io.airbyte.version=0.3.5 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordIterator.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordIterator.java index d7ec84d0bc1f..ef4a68abb03e 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordIterator.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordIterator.java @@ -24,6 +24,7 @@ package io.airbyte.integrations.source.mysql; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.AbstractIterator; import io.airbyte.commons.concurrency.VoidCallable; import io.airbyte.commons.json.Jsons; @@ -115,20 +116,24 @@ private boolean shouldSignalClose(ChangeEvent event) { return false; } - String file = Jsons.deserialize(event.value()).get("source").get("file").asText(); - int position = Jsons.deserialize(event.value()).get("source").get("pos").asInt(); - if (!file.equals(targetFilePosition.get().fileName)) { - return false; - } + JsonNode valueAsJson = Jsons.deserialize(event.value()); + String file = valueAsJson.get("source").get("file").asText(); + int position = valueAsJson.get("source").get("pos").asInt(); + + boolean isSnapshot = SnapshotMetadata.TRUE == SnapshotMetadata.valueOf( + valueAsJson.get("source").get("snapshot").asText().toUpperCase()); - if (targetFilePosition.get().position >= position) { + if (isSnapshot || targetFilePosition.get().fileName.compareTo(file) > 0 + || (targetFilePosition.get().fileName.compareTo(file) == 0 && targetFilePosition.get().position >= position)) { return false; } - // if not snapshot or is snapshot but last record in snapshot. - return SnapshotMetadata.TRUE != SnapshotMetadata.valueOf( - Jsons.deserialize(event.value()).get("source").get("snapshot").asText() - .toUpperCase()); + LOGGER.info( + "Signalling close because record's binlog file : " + file + " , position : " + position + + " is after target file : " + + targetFilePosition.get().fileName + " , target position : " + targetFilePosition + .get().position); + return true; } private void requestClose() { diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordPublisher.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordPublisher.java index cf93dc75c575..07bb6738454a 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordPublisher.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/DebeziumRecordPublisher.java @@ -139,6 +139,10 @@ protected Properties getDebeziumProperties(JsonNode config, props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString()); props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer + // https://debezium.io/documentation/reference/connectors/mysql.html#mysql-boolean-values + props.setProperty("converters", "boolean"); + props.setProperty("boolean.type", "io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter"); + // snapshot config // https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-snapshot-mode props.setProperty("snapshot.mode", "initial"); diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index 53372ed9ead3..79c772b96a81 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -115,34 +115,34 @@ public List> getCheckOperations(JsonNod final List> checkOperations = new ArrayList<>(super.getCheckOperations(config)); if (isCdc(config)) { checkOperations.add(database -> { - List matchingSlots = database.resultSetQuery(connection -> { + List log = database.resultSetQuery(connection -> { final String sql = "show variables where Variable_name = 'log_bin'"; return connection.createStatement().executeQuery(sql); }, resultSet -> resultSet.getString("Value")).collect(toList()); - if (matchingSlots.size() != 1) { + if (log.size() != 1) { throw new RuntimeException("Could not query the variable log_bin"); } - String logBin = matchingSlots.get(0); + String logBin = log.get(0); if (!logBin.equalsIgnoreCase("ON")) { throw new RuntimeException("The variable log_bin should be set to ON, but it is : " + logBin); } }); checkOperations.add(database -> { - List matchingSlots = database.resultSetQuery(connection -> { + List format = database.resultSetQuery(connection -> { final String sql = "show variables where Variable_name = 'binlog_format'"; return connection.createStatement().executeQuery(sql); }, resultSet -> resultSet.getString("Value")).collect(toList()); - if (matchingSlots.size() != 1) { + if (format.size() != 1) { throw new RuntimeException("Could not query the variable binlog_format"); } - String binlogFormat = matchingSlots.get(0); + String binlogFormat = format.get(0); if (!binlogFormat.equalsIgnoreCase("ROW")) { throw new RuntimeException("The variable binlog_format should be set to ROW, but it is : " + binlogFormat); } @@ -150,17 +150,17 @@ public List> getCheckOperations(JsonNod } checkOperations.add(database -> { - List matchingSlots = database.resultSetQuery(connection -> { + List image = database.resultSetQuery(connection -> { final String sql = "show variables where Variable_name = 'binlog_row_image'"; return connection.createStatement().executeQuery(sql); }, resultSet -> resultSet.getString("Value")).collect(toList()); - if (matchingSlots.size() != 1) { + if (image.size() != 1) { throw new RuntimeException("Could not query the variable binlog_row_image"); } - String binlogRowImage = matchingSlots.get(0); + String binlogRowImage = image.get(0); if (!binlogRowImage.equalsIgnoreCase("FULL")) { throw new RuntimeException("The variable binlog_row_image should be set to FULL, but it is : " + binlogRowImage); } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java index c241fae2aa83..165cae328e14 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceTest.java @@ -48,6 +48,7 @@ import io.airbyte.commons.util.AutoCloseableIterators; import io.airbyte.db.Database; import io.airbyte.db.Databases; +import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteMessage; @@ -67,6 +68,7 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -257,6 +259,89 @@ public void tearDown() { } } + @Test + public void fullRefreshAndCDCShouldReturnSameRecords() throws Exception { + JsonNode record1 = Jsons.jsonNode(ImmutableMap.of( + "id", 1, + "bool_col", true, + "tiny_int_one_col", true)); + ((ObjectNode) record1).put("tiny_int_two_col", (short) 80); + JsonNode record2 = Jsons.jsonNode(ImmutableMap.of( + "id", 2, + "bool_col", false, + "tiny_int_one_col", false)); + ((ObjectNode) record2).put("tiny_int_two_col", (short) 90); + ImmutableList records = ImmutableList.of(record1, record2); + Set originalData = new HashSet<>(records); + setupForComparisonBetweenFullRefreshAndCDCSnapshot(records); + + AirbyteCatalog discover = source.discover(config); + List streams = discover.getStreams(); + + assertEquals(streams.size(), 1); + JsonNode jsonSchema = streams.get(0).getJsonSchema().get("properties"); + assertEquals(jsonSchema.get("id").get("type").asText(), "number"); + assertEquals(jsonSchema.get("bool_col").get("type").asText(), "boolean"); + assertEquals(jsonSchema.get("tiny_int_one_col").get("type").asText(), "boolean"); + assertEquals(jsonSchema.get("tiny_int_two_col").get("type").asText(), "number"); + + AirbyteCatalog catalog = new AirbyteCatalog().withStreams(streams); + final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers + .toDefaultConfiguredCatalog(catalog); + configuredCatalog.getStreams().forEach(c -> c.setSyncMode(SyncMode.FULL_REFRESH)); + + Set dataFromFullRefresh = extractRecordMessages( + AutoCloseableIterators.toListAndClose(source.read(config, configuredCatalog, null))) + .stream() + .map(AirbyteRecordMessage::getData).collect(Collectors.toSet()); + + configuredCatalog.getStreams().forEach(c -> c.setSyncMode(SyncMode.INCREMENTAL)); + Set dataFromDebeziumSnapshot = extractRecordMessages( + AutoCloseableIterators.toListAndClose(source.read(config, configuredCatalog, null))) + .stream() + .map( + airbyteRecordMessage -> { + JsonNode data = airbyteRecordMessage.getData(); + removeCDCColumns((ObjectNode) data); + /** + * Debezium reads TINYINT (expect for TINYINT(1)) as IntNode while FullRefresh reads it as Short Ref + * : {@link io.airbyte.db.jdbc.JdbcUtils#setJsonField(java.sql.ResultSet, int, ObjectNode)} -> case + * TINYINT, SMALLINT -> o.put(columnName, r.getShort(i)); + */ + ((ObjectNode) data) + .put("tiny_int_two_col", (short) data.get("tiny_int_two_col").asInt()); + return data; + }) + .collect(Collectors.toSet()); + + assertEquals(dataFromFullRefresh, originalData); + assertEquals(dataFromFullRefresh, dataFromDebeziumSnapshot); + } + + private void setupForComparisonBetweenFullRefreshAndCDCSnapshot( + ImmutableList data) { + executeQuery("CREATE DATABASE " + "test_schema" + ";"); + executeQuery(String.format( + "CREATE TABLE %s.%s(%s INTEGER, %s Boolean, %s TINYINT(1), %s TINYINT(2), PRIMARY KEY (%s));", + "test_schema", "table_with_tiny_int", "id", "bool_col", "tiny_int_one_col", + "tiny_int_two_col", "id")); + + executeQuery(String + .format("INSERT INTO %s.%s (%s, %s, %s, %s) VALUES (%s, %s, %s, %s);", "test_schema", + "table_with_tiny_int", + "id", "bool_col", "tiny_int_one_col", "tiny_int_two_col", + data.get(0).get("id").asInt(), data.get(0).get("bool_col").asBoolean(), + data.get(0).get("tiny_int_one_col").asBoolean() ? 99 : -99, data.get(0).get("tiny_int_two_col").asInt())); + + executeQuery(String + .format("INSERT INTO %s.%s (%s, %s, %s, %s) VALUES (%s, %s, %s, %s);", "test_schema", + "table_with_tiny_int", + "id", "bool_col", "tiny_int_one_col", "tiny_int_two_col", + data.get(1).get("id").asInt(), data.get(1).get("bool_col").asBoolean(), + data.get(1).get("tiny_int_one_col").asBoolean() ? 99 : -99, data.get(1).get("tiny_int_two_col").asInt())); + ((ObjectNode) config).put("database", "test_schema"); + } + @Test @DisplayName("On the first sync, produce returns records that exist in the database.") void testExistingData() throws Exception { @@ -267,6 +352,27 @@ void testExistingData() throws Exception { final Set recordMessages = extractRecordMessages(actualRecords); final List stateMessages = extractStateMessages(actualRecords); + JdbcDatabase jdbcDatabase = Databases.createJdbcDatabase( + config.get("username").asText(), + config.get("password").asText(), + String.format("jdbc:mysql://%s:%s", + config.get("host").asText(), + config.get("port").asInt()), + DRIVER_CLASS); + + Optional targetFilePosition = TargetFilePosition.targetFilePosition(jdbcDatabase); + assertTrue(targetFilePosition.isPresent()); + /** + * Debezium sets the binlog file name and position values for all the records fetched during + * snapshot to the latest log position fetched via query SHOW MASTER STATUS Ref : + * {@linkplain io.debezium.connector.mysql.SnapshotReader#readBinlogPosition(int, io.debezium.connector.mysql.SourceInfo, io.debezium.jdbc.JdbcConnection, java.util.concurrent.atomic.AtomicReference)} + */ + recordMessages.forEach(record -> { + assertEquals(record.getData().get(CDC_LOG_FILE).asText(), + targetFilePosition.get().fileName); + assertEquals(record.getData().get(CDC_LOG_POS).asInt(), targetFilePosition.get().position); + }); + assertExpectedRecords( new HashSet<>(MODEL_RECORDS), recordMessages); assertExpectedStateMessages(stateMessages); @@ -653,10 +759,7 @@ private static void assertExpectedRecords(Set expectedRecords, assertNull(data.get(CDC_DELETED_AT)); } - ((ObjectNode) data).remove(CDC_LOG_FILE); - ((ObjectNode) data).remove(CDC_LOG_POS); - ((ObjectNode) data).remove(CDC_UPDATED_AT); - ((ObjectNode) data).remove(CDC_DELETED_AT); + removeCDCColumns((ObjectNode) data); return data; }) @@ -665,4 +768,11 @@ private static void assertExpectedRecords(Set expectedRecords, assertEquals(expectedRecords, actualData); } + private static void removeCDCColumns(ObjectNode data) { + data.remove(CDC_LOG_FILE); + data.remove(CDC_LOG_POS); + data.remove(CDC_UPDATED_AT); + data.remove(CDC_DELETED_AT); + } + }