Skip to content

Commit

Permalink
Source Postgres: Number(38,0) is incorrectly translated to a destinat…
Browse files Browse the repository at this point in the history
…ion Double instead of BigInt (#25898)

* Translate Numeric with no decimals to BigInt + relevant tests
* Update CDC to handle numeric/decimal arrays correctly
* Revert spacing
* Preserve Decimal scale of numeric columns
* Bumped versions to 2.0.29 + docs
* Bumped versions in metadata.yaml
* Automated Change

---------

Co-authored-by: nguyenaiden <nguyenaiden@users.noreply.github.com>
  • Loading branch information
nguyenaiden and nguyenaiden committed May 18, 2023
1 parent f3b2037 commit 1a45a66
Show file tree
Hide file tree
Showing 13 changed files with 347 additions and 36 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
import java.sql.SQLException;
import java.sql.Timestamp;
import java.text.ParseException;
import java.time.*;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.chrono.IsoEra;
import java.time.format.DateTimeParseException;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.airbyte.db.jdbc.DateTimeConverter.convertToTimestampWithTimezone;
import static org.apache.kafka.connect.data.Schema.OPTIONAL_BOOLEAN_SCHEMA;
import static org.apache.kafka.connect.data.Schema.OPTIONAL_FLOAT64_SCHEMA;
import static org.apache.kafka.connect.data.Schema.OPTIONAL_INT64_SCHEMA;
import static org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA;

import io.airbyte.db.jdbc.DateTimeConverter;
Expand Down Expand Up @@ -54,6 +55,11 @@ public class PostgresConverter implements CustomConverter<SchemaBuilder, Relatio
private final String[] ARRAY_TYPES = {"_NAME", "_NUMERIC", "_BYTEA", "_MONEY", "_BIT", "_DATE", "_TIME", "_TIMETZ", "_TIMESTAMP", "_TIMESTAMPTZ"};
private final String BYTEA_TYPE = "BYTEA";

// Debezium is manually setting the variable scale decimal length (precision)
// of numeric_array columns to 131089 if not specified. e.g: NUMERIC vs NUMERIC(38,0)
// https://github.com/debezium/debezium/blob/main/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresValueConverter.java#L113
private final int VARIABLE_SCALE_DECIMAL_LENGTH = 131089;

@Override
public void configure(final Properties props) {}

Expand All @@ -76,10 +82,19 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
}
}

private void registerArray(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
private void registerArray(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
final String fieldType = field.typeName().toUpperCase();
final SchemaBuilder arraySchema = switch (fieldType) {
case "_NUMERIC", "_MONEY" -> SchemaBuilder.array(OPTIONAL_FLOAT64_SCHEMA);
case "_NUMERIC" -> {
// If a numeric_array column does not have variable precision AND scale is 0
// then we know the precision and scale are purposefully chosen
if (numericArrayColumnPrecisionIsNotVariable(field) && field.scale().orElse(0) == 0) {
yield SchemaBuilder.array(OPTIONAL_INT64_SCHEMA);
} else {
yield SchemaBuilder.array(OPTIONAL_FLOAT64_SCHEMA);
}
}
case "_MONEY" -> SchemaBuilder.array(OPTIONAL_FLOAT64_SCHEMA);
case "_NAME", "_DATE", "_TIME", "_TIMESTAMP", "_TIMESTAMPTZ", "_TIMETZ", "_BYTEA" -> SchemaBuilder.array(OPTIONAL_STRING_SCHEMA);
case "_BIT" -> SchemaBuilder.array(OPTIONAL_BOOLEAN_SCHEMA);
default -> SchemaBuilder.array(OPTIONAL_STRING_SCHEMA);
Expand Down Expand Up @@ -164,7 +179,17 @@ private Object convertArray(final Object x, final RelationalColumn field) {
.map(Double::valueOf)
.collect(Collectors.toList());
case "_NUMERIC":
return Arrays.stream(getArray(x)).map(value -> value == null ? null : Double.valueOf(value.toString())).collect(Collectors.toList());
return Arrays.stream(getArray(x)).map(value -> {
if (value == null) {
return null;
} else {
if (numericArrayColumnPrecisionIsNotVariable(field) && field.scale().orElse(0) == 0) {
return Long.parseLong(value.toString());
} else {
return Double.valueOf(value.toString());
}
}
}).collect(Collectors.toList());
case "_TIME":
return Arrays.stream(getArray(x)).map(value -> value == null ? null : convertToTime(value)).collect(Collectors.toList());
case "_DATE":
Expand Down Expand Up @@ -330,4 +355,8 @@ private boolean isNegativeTime(final PGInterval pgInterval) {
|| pgInterval.getWholeSeconds() < 0;
}

private boolean numericArrayColumnPrecisionIsNotVariable(final RelationalColumn column) {
return column.length().orElse(VARIABLE_SCALE_DECIMAL_LENGTH) != VARIABLE_SCALE_DECIMAL_LENGTH;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -302,5 +302,5 @@ protected void executeQuery(final String query) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.28
LABEL io.airbyte.version=2.0.29
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ data:
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
maxSecondsBetweenMessages: 7200
dockerImageTag: 2.0.28
dockerImageTag: 2.0.29
dockerRepository: airbyte/source-postgres-strict-encrypt
githubIssueLabel: source-postgres
icon: postgresql.svg
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=2.0.28
LABEL io.airbyte.version=2.0.29
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
testImplementation project(":airbyte-json-validation")
testImplementation project(':airbyte-test-utils')
testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation libs.connectors.testcontainers.jdbc
testImplementation libs.connectors.testcontainers.postgresql
testImplementation libs.junit.jupiter.system.stubs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 2.0.28
dockerImageTag: 2.0.29
maxSecondsBetweenMessages: 7200
dockerRepository: airbyte/source-postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_DECIMAL_DIGITS;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_SCHEMA_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_TABLE_NAME;
import static io.airbyte.integrations.source.postgres.PostgresType.safeGetJdbcType;
Expand Down Expand Up @@ -175,7 +176,15 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
case "polygon" -> putObject(json, columnName, resultSet, colIndex, PGpolygon.class);
case "_varchar", "_char", "_bpchar", "_text", "_name" -> putArray(json, columnName, resultSet, colIndex);
case "_int2", "_int4", "_int8", "_oid" -> putLongArray(json, columnName, resultSet, colIndex);
case "_numeric", "_decimal" -> putBigDecimalArray(json, columnName, resultSet, colIndex);
case "_numeric", "_decimal" -> {
// If a numeric_array column precision is not 0 AND scale is 0,
// then we know the precision and scale are purposefully chosen
if (metadata.getPrecision(colIndex) != 0 && metadata.getScale(colIndex) == 0) {
putBigIntArray(json, columnName, resultSet, colIndex);
} else {
putBigDecimalArray(json, columnName, resultSet, colIndex);
}
}
case "_money" -> putMoneyArray(json, columnName, resultSet, colIndex);
case "_float4", "_float8" -> putDoubleArray(json, columnName, resultSet, colIndex);
case "_bool" -> putBooleanArray(json, columnName, resultSet, colIndex);
Expand All @@ -194,7 +203,13 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
case BIGINT -> putBigInt(json, columnName, resultSet, colIndex);
case FLOAT, DOUBLE -> putDouble(json, columnName, resultSet, colIndex);
case REAL -> putFloat(json, columnName, resultSet, colIndex);
case NUMERIC, DECIMAL -> putBigDecimal(json, columnName, resultSet, colIndex);
case NUMERIC, DECIMAL -> {
if (metadata.getPrecision(colIndex) != 0 && metadata.getScale(colIndex) == 0) {
putBigInt(json, columnName, resultSet, colIndex);
} else {
putBigDecimal(json, columnName, resultSet, colIndex);
}
}
// BIT is a bit string in Postgres, e.g. '0100'
case BIT, CHAR, VARCHAR, LONGVARCHAR -> json.put(columnName, value);
case DATE -> putDate(json, columnName, resultSet, colIndex);
Expand Down Expand Up @@ -332,6 +347,17 @@ private void putBigDecimalArray(final ObjectNode node, final String columnName,
node.set(columnName, arrayNode);
}

private void putBigIntArray(final ObjectNode node, final String columnName, final ResultSet resultSet, final int colIndex) throws SQLException {
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
while (arrayResultSet.next()) {
final long value = DataTypeUtils.returnNullIfInvalid(() -> arrayResultSet.getLong(2));
arrayNode.add(value);
}
node.set(columnName, arrayNode);
}


private void putDoubleArray(final ObjectNode node, final String columnName, final ResultSet resultSet, final int colIndex) throws SQLException {
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
Expand Down Expand Up @@ -371,7 +397,6 @@ public PostgresType getDatabaseFieldType(final JsonNode field) {
final String typeName = field.get(INTERNAL_COLUMN_TYPE_NAME).asText().toLowerCase();
// Postgres boolean is mapped to JDBCType.BIT, but should be BOOLEAN
return switch (typeName) {

case "_bit" -> PostgresType.BIT_ARRAY;
case "_bool" -> PostgresType.BOOL_ARRAY;
case "_name" -> PostgresType.NAME_ARRAY;
Expand All @@ -398,6 +423,13 @@ public PostgresType getDatabaseFieldType(final JsonNode field) {
// It should not be converted to base64 binary string. So it is represented as JDBC VARCHAR.
// https://www.postgresql.org/docs/14/datatype-binary.html
case "bytea" -> PostgresType.VARCHAR;
case "numeric", "decimal" -> {
if (field.get(INTERNAL_DECIMAL_DIGITS) != null && field.get(INTERNAL_DECIMAL_DIGITS).asInt() == 0) {
yield PostgresType.BIGINT;
} else {
yield PostgresType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt(), POSTGRES_TYPE_DICT);
}
}
case TIMESTAMPTZ -> PostgresType.TIMESTAMP_WITH_TIMEZONE;
case TIMETZ -> PostgresType.TIME_WITH_TIMEZONE;
default -> PostgresType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt(), POSTGRES_TYPE_DICT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,59 @@ protected void initTests() {
"08:00:2b:01:02:03:04:07")
.build());

/*
* Verify NUMERIC/DECIMAL Datatypes has
* - the default precision of 131089 (See PostgresConverter)
* - unspecified scale - any decimal value is preserved
*/
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("numeric")
.fullSourceDataType("NUMERIC")
.airbyteType(JsonSchemaType.NUMBER)
.addInsertValues("'33'")
.addExpectedValues("33")
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("numeric")
.fullSourceDataType("NUMERIC")
.airbyteType(JsonSchemaType.NUMBER)
.addInsertValues("'33.345'")
.addExpectedValues("33.345")
.build());

// case of a column type being a NUMERIC data type
// with precision but no decimal
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("numeric")
.fullSourceDataType("NUMERIC(38)")
.airbyteType(JsonSchemaType.INTEGER)
.addInsertValues("'33'")
.addExpectedValues("33")
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("numeric")
.fullSourceDataType("NUMERIC(28,2)")
.airbyteType(JsonSchemaType.NUMBER)
.addInsertValues(
"'123'", "null", "'14525.22'")
// Postgres source does not support these special values yet
// https://github.com/airbytehq/airbyte/issues/8902
// "'infinity'", "'-infinity'", "'nan'"
.addExpectedValues("123", null, "14525.22")
.build());

// Blocked by https://github.com/airbytehq/airbyte/issues/8902
for (final String type : Set.of("numeric", "decimal")) {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType(type)
.fullSourceDataType("NUMERIC(20,7)")
.airbyteType(JsonSchemaType.NUMBER)
.addInsertValues(
"'123'", "null", "'1234567890.1234567'")
Expand Down Expand Up @@ -726,29 +774,53 @@ private void addArraysTestData() {
.addExpectedValues("[\"object\",\"integer\"]")
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("numeric_array")
.fullSourceDataType("NUMERIC[]")
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.builder(JsonSchemaPrimitive.NUMBER)
.build())
.build())
.addInsertValues("'{131070.23,231072.476596593}'")
.addExpectedValues("[131070.23,231072.476596593]")
.build());
for (final String type : Set.of("numeric", "decimal")) {
/*
* Verify NUMERIC[]/DECIMAL[] Datatypes has
* - the default precision of 131089 (See PostgresConverter)
* - unspecified scale - any decimal value is preserved
*/
addDataTypeTestData(
TestDataHolder.builder()
.sourceType(String.format("%s_array", type))
.fullSourceDataType(String.format("%s[]", type.toUpperCase()))
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.builder(JsonSchemaPrimitive.NUMBER)
.build())
.build())
.addInsertValues("'{131070.23,231072.476596593}'")
.addExpectedValues("[131070.23,231072.476596593]")
.build());
/*
* Verify NUMERIC(`anyNumber`)[]/DECIMAL(`anyNumber`)[] Datatypes has
* default scale of 0 if the Precision is set
*/
addDataTypeTestData(
TestDataHolder.builder()
.sourceType(String.format("%s_array", type))
.fullSourceDataType(String.format("%s(20)[]", type.toUpperCase()))
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.builder(JsonSchemaPrimitive.NUMBER)
.build())
.build())
.addInsertValues("'{131070,231072}'")
.addExpectedValues("[131070,231072]")
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("decimal_array")
.fullSourceDataType("DECIMAL[]")
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.builder(JsonSchemaPrimitive.NUMBER)
.build())
.build())
.addInsertValues("'{131070.23,231072.476596593}'")
.addExpectedValues("[131070.23,231072.476596593]")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType(String.format("%s_array", type))
.fullSourceDataType(String.format("%s(30,2)[]", type.toUpperCase()))
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.builder(JsonSchemaPrimitive.NUMBER)
.build())
.build())
// When a decimal scale is explicitly chosen, 2 in this case,
// Postgres stores the rounded off value
.addInsertValues("'{131070.23,231072.476596593}'")
.addExpectedValues("[131070.23,231072.48]")
.build());
}

addDataTypeTestData(
TestDataHolder.builder()
Expand Down

0 comments on commit 1a45a66

Please sign in to comment.