Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Postgres Source : Support JSONB datatype" #23642

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,7 +18,7 @@ class DataTypeEnumTest {
@Test
void testConversionFromJsonSchemaPrimitiveToDataType() {
assertEquals(5, DataType.class.getEnumConstants().length);
assertEquals(17, JsonSchemaPrimitive.class.getEnumConstants().length);
assertEquals(16, JsonSchemaPrimitive.class.getEnumConstants().length);

assertEquals(DataType.STRING, DataType.fromValue(JsonSchemaPrimitive.STRING.toString().toLowerCase()));
assertEquals(DataType.NUMBER, DataType.fromValue(JsonSchemaPrimitive.NUMBER.toString().toLowerCase()));
Expand Down
Expand Up @@ -45,7 +45,7 @@
- name: AlloyDB for PostgreSQL
sourceDefinitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
dockerRepository: airbyte/source-alloydb
dockerImageTag: 1.0.49
dockerImageTag: 1.0.51
documentationUrl: https://docs.airbyte.com/integrations/sources/alloydb
icon: alloydb.svg
sourceType: database
Expand Down Expand Up @@ -1462,7 +1462,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.50
dockerImageTag: 1.0.51
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Expand Up @@ -370,7 +370,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-alloydb:1.0.49"
- dockerImage: "airbyte/source-alloydb:1.0.51"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down Expand Up @@ -11633,7 +11633,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.50"
- dockerImage: "airbyte/source-postgres:1.0.51"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Expand Up @@ -13,8 +13,6 @@
import static org.apache.kafka.connect.data.Schema.OPTIONAL_FLOAT64_SCHEMA;
import static org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airbyte.db.jdbc.DateTimeConverter;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
Expand Down Expand Up @@ -55,8 +53,6 @@ public class PostgresConverter implements CustomConverter<SchemaBuilder, Relatio
private final String[] NUMERIC_TYPES = {"NUMERIC", "DECIMAL"};
private final String[] ARRAY_TYPES = {"_NAME", "_NUMERIC", "_BYTEA", "_MONEY", "_BIT", "_DATE", "_TIME", "_TIMETZ", "_TIMESTAMP", "_TIMESTAMPTZ"};
private final String BYTEA_TYPE = "BYTEA";
private final String JSONB_TYPE = "JSONB";
private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public void configure(final Properties props) {}
Expand All @@ -73,28 +69,13 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
registerMoney(field, registration);
} else if (BYTEA_TYPE.equalsIgnoreCase(field.typeName())) {
registerBytea(field, registration);
} else if (JSONB_TYPE.equalsIgnoreCase(field.typeName())) {
registerJsonb(field, registration);
} else if (Arrays.stream(NUMERIC_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerNumber(field, registration);
} else if (Arrays.stream(ARRAY_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerArray(field, registration);
}
}

private void registerJsonb(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
try {
return objectMapper.readTree(x.toString()).toString();
} catch (JsonProcessingException e) {
throw new RuntimeException("Could not parse 'jsonb' value:" + e);
}
});
}

private void registerArray(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
final String fieldType = field.typeName().toUpperCase();
final SchemaBuilder arraySchema = switch (fieldType) {
Expand Down
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.49
LABEL io.airbyte.version=1.0.51
LABEL io.airbyte.name=airbyte/source-alloydb-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-alloydb/Dockerfile
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.49
LABEL io.airbyte.version=1.0.51
LABEL io.airbyte.name=airbyte/source-alloydb
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.50
LABEL io.airbyte.version=1.0.51
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.50
LABEL io.airbyte.version=1.0.51
LABEL io.airbyte.name=airbyte/source-postgres
Expand Up @@ -173,7 +173,6 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
case "path" -> putObject(json, columnName, resultSet, colIndex, PGpath.class);
case "point" -> putObject(json, columnName, resultSet, colIndex, PGpoint.class);
case "polygon" -> putObject(json, columnName, resultSet, colIndex, PGpolygon.class);
case "jsonb" -> putJsonb(json, columnName, resultSet, colIndex);
case "_varchar", "_char", "_bpchar", "_text", "_name" -> putArray(json, columnName, resultSet, colIndex);
case "_int2", "_int4", "_int8", "_oid" -> putLongArray(json, columnName, resultSet, colIndex);
case "_numeric", "_decimal" -> putBigDecimalArray(json, columnName, resultSet, colIndex);
Expand All @@ -187,7 +186,6 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
case "_timestamp" -> putTimestampArray(json, columnName, resultSet, colIndex);
case "_timetz" -> putTimeTzArray(json, columnName, resultSet, colIndex);
case "_time" -> putTimeArray(json, columnName, resultSet, colIndex);
case "_jsonb" -> putJsonbArray(json, columnName, resultSet, colIndex);
default -> {
switch (columnInfo.columnType) {
case BOOLEAN -> json.put(columnName, value.equalsIgnoreCase("t"));
Expand All @@ -211,33 +209,6 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
}
}

private void putJsonbArray(ObjectNode node, String columnName, ResultSet resultSet, int colIndex) throws SQLException {
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();

while (arrayResultSet.next()) {
final PGobject object = getObject(arrayResultSet, colIndex, PGobject.class);
final JsonNode value;
try {
value = new ObjectMapper().readTree(object.getValue());
} catch (JsonProcessingException e) {
throw new RuntimeException("Could not parse 'jsonb' value:" + e);
}
arrayNode.add(value);
}
node.set(columnName, arrayNode);
}

private void putJsonb(ObjectNode node, String columnName, ResultSet resultSet, int colIndex) throws SQLException {
final PGobject object = getObject(resultSet, colIndex, PGobject.class);

try {
node.put(columnName, new ObjectMapper().readTree(object.getValue()));
} catch (JsonProcessingException e) {
throw new RuntimeException("Could not parse 'jsonb' value:" + e);
}
}

private void putTimeArray(final ObjectNode node, final String columnName, final ResultSet resultSet, final int colIndex) throws SQLException {
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
Expand Down Expand Up @@ -422,13 +393,11 @@ public PostgresType getDatabaseFieldType(final JsonNode field) {
case "_time" -> PostgresType.TIME_ARRAY;
case "_date" -> PostgresType.DATE_ARRAY;
case "_bytea" -> PostgresType.BYTEA_ARRAY;
case "_jsonb" -> PostgresType.JSONB_ARRAY;
case "bool", "boolean" -> PostgresType.BOOLEAN;
// BYTEA is variable length binary string with hex output format by default (e.g. "\x6b707a").
// It should not be converted to base64 binary string. So it is represented as JDBC VARCHAR.
// https://www.postgresql.org/docs/14/datatype-binary.html
case "bytea" -> PostgresType.VARCHAR;
case "jsonb" -> PostgresType.JSONB;
case TIMESTAMPTZ -> PostgresType.TIMESTAMP_WITH_TIMEZONE;
case TIMETZ -> PostgresType.TIME_WITH_TIMEZONE;
default -> PostgresType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt(), POSTGRES_TYPE_DICT);
Expand Down Expand Up @@ -527,10 +496,7 @@ public JsonSchemaType getAirbyteType(final PostgresType jdbcType) {
case DATE_ARRAY -> JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.STRING_DATE)
.build();
case JSONB_ARRAY -> JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.JSONB)
.build();
case JSONB -> JsonSchemaType.JSONB;

case DATE -> JsonSchemaType.STRING_DATE;
case TIME -> JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE;
case TIME_WITH_TIMEZONE -> JsonSchemaType.STRING_TIME_WITH_TIMEZONE;
Expand Down Expand Up @@ -625,7 +591,6 @@ private ColumnInfo getColumnInfo(final int colIndex, final PgResultSetMetaData m
}

private static class ColumnInfo {

public String columnTypeName;
public PostgresType columnType;

Expand Down
Expand Up @@ -70,9 +70,7 @@ public enum PostgresType implements SQLType {
OID_ARRAY(Types.ARRAY),
FLOAT4_ARRAY(Types.ARRAY),
FLOAT8_ARRAY(Types.ARRAY),
BYTEA_ARRAY(Types.ARRAY),
JSONB_ARRAY(Types.ARRAY),
JSONB(Types.JAVA_OBJECT);
BYTEA_ARRAY(Types.ARRAY);

/**
* The Integer value for the JDBCType. It maps to a value in {@code Types.java}
Expand Down Expand Up @@ -124,15 +122,15 @@ public Integer getVendorTypeNumber() {
* {@code Types} value
* @see Types
*/
public static PostgresType valueOf(final int type, final Map<Integer, PostgresType> postgresTypeMap) {
public static PostgresType valueOf(final int type, final Map<Integer, PostgresType> postgresTypeMap) {
if (postgresTypeMap.containsKey(type)) {
return postgresTypeMap.get(type);
}
throw new IllegalArgumentException("Type:" + type + " is not a valid "
+ "Types.java value.");
}

public static PostgresType safeGetJdbcType(final int columnTypeInt, final Map<Integer, PostgresType> postgresTypeMap) {
public static PostgresType safeGetJdbcType(final int columnTypeInt, final Map<Integer, PostgresType> postgresTypeMap) {
try {
return PostgresType.valueOf(columnTypeInt, postgresTypeMap);
} catch (final Exception e) {
Expand Down
Expand Up @@ -46,18 +46,6 @@ public boolean testCatalog() {
return true;
}

protected String getValueFromJsonNode(final JsonNode jsonNode) {
if (jsonNode != null) {
if (jsonNode.isArray() || jsonNode.isObject()) {
return jsonNode.toString();
}

String value = jsonNode.asText();
return (value != null && value.equals("null") ? null : value);
}
return null;
}

// Test cases are sorted alphabetically based on the source type
// See https://www.postgresql.org/docs/14/datatype.html
@Override
Expand Down Expand Up @@ -265,12 +253,9 @@ protected void initTests() {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("jsonb")
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitive.JSONB)
.withLegacyAirbyteTypeProperty("json")
.build())
.addInsertValues("null", "'10000'::jsonb", "'true'::jsonb", "'[1,2,3]'::jsonb",
"'{\"Janet\": 1, \"Melissa\": {\"loves\": \"trees\", \"married\": true}}'::jsonb")
.addExpectedValues(null, "10000", "true", "[1,2,3]", "{\"Janet\":1,\"Melissa\":{\"loves\":\"trees\",\"married\":true}}")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("null", "'[1, 2, 3]'::jsonb")
.addExpectedValues(null, "[1, 2, 3]")
.build());

addDataTypeTestData(
Expand Down Expand Up @@ -593,23 +578,6 @@ protected void initTests() {

addTimeWithTimeZoneTest();
addArraysTestData();
addJsonbArrayTest();
}

protected void addJsonbArrayTest() {

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("jsonb_array")
.fullSourceDataType("JSONB[]")
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.JSONB)
.build())
.addInsertValues(
"ARRAY['[1,2,1]', 'false']::jsonb[]",
"ARRAY['{\"letter\":\"A\", \"digit\":30}', '{\"letter\":\"B\", \"digit\":31}']::jsonb[]")
.addExpectedValues("[[1,2,1],false]", "[{\"digit\":30,\"letter\":\"A\"},{\"digit\":31,\"letter\":\"B\"}]")
.build());
}

protected void addTimeWithTimeZoneTest() {
Expand Down
Expand Up @@ -11,11 +11,8 @@
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.standardtest.source.TestDataHolder;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.integrations.util.HostPortResolver;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil;
import io.airbyte.protocol.models.JsonSchemaType;
import java.util.List;
import org.jooq.SQLDialect;
import org.testcontainers.containers.PostgreSQLContainer;
Expand Down Expand Up @@ -98,24 +95,6 @@ protected void tearDown(final TestDestinationEnv testEnv) {
container.close();
}

@Override
protected void addJsonbArrayTest() {

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("jsonb_array")
.fullSourceDataType("JSONB[]")
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitiveUtil.JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.JSONB)
.build())
.addInsertValues(
"ARRAY['[1,2,1]', 'false']::jsonb[]",
"ARRAY['{\"letter\":\"A\", \"digit\":30}', '{\"letter\":\"B\", \"digit\":31}']::jsonb[]")
.addExpectedValues("[\"[1, 2, 1]\",\"false\"]",
"[\"{\\\"digit\\\": 30, \\\"letter\\\": \\\"A\\\"}\",\"{\\\"digit\\\": 31, \\\"letter\\\": \\\"B\\\"}\"]")
.build());
}

public boolean testCatalog() {
return true;
}
Expand Down
Expand Up @@ -14,7 +14,6 @@
import io.airbyte.integrations.standardtest.source.TestDataHolder;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.integrations.util.HostPortResolver;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
Expand Down Expand Up @@ -187,22 +186,4 @@ protected void addTimestampWithInfinityValuesTest() {
}
}

@Override
protected void addJsonbArrayTest() {

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("jsonb_array")
.fullSourceDataType("JSONB[]")
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitiveUtil.JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.JSONB)
.build())
.addInsertValues(
"ARRAY['[1,2,1]', 'false']::jsonb[]",
"ARRAY['{\"letter\":\"A\", \"digit\":30}', '{\"letter\":\"B\", \"digit\":31}']::jsonb[]")
.addExpectedValues("[\"[1, 2, 1]\",\"false\"]",
"[\"{\\\"digit\\\": 30, \\\"letter\\\": \\\"A\\\"}\",\"{\\\"digit\\\": 31, \\\"letter\\\": \\\"B\\\"}\"]")
.build());
}

}