Skip to content

Commit

Permalink
Postgres Source : Support JSONB datatype (airbytehq#21695)
Browse files Browse the repository at this point in the history
* Postgres Source Jsonb updated schema with oneOf definition

* updated json schema definition

* updated json schema definition

* updated tests

* refactoring

* fixed checkstyle

* fixed checkstyle

* updated values mapping

* updated test cases and refactoring

* updated test cases

* refactoring

* added jsonb[] support

* refactoring

* updated json schema

* reverted to schema with oneOf

* updated airbyte-protocol version

* deleted protocol files

* bump version

* auto-bump connector version

* manual bump of postgres-source version

* Automated Change

---------

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: VitaliiMaltsev <VitaliiMaltsev@users.noreply.github.com>
  • Loading branch information
3 people committed Feb 27, 2023
1 parent b7808a3 commit 90884d0
Show file tree
Hide file tree
Showing 17 changed files with 149 additions and 19 deletions.
Expand Up @@ -18,7 +18,7 @@ class DataTypeEnumTest {
@Test
void testConversionFromJsonSchemaPrimitiveToDataType() {
assertEquals(5, DataType.class.getEnumConstants().length);
assertEquals(16, JsonSchemaPrimitive.class.getEnumConstants().length);
assertEquals(17, JsonSchemaPrimitive.class.getEnumConstants().length);

assertEquals(DataType.STRING, DataType.fromValue(JsonSchemaPrimitive.STRING.toString().toLowerCase()));
assertEquals(DataType.NUMBER, DataType.fromValue(JsonSchemaPrimitive.NUMBER.toString().toLowerCase()));
Expand Down
Expand Up @@ -45,7 +45,7 @@
- name: AlloyDB for PostgreSQL
sourceDefinitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
dockerRepository: airbyte/source-alloydb
dockerImageTag: 1.0.48
dockerImageTag: 1.0.49
documentationUrl: https://docs.airbyte.com/integrations/sources/alloydb
icon: alloydb.svg
sourceType: database
Expand Down Expand Up @@ -1428,7 +1428,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.49
dockerImageTag: 1.0.50
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Expand Up @@ -370,7 +370,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-alloydb:1.0.48"
- dockerImage: "airbyte/source-alloydb:1.0.49"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down Expand Up @@ -11623,7 +11623,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.49"
- dockerImage: "airbyte/source-postgres:1.0.50"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Expand Up @@ -13,6 +13,8 @@
import static org.apache.kafka.connect.data.Schema.OPTIONAL_FLOAT64_SCHEMA;
import static org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airbyte.db.jdbc.DateTimeConverter;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
Expand Down Expand Up @@ -53,6 +55,8 @@ public class PostgresConverter implements CustomConverter<SchemaBuilder, Relatio
private final String[] NUMERIC_TYPES = {"NUMERIC", "DECIMAL"};
private final String[] ARRAY_TYPES = {"_NAME", "_NUMERIC", "_BYTEA", "_MONEY", "_BIT", "_DATE", "_TIME", "_TIMETZ", "_TIMESTAMP", "_TIMESTAMPTZ"};
private final String BYTEA_TYPE = "BYTEA";
private final String JSONB_TYPE = "JSONB";
private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public void configure(final Properties props) {}
Expand All @@ -69,13 +73,28 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
registerMoney(field, registration);
} else if (BYTEA_TYPE.equalsIgnoreCase(field.typeName())) {
registerBytea(field, registration);
} else if (JSONB_TYPE.equalsIgnoreCase(field.typeName())) {
registerJsonb(field, registration);
} else if (Arrays.stream(NUMERIC_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerNumber(field, registration);
} else if (Arrays.stream(ARRAY_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
registerArray(field, registration);
}
}

private void registerJsonb(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
}
try {
return objectMapper.readTree(x.toString()).toString();
} catch (JsonProcessingException e) {
throw new RuntimeException("Could not parse 'jsonb' value:" + e);
}
});
}

private void registerArray(RelationalColumn field, ConverterRegistration<SchemaBuilder> registration) {
final String fieldType = field.typeName().toUpperCase();
final SchemaBuilder arraySchema = switch (fieldType) {
Expand Down
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.48
LABEL io.airbyte.version=1.0.49
LABEL io.airbyte.name=airbyte/source-alloydb-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-alloydb/Dockerfile
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.48
LABEL io.airbyte.version=1.0.49
LABEL io.airbyte.name=airbyte/source-alloydb
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.49
LABEL io.airbyte.version=1.0.50
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.49
LABEL io.airbyte.version=1.0.50
LABEL io.airbyte.name=airbyte/source-postgres
Expand Up @@ -173,6 +173,7 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
case "path" -> putObject(json, columnName, resultSet, colIndex, PGpath.class);
case "point" -> putObject(json, columnName, resultSet, colIndex, PGpoint.class);
case "polygon" -> putObject(json, columnName, resultSet, colIndex, PGpolygon.class);
case "jsonb" -> putJsonb(json, columnName, resultSet, colIndex);
case "_varchar", "_char", "_bpchar", "_text", "_name" -> putArray(json, columnName, resultSet, colIndex);
case "_int2", "_int4", "_int8", "_oid" -> putLongArray(json, columnName, resultSet, colIndex);
case "_numeric", "_decimal" -> putBigDecimalArray(json, columnName, resultSet, colIndex);
Expand All @@ -186,6 +187,7 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
case "_timestamp" -> putTimestampArray(json, columnName, resultSet, colIndex);
case "_timetz" -> putTimeTzArray(json, columnName, resultSet, colIndex);
case "_time" -> putTimeArray(json, columnName, resultSet, colIndex);
case "_jsonb" -> putJsonbArray(json, columnName, resultSet, colIndex);
default -> {
switch (columnInfo.columnType) {
case BOOLEAN -> json.put(columnName, value.equalsIgnoreCase("t"));
Expand All @@ -209,6 +211,33 @@ public void copyToJsonField(final ResultSet resultSet, final int colIndex, final
}
}

private void putJsonbArray(ObjectNode node, String columnName, ResultSet resultSet, int colIndex) throws SQLException {
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();

while (arrayResultSet.next()) {
final PGobject object = getObject(arrayResultSet, colIndex, PGobject.class);
final JsonNode value;
try {
value = new ObjectMapper().readTree(object.getValue());
} catch (JsonProcessingException e) {
throw new RuntimeException("Could not parse 'jsonb' value:" + e);
}
arrayNode.add(value);
}
node.set(columnName, arrayNode);
}

private void putJsonb(ObjectNode node, String columnName, ResultSet resultSet, int colIndex) throws SQLException {
final PGobject object = getObject(resultSet, colIndex, PGobject.class);

try {
node.put(columnName, new ObjectMapper().readTree(object.getValue()));
} catch (JsonProcessingException e) {
throw new RuntimeException("Could not parse 'jsonb' value:" + e);
}
}

private void putTimeArray(final ObjectNode node, final String columnName, final ResultSet resultSet, final int colIndex) throws SQLException {
final ArrayNode arrayNode = Jsons.arrayNode();
final ResultSet arrayResultSet = resultSet.getArray(colIndex).getResultSet();
Expand Down Expand Up @@ -393,11 +422,13 @@ public PostgresType getDatabaseFieldType(final JsonNode field) {
case "_time" -> PostgresType.TIME_ARRAY;
case "_date" -> PostgresType.DATE_ARRAY;
case "_bytea" -> PostgresType.BYTEA_ARRAY;
case "_jsonb" -> PostgresType.JSONB_ARRAY;
case "bool", "boolean" -> PostgresType.BOOLEAN;
// BYTEA is variable length binary string with hex output format by default (e.g. "\x6b707a").
// It should not be converted to base64 binary string. So it is represented as JDBC VARCHAR.
// https://www.postgresql.org/docs/14/datatype-binary.html
case "bytea" -> PostgresType.VARCHAR;
case "jsonb" -> PostgresType.JSONB;
case TIMESTAMPTZ -> PostgresType.TIMESTAMP_WITH_TIMEZONE;
case TIMETZ -> PostgresType.TIME_WITH_TIMEZONE;
default -> PostgresType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt(), POSTGRES_TYPE_DICT);
Expand Down Expand Up @@ -496,7 +527,10 @@ public JsonSchemaType getAirbyteType(final PostgresType jdbcType) {
case DATE_ARRAY -> JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.STRING_DATE)
.build();

case JSONB_ARRAY -> JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.JSONB)
.build();
case JSONB -> JsonSchemaType.JSONB;
case DATE -> JsonSchemaType.STRING_DATE;
case TIME -> JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE;
case TIME_WITH_TIMEZONE -> JsonSchemaType.STRING_TIME_WITH_TIMEZONE;
Expand Down Expand Up @@ -591,6 +625,7 @@ private ColumnInfo getColumnInfo(final int colIndex, final PgResultSetMetaData m
}

private static class ColumnInfo {

public String columnTypeName;
public PostgresType columnType;

Expand Down
Expand Up @@ -70,7 +70,9 @@ public enum PostgresType implements SQLType {
OID_ARRAY(Types.ARRAY),
FLOAT4_ARRAY(Types.ARRAY),
FLOAT8_ARRAY(Types.ARRAY),
BYTEA_ARRAY(Types.ARRAY);
BYTEA_ARRAY(Types.ARRAY),
JSONB_ARRAY(Types.ARRAY),
JSONB(Types.JAVA_OBJECT);

/**
* The Integer value for the JDBCType. It maps to a value in {@code Types.java}
Expand Down Expand Up @@ -122,15 +124,15 @@ public Integer getVendorTypeNumber() {
* {@code Types} value
* @see Types
*/
public static PostgresType valueOf(final int type, final Map<Integer, PostgresType> postgresTypeMap) {
public static PostgresType valueOf(final int type, final Map<Integer, PostgresType> postgresTypeMap) {
if (postgresTypeMap.containsKey(type)) {
return postgresTypeMap.get(type);
}
throw new IllegalArgumentException("Type:" + type + " is not a valid "
+ "Types.java value.");
}

public static PostgresType safeGetJdbcType(final int columnTypeInt, final Map<Integer, PostgresType> postgresTypeMap) {
public static PostgresType safeGetJdbcType(final int columnTypeInt, final Map<Integer, PostgresType> postgresTypeMap) {
try {
return PostgresType.valueOf(columnTypeInt, postgresTypeMap);
} catch (final Exception e) {
Expand Down
Expand Up @@ -46,6 +46,18 @@ public boolean testCatalog() {
return true;
}

protected String getValueFromJsonNode(final JsonNode jsonNode) {
if (jsonNode != null) {
if (jsonNode.isArray() || jsonNode.isObject()) {
return jsonNode.toString();
}

String value = jsonNode.asText();
return (value != null && value.equals("null") ? null : value);
}
return null;
}

// Test cases are sorted alphabetically based on the source type
// See https://www.postgresql.org/docs/14/datatype.html
@Override
Expand Down Expand Up @@ -253,9 +265,12 @@ protected void initTests() {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("jsonb")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("null", "'[1, 2, 3]'::jsonb")
.addExpectedValues(null, "[1, 2, 3]")
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitive.JSONB)
.withLegacyAirbyteTypeProperty("json")
.build())
.addInsertValues("null", "'10000'::jsonb", "'true'::jsonb", "'[1,2,3]'::jsonb",
"'{\"Janet\": 1, \"Melissa\": {\"loves\": \"trees\", \"married\": true}}'::jsonb")
.addExpectedValues(null, "10000", "true", "[1,2,3]", "{\"Janet\":1,\"Melissa\":{\"loves\":\"trees\",\"married\":true}}")
.build());

addDataTypeTestData(
Expand Down Expand Up @@ -578,6 +593,23 @@ protected void initTests() {

addTimeWithTimeZoneTest();
addArraysTestData();
addJsonbArrayTest();
}

protected void addJsonbArrayTest() {

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("jsonb_array")
.fullSourceDataType("JSONB[]")
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.JSONB)
.build())
.addInsertValues(
"ARRAY['[1,2,1]', 'false']::jsonb[]",
"ARRAY['{\"letter\":\"A\", \"digit\":30}', '{\"letter\":\"B\", \"digit\":31}']::jsonb[]")
.addExpectedValues("[[1,2,1],false]", "[{\"digit\":30,\"letter\":\"A\"},{\"digit\":31,\"letter\":\"B\"}]")
.build());
}

protected void addTimeWithTimeZoneTest() {
Expand Down
Expand Up @@ -11,8 +11,11 @@
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.standardtest.source.TestDataHolder;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.integrations.util.HostPortResolver;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil;
import io.airbyte.protocol.models.JsonSchemaType;
import java.util.List;
import org.jooq.SQLDialect;
import org.testcontainers.containers.PostgreSQLContainer;
Expand Down Expand Up @@ -95,6 +98,24 @@ protected void tearDown(final TestDestinationEnv testEnv) {
container.close();
}

@Override
protected void addJsonbArrayTest() {

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("jsonb_array")
.fullSourceDataType("JSONB[]")
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitiveUtil.JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.JSONB)
.build())
.addInsertValues(
"ARRAY['[1,2,1]', 'false']::jsonb[]",
"ARRAY['{\"letter\":\"A\", \"digit\":30}', '{\"letter\":\"B\", \"digit\":31}']::jsonb[]")
.addExpectedValues("[\"[1, 2, 1]\",\"false\"]",
"[\"{\\\"digit\\\": 30, \\\"letter\\\": \\\"A\\\"}\",\"{\\\"digit\\\": 31, \\\"letter\\\": \\\"B\\\"}\"]")
.build());
}

public boolean testCatalog() {
return true;
}
Expand Down
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.integrations.standardtest.source.TestDataHolder;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.integrations.util.HostPortResolver;
import io.airbyte.protocol.models.JsonSchemaPrimitiveUtil;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
Expand Down Expand Up @@ -186,4 +187,22 @@ protected void addTimestampWithInfinityValuesTest() {
}
}

@Override
protected void addJsonbArrayTest() {

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("jsonb_array")
.fullSourceDataType("JSONB[]")
.airbyteType(JsonSchemaType.builder(JsonSchemaPrimitiveUtil.JsonSchemaPrimitive.ARRAY)
.withItems(JsonSchemaType.JSONB)
.build())
.addInsertValues(
"ARRAY['[1,2,1]', 'false']::jsonb[]",
"ARRAY['{\"letter\":\"A\", \"digit\":30}', '{\"letter\":\"B\", \"digit\":31}']::jsonb[]")
.addExpectedValues("[\"[1, 2, 1]\",\"false\"]",
"[\"{\\\"digit\\\": 30, \\\"letter\\\": \\\"A\\\"}\",\"{\\\"digit\\\": 31, \\\"letter\\\": \\\"B\\\"}\"]")
.build());
}

}

0 comments on commit 90884d0

Please sign in to comment.