Skip to content

Commit

Permalink
mysql dv2 raw table impl
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Apr 15, 2024
1 parent 94a3dc0 commit 7a3147d
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.30.2'
features = ['db-destinations', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

//remove once upgrading the CDK version to 0.4.x or later
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.30.2'
features = ['db-destinations', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

//remove once upgrading the CDK version to 0.4.x or later
Expand All @@ -27,9 +27,3 @@ dependencies {
implementation 'mysql:mysql-connector-java:8.0.22'
integrationTestJavaImplementation libs.testcontainers.mysql
}

configurations.all {
resolutionStrategy {
force libs.jooq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.NoOpJdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.RawOnlySqlGenerator;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
Expand All @@ -37,6 +39,7 @@
import java.util.Map;
import javax.sql.DataSource;
import org.jetbrains.annotations.NotNull;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -141,14 +144,24 @@ public JsonNode toJdbcConfig(final JsonNode config) {

@Override
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
throw new UnsupportedOperationException("mysql does not yet support DV2");
return new RawOnlySqlGenerator(new MySQLNameTransformer());
}

@Override
protected StreamAwareDataTransformer getDataTransformer(ParsedCatalog parsedCatalog, String defaultNamespace) {
return new PropertyNameSimplifyingDataTransformer();
}

@Override
public boolean isV2Destination() {
return true;
}

@Override
protected boolean shouldAlwaysDisableTypeDedupe() {
return true;
}

public static void main(final String[] args) throws Exception {
final Destination destination = MySQLDestination.sshWrappedDestination();
LOGGER.info("starting destination: {}", MySQLDestination.class);
Expand All @@ -161,7 +174,7 @@ public static void main(final String[] args) throws Exception {
protected JdbcDestinationHandler<MinimumDestinationState> getDestinationHandler(@NotNull String databaseName,
@NotNull JdbcDatabase database,
@NotNull String rawTableSchema) {
throw new UnsupportedOperationException("Mysql does not yet support DV2");
return new NoOpJdbcDestinationHandler<>(databaseName, database, rawTableSchema, SQLDialect.DEFAULT);
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
"title": "JDBC URL Params",
"type": "string",
"order": 6
},
"raw_table_database": {
"type": "string",
"description": "The database to write raw tables into",
"title": "Raw table database (defaults to airbyte_internal)",
"order": 7
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ protected boolean supportObjectDataTypeTest() {

@Override
protected JsonNode getConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
return getConfigFromTestContainer(db);
}

public static ObjectNode getConfigFromTestContainer(final MySQLContainer<?> db) {
return (ObjectNode) Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, HostPortResolver.resolveHost(db))
.put(JdbcUtils.USERNAME_KEY, db.getUsername())
.put(JdbcUtils.PASSWORD_KEY, db.getPassword())
Expand Down Expand Up @@ -132,23 +136,22 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
try (final DSLContext dslContext = DSLContextFactory.create(
final DSLContext dslContext = DSLContextFactory.create(
db.getUsername(),
db.getPassword(),
db.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
SQLDialect.MYSQL)) {
return new Database(dslContext).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList()));
}
SQLDialect.MYSQL);
return new Database(dslContext).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(this::getJsonFromRecord)
.collect(Collectors.toList()));
}

@Override
Expand All @@ -163,36 +166,39 @@ protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv test
protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TEST_SCHEMAS) {
db = new MySQLContainer<>("mysql:8.0");
db.start();
setLocalInFileToTrue();
revokeAllPermissions();
grantCorrectPermissions();
configureTestContainer(db);
}

public static void configureTestContainer(final MySQLContainer<?> db) {
setLocalInFileToTrue(db);
revokeAllPermissions(db);
grantCorrectPermissions(db);
}

private void setLocalInFileToTrue() {
executeQuery("set global local_infile=true");
private static void setLocalInFileToTrue(final MySQLContainer<?> db) {
executeQuery(db, "set global local_infile=true");
}

private void revokeAllPermissions() {
executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + db.getUsername() + "@'%';");
private static void revokeAllPermissions(final MySQLContainer<?> db) {
executeQuery(db, "REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + db.getUsername() + "@'%';");
}

private void grantCorrectPermissions() {
executeQuery("GRANT ALTER, CREATE, INSERT, SELECT, DROP ON *.* TO " + db.getUsername() + "@'%';");
private static void grantCorrectPermissions(final MySQLContainer<?> db) {
executeQuery(db, "GRANT ALTER, CREATE, INSERT, INDEX, UPDATE, DELETE, SELECT, DROP ON *.* TO " + db.getUsername() + "@'%';");
}

private void executeQuery(final String query) {
try (final DSLContext dslContext = DSLContextFactory.create(
private static void executeQuery(final MySQLContainer<?> db, final String query) {
final DSLContext dslContext = DSLContextFactory.create(
"root",
"test",
db.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
SQLDialect.MYSQL)) {
new Database(dslContext).query(
ctx -> ctx
.execute(query));
SQLDialect.MYSQL);
try {
new Database(dslContext).query(ctx -> ctx.execute(query));
} catch (final SQLException e) {
throw new RuntimeException(e);
}
Expand All @@ -208,7 +214,7 @@ protected void tearDown(final TestDestinationEnv testEnv) {
@Test
public void testCustomDbtTransformations() throws Exception {
// We need to create view for testing custom dbt transformations
executeQuery("GRANT CREATE VIEW ON *.* TO " + db.getUsername() + "@'%';");
executeQuery(db, "GRANT CREATE VIEW ON *.* TO " + db.getUsername() + "@'%';");
super.testCustomDbtTransformations();
}

Expand Down Expand Up @@ -330,7 +336,7 @@ public void testCheckIncorrectDataBaseFailure() {
unit = SECONDS)
@Test
public void testUserHasNoPermissionToDataBase() {
executeQuery("create user '" + USERNAME_WITHOUT_PERMISSION + "'@'%' IDENTIFIED BY '" + PASSWORD_WITHOUT_PERMISSION + "';\n");
executeQuery(db, "create user '" + USERNAME_WITHOUT_PERMISSION + "'@'%' IDENTIFIED BY '" + PASSWORD_WITHOUT_PERMISSION + "';\n");
final JsonNode config = ((ObjectNode) getConfigForBareMetalConnection()).put(JdbcUtils.USERNAME_KEY, USERNAME_WITHOUT_PERMISSION);
((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, PASSWORD_WITHOUT_PERMISSION);
final MySQLDestination destination = new MySQLDestination();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TES

@Override
protected void tearDown(final TestDestinationEnv testEnv) {
dslContext.close();
db.stop();
db.close();
}
Expand Down Expand Up @@ -128,18 +127,17 @@ private void grantCorrectPermissions() {
}

private void executeQuery(final String query) {
try (final DSLContext dslContext = DSLContextFactory.create(
final DSLContext dslContext = DSLContextFactory.create(
"root",
"test",
db.getDriverClassName(),
String.format("jdbc:mysql://%s:%s/%s?useSSL=true&requireSSL=true&verifyServerCertificate=false",
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
SQLDialect.DEFAULT)) {
new Database(dslContext).query(
ctx -> ctx
.execute(query));
SQLDialect.DEFAULT);
try {
new Database(dslContext).query(ctx -> ctx.execute(query));
} catch (final SQLException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.airbyte.integrations.destination.mysql

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.db.jdbc.JdbcSourceOperations
import io.airbyte.commons.json.Jsons
import java.sql.ResultSet
import java.sql.SQLException
import java.util.Locale

class MysqlTestSourceOperations : JdbcSourceOperations() {
@Throws(SQLException::class)
override fun copyToJsonField(resultSet: ResultSet, colIndex: Int, json: ObjectNode) {
val columnName = resultSet.metaData.getColumnName(colIndex)
val columnTypeName = resultSet.metaData.getColumnTypeName(colIndex).lowercase(Locale.getDefault())

// JSON has no equivalent in JDBCType
if ("json" == columnTypeName) {
json.set<JsonNode>(columnName, Jsons.deserializeExact(resultSet.getString(colIndex)))
} else {
super.copyToJsonField(resultSet, colIndex, json)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.airbyte.integrations.destination.mysql.typing_deduping;

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.RawOnlySqlGenerator
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcTypingDedupingTest
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator
import io.airbyte.integrations.destination.mysql.MySQLDestination
import io.airbyte.integrations.destination.mysql.MySQLDestinationAcceptanceTest
import io.airbyte.integrations.destination.mysql.MySQLNameTransformer
import io.airbyte.integrations.destination.mysql.MysqlTestSourceOperations
import javax.sql.DataSource
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.testcontainers.containers.MySQLContainer

abstract class AbstractMysqlTypingDedupingTest(
override val imageName: String = "airbyte/destination-mysql:dev",
override val sqlGenerator: SqlGenerator = RawOnlySqlGenerator(MySQLNameTransformer()),
override val sourceOperations: MysqlTestSourceOperations = MysqlTestSourceOperations(),
override val baseConfig: ObjectNode = Companion.config,
) : JdbcTypingDedupingTest() {

override fun getDataSource(config: JsonNode?): DataSource =
MySQLDestination().getDataSource(baseConfig)

override fun disableFinalTableComparison(): Boolean {
// TODO delete this in the next stacked PR
return true
}

companion object {
private lateinit var testContainer: MySQLContainer<*>
private lateinit var config: ObjectNode

@JvmStatic
@BeforeAll
@Throws(Exception::class)
fun setupMysql() {
testContainer = MySQLContainer("mysql:8.0")
testContainer.start()
MySQLDestinationAcceptanceTest.configureTestContainer(testContainer)

config = MySQLDestinationAcceptanceTest.getConfigFromTestContainer(testContainer)
}

@JvmStatic
@AfterAll
fun teardownMysql() {
testContainer.stop()
testContainer.close()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.airbyte.integrations.destination.mysql.typing_deduping

import com.fasterxml.jackson.databind.node.ObjectNode

class MysqlRawOverrideTypingDedupingTest : AbstractMysqlTypingDedupingTest() {

override val baseConfig: ObjectNode =
super.baseConfig.put("raw_table_database", "overridden_raw_dataset")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package io.airbyte.integrations.destination.mysql.typing_deduping

// Just a concrete instantiation of the abstract class. No overrides needed.
class MysqlTypingDedupingTest: AbstractMysqlTypingDedupingTest()

0 comments on commit 7a3147d

Please sign in to comment.