Skip to content

Commit

Permalink
MySql destination : Enable DAT tests (#12866)
Browse files Browse the repository at this point in the history
* add MySqlTestDataComparator

* enable new DAT

* enable new DAT

* handle json type + fix boolean compare for both cases

* format
  • Loading branch information
DoNotPanicUA committed May 19, 2022
1 parent 1edc0de commit 5aa7fe7
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ protected JsonNode getJsonFromRecord(Record record) {
var value = record.get(field);

switch (field.getDataType().getTypeName()) {
case "varchar", "nvarchar", "jsonb", "other":
case "varchar", "nvarchar", "jsonb", "json", "other":
var stringValue = (value != null ? value.toString() : null);
DestinationAcceptanceTestUtils.putStringIntoJson(stringValue, field.getName(), node);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
Expand All @@ -29,7 +28,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -110,15 +108,16 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
String.format("jdbc:mysql://%s:%s/%s?useSSL=true&requireSSL=true&verifyServerCertificate=false",
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()), SQLDialect.MYSQL);
db.getDatabaseName()),
SQLDialect.MYSQL);
return new Database(dslContext).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}

@Override
Expand Down Expand Up @@ -164,16 +163,17 @@ private void grantCorrectPermissions() {

private void executeQuery(final String query) {
try (final DSLContext dslContext = DSLContextFactory.create(
db.getUsername(),
db.getPassword(),
db.getDriverClassName(),
String.format("jdbc:mysql://%s:%s/%s?useSSL=true&requireSSL=true&verifyServerCertificate=false",
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()), SQLDialect.MYSQL)) {
db.getUsername(),
db.getPassword(),
db.getDriverClassName(),
String.format("jdbc:mysql://%s:%s/%s?useSSL=true&requireSSL=true&verifyServerCertificate=false",
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
SQLDialect.MYSQL)) {
new Database(dslContext).query(
ctx -> ctx
.execute(query));
ctx -> ctx
.execute(query));
} catch (final SQLException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand All @@ -27,16 +26,14 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class MySQLDestinationAcceptanceTest extends DestinationAcceptanceTest {
public class MySQLDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {

private MySQLContainer<?> db;
private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();
Expand All @@ -61,6 +58,26 @@ protected boolean supportsNormalization() {
return true;
}

@Override
protected TestDataComparator getTestDataComparator() {
return new MySqlTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
return true;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected JsonNode getConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
Expand Down Expand Up @@ -101,7 +118,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.map(r -> r.get(JavaBaseConstants.COLUMN_NAME_DATA))
.collect(Collectors.toList());
}

Expand All @@ -113,14 +130,14 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()), SQLDialect.MYSQL)) {
db.getDatabaseName()),
SQLDialect.MYSQL)) {
return new Database(dslContext).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.map(this::getJsonFromRecord)
.collect(Collectors.toList()));
}
}
Expand All @@ -133,18 +150,6 @@ protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv test
return retrieveRecordsFromTable(tableName, schema);
}

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
}
return result;
}

@Override
protected void setup(final TestDestinationEnv testEnv) {
db = new MySQLContainer<>("mysql:8.0");
Expand All @@ -168,16 +173,17 @@ private void grantCorrectPermissions() {

private void executeQuery(final String query) {
try (final DSLContext dslContext = DSLContextFactory.create(
"root",
"test",
db.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()), SQLDialect.MYSQL)) {
"root",
"test",
db.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
SQLDialect.MYSQL)) {
new Database(dslContext).query(
ctx -> ctx
.execute(query));
ctx -> ctx
.execute(query));
} catch (final SQLException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mysql;

import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import java.util.ArrayList;
import java.util.List;

public class MySqlTestDataComparator extends AdvancedTestDataComparator {

private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
}
return result;
}

@Override
protected boolean compareBooleanValues(String firstBooleanValue, String secondBooleanValue) {
if (secondBooleanValue.equalsIgnoreCase("true") || secondBooleanValue.equalsIgnoreCase("false")) {
return super.compareBooleanValues(firstBooleanValue, secondBooleanValue);
} else {
return super.compareBooleanValues(firstBooleanValue, String.valueOf(secondBooleanValue.equals("1")));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,22 @@

package io.airbyte.integrations.destination.mysql;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshTunnel;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
Expand All @@ -33,7 +28,7 @@
* Abstract class that allows us to avoid duplicating testing logic for testing SSH with a key file
* or with a password.
*/
public abstract class SshMySQLDestinationAcceptanceTest extends DestinationAcceptanceTest {
public abstract class SshMySQLDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {

private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();
private final List<String> HOST_KEY = List.of(MySQLDestination.HOST_KEY);
Expand Down Expand Up @@ -73,7 +68,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.map(r -> r.get(JavaBaseConstants.COLUMN_NAME_DATA))
.collect(Collectors.toList());
}

Expand All @@ -92,6 +87,26 @@ protected boolean implementsNamespaces() {
return true;
}

@Override
protected TestDataComparator getTestDataComparator() {
return new MySqlTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
return true;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv env,
final String streamName,
Expand All @@ -102,27 +117,15 @@ protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv env,
return retrieveRecordsFromTable(tableName, schema);
}

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}

private static Database getDatabaseFromConfig(final JsonNode config) {
final DSLContext dslContext = DSLContextFactory.create(
config.get("username").asText(),
config.get("password").asText(),
DatabaseDriver.MYSQL.getDriverClassName(),
String.format("jdbc:mysql://%s:%s",
config.get("host").asText(),
config.get("port").asText()), SQLDialect.MYSQL);
config.get("port").asText()),
SQLDialect.MYSQL);
return new Database(dslContext);
}

Expand All @@ -138,8 +141,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schema, tableName.toLowerCase(),
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.map(this::getJsonFromRecord)
.collect(Collectors.toList())));
}

Expand Down Expand Up @@ -167,13 +169,4 @@ protected void tearDown(final TestDestinationEnv testEnv) throws Exception {
});
}

protected void assertSameValue(final JsonNode expectedValue, final JsonNode actualValue) {
if (expectedValue.isBoolean()) {
// Boolean in MySQL are stored as TINYINT (0 or 1) so we force them to boolean values here
assertEquals(expectedValue.asBoolean(), actualValue.asBoolean());
} else {
assertEquals(expectedValue, actualValue);
}
}

}
Loading

0 comments on commit 5aa7fe7

Please sign in to comment.