Skip to content

Commit

Permalink
DBZ-7191 Null value will be used instead of default for optional field
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale authored and Naros committed Nov 29, 2023
1 parent 2d83aae commit 2b3c3f1
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 11 deletions.
10 changes: 9 additions & 1 deletion src/main/java/io/debezium/connector/jdbc/RecordWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,17 @@ private int bindNonKeyValuesToQuery(SinkRecordDescriptor record, QueryBinder que
}

private int bindFieldValuesToQuery(SinkRecordDescriptor record, QueryBinder query, int index, Struct source, List<String> fields) {

for (String fieldName : fields) {
final SinkRecordDescriptor.FieldDescriptor field = record.getFields().get(fieldName);
List<ValueBindDescriptor> boundValues = dialect.bindValue(field, index, source.get(fieldName));

Object value;
if (field.getSchema().isOptional()) {
value = source.getWithoutDefault(fieldName);
} else {
value = source.get(fieldName);
}
List<ValueBindDescriptor> boundValues = dialect.bindValue(field, index, value);

boundValues.forEach(query::bind);
index += boundValues.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@

import static org.fest.assertions.Assertions.assertThat;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import io.debezium.doc.FixFor;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.sink.SinkRecord;
import org.assertj.db.api.TableAssert;
import org.assertj.db.type.ValueType;
Expand Down Expand Up @@ -348,4 +352,33 @@ public void testInsertModeUpdateWithPrimaryKeyModeRecordValue(SinkRecordFactory
getSink().assertColumnType(tableAssert, "name", ValueType.TEXT);
getSink().assertColumnType(tableAssert, "nick_name$", ValueType.TEXT);
}

@FixFor("DBZ-7191")
@ParameterizedTest
@ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
public void testRecordDefaultValueUsedOnlyWithRequiredFieldWithNullValue(SinkRecordFactory factory) {
final Map<String, String> properties = getDefaultSinkConfig();
properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, SchemaEvolutionMode.BASIC.getValue());
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, PrimaryKeyMode.RECORD_KEY.getValue());
properties.put(JdbcSinkConnectorConfig.INSERT_MODE, InsertMode.INSERT.getValue());
startSinkConnector(properties);
assertSinkConnectorIsRunning();

final String tableName = randomTableName();
final String topicName = topicName("server1", "schema", tableName);

final SinkRecord createRecord = factory.createRecordWithSchemaValue(topicName,
(byte) 1,
List.of( "optional_with_default_null_value"),
List.of(SchemaBuilder.string().defaultValue("default").optional().build()),
Arrays.asList(new Object[]{null}));

consume(createRecord);

final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName(createRecord));
tableAssert.exists().hasNumberOfRows(1).hasNumberOfColumns(2);

getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER, (byte) 1);
getSink().assertColumnHasNullValue(tableAssert, "optional_with_default_null_value");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -363,28 +363,28 @@ public void testNonKeyColumnTypeResolutionFromKafkaSchemaTypeWithOptionalsWithDe

getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER, (byte) 1);
getSink().assertColumnType(tableAssert, "col_int8", ValueType.NUMBER, (byte) 10);
getSink().assertColumnType(tableAssert, "col_int8_optional", ValueType.NUMBER, (byte) 10);
getSink().assertColumnHasNullValue(tableAssert, "col_int8_optional");
getSink().assertColumnType(tableAssert, "col_int16", ValueType.NUMBER, (short) 15);
getSink().assertColumnType(tableAssert, "col_int16_optional", ValueType.NUMBER, (short) 15);
getSink().assertColumnHasNullValue(tableAssert, "col_int16_optional");
getSink().assertColumnType(tableAssert, "col_int32", ValueType.NUMBER, 1024);
getSink().assertColumnType(tableAssert, "col_int32_optional", ValueType.NUMBER, 1024);
getSink().assertColumnHasNullValue(tableAssert, "col_int32_optional");
getSink().assertColumnType(tableAssert, "col_int64", ValueType.NUMBER, 1024L);
getSink().assertColumnType(tableAssert, "col_int64_optional", ValueType.NUMBER, 1024L);
getSink().assertColumnHasNullValue(tableAssert, "col_int64_optional");
getSink().assertColumnType(tableAssert, "col_float32", ValueType.NUMBER, 3.14f);
getSink().assertColumnType(tableAssert, "col_float32_optional", ValueType.NUMBER, 3.14f);
getSink().assertColumnHasNullValue(tableAssert, "col_float32_optional");
getSink().assertColumnType(tableAssert, "col_float64", ValueType.NUMBER, 3.14d);
getSink().assertColumnType(tableAssert, "col_float64_optional", ValueType.NUMBER, 3.14d);
getSink().assertColumnHasNullValue(tableAssert, "col_float64_optional");
getSink().assertColumnType(tableAssert, "col_string", ValueType.TEXT, text);
getSink().assertColumnType(tableAssert, "col_string_optional", ValueType.TEXT, text);
getSink().assertColumnHasNullValue(tableAssert, "col_string_optional");
getSink().assertColumnType(tableAssert, "col_bytes", ValueType.BYTES, text.getBytes(StandardCharsets.UTF_8));
getSink().assertColumnType(tableAssert, "col_bytes_optional", ValueType.BYTES, text.getBytes(StandardCharsets.UTF_8));
getSink().assertColumnHasNullValue(tableAssert, "col_bytes_optional");
if (getSink().getType().is(SinkType.ORACLE)) {
getSink().assertColumnType(tableAssert, "col_bool", ValueType.NUMBER, 1);
getSink().assertColumnType(tableAssert, "col_bool_optional", ValueType.NUMBER, true);
getSink().assertColumnHasNullValue(tableAssert, "col_bool_optional");
}
else {
getSink().assertColumnType(tableAssert, "col_bool", ValueType.BOOLEAN, true);
getSink().assertColumnType(tableAssert, "col_bool_optional", ValueType.BOOLEAN, true);
getSink().assertColumnHasNullValue(tableAssert, "col_bool_optional");
}
}

Expand Down

0 comments on commit 2b3c3f1

Please sign in to comment.