Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa committed Feb 21, 2024
1 parent 46818a2 commit 84adc86
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition;
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
import io.airbyte.integrations.base.destination.typing_deduping.Array;
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStateImpl;
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState;
Expand All @@ -32,7 +30,6 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -62,20 +59,21 @@ public SnowflakeDestinationHandler(final String databaseName, final JdbcDatabase

public static LinkedHashMap<String, LinkedHashMap<String, TableDefinition>> findExistingTables(final JdbcDatabase database,
final String databaseName,
final List<StreamId> streamIds) throws SQLException {
final List<StreamId> streamIds)
throws SQLException {
final LinkedHashMap<String, LinkedHashMap<String, TableDefinition>> existingTables = new LinkedHashMap<>();
final String paramHolder = String.join(",", Collections.nCopies(streamIds.size(), "?"));
// convert list stream to array
final String[] namespaces = streamIds.stream().map(StreamId::finalNamespace).toArray(String[]::new);
final String[] names = streamIds.stream().map(StreamId::finalName).toArray(String[]::new);
final String query = """
SELECT table_schema, table_name, column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_catalog = ?
AND table_schema IN (%s)
AND table_name IN (%s)
ORDER BY table_schema, table_name, ordinal_position;
""".formatted(paramHolder, paramHolder);
SELECT table_schema, table_name, column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_catalog = ?
AND table_schema IN (%s)
AND table_name IN (%s)
ORDER BY table_schema, table_name, ordinal_position;
""".formatted(paramHolder, paramHolder);
final String[] bindValues = new String[streamIds.size() * 2 + 1];
bindValues[0] = databaseName.toUpperCase();
System.arraycopy(namespaces, 0, bindValues, 1, namespaces.length);
Expand All @@ -102,12 +100,12 @@ private LinkedHashMap<String, LinkedHashMap<String, Integer>> getFinalTableRowCo
final String[] namespaces = streamIds.stream().map(StreamId::finalNamespace).toArray(String[]::new);
final String[] names = streamIds.stream().map(StreamId::finalName).toArray(String[]::new);
final String query = """
SELECT table_schema, table_name, row_count
FROM information_schema.tables
WHERE table_catalog = ?
AND table_schema IN (%s)
AND table_name IN (%s)
""".formatted(paramHolder, paramHolder);
SELECT table_schema, table_name, row_count
FROM information_schema.tables
WHERE table_catalog = ?
AND table_schema IN (%s)
AND table_name IN (%s)
""".formatted(paramHolder, paramHolder);
final String[] bindValues = new String[streamIds.size() * 2 + 1];
bindValues[0] = databaseName.toUpperCase();
System.arraycopy(namespaces, 0, bindValues, 1, namespaces.length);
Expand All @@ -126,48 +124,32 @@ public Optional<TableDefinition> findExistingTable(final StreamId id) throws SQL
// The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC translates
// VARIANT as VARCHAR
final LinkedHashMap<String, ColumnDefinition> columns = database.queryJsons(
"""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_catalog = ?
AND table_schema = ?
AND table_name = ?
ORDER BY ordinal_position;
""",
databaseName.toUpperCase(),
id.finalNamespace().toUpperCase(),
id.finalName().toUpperCase()).stream().collect(LinkedHashMap::new,
(map, row) -> map.put(
row.get("COLUMN_NAME").asText(),
new ColumnDefinition(
row.get("COLUMN_NAME").asText(),
row.get("DATA_TYPE").asText(),
0, //unused
fromIsNullableIsoString(row.get("IS_NULLABLE").asText()))),
LinkedHashMap::putAll);
"""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_catalog = ?
AND table_schema = ?
AND table_name = ?
ORDER BY ordinal_position;
""",
databaseName.toUpperCase(),
id.finalNamespace().toUpperCase(),
id.finalName().toUpperCase()).stream().collect(LinkedHashMap::new,
(map, row) -> map.put(
row.get("COLUMN_NAME").asText(),
new ColumnDefinition(
row.get("COLUMN_NAME").asText(),
row.get("DATA_TYPE").asText(),
0, // unused
fromIsNullableIsoString(row.get("IS_NULLABLE").asText()))),
LinkedHashMap::putAll);
if (columns.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(new TableDefinition(columns));
}
}

// private boolean isFinalTableEmpty(final StreamId id) throws SQLException {
// final int rowCount = database.queryInt(
// """
// SELECT row_count
// FROM information_schema.tables
// WHERE table_catalog = ?
// AND table_schema = ?
// AND table_name = ?
// """,
// databaseName.toUpperCase(),
// id.finalNamespace().toUpperCase(),
// id.finalName().toUpperCase());
// return rowCount == 0;
// }


public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception {
final ResultSet tables = database.getMetaData().getTables(
databaseName,
Expand All @@ -183,7 +165,7 @@ public InitialRawTableState getInitialRawTableState(final StreamId id) throws Ex
final Optional<String> minUnloadedTimestamp = Optional.ofNullable(database.queryStrings(
conn -> conn.createStatement().executeQuery(new StringSubstitutor(Map.of(
"raw_table", id.rawTableId(SnowflakeSqlGenerator.QUOTE))).replace(
"""
"""
SELECT to_varchar(
TIMESTAMPADD(NANOSECOND, -1, MIN("_airbyte_extracted_at")),
'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM'
Expand All @@ -202,7 +184,7 @@ record -> record.getString("MIN_TIMESTAMP")).get(0));
final Optional<String> maxTimestamp = Optional.ofNullable(database.queryStrings(
conn -> conn.createStatement().executeQuery(new StringSubstitutor(Map.of(
"raw_table", id.rawTableId(SnowflakeSqlGenerator.QUOTE))).replace(
"""
"""
SELECT to_varchar(
MAX("_airbyte_extracted_at"),
'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM'
Expand Down Expand Up @@ -242,7 +224,6 @@ public void execute(final Sql sql) throws Exception {
}
}


private Set<String> getPks(final StreamConfig stream) {
return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet();
}
Expand All @@ -267,7 +248,8 @@ private boolean isAirbyteMetaColumnMatch(TableDefinition existingTable) {

protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) {
final Set<String> pks = getPks(stream);
// This is same as JdbcDestinationHandler#existingSchemaMatchesStreamConfig with upper case conversion.
// This is same as JdbcDestinationHandler#existingSchemaMatchesStreamConfig with upper case
// conversion.
// TODO: Unify this using name transformer or something.
if (!isAirbyteRawIdColumnMatch(existingTable) ||
!isAirbyteExtractedAtColumnMatch(existingTable) ||
Expand All @@ -277,18 +259,19 @@ protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, f
}
final LinkedHashMap<String, String> intendedColumns = stream.columns().entrySet().stream()
.collect(LinkedHashMap::new,
(map, column) -> map.put(column.getKey().name(), toJdbcTypeName(column.getValue())),
LinkedHashMap::putAll);
(map, column) -> map.put(column.getKey().name(), toJdbcTypeName(column.getValue())),
LinkedHashMap::putAll);

// Filter out Meta columns since they don't exist in stream config.
final LinkedHashMap<String, String> actualColumns = existingTable.columns().entrySet().stream()
.filter(column -> V2_FINAL_TABLE_METADATA_COLUMNS.stream().map(String::toUpperCase)
.noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey())))
.collect(LinkedHashMap::new,
(map, column) -> map.put(column.getKey(), column.getValue().type()),
LinkedHashMap::putAll);
(map, column) -> map.put(column.getKey(), column.getValue().type()),
LinkedHashMap::putAll);
// soft-resetting https://github.com/airbytehq/airbyte/pull/31082
@SuppressWarnings("deprecation") final boolean hasPksWithNonNullConstraint = existingTable.columns().entrySet().stream()
@SuppressWarnings("deprecation")
final boolean hasPksWithNonNullConstraint = existingTable.columns().entrySet().stream()
.anyMatch(c -> pks.contains(c.getKey()) && !c.getValue().isNullable());

return !hasPksWithNonNullConstraint
Expand Down Expand Up @@ -352,4 +335,5 @@ private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
case UNKNOWN -> "VARIANT";
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,14 @@
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException;
import io.airbyte.integrations.base.destination.typing_deduping.Union;
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.time.Instant;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringSubstitutor;

Expand Down

0 comments on commit 84adc86

Please sign in to comment.