From 32dd178e8f44555bbdb20a3d1e7e085e704996b0 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Tue, 28 Nov 2023 11:35:42 +0100 Subject: [PATCH] DBZ-7191 Null value will be used instead of default for optional field --- .../debezium/connector/jdbc/RecordWriter.java | 10 +++++- .../AbstractJdbcSinkInsertModeTest.java | 33 +++++++++++++++++++ .../AbstractJdbcSinkSchemaEvolutionTest.java | 20 +++++------ 3 files changed, 52 insertions(+), 11 deletions(-) diff --git a/src/main/java/io/debezium/connector/jdbc/RecordWriter.java b/src/main/java/io/debezium/connector/jdbc/RecordWriter.java index 2f7b6372..039bc917 100644 --- a/src/main/java/io/debezium/connector/jdbc/RecordWriter.java +++ b/src/main/java/io/debezium/connector/jdbc/RecordWriter.java @@ -117,9 +117,17 @@ private int bindNonKeyValuesToQuery(SinkRecordDescriptor record, QueryBinder que } private int bindFieldValuesToQuery(SinkRecordDescriptor record, QueryBinder query, int index, Struct source, List fields) { + for (String fieldName : fields) { final SinkRecordDescriptor.FieldDescriptor field = record.getFields().get(fieldName); - List boundValues = dialect.bindValue(field, index, source.get(fieldName)); + + Object value; + if (field.getSchema().isOptional()) { + value = source.getWithoutDefault(fieldName); + } else { + value = source.get(fieldName); + } + List boundValues = dialect.bindValue(field, index, value); boundValues.forEach(query::bind); index += boundValues.size(); diff --git a/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkInsertModeTest.java b/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkInsertModeTest.java index 5279c8c3..fb4a6be5 100644 --- a/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkInsertModeTest.java +++ b/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkInsertModeTest.java @@ -7,8 +7,12 @@ import static org.fest.assertions.Assertions.assertThat; +import java.util.Arrays; +import java.util.List; import java.util.Map; +import io.debezium.doc.FixFor; +import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.sink.SinkRecord; import org.assertj.db.api.TableAssert; import org.assertj.db.type.ValueType; @@ -348,4 +352,33 @@ public void testInsertModeUpdateWithPrimaryKeyModeRecordValue(SinkRecordFactory getSink().assertColumnType(tableAssert, "name", ValueType.TEXT); getSink().assertColumnType(tableAssert, "nick_name$", ValueType.TEXT); } + + @FixFor("DBZ-7191") + @ParameterizedTest + @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class) + public void testRecordDefaultValueUsedOnlyWithRequiredFieldWithNullValue(SinkRecordFactory factory) { + final Map properties = getDefaultSinkConfig(); + properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, SchemaEvolutionMode.BASIC.getValue()); + properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, PrimaryKeyMode.RECORD_KEY.getValue()); + properties.put(JdbcSinkConnectorConfig.INSERT_MODE, InsertMode.INSERT.getValue()); + startSinkConnector(properties); + assertSinkConnectorIsRunning(); + + final String tableName = randomTableName(); + final String topicName = topicName("server1", "schema", tableName); + + final SinkRecord createRecord = factory.createRecordWithSchemaValue(topicName, + (byte) 1, + List.of( "optional_with_default_null_value"), + List.of(SchemaBuilder.string().defaultValue("default").optional().build()), + Arrays.asList(new Object[]{null})); + + consume(createRecord); + + final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName(createRecord)); + tableAssert.exists().hasNumberOfRows(1).hasNumberOfColumns(2); + + getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER, (byte) 1); + getSink().assertColumnHasNullValue(tableAssert, "optional_with_default_null_value"); + } } diff --git a/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkSchemaEvolutionTest.java b/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkSchemaEvolutionTest.java index 20fda4a9..cb38f1a9 100644 --- a/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkSchemaEvolutionTest.java +++ b/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkSchemaEvolutionTest.java @@ -363,28 +363,28 @@ public void testNonKeyColumnTypeResolutionFromKafkaSchemaTypeWithOptionalsWithDe getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER, (byte) 1); getSink().assertColumnType(tableAssert, "col_int8", ValueType.NUMBER, (byte) 10); - getSink().assertColumnType(tableAssert, "col_int8_optional", ValueType.NUMBER, (byte) 10); + getSink().assertColumnHasNullValue(tableAssert, "col_int8_optional"); getSink().assertColumnType(tableAssert, "col_int16", ValueType.NUMBER, (short) 15); - getSink().assertColumnType(tableAssert, "col_int16_optional", ValueType.NUMBER, (short) 15); + getSink().assertColumnHasNullValue(tableAssert, "col_int16_optional"); getSink().assertColumnType(tableAssert, "col_int32", ValueType.NUMBER, 1024); - getSink().assertColumnType(tableAssert, "col_int32_optional", ValueType.NUMBER, 1024); + getSink().assertColumnHasNullValue(tableAssert, "col_int32_optional"); getSink().assertColumnType(tableAssert, "col_int64", ValueType.NUMBER, 1024L); - getSink().assertColumnType(tableAssert, "col_int64_optional", ValueType.NUMBER, 1024L); + getSink().assertColumnHasNullValue(tableAssert, "col_int64_optional"); getSink().assertColumnType(tableAssert, "col_float32", ValueType.NUMBER, 3.14f); - getSink().assertColumnType(tableAssert, "col_float32_optional", ValueType.NUMBER, 3.14f); + getSink().assertColumnHasNullValue(tableAssert, "col_float32_optional"); getSink().assertColumnType(tableAssert, "col_float64", ValueType.NUMBER, 3.14d); - getSink().assertColumnType(tableAssert, "col_float64_optional", ValueType.NUMBER, 3.14d); + getSink().assertColumnHasNullValue(tableAssert, "col_float64_optional"); getSink().assertColumnType(tableAssert, "col_string", ValueType.TEXT, text); - getSink().assertColumnType(tableAssert, "col_string_optional", ValueType.TEXT, text); + getSink().assertColumnHasNullValue(tableAssert, "col_string_optional"); getSink().assertColumnType(tableAssert, "col_bytes", ValueType.BYTES, text.getBytes(StandardCharsets.UTF_8)); - getSink().assertColumnType(tableAssert, "col_bytes_optional", ValueType.BYTES, text.getBytes(StandardCharsets.UTF_8)); + getSink().assertColumnHasNullValue(tableAssert, "col_bytes_optional"); if (getSink().getType().is(SinkType.ORACLE)) { getSink().assertColumnType(tableAssert, "col_bool", ValueType.NUMBER, 1); - getSink().assertColumnType(tableAssert, "col_bool_optional", ValueType.NUMBER, true); + getSink().assertColumnHasNullValue(tableAssert, "col_bool_optional"); } else { getSink().assertColumnType(tableAssert, "col_bool", ValueType.BOOLEAN, true); - getSink().assertColumnType(tableAssert, "col_bool_optional", ValueType.BOOLEAN, true); + getSink().assertColumnHasNullValue(tableAssert, "col_bool_optional"); } }