Skip to content

Commit

Permalink
Merge pull request debezium#33 in N4FRA/debezium from DSCON-125_colum…
Browse files Browse the repository at this point in the history
…n_blacklist_Oracle to master

Squashed commit of the following:

commit 2db5385f4bb688976a77b7ef6f6f8a401515216d
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Mar 6 18:01:17 2020 -0800

    DSCON-125, make column blacklist working for Oracle

commit c78c368
Merge: 90bcc19 4619fcd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Mar 6 06:52:42 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 90bcc19
Merge: b5d1ea7 3e3aeea
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Mar 2 14:31:07 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit b5d1ea7
Merge: 9686041 51f0dcb
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 17:17:38 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9686041
Merge: 926c648 4996a49
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 12:02:35 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 926c648
Merge: 92140a3 829206c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Feb 26 10:49:29 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 92140a3
Merge: 9fa48df 15a6c6c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue Feb 25 18:14:58 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9fa48df
Merge: d3da472 27eb9af
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Feb 14 16:11:29 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit d3da472
Merge: 86f3f65 081731f
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 16:18:33 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 86f3f65
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 16:02:43 2020 -0800

    DSCON-117, DBConnector exception while incremental loading - revert

    This reverts commit c3a6023.

commit c3a6023
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 15:56:18 2020 -0800

    DSCON-117, DBConnector exception while incremental loading

commit 90f1823
Merge: 605d22f 8f53bba
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Feb 3 13:52:09 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 605d22f
Merge: 276c19b e68ada6
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Jan 24 07:32:11 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 276c19b
Merge: 9b5a3f3 bc8e4be
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Jan 20 14:32:44 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9b5a3f3
Merge: 9b0ee98 f9de59c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Jan 15 13:41:38 2020 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 9b0ee98
Merge: 7ab9af3 0fe3cf3
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 27 11:55:19 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 7ab9af3
Merge: 90979c1 d8872ce
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 27 11:48:18 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 90979c1
Merge: d174eab 4086b3e
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri Dec 13 13:39:26 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit d174eab
Merge: 0894de1 7e77a83
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Dec 9 08:58:41 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 0894de1
Merge: 8353b85 12021bd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed Nov 27 15:12:39 2019 -0800

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

... and 9 more commits
  • Loading branch information
Ankati Mallik authored and Ankati Mallik committed Mar 7, 2020
1 parent 4619fcd commit a23eb5a
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
import io.debezium.connector.oracle.xstream.LcrPosition;
import io.debezium.connector.oracle.xstream.OracleVersion;
import io.debezium.document.Document;
import io.debezium.function.Predicates;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.ColumnId;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.KafkaDatabaseHistory;
Expand All @@ -27,6 +30,8 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;

import java.util.function.Predicate;

/**
* Connector configuration for Oracle.
* Includes both XStream and LogMiner configs
Expand Down Expand Up @@ -183,19 +188,41 @@ public class OracleConnectorConfig extends HistorizedRelationalDatabaseConnector
private final boolean tablenameCaseInsensitive;
private final OracleVersion oracleVersion;
private final String schemaName;
private final Tables.ColumnNameFilter columnFilter;

public OracleConnectorConfig(Configuration config) {
super(config, config.getString(SERVER_NAME), new SystemTablesPredicate());

this.databaseName = config.getString(DATABASE_NAME);
this.pdbName = config.getString(PDB_NAME);
this.databaseName = setUpperCase(config.getString(DATABASE_NAME));
this.pdbName = setUpperCase(config.getString(PDB_NAME));
this.xoutServerName = config.getString(XSTREAM_SERVER_NAME);
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE));
this.tablenameCaseInsensitive = config.getBoolean(TABLENAME_CASE_INSENSITIVE);
this.oracleVersion = OracleVersion.parse(config.getString(ORACLE_VERSION));
this.schemaName = config.getString(SCHEMA_NAME);
this.schemaName = setUpperCase(config.getString(SCHEMA_NAME));
String blacklistedColumns = setUpperCase(config.getString(RelationalDatabaseConnectorConfig.COLUMN_BLACKLIST));
this.columnFilter = getColumnNameFilter(blacklistedColumns);
}

private String setUpperCase(String property) {
property = property == null ? null : property.toUpperCase();
return property;
}

protected Tables.ColumnNameFilter getColumnNameFilter(String excludedColumnPatterns) {
return new Tables.ColumnNameFilter() {

Predicate<ColumnId> delegate = Predicates.excludes(excludedColumnPatterns, ColumnId::toString);

@Override
public boolean matches(String catalogName, String schemaName, String tableName, String columnName) {
// ignore database name and schema name, we are supposed to capture from one database and one schema
return delegate.test(new ColumnId(new TableId(null, null, tableName), columnName));
}
};
}


public static ConfigDef configDef() {
ConfigDef config = new ConfigDef();

Expand All @@ -206,6 +233,7 @@ public static ConfigDef configDef() {
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY);
Field.group(config, "Events", RelationalDatabaseConnectorConfig.TABLE_WHITELIST,
RelationalDatabaseConnectorConfig.TABLE_BLACKLIST,
RelationalDatabaseConnectorConfig.COLUMN_BLACKLIST,
RelationalDatabaseConnectorConfig.TABLE_IGNORE_BUILTIN,
Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_TOPICS_PREFIX
);
Expand Down Expand Up @@ -245,6 +273,10 @@ public String getSchemaName(){
return schemaName;
}

public Tables.ColumnNameFilter getColumnFilter() {
return columnFilter;
}

@Override
protected HistoryRecordComparator getHistoryRecordComparator() {
return new HistoryRecordComparator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class OracleDatabaseSchema extends HistorizedRelationalDatabaseSchema {
private static final Logger LOGGER = LoggerFactory.getLogger(OracleDatabaseSchema.class);

public OracleDatabaseSchema(OracleConnectorConfig connectorConfig, SchemaNameAdjuster schemaNameAdjuster, TopicSelector<TableId> topicSelector, OracleConnection connection) {
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), null,
super(connectorConfig, topicSelector, connectorConfig.getTableFilters().dataCollectionFilter(), connectorConfig.getColumnFilter(),
new TableSchemaBuilder(
new OracleValueConverters(connection),
schemaNameAdjuster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import java.sql.Savepoint;
import java.sql.Statement;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -209,15 +212,17 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext, Snapsh
snapshotContext.tables,
snapshotContext.catalogName,
schema,
null,
connectorConfig.getColumnFilter(),
false,
snapshotContext.capturedTables
);
}
}

//@Override // todo uncomment Override when we submit a PR to the core to implement conditional snapshot with "as of scn"
protected String enhanceOverriddenSelect(SnapshotContext snapshotContext, String overriddenSelect){
@Override
protected String enhanceOverriddenSelect(SnapshotContext snapshotContext, String overriddenSelect, TableId tableId){
String columnString = buildSelectColumns(connectorConfig.getConfig().getString(connectorConfig.COLUMN_BLACKLIST), snapshotContext.tables.forTable(tableId));
overriddenSelect = overriddenSelect.replaceFirst("\\*", columnString);
long snapshotOffset = (Long) snapshotContext.offset.getOffset().get("scn");
String token = connectorConfig.getTokenToReplaceInSnapshotPredicate();
if (token != null){
Expand Down Expand Up @@ -245,8 +250,39 @@ protected SchemaChangeEvent getCreateTableEvent(SnapshotContext snapshotContext,

@Override
protected String getSnapshotSelect(SnapshotContext snapshotContext, TableId tableId) {
String columnString = buildSelectColumns(connectorConfig.getConfig().getString(connectorConfig.COLUMN_BLACKLIST), snapshotContext.tables.forTable(tableId));

long snapshotOffset = (Long) snapshotContext.offset.getOffset().get("scn");
return "SELECT * FROM " + tableId.schema() + "." + tableId.table() + " AS OF SCN " + snapshotOffset;
return "SELECT " + columnString + " FROM " + tableId.schema() + "." + tableId.table() + " AS OF SCN " + snapshotOffset;
}

/**
* This is to build "whitelisted" column list
* @param blackListColumnStr comma separated columns blacklist
* @param table the table
* @return column list for select
*/
public static String buildSelectColumns(String blackListColumnStr, Table table) {
String columnsToSelect = "*";
if (blackListColumnStr != null && blackListColumnStr.trim().length() > 0
&& blackListColumnStr.toUpperCase().contains(table.id().table())) {
String allTableColumns = table.retrieveColumnNames().stream()
.map(columnName -> {
StringBuilder sb = new StringBuilder();
if (!columnName.contains(table.id().table())){
sb.append(table.id().table()).append(".").append(columnName);
} else {
sb.append(columnName);
}
return sb.toString();
}).collect(Collectors.joining(","));
String catalog = table.id().catalog();
List<String> blackList = new ArrayList<>(Arrays.asList(blackListColumnStr.trim().toUpperCase().replaceAll(catalog + ".", "").split(",")));
List<String> allColumns = new ArrayList<>(Arrays.asList(allTableColumns.toUpperCase().split(",")));
allColumns.removeAll(blackList);
columnsToSelect = String.join(",", allColumns);
}
return columnsToSelect;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,17 @@ private void setNewValues(List<Expression> expressions, List<net.sf.jsqlparser.s
String value = ParserUtils.stripeQuotes(expressions.get(i).toString());
Object stripedValue = ParserUtils.removeApostrophes(value);
Column column = table.columnWithName(columnName);
if (column == null) {
LOGGER.trace("blacklisted column: {}", columnName);
continue;
}
Object valueObject = ParserUtils.convertValueToSchemaType(column, stripedValue, converter);

ColumnValueHolder columnValueHolder = newColumnValues.get(columnName);
columnValueHolder.setProcessed(true);
columnValueHolder.getColumnValue().setColumnData(valueObject);
if (columnValueHolder != null) {
columnValueHolder.setProcessed(true);
columnValueHolder.getColumnValue().setColumnData(valueObject);
}
}
}

Expand All @@ -241,6 +247,10 @@ public void visit(EqualsTo expr) {
columnName = ParserUtils.stripeQuotes(columnName);

Column column = table.columnWithName(columnName);
if (column == null) {
LOGGER.trace("blacklisted column in where clause: {}", columnName);
return;
}
value = ParserUtils.removeApostrophes(value);

ColumnValueHolder columnValueHolder = oldColumnValues.get(columnName.toUpperCase());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.jsqlparser.SimpleDmlParser;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerRowLcr;
import io.debezium.data.Envelope;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.Table;
Expand Down Expand Up @@ -154,6 +155,16 @@ public int processResult(ResultSet resultSet) {
LOGGER.error("Following statement was not parsed: {}, details: {}", redo_sql, logMessage);
continue;
}

// this will happen for instance on a blacklisted column change, we will omit this update
if (rowLcr.getCommandType().equals(Envelope.Operation.UPDATE)
&& rowLcr.getOldValues().size() == rowLcr.getNewValues().size()
&& rowLcr.getNewValues().containsAll(rowLcr.getOldValues())) {
LOGGER.trace("Following DML was skipped, " +
"most likely because of ignored blacklisted column change: {}, details: {}", redo_sql, logMessage);
continue;
}

rowLcr.setObjectOwner(segOwner);
rowLcr.setSourceTime(changeTime);
rowLcr.setTransactionId(txId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
package io.debezium.connector.oracle.logminer.valueholder;

import java.util.Objects;

/**
* This class mimics the API of oracle.streams.DefaultColumnValue implementation
*
Expand Down Expand Up @@ -48,4 +50,19 @@ public String toString() {
", columnType=" + columnType +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LogMinerColumnValueImpl that = (LogMinerColumnValueImpl) o;
return columnType == that.columnType &&
Objects.equals(columnName, that.columnName) &&
Objects.equals(columnData, that.columnData);
}

@Override
public int hashCode() {
return Objects.hash(columnName, columnData, columnType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,25 @@ record = sqlDmlParser.getDmlChange();
record = sqlDmlParser.getDmlChange();
}

@Test
public void shouldParseUpdateNoChangesTable() throws Exception {

String createStatement = IoUtil.read(IoUtil.getResourceAsStream("ddl/create_table.sql", null, getClass(), null, null));
ddlParser.parse(createStatement, tables);

String dml = "update \"" + FULL_TABLE_NAME + "\" set \"col1\" = '6', col2 = 'text', col3 = 'text', col4 = NULL " +
"where ID = 5 and COL1 = 6 and \"COL2\" = 'text' " +
"and COL3 = 'text' and COL4 IS NULL and \"COL5\" IS NULL and COL6 IS NULL and COL7 IS NULL and COL9 IS NULL and COL10 IS NULL and COL12 IS NULL " +
"and COL8 = TO_TIMESTAMP('2019-05-14 02:28:32') and col11 = " + SPATIAL_DATA + ";";

sqlDmlParser.parse(dml, tables, "");
LogMinerRowLcr record = sqlDmlParser.getDmlChange();
boolean pass = record.getCommandType().equals(Envelope.Operation.UPDATE)
&& record.getOldValues().size() == record.getNewValues().size()
&& record.getNewValues().containsAll(record.getOldValues());
assertThat(pass);
}

private void verifyUpdate(LogMinerRowLcr record, boolean checkGeometry, boolean checkOldValues, int oldValuesNumber) {
// validate
assertThat(record.getCommandType()).isEqualTo(Envelope.Operation.UPDATE);
Expand Down
Loading

0 comments on commit a23eb5a

Please sign in to comment.