Skip to content

Commit

Permalink
馃帀 Source MySQL: support all MySQL 8.0 types (#7970)
Browse files Browse the repository at this point in the history
* Add jdbc compatible layer

* Support routine mysql types

* Format code

* Fix build

* Refactor abstract jdbc source and operation classes

* Update mysql source operations

* Test discover command for mysql

* Remove abstract jdbc compatible source layer

* Format code

* Update template

* Fix more types

* Bump version

* Log original field type

* Update comments

* Bump version in seed
  • Loading branch information
tuliren committed Dec 12, 2021
1 parent 2093b19 commit 6843bc1
Show file tree
Hide file tree
Showing 63 changed files with 1,037 additions and 553 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.4.13",
"dockerImageTag": "0.4.14",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql",
"icon": "mysql.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.4.13
dockerImageTag: 0.4.14
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4054,7 +4054,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:0.4.13"
- dockerImage: "airbyte/source-mysql:0.4.14"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
connectionSpecification:
Expand Down
14 changes: 2 additions & 12 deletions airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,30 +132,20 @@ public static JdbcDatabase createJdbcDatabase(final String username,
final String jdbcConnectionString,
final String driverClassName,
final String connectionProperties,
final JdbcSourceOperations sourceOperations) {
final JdbcCompatibleSourceOperations<?> sourceOperations) {
final BasicDataSource connectionPool =
createBasicDataSource(username, password, jdbcConnectionString, driverClassName, Optional.ofNullable(connectionProperties));

return new DefaultJdbcDatabase(connectionPool, sourceOperations);
}

public static JdbcDatabase createStreamingJdbcDatabase(final String username,
final String password,
final String jdbcConnectionString,
final String driverClassName,
final JdbcStreamingQueryConfiguration jdbcStreamingQuery,
final String connectionProperties) {
return createStreamingJdbcDatabase(username, password, jdbcConnectionString, driverClassName, jdbcStreamingQuery, connectionProperties,
JdbcUtils.getDefaultSourceOperations());
}

public static JdbcDatabase createStreamingJdbcDatabase(final String username,
final String password,
final String jdbcConnectionString,
final String driverClassName,
final JdbcStreamingQueryConfiguration jdbcStreamingQuery,
final String connectionProperties,
final JdbcSourceOperations sourceOperations) {
final JdbcCompatibleSourceOperations<?> sourceOperations) {
final BasicDataSource connectionPool =
createBasicDataSource(username, password, jdbcConnectionString, driverClassName, Optional.ofNullable(connectionProperties));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

public interface JdbcCompatibleSourceOperations<SourceType> extends SourceOperations<ResultSet, SourceType> {

/**
* Read from a result set, and copy the value of the column at colIndex to the Json object.
* <p/>
*
* @param colIndex 1-based column index.
*/
void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException;

/**
* Set the cursor field in incremental table query.
*/
void setStatementField(final PreparedStatement preparedStatement,
final int parameterIndex,
final SourceType cursorFieldType,
final String value)
throws SQLException;

/**
* Determine the database specific type of the input field based on its column metadata.
*/
SourceType getFieldType(final JsonNode field);

/**
* @return the input identifiers with quotes and delimiters.
*/
String enquoteIdentifierList(final Connection connection, final List<String> identifiers) throws SQLException;

/**
* @return the input identifier with quotes.
*/
String enquoteIdentifier(final Connection connection, final String identifier) throws SQLException;

/**
* @return fully qualified table name with the schema (if a schema exists).
*/
String getFullyQualifiedTableName(final String schemaName, final String tableName);

/**
* @return fully qualified table name with the schema (if a schema exists) in quotes.
*/
String getFullyQualifiedTableNameWithQuoting(final Connection connection, final String schemaName, final String tableName) throws SQLException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.sql.SQLException;

public interface SourceOperations<QueryResult, SourceType> {

JsonNode rowToJson(QueryResult queryResult) throws Exception;
JsonNode rowToJson(QueryResult queryResult) throws SQLException;

JsonSchemaPrimitive getType(SourceType sourceType);
JsonSchemaPrimitive getJsonType(SourceType sourceType);

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public Date getDateValue(final FieldValue fieldValue, final DateFormat dateForma
}

@Override
public JsonSchemaPrimitive getType(final StandardSQLTypeName bigQueryType) {
public JsonSchemaPrimitive getJsonType(final StandardSQLTypeName bigQueryType) {
return switch (bigQueryType) {
case BOOL -> JsonSchemaPrimitive.BOOLEAN;
case INT64, FLOAT64, NUMERIC, BIGNUMERIC -> JsonSchemaPrimitive.NUMBER;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.jdbc;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.JdbcCompatibleSourceOperations;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.text.ParseException;
import java.util.Collections;
import java.util.List;
import java.util.StringJoiner;
import javax.xml.bind.DatatypeConverter;

/**
* Source operation skeleton for JDBC compatible databases.
*/
public abstract class AbstractJdbcCompatibleSourceOperations<Datatype> implements JdbcCompatibleSourceOperations<Datatype> {

@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
// the first call communicates with the database. after that the result is cached.
final int columnCount = queryContext.getMetaData().getColumnCount();
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());

for (int i = 1; i <= columnCount; i++) {
// attempt to access the column. this allows us to know if it is null before we do type-specific
// parsing. if it is null, we can move on. while awkward, this seems to be the agreed upon way of
// checking for null values with jdbc.
queryContext.getObject(i);
if (queryContext.wasNull()) {
continue;
}

// convert to java types that will convert into reasonable json.
setJsonField(queryContext, i, jsonNode);
}

return jsonNode;
}

protected void putBoolean(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getBoolean(index));
}

/**
* In some sources Short might have value larger than {@link Short#MAX_VALUE}. E.q. MySQL has
* unsigned smallint type, which can contain value 65535. If we fail to cast Short value, we will
* try to cast Integer.
*/
protected void putShortInt(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
try {
node.put(columnName, resultSet.getShort(index));
} catch (final SQLException e) {
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> resultSet.getInt(index)));
}
}

/**
* In some sources Integer might have value larger than {@link Integer#MAX_VALUE}. E.q. MySQL has
* unsigned Integer type, which can contain value 3428724653. If we fail to cast Integer value, we
* will try to cast Long.
*/
protected void putInteger(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
try {
node.put(columnName, resultSet.getInt(index));
} catch (final SQLException e) {
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> resultSet.getLong(index)));
}
}

protected void putBigInt(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> resultSet.getLong(index)));
}

protected void putDouble(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> resultSet.getDouble(index), Double::isFinite));
}

protected void putFloat(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> resultSet.getFloat(index), Float::isFinite));
}

protected void putBigDecimal(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index)));
}

protected void putString(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getString(index));
}

protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DataTypeUtils.toISO8601String(resultSet.getDate(index)));
}

protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DataTypeUtils.toISO8601String(resultSet.getTime(index)));
}

protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
// https://www.cis.upenn.edu/~bcpierce/courses/629/jdkdocs/guide/jdbc/getstart/mapping.doc.html
final Timestamp t = resultSet.getTimestamp(index);
final java.util.Date d = new java.util.Date(t.getTime() + (t.getNanos() / 1000000));
node.put(columnName, DataTypeUtils.toISO8601String(d));
}

protected void putBinary(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getBytes(index));
}

protected void putDefault(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getString(index));
}

protected void setTime(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
setTimestamp(preparedStatement, parameterIndex, value);
}

protected void setTimestamp(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
// parse time, and timestamp the same way. this seems to not cause an problems and allows us
// to treat them all as ISO8601. if this causes any problems down the line, we can adjust.
// Parsing TIME as a TIMESTAMP might potentially break for ClickHouse cause it doesn't expect TIME
// value in the following format
try {
preparedStatement.setTimestamp(parameterIndex, Timestamp
.from(DataTypeUtils.DATE_FORMAT.parse(value).toInstant()));
} catch (final ParseException e) {
throw new RuntimeException(e);
}
}

protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
final Timestamp from = Timestamp.from(DataTypeUtils.DATE_FORMAT.parse(value).toInstant());
preparedStatement.setDate(parameterIndex, new Date(from.getTime()));
} catch (final ParseException e) {
throw new RuntimeException(e);
}
}

protected void setBit(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
// todo (cgardens) - currently we do not support bit because it requires special handling in the
// prepared statement.
// see
// https://www.postgresql-archive.org/Problems-with-BIT-datatype-and-preparedStatment-td5733533.html.
throw new RuntimeException("BIT value is not supported as incremental parameter!");
}

protected void setBoolean(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setBoolean(parameterIndex, Boolean.parseBoolean(value));
}

protected void setShortInt(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setShort(parameterIndex, Short.parseShort(value));
}

protected void setInteger(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setInt(parameterIndex, Integer.parseInt(value));
}

protected void setBigInteger(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setLong(parameterIndex, Long.parseLong(value));
}

protected void setDouble(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setDouble(parameterIndex, Double.parseDouble(value));
}

protected void setReal(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setFloat(parameterIndex, Float.parseFloat(value));
}

protected void setDecimal(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setBigDecimal(parameterIndex, new BigDecimal(value));
}

protected void setString(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setString(parameterIndex, value);
}

protected void setBinary(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setBytes(parameterIndex, DatatypeConverter.parseHexBinary(value));
}

@Override
public String enquoteIdentifierList(final Connection connection, final List<String> identifiers) throws SQLException {
final StringJoiner joiner = new StringJoiner(",");
for (final String col : identifiers) {
final String s = enquoteIdentifier(connection, col);
joiner.add(s);
}
return joiner.toString();
}

@Override
public String enquoteIdentifier(final Connection connection, final String identifier) throws SQLException {
final String identifierQuoteString = connection.getMetaData().getIdentifierQuoteString();

return identifierQuoteString + identifier + identifierQuoteString;
}

@Override
public String getFullyQualifiedTableName(final String schemaName, final String tableName) {
return JdbcUtils.getFullyQualifiedTableName(schemaName, tableName);
}

@Override
public String getFullyQualifiedTableNameWithQuoting(final Connection connection, final String schemaName, final String tableName)
throws SQLException {
final String quotedTableName = enquoteIdentifier(connection, tableName);
return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName;
}

}

0 comments on commit 6843bc1

Please sign in to comment.