Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySql destination : Enable DAT tests #12866

Merged
merged 6 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ protected JsonNode getJsonFromRecord(Record record) {
var value = record.get(field);

switch (field.getDataType().getTypeName()) {
case "varchar", "nvarchar", "jsonb", "other":
case "varchar", "nvarchar", "jsonb", "json", "other":
var stringValue = (value != null ? value.toString() : null);
DestinationAcceptanceTestUtils.putStringIntoJson(stringValue, field.getName(), node);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
Expand All @@ -29,7 +28,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -110,15 +108,16 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
String.format("jdbc:mysql://%s:%s/%s?useSSL=true&requireSSL=true&verifyServerCertificate=false",
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()), SQLDialect.MYSQL);
db.getDatabaseName()),
SQLDialect.MYSQL);
return new Database(dslContext).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}

@Override
Expand Down Expand Up @@ -164,16 +163,17 @@ private void grantCorrectPermissions() {

private void executeQuery(final String query) {
try (final DSLContext dslContext = DSLContextFactory.create(
db.getUsername(),
db.getPassword(),
db.getDriverClassName(),
String.format("jdbc:mysql://%s:%s/%s?useSSL=true&requireSSL=true&verifyServerCertificate=false",
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()), SQLDialect.MYSQL)) {
db.getUsername(),
db.getPassword(),
db.getDriverClassName(),
String.format("jdbc:mysql://%s:%s/%s?useSSL=true&requireSSL=true&verifyServerCertificate=false",
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
SQLDialect.MYSQL)) {
new Database(dslContext).query(
ctx -> ctx
.execute(query));
ctx -> ctx
.execute(query));
} catch (final SQLException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand All @@ -27,16 +26,14 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class MySQLDestinationAcceptanceTest extends DestinationAcceptanceTest {
public class MySQLDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {

private MySQLContainer<?> db;
private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();
Expand All @@ -61,6 +58,26 @@ protected boolean supportsNormalization() {
return true;
}

@Override
protected TestDataComparator getTestDataComparator() {
return new MySqlTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
return true;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected JsonNode getConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
Expand Down Expand Up @@ -101,7 +118,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.map(r -> r.get(JavaBaseConstants.COLUMN_NAME_DATA))
.collect(Collectors.toList());
}

Expand All @@ -113,14 +130,14 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()), SQLDialect.MYSQL)) {
db.getDatabaseName()),
SQLDialect.MYSQL)) {
return new Database(dslContext).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.map(this::getJsonFromRecord)
.collect(Collectors.toList()));
}
}
Expand All @@ -133,18 +150,6 @@ protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv test
return retrieveRecordsFromTable(tableName, schema);
}

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
}
return result;
}

@Override
protected void setup(final TestDestinationEnv testEnv) {
db = new MySQLContainer<>("mysql:8.0");
Expand All @@ -168,16 +173,17 @@ private void grantCorrectPermissions() {

private void executeQuery(final String query) {
try (final DSLContext dslContext = DSLContextFactory.create(
"root",
"test",
db.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()), SQLDialect.MYSQL)) {
"root",
"test",
db.getDriverClassName(),
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
db.getHost(),
db.getFirstMappedPort(),
db.getDatabaseName()),
SQLDialect.MYSQL)) {
new Database(dslContext).query(
ctx -> ctx
.execute(query));
ctx -> ctx
.execute(query));
} catch (final SQLException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mysql;

import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import java.util.ArrayList;
import java.util.List;

public class MySqlTestDataComparator extends AdvancedTestDataComparator {

private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
}
return result;
}

@Override
protected boolean compareBooleanValues(String firstBooleanValue, String secondBooleanValue) {
if (secondBooleanValue.equalsIgnoreCase("true") || secondBooleanValue.equalsIgnoreCase("false")) {
return super.compareBooleanValues(firstBooleanValue, secondBooleanValue);
} else {
return super.compareBooleanValues(firstBooleanValue, String.valueOf(secondBooleanValue.equals("1")));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,22 @@

package io.airbyte.integrations.destination.mysql;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshTunnel;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
Expand All @@ -33,7 +28,7 @@
* Abstract class that allows us to avoid duplicating testing logic for testing SSH with a key file
* or with a password.
*/
public abstract class SshMySQLDestinationAcceptanceTest extends DestinationAcceptanceTest {
public abstract class SshMySQLDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {

private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();
private final List<String> HOST_KEY = List.of(MySQLDestination.HOST_KEY);
Expand Down Expand Up @@ -73,7 +68,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
throws Exception {
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace)
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.map(r -> r.get(JavaBaseConstants.COLUMN_NAME_DATA))
.collect(Collectors.toList());
}

Expand All @@ -92,6 +87,26 @@ protected boolean implementsNamespaces() {
return true;
}

@Override
protected TestDataComparator getTestDataComparator() {
return new MySqlTestDataComparator();
}

@Override
protected boolean supportBasicDataTypeTest() {
return true;
}

@Override
protected boolean supportArrayDataTypeTest() {
return true;
}

@Override
protected boolean supportObjectDataTypeTest() {
return true;
}

@Override
protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv env,
final String streamName,
Expand All @@ -102,27 +117,15 @@ protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv env,
return retrieveRecordsFromTable(tableName, schema);
}

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}

private static Database getDatabaseFromConfig(final JsonNode config) {
final DSLContext dslContext = DSLContextFactory.create(
config.get("username").asText(),
config.get("password").asText(),
DatabaseDriver.MYSQL.getDriverClassName(),
String.format("jdbc:mysql://%s:%s",
config.get("host").asText(),
config.get("port").asText()), SQLDialect.MYSQL);
config.get("port").asText()),
SQLDialect.MYSQL);
return new Database(dslContext);
}

Expand All @@ -138,8 +141,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schema, tableName.toLowerCase(),
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.map(this::getJsonFromRecord)
.collect(Collectors.toList())));
}

Expand Down Expand Up @@ -167,13 +169,4 @@ protected void tearDown(final TestDestinationEnv testEnv) throws Exception {
});
}

protected void assertSameValue(final JsonNode expectedValue, final JsonNode actualValue) {
if (expectedValue.isBoolean()) {
// Boolean in MySQL are stored as TINYINT (0 or 1) so we force them to boolean values here
assertEquals(expectedValue.asBoolean(), actualValue.asBoolean());
} else {
assertEquals(expectedValue, actualValue);
}
}

}
Loading