diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index d4bda578a5de..8f6e252d153e 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -997,7 +997,7 @@ - name: MySQL sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad dockerRepository: airbyte/source-mysql - dockerImageTag: 1.0.14 + dockerImageTag: 1.0.15 documentationUrl: https://docs.airbyte.com/integrations/sources/mysql icon: mysql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index ebbb941253ef..3efd4372f865 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -8790,7 +8790,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mysql:1.0.14" +- dockerImage: "airbyte/source-mysql:1.0.15" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/mysql" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile index f432e604e8d6..c4ef1974c963 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.14 +LABEL io.airbyte.version=1.0.15 LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index 803a8528b87d..cd6eada119a6 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.14 +LABEL io.airbyte.version=1.0.15 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java index cd56c7fdae65..ece36cf5b27e 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java @@ -5,8 +5,10 @@ package io.airbyte.integrations.source.mysql.helpers; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.db.jdbc.JdbcDatabase; +import java.sql.SQLException; import java.time.ZoneId; import java.util.List; import java.util.Optional; @@ -38,12 +40,28 @@ public class CdcConfigurationHelper { * @return list of List> */ public static List> getCheckOperations() { - return List.of(getCheckOperation(LOG_BIN, "ON"), + return List.of(getMasterStatusOperation(), + getCheckOperation(LOG_BIN, "ON"), getCheckOperation(BINLOG_FORMAT, "ROW"), getCheckOperation(BINLOG_ROW_IMAGE, "FULL")); } + // Checks whether the user has REPLICATION CLIENT privilege needed to query status information about + // the binary log files, which are needed for CDC. + private static CheckedConsumer getMasterStatusOperation() { + return database -> { + try { + database.unsafeResultSetQuery( + connection -> connection.createStatement().executeQuery("SHOW MASTER STATUS"), + resultSet -> resultSet); + } catch (final SQLException e) { + throw new ConfigErrorException("Please grant REPLICATION CLIENT privilege, so that binary log files are available" + + " for CDC mode."); + } + }; + } + private static CheckedConsumer getCheckOperation(final String name, final String value) { return database -> { final List result = database.queryStrings( diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java index f23848336e05..9ec0e5b076cd 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java @@ -32,6 +32,8 @@ import io.airbyte.integrations.base.Source; import io.airbyte.integrations.debezium.CdcSourceTest; import io.airbyte.integrations.debezium.CdcTargetPosition; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; @@ -106,6 +108,10 @@ private void revokeAllPermissions() { executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + container.getUsername() + "@'%';"); } + private void revokeReplicationClientPermission() { + executeQuery("REVOKE REPLICATION CLIENT ON *.* FROM " + container.getUsername() + "@'%';"); + } + private void grantCorrectPermissions() { executeQuery("GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO " + container.getUsername() + "@'%';"); } @@ -213,6 +219,16 @@ protected String randomTableSchema() { return MODELS_SCHEMA; } + @Test + protected void syncWithReplicationClientPrivilegeRevokedFailsCheck() throws Exception { + revokeReplicationClientPermission(); + final AirbyteConnectionStatus status = getSource().check(getConfig()); + final String expectedErrorMessage = "Please grant REPLICATION CLIENT privilege, so that binary log files are available" + + " for CDC mode."; + assertTrue(status.getStatus().equals(Status.FAILED)); + assertTrue(status.getMessage().contains(expectedErrorMessage)); + } + @Test protected void syncShouldHandlePurgedLogsGracefully() throws Exception { diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 2ce57e435ead..4fcb17231452 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -255,7 +255,8 @@ WHERE actor_definition_id ='435bb9a5-7887-4809-aa58-28c27df0d7ad' AND (configura ## Changelog | Version | Date | Pull Request | Subject | -| :------ | :--------- | :--------------------------------------------------------- | :----------------------------------------------------------------------------------------------------------------------------------------------- | +|:--------|:-----------| :--------------------------------------------------------- |:-------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.0.15 | 2022-12-06 | [20000](https://github.com/airbytehq/airbyte/pull/20000) | Add check and better messaging when user does not have permission to access binary log in CDC mode | | 1.0.14 | 2022-11-22 | [19514](https://github.com/airbytehq/airbyte/pull/19514) | Adjust batch selection memory limits databases. | | 1.0.13 | 2022-11-14 | [18956](https://github.com/airbytehq/airbyte/pull/18956) | Clean up Tinyint Unsigned data type identification | | 1.0.12 | 2022-11-07 | [19025](https://github.com/airbytehq/airbyte/pull/19025) | Stop enforce SSL if ssl mode is disabled |