Skip to content

Commit

Permalink
Merge pull request #38 from rhauch/dbz-29
Browse files Browse the repository at this point in the history
DBZ-29 Changed MySQL connector to be able to hide, truncate, and mask specific columns
  • Loading branch information
rhauch committed May 12, 2016
2 parents 5c83d40 + ff9d0fc commit 18995ab
Show file tree
Hide file tree
Showing 21 changed files with 1,531 additions and 279 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,37 +93,77 @@ public class MySqlConnectorConfig {

public static final Field TABLE_BLACKLIST = Field.create("table.blacklist")
.withValidation(MySqlConnectorConfig::validateTableBlacklist)
.withDescription("A comma-separated list of table identifiers to be excluded from monitoring, where each identifer is of the form "
+ "'<databaseName>.<tableName>'.");
.withDescription("A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring. "
+ "Fully-qualified names for tables are of the form "
+ "'<databaseName>.<tableName>' or '<databaseName>.<schemaName>.<tableName>'");

public static final Field TABLE_WHITELIST = Field.create("table.whitelist")
.withDescription("A comma-separated list of table identifiers to be monitored, where each identifer is of the form "
+ "'<databaseName>.<tableName>'. May not be used with '" + TABLE_BLACKLIST
+ "'. "
.withDescription("A comma-separated list of regular expressions that match the fully-qualified names of tables to be monitored. "
+ "Fully-qualified names for tables are of the form "
+ "'<databaseName>.<tableName>' or '<databaseName>.<schemaName>.<tableName>'. "
+ "May not be used with '" + TABLE_BLACKLIST + "'. "
+ "The named table will be monitored only if its database is allowed by the `database.whitelist` or "
+ "not disallowed by '" + TABLE_BLACKLIST + "'.");

public static final Field DATABASE_WHITELIST = Field.create("database.whitelist")
.withDescription("A comma-separated list of database names to be monitored. "
.withDescription("A comma-separated list of regular expressions that match database names to be monitored. "
+ "May not be used with 'database.blacklist'.");

public static final Field DATABASE_BLACKLIST = Field.create("database.blacklist")
.withValidation(MySqlConnectorConfig::validateDatabaseBlacklist)
.withDescription("A comma-separated list of database names to be excluded from monitoring. "
.withDescription("A comma-separated list of regular expressions that match database names to be excluded from monitoring. "
+ "May not be used with '" + DATABASE_WHITELIST + "'.");

public static final Field TABLES_IGNORE_BUILTIN = Field.create("table.ignore.builtin")
.withValidation(Field::isBoolean)
.withDescription("Flag specifying whether built-in tables should be ignored. This applies regardless of the table whitelist or blacklists.")
.withDefault(true);

public static final Field COLUMN_BLACKLIST = Field.create("column.blacklist")
.withValidation(MySqlConnectorConfig::validateColumnBlacklist)
.withDescription("A comma-separated list of regular expressions that match fully-qualified names of columns to be excluded from monitoring and change messages. "
+ "Fully-qualified names for columns are of the form "
+ "'<databaseName>.<tableName>.<columnName>' or '<databaseName>.<schemaName>.<tableName>.<columnName>'.");

/**
* Method that generates a Field for specifying that string columns whose names match a set of regular expressions should
* have their values truncated to be no longer than the specified number of characters.
*
* @param length the maximum length of the column's string values written in source records; must be positive
* @return the field; never null
*/
public static final Field TRUNCATE_COLUMN(int length) {
if (length <= 0) throw new IllegalArgumentException("The truncation length must be positive");
return Field.create("column.truncate.to." + length + ".chars")
.withValidation(Field::isInteger)
.withDescription("A comma-separated list of regular expressions matching fully-qualified names of columns that should "
+ "be truncated to " + length + " characters.");
}

/**
* Method that generates a Field for specifying that string columns whose names match a set of regular expressions should
* have their values masked by the specified number of asterisk ('*') characters.
*
* @param length the number of asterisks that should appear in place of the column's string values written in source records;
* must be positive
* @return the field; never null
*/
public static final Field MASK_COLUMN(int length) {
if (length <= 0) throw new IllegalArgumentException("The mask length must be positive");
return Field.create("column.mask.with." + length + ".chars")
.withValidation(Field::isInteger)
.withDescription("A comma-separated list of regular expressions matching fully-qualified names of columns that should "
+ "be masked with " + length + " asterisk ('*') characters.");
}

public static Collection<Field> ALL_FIELDS = Collect.arrayListOf(USER, PASSWORD, HOSTNAME, PORT, SERVER_ID,
SERVER_NAME, INITIAL_BINLOG_FILENAME,
CONNECTION_TIMEOUT_MS, KEEP_ALIVE,
MAX_QUEUE_SIZE, MAX_BATCH_SIZE, POLL_INTERVAL_MS,
DATABASE_HISTORY, INCLUDE_SCHEMA_CHANGES,
TABLE_WHITELIST, TABLE_BLACKLIST, TABLES_IGNORE_BUILTIN,
DATABASE_WHITELIST, DATABASE_BLACKLIST);
DATABASE_WHITELIST, DATABASE_BLACKLIST,
COLUMN_BLACKLIST);

private static int validateMaxQueueSize(Configuration config, Field field, Consumer<String> problems) {
int maxQueueSize = config.getInteger(field);
Expand Down Expand Up @@ -163,6 +203,11 @@ private static int validateTableBlacklist(Configuration config, Field field, Con
return 0;
}

private static int validateColumnBlacklist(Configuration config, Field field, Consumer<String> problems) {
// String blacklist = config.getString(COLUMN_BLACKLIST);
return 0;
}

private static int randomServerId() {
int lowestServerId = 5400;
int highestServerId = 6400;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@

import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.relational.ColumnId;
import io.debezium.relational.ColumnMappers;
import io.debezium.relational.Selectors;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
Expand Down Expand Up @@ -145,24 +148,36 @@ public void start(Map<String, String> props) {
metronome = Metronome.parker(pollIntervalMs, TimeUnit.MILLISECONDS, Clock.SYSTEM);

// Define the filter using the whitelists and blacklists for tables and database names ...
Predicate<TableId> tableFilter = TableId.filter(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST),
config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST),
config.getString(MySqlConnectorConfig.TABLE_WHITELIST),
config.getString(MySqlConnectorConfig.TABLE_BLACKLIST));
Predicate<TableId> tableFilter = Selectors.tableSelector()
.includeDatabases(config.getString(MySqlConnectorConfig.DATABASE_WHITELIST))
.excludeDatabases(config.getString(MySqlConnectorConfig.DATABASE_BLACKLIST))
.includeTables(config.getString(MySqlConnectorConfig.TABLE_WHITELIST))
.excludeTables(config.getString(MySqlConnectorConfig.TABLE_BLACKLIST))
.build();
if (config.getBoolean(MySqlConnectorConfig.TABLES_IGNORE_BUILTIN)) {
Predicate<TableId> isBuiltin = (id) -> {
return BUILT_IN_DB_NAMES.contains(id.catalog().toLowerCase()) || BUILT_IN_TABLE_NAMES.contains(id.table().toLowerCase());
};
tableFilter = tableFilter.and(isBuiltin.negate());
}

// Define the filter that excludes blacklisted columns, truncated columns, and masked columns ...
Predicate<ColumnId> columnFilter = Selectors.excludeColumns(config.getString(MySqlConnectorConfig.COLUMN_BLACKLIST));

// Define the truncated, masked, and mapped columns ...
ColumnMappers.Builder columnMapperBuilder = ColumnMappers.create();
config.forEachMatchingFieldNameWithInteger("column\\.truncate\\.to\\.(\\d+)\\.chars", columnMapperBuilder::truncateStrings);
config.forEachMatchingFieldNameWithInteger("column\\.mask\\.with\\.(\\d+)\\.chars", columnMapperBuilder::maskStrings);
ColumnMappers columnMappers = columnMapperBuilder.build();

// Create the queue ...
events = new LinkedBlockingDeque<>(maxQueueSize);
batchEvents = new ArrayDeque<>(maxBatchSize);

// Set up our handlers for specific kinds of events ...
tables = new Tables();
tableConverters = new TableConverters(topicSelector, dbHistory, includeSchemaChanges, tables, tableFilter);
tableConverters = new TableConverters(topicSelector, dbHistory, includeSchemaChanges, tables, tableFilter, columnFilter,
columnMappers);
eventHandlers.put(EventType.ROTATE, tableConverters::rotateLogs);
eventHandlers.put(EventType.TABLE_MAP, tableConverters::updateTableMetadata);
eventHandlers.put(EventType.QUERY, tableConverters::updateTableCommand);
Expand Down Expand Up @@ -213,15 +228,15 @@ public void start(Map<String, String> props) {
} else {
// initializes this position, though it will be reset when we see the first event (should be a rotate event) ...
client.setBinlogFilename(initialBinLogFilename);
logger.info("Starting MySQL connector from beginning of binlog file {}, position {}",
source.binlogFilename(), source.binlogPosition());
logger.info("Starting MySQL connector '{}' from beginning of binlog file {}, position {}",
serverName, source.binlogFilename(), source.binlogPosition());
}

// Start the log reader, which starts background threads ...
try {
logger.debug("Connecting to MySQL server");
client.connect(timeoutInMilliseconds);
logger.debug("Successfully connected to MySQL server and beginning to read binlog");
logger.info("Successfully started MySQL Connector '{}' and beginning to read binlog", serverName);
} catch (TimeoutException e) {
double seconds = TimeUnit.MILLISECONDS.toSeconds(timeoutInMilliseconds);
throw new ConnectException("Timed out after " + seconds + " seconds while waiting to connect to the MySQL database at " + host
Expand Down Expand Up @@ -272,8 +287,8 @@ public List<SourceRecord> poll() throws InterruptedException {
source.setRowInEvent(0);
}
}
if ( !running.get()) break;

if (!running.get()) break;

// If there is a handler for this event, forward the event to it ...
EventHandler handler = eventHandlers.get(eventType);
Expand All @@ -297,6 +312,7 @@ public List<SourceRecord> poll() throws InterruptedException {
@Override
public void stop() {
try {
logger.info("Stopping MySQL Connector '{}'", serverName);
// Signal to the 'poll()' method that it should stop what its doing ...
this.running.set(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;

import io.debezium.annotation.NotThreadSafe;
import io.debezium.relational.ColumnId;
import io.debezium.relational.ColumnMappers;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
Expand Down Expand Up @@ -60,18 +62,22 @@ final class TableConverters {
private final Map<String, Long> tableNumbersByTableName = new HashMap<>();
private final boolean recordSchemaChangesInSourceRecords;
private final Predicate<TableId> tableFilter;
private final Predicate<ColumnId> columnFilter;
private final ColumnMappers columnMappers;
private final Set<String> ignoredQueryStatements = Collect.unmodifiableSet("BEGIN", "END", "FLUSH PRIVILEGES");
private final Set<TableId> unknownTableIds = new HashSet<>();

public TableConverters(TopicSelector topicSelector, DatabaseHistory dbHistory,
boolean recordSchemaChangesInSourceRecords, Tables tables,
Predicate<TableId> tableFilter) {
Predicate<TableId> tableFilter, Predicate<ColumnId> columnFilter, ColumnMappers columnSelectors) {
Objects.requireNonNull(topicSelector, "A topic selector is required");
Objects.requireNonNull(dbHistory, "Database history storage is required");
Objects.requireNonNull(tables, "A Tables object is required");
this.topicSelector = topicSelector;
this.dbHistory = dbHistory;
this.tables = tables;
this.columnFilter = columnFilter;
this.columnMappers = columnSelectors;
this.ddlParser = new MySqlDdlParser(false); // don't include views
this.recordSchemaChangesInSourceRecords = recordSchemaChangesInSourceRecords;
Predicate<TableId> knownTables = (id) -> !unknownTableIds.contains(id); // known if not unknown
Expand All @@ -82,7 +88,7 @@ public void loadTables() {
// Create TableSchema instances for any existing table ...
this.tables.tableIds().forEach(id -> {
Table table = this.tables.forTable(id);
TableSchema schema = schemaBuilder.create(table);
TableSchema schema = schemaBuilder.create(table,columnFilter, columnMappers);
tableSchemaByTableId.put(id, schema);
});
}
Expand Down Expand Up @@ -129,7 +135,7 @@ public void updateTableCommand(Event event, SourceInfo source, Consumer<SourceRe
if (table == null) { // removed
tableSchemaByTableId.remove(tableId);
} else {
TableSchema schema = schemaBuilder.create(table);
TableSchema schema = schemaBuilder.create(table,columnFilter, columnMappers);
tableSchemaByTableId.put(tableId, schema);
}
});
Expand Down

0 comments on commit 18995ab

Please sign in to comment.