Skip to content

Commit

Permalink
Merge pull request debezium#56 in N4FRA/debezium from DSCON-372_clean…
Browse files Browse the repository at this point in the history
…_Oracle_connector_code to master

Squashed commit of the following:

commit 689177d9e67bc2ac158b9056c27411d01dbb30cb
Author: AndreyIg <gnyiny@gmail.com>
Date:   Mon Jun 1 15:54:23 2020 -0700

    DSCON-372, clean up code of Oracle connector

commit cc57c1fc2180fab60793d11ade18715993fb4b01
Merge: d5797919 fc66328
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 28 14:09:13 2020 -0700

    Merge branch 'master' into DSCON-372_clean_Oracle_connector_code

commit fc66328
Merge: 539c214 88315df
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 28 14:09:07 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit d579791933667c2a6a2f4be5d14de6c0fe3a3161
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 28 13:47:16 2020 -0700

    DSCON-372, clean up code of Oracle connector

commit 5f08f2c3bf56a0b8e00b8479c652b50960610322
Merge: 88132371 539c214
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 28 13:43:42 2020 -0700

    Merge branch 'master' into DSCON-372_clean_Oracle_connector_code

commit 539c214
Merge: a54f7a3 57a0b1c
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 28 13:43:27 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 88132371b02e0a4f8a285b3521163c60d2bcb5c0
Merge: e0ce77ec a54f7a3
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 28 12:16:35 2020 -0700

    merge conflicts

commit e0ce77eca443cd6315cb7f5402b0007a04b7aba4
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 28 12:10:43 2020 -0700

    DSCON-372, clean up code of Oracle connector

commit a54f7a3
Merge: 796bf4b 7c49efd
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri May 22 14:31:05 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit dd4b64677940330a48f32dc5e2c6a76dcbb18cc2
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri May 22 14:26:47 2020 -0700

    DSCON-364, JSQLParserException with new tables

commit aaa85742ecd60ace8e6baaa85aafb84fb3b7ff37
Merge: df3d36ac 796bf4b
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri May 15 12:19:28 2020 -0700

    Merge branch 'master' into ARGO-209312_DBC_Oracle_RAC

commit 796bf4b
Merge: 8b36431 2c1fc9e
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri May 15 12:19:16 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit df3d36ac7d0d8a7927d90b47fd6c92555644f285
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri May 15 12:16:18 2020 -0700

    ARGO-209312, db connector  on Oracle RAC

commit aac6d3a4110a4ec08b39f68a39ec1e37eb66e7eb
Merge: 44bc0414 8b36431
Author: AndreyIg <gnyiny@gmail.com>
Date:   Fri May 15 12:15:50 2020 -0700

    Merge branch 'master' into ARGO-209312_DBC_Oracle_RAC

commit 8b36431
Merge: 66c207f 70ad303
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed May 13 16:37:01 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit 44bc04142f38740940ef247540e1b370d1754e11
Author: AndreyIg <gnyiny@gmail.com>
Date:   Wed May 13 16:34:20 2020 -0700

    ARGO-209312, db connector  on Oracle RAC

commit 34e10b92b354dbc3804f4c9ed90eb27a9d8f5fa4
Merge: c953356a 66c207f
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue May 12 11:18:25 2020 -0700

    Merge branch 'master' into DSCON-301_DBC_Crashed_after_hours_downtime

commit 66c207f
Merge: f1810b9 d5de8d8
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue May 12 11:17:49 2020 -0700

    Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium

commit c953356a2b5aed88846be8be41f97249f0f96570
Author: AndreyIg <gnyiny@gmail.com>
Date:   Tue May 12 09:51:42 2020 -0700

    DSCON-301, DBC Crashed after 2 hours of downtime

commit 3ec1c0bc00d9f7e9737292660cf7f4989b26888a
Merge: bccb2c81 f1810b9
Author: AndreyIg <gnyiny@gmail.com>
Date:   Thu May 7 15:00:23 2020 -0700

    Merge branch 'master' into DSCON-268_manage_millisecondToSleepBetweenMiningQuery

... and 55 more commits
  • Loading branch information
Ignatenko Andrey committed Jun 1, 2020
1 parent 88315df commit c2d8ec5
Show file tree
Hide file tree
Showing 31 changed files with 578 additions and 432 deletions.
Expand Up @@ -12,7 +12,7 @@
import io.debezium.util.Clock;

/**
* Base class to emit change data based on a single LogMinerRowLcr or RowLCR event.
* Base class to emit change data based on a single entry event.
*/
public abstract class BaseChangeRecordEmitter<T> extends RelationalChangeRecordEmitter {

Expand Down
Expand Up @@ -10,7 +10,7 @@
import io.debezium.antlr.DataTypeResolver;
import io.debezium.connector.oracle.antlr.listener.OracleDmlParserListener;
import io.debezium.connector.oracle.logminer.OracleChangeRecordValueConverter;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerRowLcr;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.ddl.parser.oracle.generated.PlSqlLexer;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.relational.SystemVariables;
Expand All @@ -24,7 +24,7 @@
*/
public class OracleDmlParser extends AntlrDdlParser<PlSqlLexer, PlSqlParser> {

private LogMinerRowLcr rowLCR;
private LogMinerDmlEntry dmlEntry;
protected String catalogName;
protected String schemaName;
private OracleChangeRecordValueConverter converter;
Expand All @@ -36,12 +36,12 @@ public OracleDmlParser(boolean throwErrorsFromTreeWalk, final String catalogName
this.converter = converter;
}

public LogMinerRowLcr getDmlChange(){
return rowLCR;
public LogMinerDmlEntry getDmlEntry(){
return dmlEntry;
}

public void setRowLCR(LogMinerRowLcr rowLCR) {
this.rowLCR = rowLCR;
public void setDmlEntry(LogMinerDmlEntry dml) {
this.dmlEntry = dml;
}

@Override
Expand Down
Expand Up @@ -8,8 +8,8 @@

import io.debezium.connector.oracle.antlr.OracleDmlParser;
import io.debezium.connector.oracle.logminer.OracleChangeRecordValueConverter;
import io.debezium.connector.oracle.logminer.valueholder.ColumnValueHolder;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueImpl;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.ddl.parser.oracle.generated.PlSqlParserBaseListener;
import io.debezium.relational.Column;
Expand All @@ -33,8 +33,8 @@ abstract class BaseDmlParserListener<T> extends PlSqlParserBaseListener {

protected OracleDmlParser parser;

Map<T, ColumnValueHolder> newColumnValues = new LinkedHashMap<>();
Map<T, ColumnValueHolder> oldColumnValues = new LinkedHashMap<>();
Map<T, LogMinerColumnValueWrapper> newColumnValues = new LinkedHashMap<>();
Map<T, LogMinerColumnValueWrapper> oldColumnValues = new LinkedHashMap<>();

BaseDmlParserListener(String catalogName, String schemaName, OracleDmlParser parser) {
this.parser = parser;
Expand All @@ -43,7 +43,7 @@ abstract class BaseDmlParserListener<T> extends PlSqlParserBaseListener {
this.converter = parser.getConverters();
}

// Defines the key of the Map of ColumnValueHolder. It could be String or Integer
// Defines the key of the Map of LogMinerColumnValueWrapper. It could be String or Integer
abstract protected T getKey(Column column, int index);

/**
Expand All @@ -61,8 +61,8 @@ void init(PlSqlParser.Dml_table_expression_clauseContext ctx) {
int type = column.jdbcType();
T key = getKey(column, i);
String name = ParserUtils.stripeQuotes(column.name().toUpperCase());
newColumnValues.put(key, new ColumnValueHolder(new LogMinerColumnValueImpl(name, type)));
oldColumnValues.put(key, new ColumnValueHolder(new LogMinerColumnValueImpl(name, type)));
newColumnValues.put(key, new LogMinerColumnValueWrapper(new LogMinerColumnValueImpl(name, type)));
oldColumnValues.put(key, new LogMinerColumnValueWrapper(new LogMinerColumnValueImpl(name, type)));
}
}
}
Expand Up @@ -5,7 +5,7 @@
*/
package io.debezium.connector.oracle.antlr.listener;

import io.debezium.connector.oracle.logminer.valueholder.ColumnValueHolder;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
import io.debezium.connector.oracle.antlr.OracleDmlParser;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.relational.Column;
Expand Down Expand Up @@ -57,11 +57,11 @@ void parseRecursively(PlSqlParser.Logical_expressionContext logicalExpression)
Column column = table.columnWithName(columnName);
Object stripedValue = ParserUtils.removeApostrophes(value);

ColumnValueHolder columnValueHolder = oldColumnValues.get(columnName);
if (columnValueHolder != null) { //todo this used to happen for ROWID pseudo column. Test if this is not a problem after NO_ROWID_IN_STMT option
LogMinerColumnValueWrapper logMinerColumnValueWrapper = oldColumnValues.get(columnName);
if (logMinerColumnValueWrapper != null) { //todo this used to happen for ROWID pseudo column. Test if this is not a problem after NO_ROWID_IN_STMT option
Object valueObject = ParserUtils.convertValueToSchemaType(column, stripedValue, converter);
columnValueHolder.setProcessed(true);
columnValueHolder.getColumnValue().setColumnData(valueObject);
logMinerColumnValueWrapper.setProcessed(true);
logMinerColumnValueWrapper.getColumnValue().setColumnData(valueObject);
}

}
Expand Down
Expand Up @@ -6,9 +6,9 @@
package io.debezium.connector.oracle.antlr.listener;

import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValue;
import io.debezium.connector.oracle.logminer.valueholder.ColumnValueHolder;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerRowLcrImpl;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerRowLcr;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl;
import io.debezium.connector.oracle.antlr.OracleDmlParser;
import io.debezium.data.Envelope;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
Expand Down Expand Up @@ -58,9 +58,9 @@ public void enterDelete_statement(PlSqlParser.Delete_statementContext ctx) {
@Override
public void exitDelete_statement(PlSqlParser.Delete_statementContext ctx) {
List<LogMinerColumnValue> actualOldValues = oldColumnValues.values()
.stream().map(ColumnValueHolder::getColumnValue).collect(Collectors.toList());
LogMinerRowLcr newRecord = new LogMinerRowLcrImpl(Envelope.Operation.DELETE, Collections.emptyList(), actualOldValues);
parser.setRowLCR(newRecord);
.stream().map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList());
LogMinerDmlEntry newRecord = new LogMinerDmlEntryImpl(Envelope.Operation.DELETE, Collections.emptyList(), actualOldValues);
parser.setDmlEntry(newRecord);
super.exitDelete_statement(ctx);
}
}
Expand Up @@ -6,9 +6,9 @@
package io.debezium.connector.oracle.antlr.listener;

import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValue;
import io.debezium.connector.oracle.logminer.valueholder.ColumnValueHolder;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerRowLcrImpl;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerRowLcr;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl;
import io.debezium.connector.oracle.antlr.OracleDmlParser;
import io.debezium.data.Envelope;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
Expand Down Expand Up @@ -62,7 +62,7 @@ public void enterValues_clause(PlSqlParser.Values_clauseContext ctx) {
List<PlSqlParser.ExpressionContext> values = ctx.expressions().expression();
for (int i = 0; i < values.size(); i++) {
PlSqlParser.ExpressionContext value = values.get(i);
ColumnValueHolder columnObject = newColumnValues.get(i);
LogMinerColumnValueWrapper columnObject = newColumnValues.get(i);

String columnName = columnObject.getColumnValue().getColumnName();
Column column = table.columnWithName(columnName);
Expand All @@ -79,9 +79,9 @@ public void enterValues_clause(PlSqlParser.Values_clauseContext ctx) {
@Override
public void exitSingle_table_insert(PlSqlParser.Single_table_insertContext ctx) {
List<LogMinerColumnValue> actualNewValues = newColumnValues.values()
.stream().map(ColumnValueHolder::getColumnValue).collect(Collectors.toList());
LogMinerRowLcr newRecord = new LogMinerRowLcrImpl(Envelope.Operation.CREATE, actualNewValues, Collections.emptyList());
parser.setRowLCR(newRecord);
.stream().map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList());
LogMinerDmlEntry newRecord = new LogMinerDmlEntryImpl(Envelope.Operation.CREATE, actualNewValues, Collections.emptyList());
parser.setDmlEntry(newRecord);
super.exitSingle_table_insert(ctx);
}
}
Expand Up @@ -6,7 +6,7 @@
package io.debezium.connector.oracle.antlr.listener;

import io.debezium.connector.oracle.logminer.OracleChangeRecordValueConverter;
import io.debezium.connector.oracle.logminer.valueholder.ColumnValueHolder;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
Expand Down Expand Up @@ -66,11 +66,11 @@ public static String stripeAlias(String text, String alias){
* @param oldColumnValues values in WHERE clause
* @param table Debezium Table object
*/
public static void cloneOldToNewColumnValues(Map<String, ColumnValueHolder> newColumnValues, Map<String, ColumnValueHolder> oldColumnValues, Table table) {
public static void cloneOldToNewColumnValues(Map<String, LogMinerColumnValueWrapper> newColumnValues, Map<String, LogMinerColumnValueWrapper> oldColumnValues, Table table) {
for (Column column : table.columns()) {
final ColumnValueHolder newColumnValue = newColumnValues.get(column.name());
final LogMinerColumnValueWrapper newColumnValue = newColumnValues.get(column.name());
if (!newColumnValue.isProcessed()) {
final ColumnValueHolder oldColumnValue = oldColumnValues.get(column.name());
final LogMinerColumnValueWrapper oldColumnValue = oldColumnValues.get(column.name());
newColumnValue.setProcessed(true);
newColumnValue.getColumnValue().setColumnData(oldColumnValue.getColumnValue().getColumnData());
}
Expand Down
Expand Up @@ -6,10 +6,10 @@
package io.debezium.connector.oracle.antlr.listener;

import io.debezium.connector.oracle.antlr.OracleDmlParser;
import io.debezium.connector.oracle.logminer.valueholder.ColumnValueHolder;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValueWrapper;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerColumnValue;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerRowLcr;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerRowLcrImpl;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntryImpl;
import io.debezium.data.Envelope;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.relational.Column;
Expand Down Expand Up @@ -83,21 +83,21 @@ public void enterColumn_based_update_set_clause(PlSqlParser.Column_based_update_
Column column = table.columnWithName(stripedName);
Object valueObject = ParserUtils.convertValueToSchemaType(column, stripedValue, converter);

ColumnValueHolder columnValueHolder = newColumnValues.get(stripedName);
columnValueHolder.setProcessed(true);
columnValueHolder.getColumnValue().setColumnData(valueObject);
LogMinerColumnValueWrapper logMinerColumnValueWrapper = newColumnValues.get(stripedName);
logMinerColumnValueWrapper.setProcessed(true);
logMinerColumnValueWrapper.getColumnValue().setColumnData(valueObject);

super.enterColumn_based_update_set_clause(ctx);
}

@Override
public void exitUpdate_statement(PlSqlParser.Update_statementContext ctx) {
List<LogMinerColumnValue> actualNewValues = newColumnValues.values().stream()
.filter(ColumnValueHolder::isProcessed).map(ColumnValueHolder::getColumnValue).collect(Collectors.toList());
.filter(LogMinerColumnValueWrapper::isProcessed).map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList());
List<LogMinerColumnValue> actualOldValues = oldColumnValues.values().stream()
.filter(ColumnValueHolder::isProcessed).map(ColumnValueHolder::getColumnValue).collect(Collectors.toList());
LogMinerRowLcr newRecord = new LogMinerRowLcrImpl(Envelope.Operation.UPDATE, actualNewValues, actualOldValues);
parser.setRowLCR(newRecord);
.filter(LogMinerColumnValueWrapper::isProcessed).map(LogMinerColumnValueWrapper::getColumnValue).collect(Collectors.toList());
LogMinerDmlEntry newRecord = new LogMinerDmlEntryImpl(Envelope.Operation.UPDATE, actualNewValues, actualOldValues);
parser.setDmlEntry(newRecord);
super.exitUpdate_statement(ctx);
}

Expand Down

0 comments on commit c2d8ec5

Please sign in to comment.