Skip to content

Commit

Permalink
Merge pull request #40 from rhauch/dbz-29b
Browse files Browse the repository at this point in the history
DBZ-29 Refactored ColumnMappers
  • Loading branch information
rhauch committed May 12, 2016
2 parents 18995ab + b1e6eb1 commit 7ce096a
Show file tree
Hide file tree
Showing 13 changed files with 501 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@
import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.relational.ColumnId;
import io.debezium.relational.ColumnMappers;
import io.debezium.relational.Selectors;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.mapping.ColumnMappers;
import io.debezium.util.Clock;
import io.debezium.util.Collect;
import io.debezium.util.Metronome;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@

import io.debezium.annotation.NotThreadSafe;
import io.debezium.relational.ColumnId;
import io.debezium.relational.ColumnMappers;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Tables;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.mapping.ColumnMappers;
import io.debezium.text.ParsingException;
import io.debezium.util.Collect;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void shouldConsumeEventsWithMaskedAndBlacklistedColumns() throws SQLExcep
start(MySqlConnector.class, config);

// Wait for records to become available ...
//Testing.Print.enable();
Testing.Print.enable();
waitForAvailableRecords(15, TimeUnit.SECONDS);

// Now consume the records ...
Expand All @@ -166,10 +166,12 @@ public void shouldConsumeEventsWithMaskedAndBlacklistedColumns() throws SQLExcep
fail("The 'order_number' field was found but should not exist");
} catch ( DataException e ) {
// expected
printJson(record);
}
} else if ( record.topic().endsWith(".customers")) {
Struct value = (Struct) record.value();
assertThat(value.getString("email")).isEqualTo("************");
printJson(record);
}
});
stopConnector();
Expand Down

This file was deleted.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.mapping;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.SchemaBuilder;

import io.debezium.config.Configuration;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;

/**
* A factory for a function used to map values of a column.
*
* @author Randall Hauch
*/
@FunctionalInterface
public interface ColumnMapper {

/**
* Initialize the ColumnMapper instance based upon the connector's configuration.
* @param config the connector's configuration
*/
default void initialize( Configuration config ) {
// do nothing
}

/**
* Create for the given column a function that maps values.
*
* @param column the column description; never null
* @return the function that converts the value; may be null
*/
ValueConverter create(Column column);

/**
* Optionally annotate the schema with properties to better capture the mapping behavior.
* @param schemaBuilder the builder for the {@link Field}'s schema; never null
*/
default void alterFieldSchema( SchemaBuilder schemaBuilder) {
// do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational;
package io.debezium.relational.mapping;

import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -14,7 +13,14 @@
import org.apache.kafka.connect.errors.ConnectException;

import io.debezium.annotation.Immutable;
import io.debezium.config.Configuration;
import io.debezium.function.Predicates;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnId;
import io.debezium.relational.Selectors;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.ValueConverter;
import io.debezium.util.Strings;

/**
Expand Down Expand Up @@ -68,7 +74,21 @@ public Builder map(String fullyQualifiedColumnNames, ColumnMapper mapper) {
* @return this object so that methods can be chained together; never null
*/
public Builder map(String fullyQualifiedColumnNames, Class<ColumnMapper> mapperClass) {
return map(fullyQualifiedColumnNames, instantiateMapper(mapperClass));
return map(fullyQualifiedColumnNames,mapperClass,null);
}

/**
* Set a mapping function for the columns with fully-qualified names that match the given comma-separated list of regular
* expression patterns.
*
* @param fullyQualifiedColumnNames the comma-separated list of fully-qualified column names; may not be null
* @param mapperClass the Java class that implements {@code BiFunction<Column, Object, Object>} and that
* will be used to map actual values into values used in the output record; may not be null
* @param config the configuration to pass to the {@link ColumnMapper} instance; may be null
* @return this object so that methods can be chained together; never null
*/
public Builder map(String fullyQualifiedColumnNames, Class<ColumnMapper> mapperClass, Configuration config) {
return map(fullyQualifiedColumnNames, instantiateMapper(mapperClass, config));
}

/**
Expand All @@ -80,15 +100,7 @@ public Builder map(String fullyQualifiedColumnNames, Class<ColumnMapper> mapperC
* @return this object so that methods can be chained together; never null
*/
public Builder truncateStrings(String fullyQualifiedColumnNames, int maxLength) {
return map(fullyQualifiedColumnNames, (column) -> {
return (value) -> {
if (value instanceof String) {
String str = (String) value;
if (str.length() > maxLength) return str.substring(0, maxLength);
}
return value;
};
});
return map(fullyQualifiedColumnNames, new TruncateStrings(maxLength));
}

/**
Expand Down Expand Up @@ -125,22 +137,7 @@ public Builder maskStrings(String fullyQualifiedColumnNames, int numberOfChars,
* @return this object so that methods can be chained together; never null
*/
public Builder maskStrings(String fullyQualifiedColumnNames, String maskValue) {
return map(fullyQualifiedColumnNames, (column) -> {
switch (column.jdbcType()) {
case Types.CHAR: // variable-length
case Types.VARCHAR: // variable-length
case Types.LONGVARCHAR: // variable-length
case Types.CLOB: // variable-length
case Types.NCHAR: // fixed-length
case Types.NVARCHAR: // fixed-length
case Types.LONGNVARCHAR: // fixed-length
case Types.NCLOB: // fixed-length
case Types.DATALINK:
return (input) -> maskValue;
default:
return (input) -> input;
}
});
return map(fullyQualifiedColumnNames, new MaskStrings(maskValue));
}

/**
Expand All @@ -153,8 +150,23 @@ public Builder maskStrings(String fullyQualifiedColumnNames, String maskValue) {
* an existing mapping function should be removed
* @return this object so that methods can be chained together; never null
*/
@SuppressWarnings("unchecked")
public Builder map(String fullyQualifiedColumnNames, String mapperClassName) {
return map(fullyQualifiedColumnNames,mapperClassName,null);
}

/**
* Set a mapping function for the columns with fully-qualified names that match the given comma-separated list of regular
* expression patterns.
*
* @param fullyQualifiedColumnNames the comma-separated list of fully-qualified column names; may not be null
* @param mapperClassName the name of the Java class that implements {@code BiFunction<Column, Object, Object>} and that
* will be used to map actual values into values used in the output record; null if
* an existing mapping function should be removed
* @param config the configuration to pass to the {@link ColumnMapper} instance; may be null
* @return this object so that methods can be chained together; never null
*/
@SuppressWarnings("unchecked")
public Builder map(String fullyQualifiedColumnNames, String mapperClassName, Configuration config) {
Class<ColumnMapper> mapperClass = null;
if (mapperClassName != null) {
try {
Expand All @@ -167,7 +179,7 @@ public Builder map(String fullyQualifiedColumnNames, String mapperClassName) {
e);
}
}
return map(fullyQualifiedColumnNames, mapperClass);
return map(fullyQualifiedColumnNames, mapperClass, config);
}

/**
Expand All @@ -194,8 +206,20 @@ private ColumnMappers(List<MapperRule> rules) {
* @param column the column; may not be null
* @return the mapping function, or null if there is no mapping function
*/
public ValueConverter mapperFor(Table table, Column column) {
return mapperFor(table.id(),column);
public ValueConverter mappingConverterFor(Table table, Column column) {
return mappingConverterFor(table.id(), column);
}

/**
* Get the value mapping function for the given column.
*
* @param tableId the identifier of the table to which the column belongs; may not be null
* @param column the column; may not be null
* @return the mapping function, or null if there is no mapping function
*/
public ValueConverter mappingConverterFor(TableId tableId, Column column) {
ColumnMapper mapper = mapperFor(tableId,column);
return mapper != null ? mapper.create(column) : null;
}

/**
Expand All @@ -205,11 +229,11 @@ public ValueConverter mapperFor(Table table, Column column) {
* @param column the column; may not be null
* @return the mapping function, or null if there is no mapping function
*/
public ValueConverter mapperFor(TableId tableId, Column column) {
public ColumnMapper mapperFor(TableId tableId, Column column) {
ColumnId id = new ColumnId(tableId, column.name());
Optional<MapperRule> matchingRule = rules.stream().filter(rule -> rule.matches(id)).findFirst();
if (matchingRule.isPresent()) {
return matchingRule.get().mapper.create(column);
return matchingRule.get().mapper;
}
return null;
}
Expand All @@ -229,13 +253,19 @@ protected boolean matches(ColumnId id) {
}
}

protected static <T> T instantiateMapper(Class<T> clazz) {
protected static ColumnMapper instantiateMapper(Class<ColumnMapper> clazz, Configuration config) {
try {
return clazz.newInstance();
ColumnMapper mapper = clazz.newInstance();
if ( config != null ) {
mapper.initialize(config);
}
return mapper;
} catch (InstantiationException e) {
throw new ConnectException("Unable to instantiate column mapper class " + clazz.getName() + ": " + e.getMessage(), e);
} catch (IllegalAccessException e) {
throw new ConnectException("Unable to access column mapper class " + clazz.getName() + ": " + e.getMessage(), e);
} catch (Throwable e) {
throw new ConnectException("Unable to initialize the column mapper class " + clazz.getName() + ": " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.relational.mapping;

import java.sql.Types;

import org.apache.kafka.connect.data.SchemaBuilder;

import io.debezium.annotation.Immutable;
import io.debezium.relational.Column;
import io.debezium.relational.ValueConverter;

/**
* A {@link ColumnMapper} implementation that ensures that string values are masked by a predefined value.
*
* @author Randall Hauch
*/
public class MaskStrings implements ColumnMapper {

private final MaskingValueConverter converter;

/**
* Create a {@link ColumnMapper} that masks string values with a predefined value.
*
* @param maskValue the value that should be used in place of the actual value; may not be null
* @throws IllegalArgumentException if the {@code maxLength} is not positive
*/
public MaskStrings(String maskValue) {
if (maskValue == null) throw new IllegalArgumentException("Mask value may not be null");
this.converter = new MaskingValueConverter(maskValue);
}

@Override
public ValueConverter create(Column column) {
switch (column.jdbcType()) {
case Types.CHAR: // variable-length
case Types.VARCHAR: // variable-length
case Types.LONGVARCHAR: // variable-length
case Types.CLOB: // variable-length
case Types.NCHAR: // fixed-length
case Types.NVARCHAR: // fixed-length
case Types.LONGNVARCHAR: // fixed-length
case Types.NCLOB: // fixed-length
case Types.DATALINK:
return converter;
default:
return ValueConverter.passthrough();
}
}

@Override
public void alterFieldSchema(SchemaBuilder schemaBuilder) {
schemaBuilder.parameter("masked", "true");
}

@Immutable
protected static final class MaskingValueConverter implements ValueConverter {
protected final String maskValue;

public MaskingValueConverter(String maskValue) {
this.maskValue = maskValue;
assert this.maskValue != null;
}

@Override
public Object convert(Object value) {
return maskValue;
}
}
}
Loading

0 comments on commit 7ce096a

Please sign in to comment.