Skip to content

Commit

Permalink
MySQL Source: fixed unencrypted CDC connections (#18851)
Browse files Browse the repository at this point in the history
* MySQL Source: fixed unencrypted CDC connections

* updated changelog

* bump version

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
VitaliiMaltsev and octavia-squidington-iii committed Nov 3, 2022
1 parent 9231e3e commit 8bb9701
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 1.0.10
dockerImageTag: 1.0.11
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7727,7 +7727,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:1.0.10"
- dockerImage: "airbyte/source-mysql:1.0.11"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/mysql"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.10
LABEL io.airbyte.version=1.0.11

LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.10
LABEL io.airbyte.version=1.0.11

LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private static Properties commonProperties(final JdbcDatabase database) {
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-ssl-mode
if (!sourceConfig.has(JdbcUtils.SSL_KEY) || sourceConfig.get(JdbcUtils.SSL_KEY).asBoolean()) {
if (dbConfig.has(SSL_MODE) && !dbConfig.get(SSL_MODE).asText().isEmpty()) {
props.setProperty("database.sslmode", MySqlSource.toSslJdbcParamInternal(SslMode.valueOf(dbConfig.get(SSL_MODE).asText())));
props.setProperty("database.ssl.mode", MySqlSource.toSslJdbcParamInternal(SslMode.valueOf(dbConfig.get(SSL_MODE).asText())));
props.setProperty("database.history.producer.security.protocol", "SSL");
props.setProperty("database.history.consumer.security.protocol", "SSL");

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package io.airbyte.integrations.io.airbyte.integration_tests.sources;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.ssh.SshHelpers;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

import java.util.List;
import java.util.stream.Collectors;

import static io.airbyte.integrations.io.airbyte.integration_tests.sources.utils.TestConstants.INITIAL_CDC_WAITING_SECONDS;
import static io.airbyte.protocol.models.SyncMode.INCREMENTAL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

public class CdcMySqlSslRequiredSourceAcceptanceTest extends SourceAcceptanceTest {
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private MySQLContainer<?> container;
private JsonNode config;

@Override
protected String getImageName() {
return "airbyte/source-mysql:dev";
}

@Override
protected ConnectorSpecification getSpec() throws Exception {
return SshHelpers.getSpecAndInjectSsh();
}

@Override
protected JsonNode getConfig() {
return config;
}

@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME),
String.format("%s", config.get(JdbcUtils.DATABASE_KEY).asText()),
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))),
new ConfiguredAirbyteStream()
.withSyncMode(INCREMENTAL)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s", STREAM_NAME2),
String.format("%s", config.get(JdbcUtils.DATABASE_KEY).asText()),
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL)))));
}

@Override
protected JsonNode getState() {
return null;
}

@Override
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
container = new MySQLContainer<>("mysql:8.0");
container.start();

final var sslMode = ImmutableMap.builder()
.put(JdbcUtils.MODE_KEY, "required")
.build();
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.put("initial_waiting_seconds", INITIAL_CDC_WAITING_SECONDS)
.build());

config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, container.getHost())
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
.put(JdbcUtils.SSL_KEY, true)
.put(JdbcUtils.SSL_MODE_KEY, sslMode)
.put("replication_method", replicationMethod)
.put("is_test", true)
.build());

revokeAllPermissions();
grantCorrectPermissions();
alterUserRequireSsl();
createAndPopulateTables();
}

private void alterUserRequireSsl() {
executeQuery("ALTER USER " + container.getUsername() + " REQUIRE SSL;");
}

private void createAndPopulateTables() {
executeQuery("CREATE TABLE id_and_name(id INTEGER PRIMARY KEY, name VARCHAR(200));");
executeQuery(
"INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');");
executeQuery("CREATE TABLE starships(id INTEGER PRIMARY KEY, name VARCHAR(200));");
executeQuery(
"INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');");
}

private void revokeAllPermissions() {
executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + container.getUsername() + "@'%';");
}

private void grantCorrectPermissions() {
executeQuery(
"GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "
+ container.getUsername() + "@'%';");
}

private void executeQuery(final String query) {
try (final DSLContext dslContext = DSLContextFactory.create(
"root",
"test",
DatabaseDriver.MYSQL.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
container.getHost(),
container.getFirstMappedPort(),
container.getDatabaseName()),
SQLDialect.MYSQL)) {
final Database database = new Database(dslContext);
database.query(
ctx -> ctx
.execute(query));
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
container.close();
}

@Test
public void testIncrementalSyncShouldNotFailIfBinlogIsDeleted() throws Exception {
final ConfiguredAirbyteCatalog configuredCatalog = withSourceDefinedCursors(getConfiguredCatalog());
// only sync incremental streams
configuredCatalog.setStreams(
configuredCatalog.getStreams().stream().filter(s -> s.getSyncMode() == INCREMENTAL).collect(Collectors.toList()));

final List<AirbyteMessage> airbyteMessages = runRead(configuredCatalog, getState());
final List<AirbyteRecordMessage> recordMessages = filterRecords(airbyteMessages);
final List<AirbyteStateMessage> stateMessages = airbyteMessages
.stream()
.filter(m -> m.getType() == AirbyteMessage.Type.STATE)
.map(AirbyteMessage::getState)
.collect(Collectors.toList());
assertFalse(recordMessages.isEmpty(), "Expected the first incremental sync to produce records");
assertFalse(stateMessages.isEmpty(), "Expected incremental sync to produce STATE messages");

// when we run incremental sync again there should be no new records. Run a sync with the latest
// state message and assert no records were emitted.
final JsonNode latestState = Jsons.jsonNode(supportsPerStream() ? stateMessages : List.of(Iterables.getLast(stateMessages)));
// RESET MASTER removes all binary log files that are listed in the index file,
// leaving only a single, empty binary log file with a numeric suffix of .000001
executeQuery("RESET MASTER;");

assertEquals(6, filterRecords(runRead(configuredCatalog, latestState)).size());
}
}
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ WHERE actor_definition_id ='435bb9a5-7887-4809-aa58-28c27df0d7ad' AND (configura
## Changelog
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.11 | 2022-11-03 | [18851](https://github.com/airbytehq/airbyte/pull/18851) | Fix bug with unencrypted CDC connections |
| 1.0.10 | 2022-11-02 | [18619](https://github.com/airbytehq/airbyte/pull/18619) | Fix bug with handling Tinyint(1) Unsigned values as boolean |
| 1.0.9 | 2022-10-31 | [18538](https://github.com/airbytehq/airbyte/pull/18538) | Encode database name |
| 1.0.8 | 2022-10-25 | [18383](https://github.com/airbytehq/airbyte/pull/18383) | Better SSH error handling + messages |
Expand Down

0 comments on commit 8bb9701

Please sign in to comment.