diff --git a/airbyte-integrations/connectors/destination-mysql-strict-encrypt/metadata.yaml b/airbyte-integrations/connectors/destination-mysql-strict-encrypt/metadata.yaml index 6d888524977cde..eb3f6bc231a9fb 100644 --- a/airbyte-integrations/connectors/destination-mysql-strict-encrypt/metadata.yaml +++ b/airbyte-integrations/connectors/destination-mysql-strict-encrypt/metadata.yaml @@ -7,7 +7,7 @@ data: connectorSubtype: database connectorType: destination definitionId: ca81ee7c-3163-4246-af40-094cc31e5e42 - dockerImageTag: 1.0.1 + dockerImageTag: 1.0.2 dockerRepository: airbyte/destination-mysql-strict-encrypt githubIssueLabel: destination-mysql icon: mysql.svg diff --git a/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestinationStrictEncrypt.java b/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestinationStrictEncrypt.java deleted file mode 100644 index d54cbb5f3de4ba..00000000000000 --- a/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestinationStrictEncrypt.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.mysql; - -import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.cdk.db.jdbc.JdbcUtils; -import io.airbyte.cdk.integrations.base.Destination; -import io.airbyte.cdk.integrations.base.IntegrationRunner; -import io.airbyte.cdk.integrations.base.spec_modification.SpecModifyingDestination; -import io.airbyte.commons.json.Jsons; -import io.airbyte.protocol.models.v0.ConnectorSpecification; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MySQLDestinationStrictEncrypt extends SpecModifyingDestination implements Destination { - - private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDestinationStrictEncrypt.class); - - public MySQLDestinationStrictEncrypt() { - super(MySQLDestination.sshWrappedDestination()); - } - - @Override - public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) { - final ConnectorSpecification spec = Jsons.clone(originalSpec); - ((ObjectNode) spec.getConnectionSpecification().get("properties")).remove(JdbcUtils.SSL_KEY); - return spec; - } - - public static void main(final String[] args) throws Exception { - final Destination destination = new MySQLDestinationStrictEncrypt(); - LOGGER.info("starting destination: {}", MySQLDestinationStrictEncrypt.class); - try { - new IntegrationRunner(destination).run(args); - } catch (Exception e) { - MySQLDestination.handleException(e); - } - LOGGER.info("completed destination: {}", MySQLDestinationStrictEncrypt.class); - } - -} diff --git a/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/main/kotlin/io/airbyte/integrations/destination/mysql/MySQLDestinationStrictEncrypt.kt b/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/main/kotlin/io/airbyte/integrations/destination/mysql/MySQLDestinationStrictEncrypt.kt new file mode 100644 index 00000000000000..4d01ef42fa9b16 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/main/kotlin/io/airbyte/integrations/destination/mysql/MySQLDestinationStrictEncrypt.kt @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.mysql + +import com.fasterxml.jackson.databind.node.ObjectNode +import io.airbyte.cdk.db.jdbc.JdbcUtils +import io.airbyte.cdk.integrations.base.Destination +import io.airbyte.cdk.integrations.base.IntegrationRunner +import io.airbyte.cdk.integrations.base.spec_modification.SpecModifyingDestination +import io.airbyte.commons.json.Jsons +import io.airbyte.protocol.models.v0.ConnectorSpecification +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +class MySQLDestinationStrictEncrypt : SpecModifyingDestination(MySQLDestination.sshWrappedDestination()), Destination { + override fun modifySpec(originalSpec: ConnectorSpecification): ConnectorSpecification { + val spec: ConnectorSpecification = Jsons.clone(originalSpec) + (spec.connectionSpecification["properties"] as ObjectNode).remove(JdbcUtils.SSL_KEY) + return spec + } + + companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(MySQLDestinationStrictEncrypt::class.java) + + @Throws(Exception::class) + @JvmStatic + fun main(args: Array) { + val destination: Destination = MySQLDestinationStrictEncrypt() + LOGGER.info("starting destination: {}", MySQLDestinationStrictEncrypt::class.java) + try { + IntegrationRunner(destination).run(args) + } catch (e: Exception) { + MySQLDestination.handleException(e) + } + LOGGER.info("completed destination: {}", MySQLDestinationStrictEncrypt::class.java) + } + } +} diff --git a/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLStrictEncryptDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLStrictEncryptDestinationAcceptanceTest.java index d883dcec7a2e7c..296ad95dcefe50 100644 --- a/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLStrictEncryptDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLStrictEncryptDestinationAcceptanceTest.java @@ -33,6 +33,7 @@ import java.util.HashSet; import java.util.List; import java.util.stream.Collectors; +import io.airbyte.integrations.destination.mysql.MySQLNameTransformer; import org.jooq.DSLContext; import org.jooq.SQLDialect; import org.junit.jupiter.api.Disabled; diff --git a/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySqlTestDataComparator.java b/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySqlTestDataComparator.java index caa587109f6fde..d47787ea4af5b7 100644 --- a/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySqlTestDataComparator.java +++ b/airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySqlTestDataComparator.java @@ -10,6 +10,7 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; +import io.airbyte.integrations.destination.mysql.MySQLNameTransformer; public class MySqlTestDataComparator extends AdvancedTestDataComparator { diff --git a/airbyte-integrations/connectors/destination-mysql/metadata.yaml b/airbyte-integrations/connectors/destination-mysql/metadata.yaml index 466f5db22da0ba..de7c34574efcde 100644 --- a/airbyte-integrations/connectors/destination-mysql/metadata.yaml +++ b/airbyte-integrations/connectors/destination-mysql/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: ca81ee7c-3163-4246-af40-094cc31e5e42 - dockerImageTag: 1.0.1 + dockerImageTag: 1.0.2 dockerRepository: airbyte/destination-mysql githubIssueLabel: destination-mysql icon: mysql.svg diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java deleted file mode 100644 index d3c3e375dcc1bd..00000000000000 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.mysql; - -import static io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage; - -import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; -import io.airbyte.cdk.db.factory.DataSourceFactory; -import io.airbyte.cdk.db.factory.DatabaseDriver; -import io.airbyte.cdk.db.jdbc.JdbcDatabase; -import io.airbyte.cdk.db.jdbc.JdbcUtils; -import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility; -import io.airbyte.cdk.integrations.base.Destination; -import io.airbyte.cdk.integrations.base.IntegrationRunner; -import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination; -import io.airbyte.cdk.integrations.destination.PropertyNameSimplifyingDataTransformer; -import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer; -import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination; -import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; -import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; -import io.airbyte.commons.exceptions.ConfigErrorException; -import io.airbyte.commons.exceptions.ConnectionErrorException; -import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.map.MoreMaps; -import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; -import io.airbyte.integrations.base.destination.typing_deduping.DestinationV1V2Migrator; -import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; -import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; -import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration; -import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState; -import io.airbyte.integrations.destination.mysql.MySQLSqlOperations.VersionCompatibility; -import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlDestinationHandler; -import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlSqlGenerator; -import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlV1V2Migrator; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; -import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; -import java.sql.SQLSyntaxErrorException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import javax.sql.DataSource; -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MySQLDestination extends AbstractJdbcDestination implements Destination { - - private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDestination.class); - public static final String DRIVER_CLASS = DatabaseDriver.MYSQL.getDriverClassName(); - - static final Map DEFAULT_JDBC_PARAMETERS = ImmutableMap.of( - // zero dates by default cannot be parsed into java date objects (they will throw an error) - // in addition, users don't always have agency in fixing them e.g: maybe they don't own the database - // and can't - // remove zero date values. - // since zero dates are placeholders, we convert them to null by default - "zeroDateTimeBehavior", "convertToNull", - "allowLoadLocalInfile", "true"); - - static final Map DEFAULT_SSL_JDBC_PARAMETERS = MoreMaps.merge(ImmutableMap.of( - "useSSL", "true", - "requireSSL", "true", - "verifyServerCertificate", "false"), - DEFAULT_JDBC_PARAMETERS); - - @Override - @NotNull - protected String getConfigSchemaKey() { - return JdbcUtils.DATABASE_KEY; - } - - public static Destination sshWrappedDestination() { - return new SshWrappedDestination(new MySQLDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY); - } - - @Override - public AirbyteConnectionStatus check(final JsonNode config) { - final DataSource dataSource = getDataSource(config); - try { - final JdbcDatabase database = getDatabase(dataSource); - final MySQLSqlOperations mySQLSqlOperations = (MySQLSqlOperations) getSqlOperations(); - - final String outputSchema = getNamingResolver().getIdentifier(config.get(JdbcUtils.DATABASE_KEY).asText()); - attemptSQLCreateAndDropTableOperations(outputSchema, database, getNamingResolver(), - mySQLSqlOperations); - - mySQLSqlOperations.verifyLocalFileEnabled(database); - - final VersionCompatibility compatibility = mySQLSqlOperations.isCompatibleVersion(database); - if (!compatibility.isCompatible()) { - throw new RuntimeException(String - .format("Your MySQL version %s is not compatible with Airbyte", - compatibility.getVersion())); - } - - return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); - } catch (final ConnectionErrorException e) { - final String message = getErrorMessage(e.getStateCode(), e.getErrorCode(), e.getExceptionMessage(), e); - AirbyteTraceMessageUtility.emitConfigErrorTrace(e, message); - return new AirbyteConnectionStatus() - .withStatus(Status.FAILED) - .withMessage(message); - } catch (final Exception e) { - LOGGER.error("Exception while checking connection: ", e); - return new AirbyteConnectionStatus() - .withStatus(Status.FAILED) - .withMessage("Could not connect with provided configuration. \n" + e.getMessage()); - } finally { - try { - DataSourceFactory.close(dataSource); - } catch (final Exception e) { - LOGGER.warn("Unable to close data source.", e); - } - } - } - - public MySQLDestination() { - super(DRIVER_CLASS, new MySQLNameTransformer(), new MySQLSqlOperations()); - } - - @Override - protected Map getDefaultConnectionProperties(final JsonNode config) { - if (JdbcUtils.useSsl(config)) { - return DEFAULT_SSL_JDBC_PARAMETERS; - } else { - return DEFAULT_JDBC_PARAMETERS; - } - } - - @Override - public JsonNode toJdbcConfig(final JsonNode config) { - final String jdbcUrl = String.format("jdbc:mysql://%s:%s", - config.get(JdbcUtils.HOST_KEY).asText(), - config.get(JdbcUtils.PORT_KEY).asText()); - - final ImmutableMap.Builder configBuilder = ImmutableMap.builder() - .put(JdbcUtils.USERNAME_KEY, config.get(JdbcUtils.USERNAME_KEY).asText()) - .put(JdbcUtils.JDBC_URL_KEY, jdbcUrl); - - if (config.has(JdbcUtils.PASSWORD_KEY)) { - configBuilder.put(JdbcUtils.PASSWORD_KEY, config.get(JdbcUtils.PASSWORD_KEY).asText()); - } - if (config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) { - configBuilder.put(JdbcUtils.JDBC_URL_PARAMS_KEY, config.get(JdbcUtils.JDBC_URL_PARAMS_KEY)); - } - - return Jsons.jsonNode(configBuilder.build()); - } - - @Override - protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) { - return new MysqlSqlGenerator(); - } - - @NotNull - @Override - protected JdbcDestinationHandler getDestinationHandler(@NotNull String databaseName, - @NotNull JdbcDatabase database, - @NotNull String rawTableSchema) { - return new MysqlDestinationHandler(database, rawTableSchema); - } - - @NotNull - @Override - protected List> getMigrations(@NotNull JdbcDatabase database, - @NotNull String databaseName, - @NotNull SqlGenerator sqlGenerator, - @NotNull DestinationHandler destinationHandler) { - return Collections.emptyList(); - } - - @Override - protected DestinationV1V2Migrator getV1V2Migrator(JdbcDatabase database, String databaseName) { - return new MysqlV1V2Migrator(database); - } - - @Override - protected StreamAwareDataTransformer getDataTransformer(ParsedCatalog parsedCatalog, String defaultNamespace) { - return new PropertyNameSimplifyingDataTransformer(); - } - - @Override - public boolean isV2Destination() { - return true; - } - - static void handleException(Exception e) throws Exception { - if (e instanceof SQLSyntaxErrorException s) { - if (s.getMessage().toLowerCase().contains("access denied")) { - throw new ConfigErrorException("Access denied. Please check your configuration", s); - } - } - - throw e; - } - - public static void main(final String[] args) throws Exception { - final Destination destination = MySQLDestination.sshWrappedDestination(); - LOGGER.info("starting destination: {}", MySQLDestination.class); - try { - new IntegrationRunner(destination).run(args); - } catch (Exception e) { - handleException(e); - } - LOGGER.info("completed destination: {}", MySQLDestination.class); - } - -} diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.java deleted file mode 100644 index c711f634eef2c4..00000000000000 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.mysql; - -import io.airbyte.cdk.integrations.destination.StandardNameTransformer; - -/** - * Note that MySQL documentation discusses about identifiers case sensitivity using the - * lower_case_table_names system variable. As one of their recommendation is: "It is best to adopt a - * consistent convention, such as always creating and referring to databases and tables using - * lowercase names. This convention is recommended for maximum portability and ease of use. - * - * Source: https://dev.mysql.com/doc/refman/8.0/en/identifier-case-sensitivity.html" - * - * As a result, we are here forcing all identifier (table, schema and columns) names to lowercase. - */ -public class MySQLNameTransformer extends StandardNameTransformer { - - // These constants must match those in destination_name_transformer.py - public static final int MAX_MYSQL_NAME_LENGTH = 64; - // DBT appends a suffix to table names - public static final int TRUNCATE_DBT_RESERVED_SIZE = 12; - // 4 charachters for 1 underscore and 3 suffix (e.g. _ab1) - // 4 charachters for 1 underscore and 3 schema hash - public static final int TRUNCATE_RESERVED_SIZE = 8; - public static final int TRUNCATION_MAX_NAME_LENGTH = MAX_MYSQL_NAME_LENGTH - TRUNCATE_DBT_RESERVED_SIZE - TRUNCATE_RESERVED_SIZE; - - @Override - public String getIdentifier(final String name) { - final String identifier = applyDefaultCase(super.getIdentifier(name)); - return truncateName(identifier, TRUNCATION_MAX_NAME_LENGTH); - } - - @Override - public String getTmpTableName(final String streamName) { - final String tmpTableName = applyDefaultCase(super.getTmpTableName(streamName)); - return truncateName(tmpTableName, TRUNCATION_MAX_NAME_LENGTH); - } - - @Override - public String getRawTableName(final String streamName) { - final String rawTableName = applyDefaultCase(super.getRawTableName(streamName)); - return truncateName(rawTableName, TRUNCATION_MAX_NAME_LENGTH); - } - - static String truncateName(final String name, final int maxLength) { - if (name.length() <= maxLength) { - return name; - } - - final int allowedLength = maxLength - 2; - final String prefix = name.substring(0, allowedLength / 2); - final String suffix = name.substring(name.length() - allowedLength / 2); - return prefix + "__" + suffix; - } - - @Override - public String applyDefaultCase(final String input) { - return input.toLowerCase(); - } - -} diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java deleted file mode 100644 index 9164bac3e23f3d..00000000000000 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.mysql; - -import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT; -import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT; -import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META; -import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID; -import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA; -import static org.jooq.impl.DSL.field; -import static org.jooq.impl.DSL.name; -import static org.jooq.impl.DSL.table; - -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import io.airbyte.cdk.db.jdbc.JdbcDatabase; -import io.airbyte.cdk.integrations.base.JavaBaseConstants; -import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage; -import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations; -import java.io.File; -import java.io.IOException; -import java.nio.file.Files; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import org.jooq.SQLDialect; -import org.jooq.impl.DSL; - -@SuppressFBWarnings( - value = {"SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE"}, - justification = "There is little chance of SQL injection. There is also little need for statement reuse. The basic statement is more readable than the prepared statement.") -public class MySQLSqlOperations extends JdbcSqlOperations { - - private boolean isLocalFileEnabled = false; - - @Override - public void executeTransaction(final JdbcDatabase database, final List queries) throws Exception { - database.executeWithinTransaction(queries); - } - - @Override - public void insertRecordsInternal(final JdbcDatabase database, - final List records, - final String schemaName, - final String tmpTableName) - throws SQLException { - throw new UnsupportedOperationException("Mysql requires V2"); - } - - @Override - protected void insertRecordsInternalV2(final JdbcDatabase database, - final List records, - final String schemaName, - final String tableName) - throws Exception { - if (records.isEmpty()) { - return; - } - - verifyLocalFileEnabled(database); - try { - final File tmpFile = Files.createTempFile(tableName + "-", ".tmp").toFile(); - - loadDataIntoTable( - database, - records, - schemaName, - tableName, - tmpFile, - COLUMN_NAME_AB_RAW_ID, - COLUMN_NAME_DATA, - COLUMN_NAME_AB_EXTRACTED_AT, - COLUMN_NAME_AB_LOADED_AT, - COLUMN_NAME_AB_META); - Files.delete(tmpFile.toPath()); - } catch (final IOException e) { - throw new SQLException(e); - } - } - - private void loadDataIntoTable(final JdbcDatabase database, - final List records, - final String schemaName, - final String tmpTableName, - final File tmpFile, - final String... columnNames) - throws SQLException { - database.execute(connection -> { - try { - writeBatchToFile(tmpFile, records); - - final String absoluteFile = "'" + tmpFile.getAbsolutePath() + "'"; - - /* - * We want to generate a query like: - * - * LOAD DATA LOCAL INFILE '/a/b/c' INTO TABLE foo.bar FIELDS TERMINATED BY ',' ENCLOSED BY - * '"' ESCAPED BY '\"' LINES TERMINATED BY '\r\n' (@c0, @c1, @c2, @c3, @c4) SET _airybte_raw_id = - * NULLIF(@c0, ''), _airbyte_data = NULLIF(@c1, ''), _airbyte_extracted_at = NULLIF(@c2, ''), - * _airbyte_loaded_at = NULLIF(@c3, ''), _airbyte_meta = NULLIF(@c4, '') - * - * This is to avoid weird default values (e.g. 0000-00-00 00:00:00) when the value should be NULL. - */ - - final String colVarDecls = "(" - + IntStream.range(0, columnNames.length).mapToObj(i -> "@c" + i).collect(Collectors.joining(",")) - + ")"; - final String colAssignments = IntStream.range(0, columnNames.length) - .mapToObj(i -> columnNames[i] + " = NULLIF(@c" + i + ", '')") - .collect(Collectors.joining(",")); - - final String query = String.format( - """ - LOAD DATA LOCAL INFILE %s INTO TABLE %s.%s - FIELDS TERMINATED BY ',' ENCLOSED BY '"' ESCAPED BY '\\"' - LINES TERMINATED BY '\\r\\n' - %s - SET - %s - """, - absoluteFile, - schemaName, - tmpTableName, - colVarDecls, - colAssignments); - try (final Statement stmt = connection.createStatement()) { - stmt.execute(query); - } - } catch (final Exception e) { - throw new RuntimeException(e); - } - }); - } - - void verifyLocalFileEnabled(final JdbcDatabase database) throws SQLException { - final boolean localFileEnabled = isLocalFileEnabled || checkIfLocalFileIsEnabled(database); - if (!localFileEnabled) { - tryEnableLocalFile(database); - } - isLocalFileEnabled = true; - } - - private void tryEnableLocalFile(final JdbcDatabase database) throws SQLException { - database.execute(connection -> { - try (final Statement statement = connection.createStatement()) { - statement.execute("set global local_infile=true"); - } catch (final Exception e) { - throw new RuntimeException( - "The DB user provided to airbyte was unable to switch on the local_infile attribute on the MySQL server. As an admin user, you will need to run \"SET GLOBAL local_infile = true\" before syncing data with Airbyte.", - e); - } - }); - } - - private double getVersion(final JdbcDatabase database) throws SQLException { - final List versions = database.queryStrings( - connection -> connection.createStatement().executeQuery("select version()"), - resultSet -> resultSet.getString("version()")); - return Double.parseDouble(versions.get(0).substring(0, 3)); - } - - VersionCompatibility isCompatibleVersion(final JdbcDatabase database) throws SQLException { - final double version = getVersion(database); - return new VersionCompatibility(version, version >= 5.7); - } - - @Override - public boolean isSchemaRequired() { - return false; - } - - private boolean checkIfLocalFileIsEnabled(final JdbcDatabase database) throws SQLException { - final List localFiles = database.queryStrings( - connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), - resultSet -> resultSet.getString("Value")); - return localFiles.get(0).equalsIgnoreCase("on"); - } - - @Override - public void createTableIfNotExists( - JdbcDatabase database, - String schemaName, - String tableName) - throws SQLException { - super.createTableIfNotExists(database, schemaName, tableName); - - // mysql doesn't have a "create index if not exists" method, and throws an error - // if you create an index that already exists. - // So we can't just override postCreateTableQueries. - // Instead, we manually query for index existence and create the index if needed. - // jdbc metadata is... weirdly painful to use for finding indexes: - // (getIndexInfo requires isUnique / isApproximate, which sounds like an easy thing to get wrong), - // and jooq doesn't support `show` queries, - // so manually build the query string. We can at least use jooq to render the table name. - String tableId = DSL.using(SQLDialect.MYSQL).render(table(name(schemaName, tableName))); - // This query returns a list of columns in the index, or empty list if the index does not exist. - boolean unloadedExtractedAtIndexNotExists = - database.queryJsons("show index from " + tableId + " where key_name='unloaded_extracted_at'").isEmpty(); - if (unloadedExtractedAtIndexNotExists) { - database.execute(DSL.using(SQLDialect.MYSQL).createIndex("unloaded_extracted_at") - .on( - table(name(schemaName, tableName)), - field(name(COLUMN_NAME_AB_LOADED_AT)), - field(name(COLUMN_NAME_AB_EXTRACTED_AT))) - .getSQL()); - } - boolean extractedAtIndexNotExists = database.queryJsons("show index from " + tableId + " where key_name='extracted_at'").isEmpty(); - if (extractedAtIndexNotExists) { - database.execute(DSL.using(SQLDialect.MYSQL).createIndex("extracted_at") - .on( - table(name(schemaName, tableName)), - field(name(COLUMN_NAME_AB_EXTRACTED_AT))) - .getSQL()); - } - } - - @Override - protected String createTableQueryV1(String schemaName, String tableName) { - throw new UnsupportedOperationException("Mysql requires V2"); - } - - @Override - protected String createTableQueryV2(String schemaName, String tableName) { - // MySQL requires byte information with VARCHAR. Since we are using uuid as value for the column, - // 256 is enough - return String.format( - """ - CREATE TABLE IF NOT EXISTS %s.%s (\s - %s VARCHAR(256) PRIMARY KEY, - %s JSON, - %s TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6), - %s TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6), - %s JSON - ); - """, - schemaName, - tableName, - JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, - JavaBaseConstants.COLUMN_NAME_DATA, - JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, - JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, - JavaBaseConstants.COLUMN_NAME_AB_META); - } - - public static class VersionCompatibility { - - private final double version; - private final boolean isCompatible; - - public VersionCompatibility(final double version, final boolean isCompatible) { - this.version = version; - this.isCompatible = isCompatible; - } - - public double getVersion() { - return version; - } - - public boolean isCompatible() { - return isCompatible; - } - - } - -} diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/kotlin/io/airbyte/integrations/destination/mysql/MySQLDestination.kt b/airbyte-integrations/connectors/destination-mysql/src/main/kotlin/io/airbyte/integrations/destination/mysql/MySQLDestination.kt new file mode 100644 index 00000000000000..9de31e2a04c8c5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mysql/src/main/kotlin/io/airbyte/integrations/destination/mysql/MySQLDestination.kt @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.mysql + +import com.fasterxml.jackson.databind.JsonNode +import com.google.common.collect.ImmutableMap +import io.airbyte.cdk.db.factory.DataSourceFactory +import io.airbyte.cdk.db.factory.DatabaseDriver +import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.db.jdbc.JdbcUtils +import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility +import io.airbyte.cdk.integrations.base.Destination +import io.airbyte.cdk.integrations.base.IntegrationRunner +import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage +import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination +import io.airbyte.cdk.integrations.destination.PropertyNameSimplifyingDataTransformer +import io.airbyte.cdk.integrations.destination.async.deser.StreamAwareDataTransformer +import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator +import io.airbyte.commons.exceptions.ConfigErrorException +import io.airbyte.commons.exceptions.ConnectionErrorException +import io.airbyte.commons.json.Jsons +import io.airbyte.commons.map.MoreMaps +import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler +import io.airbyte.integrations.base.destination.typing_deduping.DestinationV1V2Migrator +import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog +import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState +import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlDestinationHandler +import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlSqlGenerator +import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlV1V2Migrator +import io.airbyte.protocol.models.v0.AirbyteConnectionStatus +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import java.sql.SQLSyntaxErrorException +import java.util.* + +class MySQLDestination : AbstractJdbcDestination( + DRIVER_CLASS, MySQLNameTransformer(), MySQLSqlOperations() +), + Destination { + override val configSchemaKey: String + get() = JdbcUtils.DATABASE_KEY + + override fun check(config: JsonNode): AirbyteConnectionStatus { + val dataSource = getDataSource(config) + try { + val database = getDatabase(dataSource) + val mySQLSqlOperations = sqlOperations as MySQLSqlOperations + + val outputSchema: String = namingResolver.getIdentifier(config[JdbcUtils.DATABASE_KEY].asText()) + attemptTableOperations( + outputSchema, database, namingResolver, + mySQLSqlOperations, false + ) + + mySQLSqlOperations.verifyLocalFileEnabled(database) + + val compatibility = mySQLSqlOperations.isCompatibleVersion(database) + if (!compatibility.isCompatible) { + throw RuntimeException( + String.format( + "Your MySQL version %s is not compatible with Airbyte", + compatibility.version + ) + ) + } + + return AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED) + } catch (e: ConnectionErrorException) { + val message = ErrorMessage.getErrorMessage(e.stateCode, e.errorCode, e.exceptionMessage, e) + AirbyteTraceMessageUtility.emitConfigErrorTrace(e, message) + return AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage(message) + } catch (e: Exception) { + LOGGER.error("Exception while checking connection: ", e) + return AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage( + """ + Could not connect with provided configuration. + ${e.message} + """.trimIndent() + ) + } finally { + try { + DataSourceFactory.close(dataSource) + } catch (e: Exception) { + LOGGER.warn("Unable to close data source.", e) + } + } + } + + override fun getDefaultConnectionProperties(config: JsonNode): Map { + return if (JdbcUtils.useSsl(config)) { + DEFAULT_SSL_JDBC_PARAMETERS + } else { + DEFAULT_JDBC_PARAMETERS + } + } + + override fun toJdbcConfig(config: JsonNode): JsonNode { + val jdbcUrl = String.format( + "jdbc:mysql://%s:%s", + config[JdbcUtils.HOST_KEY].asText(), + config[JdbcUtils.PORT_KEY].asText() + ) + + val configBuilder = ImmutableMap.builder() + .put(JdbcUtils.USERNAME_KEY, config[JdbcUtils.USERNAME_KEY].asText()) + .put(JdbcUtils.JDBC_URL_KEY, jdbcUrl) + + if (config.has(JdbcUtils.PASSWORD_KEY)) { + configBuilder.put(JdbcUtils.PASSWORD_KEY, config[JdbcUtils.PASSWORD_KEY].asText()) + } + if (config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) { + configBuilder.put(JdbcUtils.JDBC_URL_PARAMS_KEY, config[JdbcUtils.JDBC_URL_PARAMS_KEY]) + } + + return Jsons.jsonNode(configBuilder.build()) + } + + override fun getSqlGenerator(config: JsonNode): JdbcSqlGenerator { + return MysqlSqlGenerator() + } + + override fun getDestinationHandler( + databaseName: String, + database: JdbcDatabase, + rawTableSchema: String + ): JdbcDestinationHandler { + return MysqlDestinationHandler(database, rawTableSchema) + } + + override fun getMigrations( + database: JdbcDatabase, + databaseName: String, + sqlGenerator: SqlGenerator, + destinationHandler: DestinationHandler + ): List> { + return emptyList() + } + + override fun getV1V2Migrator(database: JdbcDatabase, databaseName: String): DestinationV1V2Migrator { + return MysqlV1V2Migrator(database) + } + + override fun getDataTransformer(parsedCatalog: ParsedCatalog?, defaultNamespace: String?): StreamAwareDataTransformer { + return PropertyNameSimplifyingDataTransformer() + } + + override val isV2Destination: Boolean + get() = true + + companion object { + private val LOGGER: Logger = LoggerFactory.getLogger(MySQLDestination::class.java) + val DRIVER_CLASS: String = DatabaseDriver.MYSQL.driverClassName + + val DEFAULT_JDBC_PARAMETERS: Map = + ImmutableMap.of( // zero dates by default cannot be parsed into java date objects (they will throw an error) + // in addition, users don't always have agency in fixing them e.g: maybe they don't own the database + // and can't + // remove zero date values. + // since zero dates are placeholders, we convert them to null by default + "zeroDateTimeBehavior", "convertToNull", + "allowLoadLocalInfile", "true" + ) + + val DEFAULT_SSL_JDBC_PARAMETERS: Map = MoreMaps.merge( + ImmutableMap.of( + "useSSL", "true", + "requireSSL", "true", + "verifyServerCertificate", "false" + ), + DEFAULT_JDBC_PARAMETERS + ) + + fun sshWrappedDestination(): Destination { + return SshWrappedDestination(MySQLDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY) + } + + @Throws(Exception::class) + fun handleException(e: Exception) { + if (e is SQLSyntaxErrorException) { + if (e.message!!.lowercase(Locale.getDefault()).contains("access denied")) { + throw ConfigErrorException("Access denied. Please check your configuration", e) + } + } + + throw e + } + + @Throws(Exception::class) + @JvmStatic + fun main(args: Array) { + val destination = sshWrappedDestination() + LOGGER.info("starting destination: {}", MySQLDestination::class.java) + try { + IntegrationRunner(destination).run(args) + } catch (e: Exception) { + handleException(e) + } + LOGGER.info("completed destination: {}", MySQLDestination::class.java) + } + } +} diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/kotlin/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.kt b/airbyte-integrations/connectors/destination-mysql/src/main/kotlin/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.kt new file mode 100644 index 00000000000000..8b7c322c58d842 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mysql/src/main/kotlin/io/airbyte/integrations/destination/mysql/MySQLNameTransformer.kt @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.mysql + +import io.airbyte.cdk.integrations.destination.StandardNameTransformer +import java.util.* + +/** + * Note that MySQL documentation discusses about identifiers case sensitivity using the + * lower_case_table_names system variable. As one of their recommendation is: "It is best to adopt a + * consistent convention, such as always creating and referring to databases and tables using + * lowercase names. This convention is recommended for maximum portability and ease of use. + * + * Source: https://dev.mysql.com/doc/refman/8.0/en/identifier-case-sensitivity.html" + * + * As a result, we are here forcing all identifier (table, schema and columns) names to lowercase. + */ +class MySQLNameTransformer : StandardNameTransformer() { + override fun getIdentifier(name: String): String { + val identifier = applyDefaultCase(super.getIdentifier(name)) + return truncateName(identifier, TRUNCATION_MAX_NAME_LENGTH) + } + + override fun getTmpTableName(streamName: String): String { + val tmpTableName = applyDefaultCase(super.getTmpTableName(streamName)) + return truncateName(tmpTableName, TRUNCATION_MAX_NAME_LENGTH) + } + + override fun getRawTableName(streamName: String): String { + val rawTableName = applyDefaultCase(super.getRawTableName(streamName)) + return truncateName(rawTableName, TRUNCATION_MAX_NAME_LENGTH) + } + + override fun applyDefaultCase(input: String): String { + return input.lowercase(Locale.getDefault()) + } + + companion object { + // These constants must match those in destination_name_transformer.py + const val MAX_MYSQL_NAME_LENGTH: Int = 64 + + // DBT appends a suffix to table names + const val TRUNCATE_DBT_RESERVED_SIZE: Int = 12 + + // 4 charachters for 1 underscore and 3 suffix (e.g. _ab1) + // 4 charachters for 1 underscore and 3 schema hash + const val TRUNCATE_RESERVED_SIZE: Int = 8 + const val TRUNCATION_MAX_NAME_LENGTH: Int = MAX_MYSQL_NAME_LENGTH - TRUNCATE_DBT_RESERVED_SIZE - TRUNCATE_RESERVED_SIZE + + @JvmStatic + fun truncateName(name: String, maxLength: Int): String { + if (name.length <= maxLength) { + return name + } + + val allowedLength = maxLength - 2 + val prefix = name.substring(0, allowedLength / 2) + val suffix = name.substring(name.length - allowedLength / 2) + return prefix + "__" + suffix + } + } +} diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/kotlin/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.kt b/airbyte-integrations/connectors/destination-mysql/src/main/kotlin/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.kt new file mode 100644 index 00000000000000..7517bb101288c8 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mysql/src/main/kotlin/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.kt @@ -0,0 +1,258 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.destination.mysql + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings +import io.airbyte.cdk.db.jdbc.JdbcDatabase +import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage +import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations +import io.airbyte.commons.functional.CheckedConsumer +import org.jooq.SQLDialect +import org.jooq.impl.DSL +import java.io.File +import java.io.IOException +import java.nio.file.Files +import java.sql.Connection +import java.sql.ResultSet +import java.sql.SQLException +import java.util.stream.Collectors +import java.util.stream.IntStream + +@SuppressFBWarnings( + value = ["SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE"], + justification = "There is little chance of SQL injection. There is also little need for statement reuse. The basic statement is more readable than the prepared statement." +) +class MySQLSqlOperations : JdbcSqlOperations() { + private var isLocalFileEnabled = false + + @Throws(Exception::class) + override fun executeTransaction(database: JdbcDatabase, queries: List) { + database.executeWithinTransaction(queries) + } + + @Throws(SQLException::class) + public override fun insertRecordsInternal( + database: JdbcDatabase, + records: List, + schemaName: String?, + tmpTableName: String? + ) { + throw UnsupportedOperationException("Mysql requires V2") + } + + @Throws(Exception::class) + override fun insertRecordsInternalV2( + database: JdbcDatabase, + records: List, + schemaName: String?, + tableName: String? + ) { + if (records.isEmpty()) { + return + } + + verifyLocalFileEnabled(database) + try { + val tmpFile = Files.createTempFile("$tableName-", ".tmp").toFile() + + loadDataIntoTable( + database, + records, + schemaName, + tableName, + tmpFile, + COLUMN_NAME_AB_RAW_ID, + COLUMN_NAME_DATA, + COLUMN_NAME_AB_EXTRACTED_AT, + COLUMN_NAME_AB_LOADED_AT, + COLUMN_NAME_AB_META + ) + Files.delete(tmpFile.toPath()) + } catch (e: IOException) { + throw SQLException(e) + } + } + + @Throws(SQLException::class) + private fun loadDataIntoTable( + database: JdbcDatabase, + records: List, + schemaName: String?, + tmpTableName: String?, + tmpFile: File, + vararg columnNames: String + ) { + database.execute(CheckedConsumer { connection: Connection -> + try { + writeBatchToFile(tmpFile, records) + + val absoluteFile = "'" + tmpFile.absolutePath + "'" + + /* + * We want to generate a query like: + * + * LOAD DATA LOCAL INFILE '/a/b/c' INTO TABLE foo.bar FIELDS TERMINATED BY ',' ENCLOSED BY + * '"' ESCAPED BY '\"' LINES TERMINATED BY '\r\n' (@c0, @c1, @c2, @c3, @c4) SET _airybte_raw_id = + * NULLIF(@c0, ''), _airbyte_data = NULLIF(@c1, ''), _airbyte_extracted_at = NULLIF(@c2, ''), + * _airbyte_loaded_at = NULLIF(@c3, ''), _airbyte_meta = NULLIF(@c4, '') + * + * This is to avoid weird default values (e.g. 0000-00-00 00:00:00) when the value should be NULL. + */ + val colVarDecls = ("(" + + IntStream.range(0, columnNames.size).mapToObj { i: Int -> "@c$i" }.collect(Collectors.joining(",")) + + ")") + val colAssignments = IntStream.range(0, columnNames.size) + .mapToObj { i: Int -> columnNames[i] + " = NULLIF(@c" + i + ", '')" } + .collect(Collectors.joining(",")) + + val query = String.format( + """ + LOAD DATA LOCAL INFILE %s INTO TABLE %s.%s + FIELDS TERMINATED BY ',' ENCLOSED BY '"' ESCAPED BY '\${'"'}' + LINES TERMINATED BY '\r\ + ' + %s + SET + %s + + """.trimIndent(), + absoluteFile, + schemaName, + tmpTableName, + colVarDecls, + colAssignments + ) + connection.createStatement().use { stmt -> + stmt.execute(query) + } + } catch (e: Exception) { + throw RuntimeException(e) + } + }) + } + + @Throws(SQLException::class) + fun verifyLocalFileEnabled(database: JdbcDatabase) { + val localFileEnabled = isLocalFileEnabled || checkIfLocalFileIsEnabled(database) + if (!localFileEnabled) { + tryEnableLocalFile(database) + } + isLocalFileEnabled = true + } + + @Throws(SQLException::class) + private fun tryEnableLocalFile(database: JdbcDatabase) { + database.execute(CheckedConsumer { connection: Connection -> + try { + connection.createStatement().use { statement -> + statement.execute("set global local_infile=true") + } + } catch (e: Exception) { + throw RuntimeException( + "The DB user provided to airbyte was unable to switch on the local_infile attribute on the MySQL server. As an admin user, you will need to run \"SET GLOBAL local_infile = true\" before syncing data with Airbyte.", + e + ) + } + }) + } + + @Throws(SQLException::class) + private fun getVersion(database: JdbcDatabase): Double { + val versions = database.queryStrings( + { connection: Connection -> connection.createStatement().executeQuery("select version()") }, + { resultSet: ResultSet -> resultSet.getString("version()") }) + return versions[0].substring(0, 3).toDouble() + } + + @Throws(SQLException::class) + fun isCompatibleVersion(database: JdbcDatabase): VersionCompatibility { + val version = getVersion(database) + return VersionCompatibility(version, version >= 5.7) + } + + override val isSchemaRequired: Boolean + get() = false + + @Throws(SQLException::class) + private fun checkIfLocalFileIsEnabled(database: JdbcDatabase): Boolean { + val localFiles = database.queryStrings( + { connection: Connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'") }, + { resultSet: ResultSet -> resultSet.getString("Value") }) + return localFiles[0].equals("on", ignoreCase = true) + } + + @Throws(SQLException::class) + override fun createTableIfNotExists( + database: JdbcDatabase, + schemaName: String?, + tableName: String? + ) { + super.createTableIfNotExists(database, schemaName, tableName) + + // mysql doesn't have a "create index if not exists" method, and throws an error + // if you create an index that already exists. + // So we can't just override postCreateTableQueries. + // Instead, we manually query for index existence and create the index if needed. + // jdbc metadata is... weirdly painful to use for finding indexes: + // (getIndexInfo requires isUnique / isApproximate, which sounds like an easy thing to get wrong), + // and jooq doesn't support `show` queries, + // so manually build the query string. We can at least use jooq to render the table name. + val tableId = DSL.using(SQLDialect.MYSQL).render(DSL.table(DSL.name(schemaName, tableName))) + // This query returns a list of columns in the index, or empty list if the index does not exist. + val unloadedExtractedAtIndexNotExists = + database.queryJsons("show index from $tableId where key_name='unloaded_extracted_at'").isEmpty() + if (unloadedExtractedAtIndexNotExists) { + database.execute( + DSL.using(SQLDialect.MYSQL).createIndex("unloaded_extracted_at") + .on( + DSL.table(DSL.name(schemaName, tableName)), + DSL.field(DSL.name(COLUMN_NAME_AB_LOADED_AT)), + DSL.field(DSL.name(COLUMN_NAME_AB_EXTRACTED_AT)) + ) + .sql + ) + } + val extractedAtIndexNotExists = database.queryJsons("show index from $tableId where key_name='extracted_at'").isEmpty() + if (extractedAtIndexNotExists) { + database.execute( + DSL.using(SQLDialect.MYSQL).createIndex("extracted_at") + .on( + DSL.table(DSL.name(schemaName, tableName)), + DSL.field(DSL.name(COLUMN_NAME_AB_EXTRACTED_AT)) + ) + .getSQL() + ) + } + } + + override fun createTableQueryV1(schemaName: String?, tableName: String?): String { + throw UnsupportedOperationException("Mysql requires V2") + } + + override fun createTableQueryV2(schemaName: String?, tableName: String?): String { + // MySQL requires byte information with VARCHAR. Since we are using uuid as value for the column, + // 256 is enough + return String.format( + """ + CREATE TABLE IF NOT EXISTS %s.%s ( + %s VARCHAR(256) PRIMARY KEY, + %s JSON, + %s TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6), + %s TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6), + %s JSON + ); + + """.trimIndent(), + schemaName, + tableName, + COLUMN_NAME_AB_RAW_ID, + COLUMN_NAME_DATA, + COLUMN_NAME_AB_EXTRACTED_AT, + COLUMN_NAME_AB_LOADED_AT, + COLUMN_NAME_AB_META + ) + } + + class VersionCompatibility(val version: Double, val isCompatible: Boolean) +} diff --git a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java index 55cbb6edd79b43..e3adeffecae661 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java @@ -36,6 +36,8 @@ import java.util.HashSet; import java.util.List; import java.util.stream.Collectors; +import io.airbyte.integrations.destination.mysql.MySQLDestination; +import io.airbyte.integrations.destination.mysql.MySQLNameTransformer; import org.jooq.DSLContext; import org.jooq.SQLDialect; import org.junit.jupiter.api.Disabled; diff --git a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySqlTestDataComparator.java b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySqlTestDataComparator.java index caa587109f6fde..d47787ea4af5b7 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySqlTestDataComparator.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySqlTestDataComparator.java @@ -10,6 +10,7 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; +import io.airbyte.integrations.destination.mysql.MySQLNameTransformer; public class MySqlTestDataComparator extends AdvancedTestDataComparator { diff --git a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SshMySQLDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SshMySQLDestinationAcceptanceTest.java index 45d8582912aa77..692716427ae1e9 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SshMySQLDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SshMySQLDestinationAcceptanceTest.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.List; import java.util.stream.Collectors; +import io.airbyte.integrations.destination.mysql.MySQLNameTransformer; import org.apache.commons.lang3.RandomStringUtils; import org.jooq.DSLContext; import org.jooq.SQLDialect; diff --git a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SslMySQLDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SslMySQLDestinationAcceptanceTest.java index f2e29a2fbbd93f..544511e14d02d6 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SslMySQLDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SslMySQLDestinationAcceptanceTest.java @@ -21,6 +21,8 @@ import java.util.HashSet; import java.util.List; import java.util.stream.Collectors; +import io.airbyte.integrations.destination.mysql.MySQLDestination; +import io.airbyte.integrations.destination.mysql.MySQLNameTransformer; import org.jooq.DSLContext; import org.jooq.SQLDialect; import org.junit.jupiter.api.Disabled; diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java index 51588600846337..ef677317fb7abd 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLDestinationTest.java @@ -11,6 +11,7 @@ import io.airbyte.cdk.db.jdbc.JdbcUtils; import io.airbyte.commons.json.Jsons; import java.util.Map; +import io.airbyte.integrations.destination.mysql.MySQLDestination; import org.junit.jupiter.api.Test; public class MySQLDestinationTest { diff --git a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformerTest.java b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformerTest.java index 461fb3a0f81158..72f6478d921036 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformerTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test/java/io/airbyte/integrations/destination/mysql/MySQLNameTransformerTest.java @@ -6,6 +6,7 @@ import static org.junit.jupiter.api.Assertions.*; +import io.airbyte.integrations.destination.mysql.MySQLNameTransformer; import org.junit.jupiter.api.Test; class MySQLNameTransformerTest { diff --git a/airbyte-integrations/connectors/destination-mysql/src/testFixtures/kotlin/io/airbyte/integrations/destination/mysql/MysqlContainerFactory.kt b/airbyte-integrations/connectors/destination-mysql/src/testFixtures/kotlin/io/airbyte/integrations/destination/mysql/MysqlContainerFactory.kt index 5062720b7b32ce..b59ea6c6749c69 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/testFixtures/kotlin/io/airbyte/integrations/destination/mysql/MysqlContainerFactory.kt +++ b/airbyte-integrations/connectors/destination-mysql/src/testFixtures/kotlin/io/airbyte/integrations/destination/mysql/MysqlContainerFactory.kt @@ -11,6 +11,6 @@ import org.testcontainers.utility.DockerImageName /** Much like the destination-postgres PostgresTestDatabase, this was copied from source-mysql. */ class MySQLContainerFactory : ContainerFactory>() { override fun createNewContainer(imageName: DockerImageName?): MySQLContainer<*> { - return MySQLContainer(imageName?.asCompatibleSubstituteFor("mysql")) + return MySQLContainer(imageName?.asCompatibleSubstituteFor("io/airbyte/integrations/destination/mysql")) } } diff --git a/docs/integrations/destinations/mysql.md b/docs/integrations/destinations/mysql.md index 12eae066c22b01..98fba0d8c99d75 100644 --- a/docs/integrations/destinations/mysql.md +++ b/docs/integrations/destinations/mysql.md @@ -111,6 +111,7 @@ Using this feature requires additional configuration, when creating the destinat | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :-------------------------------------------------------------------------------------------------- | +| 1.0.2 | 2024-06-26 | [40553](https://github.com/airbytehq/airbyte/pull/40553) | Convert prod code to kotlin | | 1.0.1 | 2024-06-25 | [40513](https://github.com/airbytehq/airbyte/pull/40513) | Improve error reporting for "access denied" error | | 1.0.0 | 2024-04-26 | [37322](https://github.com/airbytehq/airbyte/pull/37322) | Remove normalization and upgrade to DV2 output format | | 0.3.1 | 2024-04-12 | [36926](https://github.com/airbytehq/airbyte/pull/36926) | Upgrade to Kotlin CDK |