Skip to content

Commit

Permalink
Merge pull request #41 from rhauch/dbz-50
Browse files Browse the repository at this point in the history
DBZ-50 Added parameters for length, maxLength and whether the field is masked
  • Loading branch information
rhauch committed May 12, 2016
2 parents 7ce096a + 6d56a8f commit 83967e0
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,40 +301,16 @@ protected ValueConverter[] convertersForColumns(Schema schema, TableId tableId,
* @param mapper the mapping function for the column; may be null if the columns is not to be mapped to different values
*/
protected void addField(SchemaBuilder builder, Column column, ColumnMapper mapper) {
addField(builder, column.name(), column.jdbcType(), column.typeName(), column.length(),
column.scale(), column.isOptional(), mapper);
}

/**
* Add to the supplied {@link SchemaBuilder} a field for the column with the given information.
* <p>
* Subclasses that wish to override or extend the mappings of JDBC/DBMS types to Kafka Connect value types can override
* this method and delegate to this method before and/or after the custom logic. Similar behavior should be addressed
* in a specialized {@link #createValueConverterFor(Column, Field)} as well.
*
* @param parentBuilder the builder for the schema used to {@link SchemaBuilder#field(String, Schema) define} the new field;
* never null
* @param columnName the name of the column
* @param jdbcType the column's {@link Types JDBC type}
* @param typeName the column's DBMS-specific type name
* @param columnLength the length of the column
* @param columnScale the scale of the column values, or 0 if not a decimal value
* @param optional {@code true} if the column is optional, or {@code false} if the column is known to always have a value
* @param mapper the mapping function for the column; may be null if the columns is not to be mapped to different values
*/
protected void addField(SchemaBuilder parentBuilder, String columnName, int jdbcType, String typeName, int columnLength,
int columnScale, boolean optional, ColumnMapper mapper) {
SchemaBuilder fieldBuilder = null;
switch (jdbcType) {
switch (column.jdbcType()) {
case Types.NULL:
LOGGER.warn("Unexpected JDBC type: NULL");
break;

// Single- and multi-bit values ...
case Types.BIT:
if (columnLength > 1) {
if (column.length() > 1) {
fieldBuilder = Bits.builder();
fieldBuilder.parameter("length", Integer.toString(columnLength));
break;
}
// otherwise, it is just one bit so use a boolean ...
Expand Down Expand Up @@ -386,7 +362,7 @@ protected void addField(SchemaBuilder parentBuilder, String columnName, int jdbc
case Types.DECIMAL:
// values are fixed-precision decimal values with exact precision.
// Use Kafka Connect's arbitrary precision decimal type and use the column's specified scale ...
fieldBuilder = Decimal.builder(columnScale);
fieldBuilder = Decimal.builder(column.scale());
break;

// Fixed-length string values
Expand Down Expand Up @@ -439,16 +415,16 @@ protected void addField(SchemaBuilder parentBuilder, String columnName, int jdbc
case Types.REF_CURSOR:
case Types.STRUCT:
default:
fieldBuilder = addOtherField(columnName, jdbcType, typeName, columnLength, columnScale, optional, mapper);
fieldBuilder = addOtherField(column, mapper);
break;
}
if (fieldBuilder != null) {
if (mapper != null) {
// Let the mapper add properties to the schema ...
mapper.alterFieldSchema(fieldBuilder);
mapper.alterFieldSchema(column,fieldBuilder);
}
if (optional) fieldBuilder.optional();
parentBuilder.field(columnName, fieldBuilder.build());
if (column.isOptional()) fieldBuilder.optional();
builder.field(column.name(), fieldBuilder.build());
}
}

Expand All @@ -457,20 +433,14 @@ protected void addField(SchemaBuilder parentBuilder, String columnName, int jdbc
* <p>
* Subclasses that wish to override or extend the mappings of JDBC/DBMS types to Kafka Connect value types can override
* this method and delegate to this method before and/or after the custom logic. Similar behavior should be addressed
* in a specialized {@link #addField(SchemaBuilder, String, int, String, int, int, boolean, ColumnMapper)} as well.
* in a specialized {@link #addField(SchemaBuilder, Column, ColumnMapper)} as well.
*
* @param columnName the name of the column
* @param jdbcType the column's {@link Types JDBC type}
* @param typeName the column's DBMS-specific type name
* @param columnLength the length of the column
* @param columnScale the scale of the column values, or 0 if not a decimal value
* @param optional {@code true} if the column is optional, or {@code false} if the column is known to always have a value
* @param column the column
* @param mapper the mapping function for the column; may be null if the columns is not to be mapped to different values
* @return the {@link SchemaBuilder} for the new field, ready to be {@link SchemaBuilder#build() build}; may be null
*/
protected SchemaBuilder addOtherField(String columnName, int jdbcType, String typeName, int columnLength,
int columnScale, boolean optional, ColumnMapper mapper) {
LOGGER.warn("Unexpected JDBC type: {}", jdbcType);
protected SchemaBuilder addOtherField(Column column, ColumnMapper mapper) {
LOGGER.warn("Unexpected JDBC type: {}", column.jdbcType());
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ default void initialize( Configuration config ) {

/**
* Optionally annotate the schema with properties to better capture the mapping behavior.
* @param column the column definition; never null
* @param schemaBuilder the builder for the {@link Field}'s schema; never null
*/
default void alterFieldSchema( SchemaBuilder schemaBuilder) {
default void alterFieldSchema(Column column, SchemaBuilder schemaBuilder) {
// do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public ValueConverter create(Column column) {
}

@Override
public void alterFieldSchema(SchemaBuilder schemaBuilder) {
public void alterFieldSchema(Column column, SchemaBuilder schemaBuilder) {
schemaBuilder.parameter("masked", "true");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.apache.kafka.connect.data.SchemaBuilder;

import io.debezium.annotation.Immutable;
import io.debezium.config.Configuration;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;

Expand All @@ -34,27 +33,21 @@ public TruncateStrings(int maxLength) {

@Override
public ValueConverter create(Column column) {
return converter;
return isTruncationPossible(column) ? converter : ValueConverter.passthrough();
}

@Override
public void alterFieldSchema(SchemaBuilder schemaBuilder) {
Configuration params = Configuration.from(schemaBuilder.parameters());
Integer length = params.getInteger("length");
if ( length != null && converter.maxLength < length ) {
// Overwrite the parameter ...
schemaBuilder.parameter("length",Integer.toString(converter.maxLength));
}
Integer maxLength = params.getInteger("maxLength");
if ( maxLength != null && converter.maxLength < maxLength ) {
// Overwrite the parameter ...
schemaBuilder.parameter("maxLength",Integer.toString(converter.maxLength));
}
if ( maxLength == null && length == null ) {
schemaBuilder.parameter("maxLength",Integer.toString(converter.maxLength));
public void alterFieldSchema(Column column, SchemaBuilder schemaBuilder) {
if (isTruncationPossible(column)) {
schemaBuilder.parameter("truncateLength", Integer.toString(converter.maxLength));
}
}

protected boolean isTruncationPossible(Column column) {
// Possible when the length is unknown or greater than the max truncation length ...
return column.length() < 0 || column.length() > converter.maxLength;
}

@Immutable
protected static final class TruncatingValueConverter implements ValueConverter {
protected final int maxLength;
Expand Down

0 comments on commit 83967e0

Please sign in to comment.