Skip to content

Commit

Permalink
Destination Mysql: DV2 (#36936)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed May 3, 2024
1 parent 4894ad2 commit e8e85eb
Show file tree
Hide file tree
Showing 66 changed files with 1,597 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.30.2'
cdkVersionRequired = '0.33.0'
features = ['db-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand All @@ -31,9 +31,3 @@ dependencies {
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-mysql')
integrationTestJavaImplementation libs.testcontainers.mysql
}

configurations.all {
resolutionStrategy {
force libs.jooq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,34 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
dockerImageTag: 0.3.1
dockerImageTag: 1.0.0
dockerRepository: airbyte/destination-mysql-strict-encrypt
githubIssueLabel: destination-mysql
icon: mysql.svg
license: ELv2
name: MySQL
normalizationConfig:
normalizationIntegrationType: mysql
normalizationRepository: airbyte/normalization-mysql
normalizationTag: 0.4.1
releaseStage: alpha
documentationUrl: https://docs.airbyte.com/integrations/destinations/mysql
supportsDbt: true
tags:
- language:java
releases:
breakingChanges:
1.0.0:
message:
"**Do not upgrade until you have run a test upgrade as outlined [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#testing-destinations-v2-for-a-single-connection)**.
This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2),
which provides better error handling, incremental delivery of data for large
syncs, and improved final table structures. To review the breaking changes,
and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading).
These changes will likely require updates to downstream dbt / SQL models,
which we walk through [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations).
Selecting `Upgrade` will upgrade **all** connections using this destination
at their next sync. You can manually sync existing connections prior to
the next scheduled sync to start the upgrade early.
"
upgradeDeadline: "2024-05-15"
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

@Disabled
public class MySQLStrictEncryptDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {

private MySQLContainer<?> db;
Expand Down Expand Up @@ -113,23 +114,22 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
try (final DSLContext dslContext = DSLContextFactory.create(
final DSLContext dslContext = DSLContextFactory.create(
db.getUsername(),
db.getPassword(),
db.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
SQLDialect.MYSQL)) {
return new Database(dslContext).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList()));
}
SQLDialect.MYSQL);
return new Database(dslContext).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList()));
}

@Override
Expand Down Expand Up @@ -162,19 +162,18 @@ private void grantCorrectPermissions() {
}

private void executeQuery(final String query) {
try (final DSLContext dslContext = DSLContextFactory.create(
final DSLContext dslContext = DSLContextFactory.create(
"root",
"test",
db.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
SQLDialect.MYSQL)) {
new Database(dslContext).query(
ctx -> ctx
.execute(query));
} catch (final SQLException e) {
SQLDialect.MYSQL);
try {
new Database(dslContext).query(ctx -> ctx.execute(query));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/mysql",
"supportsIncremental": true,
"supportsNormalization": true,
"supportsNormalization": false,
"supportsDBT": true,
"supported_destination_sync_modes": ["overwrite", "append"],
"supported_destination_sync_modes": ["overwrite", "append", "append_dedup"],
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "MySQL Destination Spec",
Expand Down Expand Up @@ -165,6 +165,19 @@
}
}
]
},
"raw_data_schema": {
"type": "string",
"description": "The database to write raw tables into",
"title": "Raw table database (defaults to airbyte_internal)",
"order": 7
},
"disable_type_dedupe": {
"type": "boolean",
"default": false,
"description": "Disable Writing Final Tables. WARNING! The data format in _airbyte_data is likely stable but there are no guarantees that other metadata columns will remain the same in future versions",
"title": "Disable Final Tables. (WARNING! Unstable option; Columns in raw table schema might change between versions)",
"order": 8
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.30.2'
cdkVersionRequired = '0.33.0'
features = ['db-destinations', 'typing-deduping']
useLocalCdk = false
}
Expand All @@ -26,10 +26,5 @@ application {
dependencies {
implementation 'mysql:mysql-connector-java:8.0.22'
integrationTestJavaImplementation libs.testcontainers.mysql
}

configurations.all {
resolutionStrategy {
force libs.jooq
}
testFixturesApi libs.testcontainers.mysql
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# our testcontainer has issues with too much concurrency.
# 4 threads seems to be the sweet spot.
testExecutionConcurrency=4
JunitMethodExecutionTimeout=15 m
24 changes: 17 additions & 7 deletions airbyte-integrations/connectors/destination-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,17 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
dockerImageTag: 0.3.1
dockerImageTag: 1.0.0
dockerRepository: airbyte/destination-mysql
githubIssueLabel: destination-mysql
icon: mysql.svg
license: ELv2
name: MySQL
normalizationConfig:
normalizationIntegrationType: mysql
normalizationRepository: airbyte/normalization-mysql
normalizationTag: 0.4.3
registries:
cloud:
dockerImageTag: 0.2.0
dockerRepository: airbyte/destination-mysql-strict-encrypt
enabled: true
oss:
dockerImageTag: 0.2.0
enabled: true
releaseStage: alpha
documentationUrl: https://docs.airbyte.com/integrations/destinations/mysql
Expand All @@ -29,4 +23,20 @@ data:
sl: 100
ql: 200
supportLevel: community
releases:
breakingChanges:
1.0.0:
message:
"**Do not upgrade until you have run a test upgrade as outlined [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#testing-destinations-v2-for-a-single-connection)**.
This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2),
which provides better error handling and improved final table structures. To review the breaking changes,
and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading).
These changes will likely require updates to downstream dbt / SQL models,
which we walk through [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations).
Selecting `Upgrade` will upgrade **all** connections using this destination
at their next sync. You can manually sync existing connections prior to
the next scheduled sync to start the upgrade early."
upgradeDeadline: "2024-06-05"
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationV1V2Migrator;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.integrations.destination.mysql.MySQLSqlOperations.VersionCompatibility;
import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlDestinationHandler;
import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlSqlGenerator;
import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlV1V2Migrator;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import java.util.Collections;
Expand Down Expand Up @@ -60,6 +64,12 @@ public class MySQLDestination extends AbstractJdbcDestination<MinimumDestination
"verifyServerCertificate", "false"),
DEFAULT_JDBC_PARAMETERS);

@Override
@NotNull
protected String getConfigSchemaKey() {
return JdbcUtils.DATABASE_KEY;
}

public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new MySQLDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
}
Expand Down Expand Up @@ -120,10 +130,9 @@ protected Map<String, String> getDefaultConnectionProperties(final JsonNode conf

@Override
public JsonNode toJdbcConfig(final JsonNode config) {
final String jdbcUrl = String.format("jdbc:mysql://%s:%s/%s",
final String jdbcUrl = String.format("jdbc:mysql://%s:%s",
config.get(JdbcUtils.HOST_KEY).asText(),
config.get(JdbcUtils.PORT_KEY).asText(),
config.get(JdbcUtils.DATABASE_KEY).asText());
config.get(JdbcUtils.PORT_KEY).asText());

final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put(JdbcUtils.USERNAME_KEY, config.get(JdbcUtils.USERNAME_KEY).asText())
Expand All @@ -141,27 +150,15 @@ public JsonNode toJdbcConfig(final JsonNode config) {

@Override
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
throw new UnsupportedOperationException("mysql does not yet support DV2");
}

@Override
protected StreamAwareDataTransformer getDataTransformer(ParsedCatalog parsedCatalog, String defaultNamespace) {
return new PropertyNameSimplifyingDataTransformer();
}

public static void main(final String[] args) throws Exception {
final Destination destination = MySQLDestination.sshWrappedDestination();
LOGGER.info("starting destination: {}", MySQLDestination.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", MySQLDestination.class);
return new MysqlSqlGenerator();
}

@NotNull
@Override
protected JdbcDestinationHandler<MinimumDestinationState> getDestinationHandler(@NotNull String databaseName,
@NotNull JdbcDatabase database,
@NotNull String rawTableSchema) {
throw new UnsupportedOperationException("Mysql does not yet support DV2");
return new MysqlDestinationHandler(database, rawTableSchema);
}

@NotNull
Expand All @@ -173,4 +170,26 @@ protected List<Migration<MinimumDestinationState>> getMigrations(@NotNull JdbcDa
return Collections.emptyList();
}

@Override
protected DestinationV1V2Migrator getV1V2Migrator(JdbcDatabase database, String databaseName) {
return new MysqlV1V2Migrator(database);
}

@Override
protected StreamAwareDataTransformer getDataTransformer(ParsedCatalog parsedCatalog, String defaultNamespace) {
return new PropertyNameSimplifyingDataTransformer();
}

@Override
public boolean isV2Destination() {
return true;
}

public static void main(final String[] args) throws Exception {
final Destination destination = MySQLDestination.sshWrappedDestination();
LOGGER.info("starting destination: {}", MySQLDestination.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", MySQLDestination.class);
}

}
Loading

0 comments on commit e8e85eb

Please sign in to comment.