Skip to content

Commit

Permalink
run gradle format
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda committed Mar 15, 2023
1 parent 9864e4b commit 8c5f44d
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.destination.staging.StagingOperations;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake;

import io.airbyte.commons.json.Jsons;
Expand All @@ -9,14 +13,15 @@
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import java.util.Map;

public abstract class SnowflakeSqlStagingOperations extends SnowflakeSqlOperations implements StagingOperations{
public abstract class SnowflakeSqlStagingOperations extends SnowflakeSqlOperations implements StagingOperations {

/**
* This method is used in Check connection method to make sure that user has the Write permission
*/
protected void attemptWriteToStage(final String outputSchema,
final String stageName,
final JdbcDatabase database) throws Exception {
final String stageName,
final JdbcDatabase database)
throws Exception {

final CsvSerializedBuffer csvSerializedBuffer = new CsvSerializedBuffer(
new FileBuffer(CsvSerializedBuffer.CSV_GZ_SUFFIX),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.integrations.source.mssql;

import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_DB_HISTORY;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.debezium.CdcStateHandler;
Expand All @@ -12,16 +15,11 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.debezium.engine.ChangeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_DB_HISTORY;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MssqlCdcStateHandler implements CdcStateHandler {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,13 @@ protected boolean verifyCursorColumnValues(final JdbcDatabase database, final St
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
nullValExist = jsonNodes.get(0).get(resultColName).booleanValue();
LOGGER.info("null cursor value for MsSQL source : {}, shema {} , tableName {}, columnName {} ", nullValExist, schema, tableName, columnName);
LOGGER.info("null cursor value for MsSQL source : {}, shema {} , tableName {}, columnName {} ", nullValExist, schema, tableName,
columnName);
}
}
}
// return !nullValExist;
// will enable after we have sent comms to users this affects
// return !nullValExist;
// will enable after we have sent comms to users this affects
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,16 @@ public void testTableWithNullCursorValueShouldThrowException() throws Exception
});

ConfiguredAirbyteStream configuredAirbyteStream = new ConfiguredAirbyteStream().withSyncMode(
SyncMode.INCREMENTAL)
SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.INCREMENTAL)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
DB_NAME,
Field.of("id", JsonSchemaType.INTEGER),
Field.of("name", JsonSchemaType.STRING),
Field.of("born", JsonSchemaType.STRING))
STREAM_NAME,
DB_NAME,
Field.of("id", JsonSchemaType.INTEGER),
Field.of("name", JsonSchemaType.STRING),
Field.of("born", JsonSchemaType.STRING))
.withSupportedSyncModes(
Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.integrations.source.mysql;

import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.debezium.CdcStateHandler;
Expand All @@ -12,16 +15,11 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.debezium.engine.ChangeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_CDC_OFFSET;
import static io.airbyte.integrations.source.mysql.MySqlSource.MYSQL_DB_HISTORY;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlCdcStateHandler implements CdcStateHandler {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,21 @@ public class MySqlSource extends AbstractJdbcSource<MysqlType> implements Source
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSource.class);
private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000;
public static final String NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY =
"""
SELECT (EXISTS (SELECT * from `%s`.`%s` where `%s` IS NULL LIMIT 1)) AS %s
""";
"""
SELECT (EXISTS (SELECT * from `%s`.`%s` where `%s` IS NULL LIMIT 1)) AS %s
""";
public static final String NULL_CURSOR_VALUE_WITHOUT_SCHEMA_QUERY =
"""
SELECT (EXISTS (SELECT * from %s where `%s` IS NULL LIMIT 1)) AS %s
""";
"""
SELECT (EXISTS (SELECT * from %s where `%s` IS NULL LIMIT 1)) AS %s
""";
public static final String DESCRIBE_TABLE_WITHOUT_SCHEMA_QUERY =
"""
DESCRIBE %s
""";
"""
DESCRIBE %s
""";
public static final String DESCRIBE_TABLE_WITH_SCHEMA_QUERY =
"""
DESCRIBE `%s`.`%s`
""";
"""
DESCRIBE `%s`.`%s`
""";

public static final String DRIVER_CLASS = DatabaseDriver.MYSQL.getDriverClassName();
public static final String MYSQL_CDC_OFFSET = "mysql_cdc_offset";
Expand Down Expand Up @@ -333,39 +333,40 @@ public Set<String> getExcludedInternalNameSpaces() {

@Override
protected boolean verifyCursorColumnValues(final JdbcDatabase database, final String schema, final String tableName, final String columnName)
throws SQLException {
throws SQLException {
boolean nullValExist = false;
final String resultColName = "nullValue";
final String descQuery = schema == null || schema.isBlank()
? String.format(DESCRIBE_TABLE_WITHOUT_SCHEMA_QUERY, tableName)
: String.format(DESCRIBE_TABLE_WITH_SCHEMA_QUERY, schema, tableName);
? String.format(DESCRIBE_TABLE_WITHOUT_SCHEMA_QUERY, tableName)
: String.format(DESCRIBE_TABLE_WITH_SCHEMA_QUERY, schema, tableName);
final Optional<JsonNode> field = database.bufferedResultSetQuery(conn -> conn.createStatement()
.executeQuery(descQuery),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet))
.stream().filter(x -> x.get("Field").asText().equalsIgnoreCase(columnName))
.findFirst();
if(field.isPresent()){
.executeQuery(descQuery),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet))
.stream().filter(x -> x.get("Field").asText().equalsIgnoreCase(columnName))
.findFirst();
if (field.isPresent()) {
final JsonNode jsonNode = field.get();
final JsonNode isNullable = jsonNode.get("Null");
if (isNullable!=null){
if (isNullable.asText().equalsIgnoreCase("YES")){
if (isNullable != null) {
if (isNullable.asText().equalsIgnoreCase("YES")) {
final String query = schema == null || schema.isBlank()
? String.format(NULL_CURSOR_VALUE_WITHOUT_SCHEMA_QUERY,
? String.format(NULL_CURSOR_VALUE_WITHOUT_SCHEMA_QUERY,
tableName, columnName, resultColName)
: String.format(NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY,
schema, tableName, columnName, resultColName) ;
: String.format(NULL_CURSOR_VALUE_WITH_SCHEMA_QUERY,
schema, tableName, columnName, resultColName);

LOGGER.debug("null value query: {}", query);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(query),
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
nullValExist = convertToBoolean(jsonNodes.get(0).get(resultColName).toString());
LOGGER.info("null cursor value for MySQL source : {}, shema {} , tableName {}, columnName {} ", nullValExist, schema, tableName, columnName);
LOGGER.info("null cursor value for MySQL source : {}, shema {} , tableName {}, columnName {} ", nullValExist, schema, tableName,
columnName);
}
}
}
// return !nullValExist;
// will enable after we have sent comms to users this affects
// return !nullValExist;
// will enable after we have sent comms to users this affects
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.Database;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
Expand All @@ -33,8 +32,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.jooq.DSLContext;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
Expand Down Expand Up @@ -152,10 +149,10 @@ private ConfiguredAirbyteStream createTableWithNullValueCursor(final Connection
@Disabled("See https://github.com/airbytehq/airbyte/pull/23908#issuecomment-1463753684, enable once communication is out")
public void viewWithNullValueCursorShouldThrowException() throws SQLException {
try (final MySQLContainer<?> db = new MySQLContainer<>("mysql:8.0")
.withUsername(TEST_USER)
.withPassword(TEST_PASSWORD)
.withEnv("MYSQL_ROOT_HOST", "%")
.withEnv("MYSQL_ROOT_PASSWORD", TEST_PASSWORD)) {
.withUsername(TEST_USER)
.withPassword(TEST_PASSWORD)
.withEnv("MYSQL_ROOT_HOST", "%")
.withEnv("MYSQL_ROOT_PASSWORD", TEST_PASSWORD)) {
db.start();
final JsonNode config = getConfig(db, "test", "");
try (Connection connection = DriverManager.getConnection(db.getJdbcUrl(), "root", config.get(JdbcUtils.PASSWORD_KEY).asText())) {
Expand All @@ -164,8 +161,8 @@ public void viewWithNullValueCursorShouldThrowException() throws SQLException {

final Throwable throwable = catchThrowable(() -> MoreIterators.toSet(new MySqlSource().read(config, catalog, null)));
assertThat(throwable).isInstanceOf(ConfigErrorException.class)
.hasMessageContaining(
"The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering with no null values as a cursor. {tableName='test.test_view_null_cursor', cursorColumnName='id', cursorSqlType=INT, cause=Cursor column contains NULL value}");
.hasMessageContaining(
"The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering with no null values as a cursor. {tableName='test.test_view_null_cursor', cursorColumnName='id', cursorSqlType=INT, cause=Cursor column contains NULL value}");

} finally {
db.stop();
Expand All @@ -178,22 +175,22 @@ private ConfiguredAirbyteStream createViewWithNullValueCursor(final Connection c
connection.createStatement().execute("GRANT ALL PRIVILEGES ON *.* TO '" + TEST_USER + "'@'%';\n");
connection.createStatement().execute("CREATE TABLE IF NOT EXISTS test.test_table_null_cursor(id INTEGER NULL)");
connection.createStatement().execute("""
CREATE VIEW test_view_null_cursor(id) as
SELECT test_table_null_cursor.id
FROM test_table_null_cursor
""");
CREATE VIEW test_view_null_cursor(id) as
SELECT test_table_null_cursor.id
FROM test_table_null_cursor
""");
connection.createStatement().execute("INSERT INTO test.test_table_null_cursor(id) VALUES (1), (2), (NULL)");

return new ConfiguredAirbyteStream().withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.INCREMENTAL)
.withStream(CatalogHelpers.createAirbyteStream(
"test_view_null_cursor",
"test",
Field.of("id", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))));
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.INCREMENTAL)
.withStream(CatalogHelpers.createAirbyteStream(
"test_view_null_cursor",
"test",
Field.of("id", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.debezium.engine.ChangeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresCdcStateHandler implements CdcStateHandler {

Expand Down Expand Up @@ -64,7 +63,7 @@ public AirbyteMessage saveStateAfterCompletionOfSnapshotOfNewStreams() {
}

@Override
public boolean isSnapshotEvent(final ChangeEvent<String, String> event){
public boolean isSnapshotEvent(final ChangeEvent<String, String> event) {
JsonNode isSnapshotEvent = Jsons.deserialize(event.value()).get("source").get("snapshot");
return isSnapshotEvent != null && isSnapshotEvent.asBoolean();
}
Expand All @@ -77,30 +76,27 @@ public boolean isRecordBehindOffset(final Map<String, String> offset, final Chan

final JsonNode offsetJson = Jsons.deserialize((String) offset.values().toArray()[0]);

final String offset_lsn = offsetJson.get("lsn_commit") != null ?
String.valueOf(offsetJson.get("lsn_commit")) :
String.valueOf(offsetJson.get("lsn"));
final String offset_lsn =
offsetJson.get("lsn_commit") != null ? String.valueOf(offsetJson.get("lsn_commit")) : String.valueOf(offsetJson.get("lsn"));
final String event_lsn = String.valueOf(Jsons.deserialize(event.value()).get("source").get("lsn"));
return Integer.parseInt(event_lsn) > Integer.parseInt(offset_lsn);
}

@Override
public boolean isSameOffset(final Map<String, String> offsetA, final Map<String, String> offsetB) {
if (offsetA == null || offsetA.size() != 1){
if (offsetA == null || offsetA.size() != 1) {
return false;
}
if (offsetB == null || offsetB.size() != 1){
if (offsetB == null || offsetB.size() != 1) {
return false;
}
final JsonNode offsetJsonA = Jsons.deserialize((String) offsetA.values().toArray()[0]);
final JsonNode offsetJsonB = Jsons.deserialize((String) offsetB.values().toArray()[0]);

final String lsnA = offsetJsonA.get("lsn_commit") != null ?
String.valueOf(offsetJsonA.get("lsn_commit")) :
String.valueOf(offsetJsonA.get("lsn"));
final String lsnB = offsetJsonB.get("lsn_commit") != null ?
String.valueOf(offsetJsonB.get("lsn_commit")) :
String.valueOf(offsetJsonB.get("lsn"));
final String lsnA =
offsetJsonA.get("lsn_commit") != null ? String.valueOf(offsetJsonA.get("lsn_commit")) : String.valueOf(offsetJsonA.get("lsn"));
final String lsnB =
offsetJsonB.get("lsn_commit") != null ? String.valueOf(offsetJsonB.get("lsn_commit")) : String.valueOf(offsetJsonB.get("lsn"));

return Integer.parseInt(lsnA) == Integer.parseInt(lsnB);
}
Expand Down
Loading

0 comments on commit 8c5f44d

Please sign in to comment.