Skip to content

Commit

Permalink
fix mysql-cdc: handle tinyint(1) and boolean correctly + fix target f…
Browse files Browse the repository at this point in the history
…ile comparison (#3890)

* fix mysql-cdc: handle tinyint(1) as boolean + fix taget file comparison

* fix formatting

* simplify shouldSignalClose method

* only deserialize once

* address review comments

* upgrade version

* upgrade version
  • Loading branch information
subodh1810 committed Jun 7, 2021
1 parent cea80b4 commit 4e084a1
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.3.4",
"dockerImageTag": "0.3.5",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql",
"icon": "mysql.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
- sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
name: MySQL
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.3.4
dockerImageTag: 0.3.5
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
- sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.4
LABEL io.airbyte.version=0.3.5

LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package io.airbyte.integrations.source.mysql;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.AbstractIterator;
import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -115,20 +116,24 @@ private boolean shouldSignalClose(ChangeEvent<String, String> event) {
return false;
}

String file = Jsons.deserialize(event.value()).get("source").get("file").asText();
int position = Jsons.deserialize(event.value()).get("source").get("pos").asInt();
if (!file.equals(targetFilePosition.get().fileName)) {
return false;
}
JsonNode valueAsJson = Jsons.deserialize(event.value());
String file = valueAsJson.get("source").get("file").asText();
int position = valueAsJson.get("source").get("pos").asInt();

boolean isSnapshot = SnapshotMetadata.TRUE == SnapshotMetadata.valueOf(
valueAsJson.get("source").get("snapshot").asText().toUpperCase());

if (targetFilePosition.get().position >= position) {
if (isSnapshot || targetFilePosition.get().fileName.compareTo(file) > 0
|| (targetFilePosition.get().fileName.compareTo(file) == 0 && targetFilePosition.get().position >= position)) {
return false;
}

// if not snapshot or is snapshot but last record in snapshot.
return SnapshotMetadata.TRUE != SnapshotMetadata.valueOf(
Jsons.deserialize(event.value()).get("source").get("snapshot").asText()
.toUpperCase());
LOGGER.info(
"Signalling close because record's binlog file : " + file + " , position : " + position
+ " is after target file : "
+ targetFilePosition.get().fileName + " , target position : " + targetFilePosition
.get().position);
return true;
}

private void requestClose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ protected Properties getDebeziumProperties(JsonNode config,
props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString());
props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer

// https://debezium.io/documentation/reference/connectors/mysql.html#mysql-boolean-values
props.setProperty("converters", "boolean");
props.setProperty("boolean.type", "io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter");

// snapshot config
// https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-snapshot-mode
props.setProperty("snapshot.mode", "initial");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,52 +115,52 @@ public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(JsonNod
final List<CheckedConsumer<JdbcDatabase, Exception>> checkOperations = new ArrayList<>(super.getCheckOperations(config));
if (isCdc(config)) {
checkOperations.add(database -> {
List<String> matchingSlots = database.resultSetQuery(connection -> {
List<String> log = database.resultSetQuery(connection -> {
final String sql = "show variables where Variable_name = 'log_bin'";

return connection.createStatement().executeQuery(sql);
}, resultSet -> resultSet.getString("Value")).collect(toList());

if (matchingSlots.size() != 1) {
if (log.size() != 1) {
throw new RuntimeException("Could not query the variable log_bin");
}

String logBin = matchingSlots.get(0);
String logBin = log.get(0);
if (!logBin.equalsIgnoreCase("ON")) {
throw new RuntimeException("The variable log_bin should be set to ON, but it is : " + logBin);
}
});

checkOperations.add(database -> {
List<String> matchingSlots = database.resultSetQuery(connection -> {
List<String> format = database.resultSetQuery(connection -> {
final String sql = "show variables where Variable_name = 'binlog_format'";

return connection.createStatement().executeQuery(sql);
}, resultSet -> resultSet.getString("Value")).collect(toList());

if (matchingSlots.size() != 1) {
if (format.size() != 1) {
throw new RuntimeException("Could not query the variable binlog_format");
}

String binlogFormat = matchingSlots.get(0);
String binlogFormat = format.get(0);
if (!binlogFormat.equalsIgnoreCase("ROW")) {
throw new RuntimeException("The variable binlog_format should be set to ROW, but it is : " + binlogFormat);
}
});
}

checkOperations.add(database -> {
List<String> matchingSlots = database.resultSetQuery(connection -> {
List<String> image = database.resultSetQuery(connection -> {
final String sql = "show variables where Variable_name = 'binlog_row_image'";

return connection.createStatement().executeQuery(sql);
}, resultSet -> resultSet.getString("Value")).collect(toList());

if (matchingSlots.size() != 1) {
if (image.size() != 1) {
throw new RuntimeException("Could not query the variable binlog_row_image");
}

String binlogRowImage = matchingSlots.get(0);
String binlogRowImage = image.get(0);
if (!binlogRowImage.equalsIgnoreCase("FULL")) {
throw new RuntimeException("The variable binlog_row_image should be set to FULL, but it is : " + binlogRowImage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
Expand All @@ -67,6 +68,7 @@
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -257,6 +259,89 @@ public void tearDown() {
}
}

@Test
public void fullRefreshAndCDCShouldReturnSameRecords() throws Exception {
JsonNode record1 = Jsons.jsonNode(ImmutableMap.of(
"id", 1,
"bool_col", true,
"tiny_int_one_col", true));
((ObjectNode) record1).put("tiny_int_two_col", (short) 80);
JsonNode record2 = Jsons.jsonNode(ImmutableMap.of(
"id", 2,
"bool_col", false,
"tiny_int_one_col", false));
((ObjectNode) record2).put("tiny_int_two_col", (short) 90);
ImmutableList<JsonNode> records = ImmutableList.of(record1, record2);
Set<JsonNode> originalData = new HashSet<>(records);
setupForComparisonBetweenFullRefreshAndCDCSnapshot(records);

AirbyteCatalog discover = source.discover(config);
List<AirbyteStream> streams = discover.getStreams();

assertEquals(streams.size(), 1);
JsonNode jsonSchema = streams.get(0).getJsonSchema().get("properties");
assertEquals(jsonSchema.get("id").get("type").asText(), "number");
assertEquals(jsonSchema.get("bool_col").get("type").asText(), "boolean");
assertEquals(jsonSchema.get("tiny_int_one_col").get("type").asText(), "boolean");
assertEquals(jsonSchema.get("tiny_int_two_col").get("type").asText(), "number");

AirbyteCatalog catalog = new AirbyteCatalog().withStreams(streams);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers
.toDefaultConfiguredCatalog(catalog);
configuredCatalog.getStreams().forEach(c -> c.setSyncMode(SyncMode.FULL_REFRESH));

Set<JsonNode> dataFromFullRefresh = extractRecordMessages(
AutoCloseableIterators.toListAndClose(source.read(config, configuredCatalog, null)))
.stream()
.map(AirbyteRecordMessage::getData).collect(Collectors.toSet());

configuredCatalog.getStreams().forEach(c -> c.setSyncMode(SyncMode.INCREMENTAL));
Set<JsonNode> dataFromDebeziumSnapshot = extractRecordMessages(
AutoCloseableIterators.toListAndClose(source.read(config, configuredCatalog, null)))
.stream()
.map(
airbyteRecordMessage -> {
JsonNode data = airbyteRecordMessage.getData();
removeCDCColumns((ObjectNode) data);
/**
* Debezium reads TINYINT (expect for TINYINT(1)) as IntNode while FullRefresh reads it as Short Ref
* : {@link io.airbyte.db.jdbc.JdbcUtils#setJsonField(java.sql.ResultSet, int, ObjectNode)} -> case
* TINYINT, SMALLINT -> o.put(columnName, r.getShort(i));
*/
((ObjectNode) data)
.put("tiny_int_two_col", (short) data.get("tiny_int_two_col").asInt());
return data;
})
.collect(Collectors.toSet());

assertEquals(dataFromFullRefresh, originalData);
assertEquals(dataFromFullRefresh, dataFromDebeziumSnapshot);
}

private void setupForComparisonBetweenFullRefreshAndCDCSnapshot(
ImmutableList<JsonNode> data) {
executeQuery("CREATE DATABASE " + "test_schema" + ";");
executeQuery(String.format(
"CREATE TABLE %s.%s(%s INTEGER, %s Boolean, %s TINYINT(1), %s TINYINT(2), PRIMARY KEY (%s));",
"test_schema", "table_with_tiny_int", "id", "bool_col", "tiny_int_one_col",
"tiny_int_two_col", "id"));

executeQuery(String
.format("INSERT INTO %s.%s (%s, %s, %s, %s) VALUES (%s, %s, %s, %s);", "test_schema",
"table_with_tiny_int",
"id", "bool_col", "tiny_int_one_col", "tiny_int_two_col",
data.get(0).get("id").asInt(), data.get(0).get("bool_col").asBoolean(),
data.get(0).get("tiny_int_one_col").asBoolean() ? 99 : -99, data.get(0).get("tiny_int_two_col").asInt()));

executeQuery(String
.format("INSERT INTO %s.%s (%s, %s, %s, %s) VALUES (%s, %s, %s, %s);", "test_schema",
"table_with_tiny_int",
"id", "bool_col", "tiny_int_one_col", "tiny_int_two_col",
data.get(1).get("id").asInt(), data.get(1).get("bool_col").asBoolean(),
data.get(1).get("tiny_int_one_col").asBoolean() ? 99 : -99, data.get(1).get("tiny_int_two_col").asInt()));
((ObjectNode) config).put("database", "test_schema");
}

@Test
@DisplayName("On the first sync, produce returns records that exist in the database.")
void testExistingData() throws Exception {
Expand All @@ -267,6 +352,27 @@ void testExistingData() throws Exception {
final Set<AirbyteRecordMessage> recordMessages = extractRecordMessages(actualRecords);
final List<AirbyteStateMessage> stateMessages = extractStateMessages(actualRecords);

JdbcDatabase jdbcDatabase = Databases.createJdbcDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:mysql://%s:%s",
config.get("host").asText(),
config.get("port").asInt()),
DRIVER_CLASS);

Optional<TargetFilePosition> targetFilePosition = TargetFilePosition.targetFilePosition(jdbcDatabase);
assertTrue(targetFilePosition.isPresent());
/**
* Debezium sets the binlog file name and position values for all the records fetched during
* snapshot to the latest log position fetched via query SHOW MASTER STATUS Ref :
* {@linkplain io.debezium.connector.mysql.SnapshotReader#readBinlogPosition(int, io.debezium.connector.mysql.SourceInfo, io.debezium.jdbc.JdbcConnection, java.util.concurrent.atomic.AtomicReference)}
*/
recordMessages.forEach(record -> {
assertEquals(record.getData().get(CDC_LOG_FILE).asText(),
targetFilePosition.get().fileName);
assertEquals(record.getData().get(CDC_LOG_POS).asInt(), targetFilePosition.get().position);
});

assertExpectedRecords(
new HashSet<>(MODEL_RECORDS), recordMessages);
assertExpectedStateMessages(stateMessages);
Expand Down Expand Up @@ -653,10 +759,7 @@ private static void assertExpectedRecords(Set<JsonNode> expectedRecords,
assertNull(data.get(CDC_DELETED_AT));
}

((ObjectNode) data).remove(CDC_LOG_FILE);
((ObjectNode) data).remove(CDC_LOG_POS);
((ObjectNode) data).remove(CDC_UPDATED_AT);
((ObjectNode) data).remove(CDC_DELETED_AT);
removeCDCColumns((ObjectNode) data);

return data;
})
Expand All @@ -665,4 +768,11 @@ private static void assertExpectedRecords(Set<JsonNode> expectedRecords,
assertEquals(expectedRecords, actualData);
}

private static void removeCDCColumns(ObjectNode data) {
data.remove(CDC_LOG_FILE);
data.remove(CDC_LOG_POS);
data.remove(CDC_UPDATED_AT);
data.remove(CDC_DELETED_AT);
}

}

0 comments on commit 4e084a1

Please sign in to comment.