Skip to content

Commit

Permalink
Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyIg committed Jun 1, 2020
2 parents fc66328 + c2d8ec5 commit b14ea98
Show file tree
Hide file tree
Showing 31 changed files with 578 additions and 432 deletions.
Expand Up @@ -12,7 +12,7 @@
import io.debezium.util.Clock;

/**
* Base class to emit change data based on a single LogMinerRowLcr or RowLCR event.
* Base class to emit change data based on a single entry event.
*/
public abstract class BaseChangeRecordEmitter<T> extends RelationalChangeRecordEmitter {

Expand Down
Expand Up @@ -10,7 +10,7 @@
import io.debezium.antlr.DataTypeResolver;
import io.debezium.connector.oracle.antlr.listener.OracleDmlParserListener;
import io.debezium.connector.oracle.logminer.OracleChangeRecordValueConverter;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerRowLcr;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.ddl.parser.oracle.generated.PlSqlLexer;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.relational.SystemVariables;
Expand All @@ -24,7 +24,7 @@
*/
public class OracleDmlParser extends AntlrDdlParser<PlSqlLexer, PlSqlParser> {

private LogMinerRowLcr rowLCR;
private LogMinerDmlEntry dmlEntry;
protected String catalogName;
protected String schemaName;
private OracleChangeRecordValueConverter converter;
Expand All @@ -36,12 +36,12 @@ public OracleDmlParser(boolean throwErrorsFromTreeWalk, final String catalogName
this.converter = converter;
}

public LogMinerRowLcr getDmlChange(){
return rowLCR;
public LogMinerDmlEntry getDmlEntry(){
return dmlEntry;
}

public void setRowLCR(LogMinerRowLcr rowLCR) {
this.rowLCR = rowLCR;
public void setDmlEntry(LogMinerDmlEntry dml) {
this.dmlEntry = dml;
}

@Override
Expand Down
Expand Up @@ -8,8 +8,8 @@

import io.debezium.connector.oracle.antlr.OracleDmlParser;
import io.debezium.connector.oracle.logminer.OracleChangeRecordValueConverter;
import io.debezium.connector.oracle.logminer.valueholder.ColumnValueHolder;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueImpl;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.ddl.parser.oracle.generated.PlSqlParserBaseListener;
import io.debezium.relational.Column;
Expand All @@ -33,8 +33,8 @@ abstract class BaseDmlParserListener<T> extends PlSqlParserBaseListener {

protected OracleDmlParser parser;

Map<T, ColumnValueHolder> newColumnValues = new LinkedHashMap<>();
Map<T, ColumnValueHolder> oldColumnValues = new LinkedHashMap<>();
Map<T, LogMinerColumnValueWrapper> newColumnValues = new LinkedHashMap<>();
Map<T, LogMinerColumnValueWrapper> oldColumnValues = new LinkedHashMap<>();

BaseDmlParserListener(String catalogName, String schemaName, OracleDmlParser parser) {
this.parser = parser;
Expand All @@ -43,7 +43,7 @@ abstract class BaseDmlParserListener<T> extends PlSqlParserBaseListener {
this.converter = parser.getConverters();
}

// Defines the key of the Map of ColumnValueHolder. It could be String or Integer
// Defines the key of the Map of LogMinerColumnValueWrapper. It could be String or Integer
abstract protected T getKey(Column column, int index);

/**
Expand All @@ -61,8 +61,8 @@ void init(PlSqlParser.Dml_table_expression_clauseContext ctx) {
int type = column.jdbcType();
T key = getKey(column, i);
String name = ParserUtils.stripeQuotes(column.name().toUpperCase());
newColumnValues.put(key, new ColumnValueHolder(new LogMinerColumnValueImpl(name, type)));
oldColumnValues.put(key, new ColumnValueHolder(new LogMinerColumnValueImpl(name, type)));
newColumnValues.put(key, new LogMinerColumnValueWrapper(new LogMinerColumnValueImpl(name, type)));
oldColumnValues.put(key, new LogMinerColumnValueWrapper(new LogMinerColumnValueImpl(name, type)));
}
}
}
Expand Up @@ -5,7 +5,7 @@
*/
package io.debezium.connector.oracle.antlr.listener;

import io.debezium.connector.oracle.logminer.valueholder.ColumnValueHolder;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
import io.debezium.connector.oracle.antlr.OracleDmlParser;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.relational.Column;
Expand Down Expand Up @@ -57,11 +57,11 @@ void parseRecursively(PlSqlParser.Logical_expressionContext logicalExpression)
Column column = table.columnWithName(columnName);
Object stripedValue = ParserUtils.removeApostrophes(value);

ColumnValueHolder columnValueHolder = oldColumnValues.get(columnName);
if (columnValueHolder != null) { //todo this used to happen for ROWID pseudo column. Test if this is not a problem after NO_ROWID_IN_STMT option
LogMinerColumnValueWrapper logMinerColumnValueWrapper = oldColumnValues.get(columnName);
if (logMinerColumnValueWrapper != null) { //todo this used to happen for ROWID pseudo column. Test if this is not a problem after NO_ROWID_IN_STMT option
Object valueObject = ParserUtils.convertValueToSchemaType(column, stripedValue, converter);
columnValueHolder.setProcessed(true);
columnValueHolder.getColumnValue().setColumnData(valueObject);
logMinerColumnValueWrapper.setProcessed(true);
logMinerColumnValueWrapper.getColumnValue().setColumnData(valueObject);
}

}
Expand Down
Expand Up @@ -6,9 +6,9 @@
package io.debezium.connector.oracle.antlr.listener;

import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValue;
import io.debezium.connector.oracle.logminer.valueholder.ColumnValueHolder;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerRowLcrImpl;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerRowLcr;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl;
import io.debezium.connector.oracle.antlr.OracleDmlParser;
import io.debezium.data.Envelope;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
Expand Down Expand Up @@ -58,9 +58,9 @@ public void enterDelete_statement(PlSqlParser.Delete_statementContext ctx) {
@Override
public void exitDelete_statement(PlSqlParser.Delete_statementContext ctx) {
List<LogMinerColumnValue> actualOldValues = oldColumnValues.values()
.stream().map(ColumnValueHolder::getColumnValue).collect(Collectors.toList());
LogMinerRowLcr newRecord = new LogMinerRowLcrImpl(Envelope.Operation.DELETE, Collections.emptyList(), actualOldValues);
parser.setRowLCR(newRecord);
.stream().map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList());
LogMinerDmlEntry newRecord = new LogMinerDmlEntryImpl(Envelope.Operation.DELETE, Collections.emptyList(), actualOldValues);
parser.setDmlEntry(newRecord);
super.exitDelete_statement(ctx);
}
}
Expand Up @@ -6,9 +6,9 @@
package io.debezium.connector.oracle.antlr.listener;

import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValue;
import io.debezium.connector.oracle.logminer.valueholder.ColumnValueHolder;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerRowLcrImpl;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerRowLcr;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl;
import io.debezium.connector.oracle.antlr.OracleDmlParser;
import io.debezium.data.Envelope;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
Expand Down Expand Up @@ -62,7 +62,7 @@ public void enterValues_clause(PlSqlParser.Values_clauseContext ctx) {
List<PlSqlParser.ExpressionContext> values = ctx.expressions().expression();
for (int i = 0; i < values.size(); i++) {
PlSqlParser.ExpressionContext value = values.get(i);
ColumnValueHolder columnObject = newColumnValues.get(i);
LogMinerColumnValueWrapper columnObject = newColumnValues.get(i);

String columnName = columnObject.getColumnValue().getColumnName();
Column column = table.columnWithName(columnName);
Expand All @@ -79,9 +79,9 @@ public void enterValues_clause(PlSqlParser.Values_clauseContext ctx) {
@Override
public void exitSingle_table_insert(PlSqlParser.Single_table_insertContext ctx) {
List<LogMinerColumnValue> actualNewValues = newColumnValues.values()
.stream().map(ColumnValueHolder::getColumnValue).collect(Collectors.toList());
LogMinerRowLcr newRecord = new LogMinerRowLcrImpl(Envelope.Operation.CREATE, actualNewValues, Collections.emptyList());
parser.setRowLCR(newRecord);
.stream().map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList());
LogMinerDmlEntry newRecord = new LogMinerDmlEntryImpl(Envelope.Operation.CREATE, actualNewValues, Collections.emptyList());
parser.setDmlEntry(newRecord);
super.exitSingle_table_insert(ctx);
}
}
Expand Up @@ -6,7 +6,7 @@
package io.debezium.connector.oracle.antlr.listener;

import io.debezium.connector.oracle.logminer.OracleChangeRecordValueConverter;
import io.debezium.connector.oracle.logminer.valueholder.ColumnValueHolder;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
Expand Down Expand Up @@ -66,11 +66,11 @@ public static String stripeAlias(String text, String alias){
* @param oldColumnValues values in WHERE clause
* @param table Debezium Table object
*/
public static void cloneOldToNewColumnValues(Map<String, ColumnValueHolder> newColumnValues, Map<String, ColumnValueHolder> oldColumnValues, Table table) {
public static void cloneOldToNewColumnValues(Map<String, LogMinerColumnValueWrapper> newColumnValues, Map<String, LogMinerColumnValueWrapper> oldColumnValues, Table table) {
for (Column column : table.columns()) {
final ColumnValueHolder newColumnValue = newColumnValues.get(column.name());
final LogMinerColumnValueWrapper newColumnValue = newColumnValues.get(column.name());
if (!newColumnValue.isProcessed()) {
final ColumnValueHolder oldColumnValue = oldColumnValues.get(column.name());
final LogMinerColumnValueWrapper oldColumnValue = oldColumnValues.get(column.name());
newColumnValue.setProcessed(true);
newColumnValue.getColumnValue().setColumnData(oldColumnValue.getColumnValue().getColumnData());
}
Expand Down
Expand Up @@ -6,10 +6,10 @@
package io.debezium.connector.oracle.antlr.listener;

import io.debezium.connector.oracle.antlr.OracleDmlParser;
import io.debezium.connector.oracle.logminer.valueholder.ColumnValueHolder;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValue;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerRowLcr;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerRowLcrImpl;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl;
import io.debezium.data.Envelope;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.relational.Column;
Expand Down Expand Up @@ -83,21 +83,21 @@ public void enterColumn_based_update_set_clause(PlSqlParser.Column_based_update_
Column column = table.columnWithName(stripedName);
Object valueObject = ParserUtils.convertValueToSchemaType(column, stripedValue, converter);

ColumnValueHolder columnValueHolder = newColumnValues.get(stripedName);
columnValueHolder.setProcessed(true);
columnValueHolder.getColumnValue().setColumnData(valueObject);
LogMinerColumnValueWrapper logMinerColumnValueWrapper = newColumnValues.get(stripedName);
logMinerColumnValueWrapper.setProcessed(true);
logMinerColumnValueWrapper.getColumnValue().setColumnData(valueObject);

super.enterColumn_based_update_set_clause(ctx);
}

@Override
public void exitUpdate_statement(PlSqlParser.Update_statementContext ctx) {
List<LogMinerColumnValue> actualNewValues = newColumnValues.values().stream()
.filter(ColumnValueHolder::isProcessed).map(ColumnValueHolder::getColumnValue).collect(Collectors.toList());
.filter(LogMinerColumnValueWrapper::isProcessed).map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList());
List<LogMinerColumnValue> actualOldValues = oldColumnValues.values().stream()
.filter(ColumnValueHolder::isProcessed).map(ColumnValueHolder::getColumnValue).collect(Collectors.toList());
LogMinerRowLcr newRecord = new LogMinerRowLcrImpl(Envelope.Operation.UPDATE, actualNewValues, actualOldValues);
parser.setRowLCR(newRecord);
.filter(LogMinerColumnValueWrapper::isProcessed).map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList());
LogMinerDmlEntry newRecord = new LogMinerDmlEntryImpl(Envelope.Operation.UPDATE, actualNewValues, actualOldValues);
parser.setDmlEntry(newRecord);
super.exitUpdate_statement(ctx);
}

Expand Down

0 comments on commit b14ea98

Please sign in to comment.