Skip to content
This repository has been archived by the owner on Nov 18, 2021. It is now read-only.

DBZ-3154 Update test suite to run in CDB & non-CDB modes. #278

Merged
merged 3 commits into from Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -7,7 +7,6 @@

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;

Expand All @@ -26,6 +25,7 @@
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.oracle.logminer.HistoryRecorder;
import io.debezium.connector.oracle.logminer.NeverHistoryRecorder;
import io.debezium.connector.oracle.logminer.SqlUtils;
import io.debezium.connector.oracle.xstream.LcrPosition;
import io.debezium.connector.oracle.xstream.OracleVersion;
import io.debezium.document.Document;
Expand Down Expand Up @@ -328,6 +328,10 @@ public static ConfigDef configDef() {
return CONFIG_DEFINITION.configDef();
}

public static final String[] EXCLUDED_SCHEMAS = { "appqossys", "audsys", "ctxsys", "dvsys", "dbsfwuser", "dbsnmp",
"gsmadmin_internal", "lbacsys", "mdsys", "ojvmsys", "olapsys", "orddata", "ordsys", "outln", "sys", "system",
"wmsys", "xdb" };

private final String databaseName;
private final String pdbName;
private final String xoutServerName;
Expand Down Expand Up @@ -358,7 +362,7 @@ public static ConfigDef configDef() {
private final LogMiningDmlParser dmlParser;

public OracleConnectorConfig(Configuration config) {
super(OracleConnector.class, config, config.getString(SERVER_NAME), new SystemTablesPredicate(), x -> x.schema() + "." + x.table(), true);
super(OracleConnector.class, config, config.getString(SERVER_NAME), new SystemTablesPredicate(config), x -> x.schema() + "." + x.table(), true);

this.databaseName = toUpperCase(config.getString(DATABASE_NAME));
this.pdbName = toUpperCase(config.getString(PDB_NAME));
Expand Down Expand Up @@ -709,9 +713,24 @@ public static LogMiningDmlParser parse(String value, String defaultValue) {
*/
private static class SystemTablesPredicate implements TableFilter {

private final Configuration config;

SystemTablesPredicate(Configuration config) {
this.config = config;
}

@Override
public boolean isIncluded(TableId t) {
return !OracleConnectorConfig.getExcludedSchemaNames().contains(t.schema().toLowerCase());
return !isExcludedSchema(t) && !isFlushTable(t);
}

private boolean isExcludedSchema(TableId id) {
return Arrays.stream(EXCLUDED_SCHEMAS).anyMatch(id.schema().toLowerCase()::equals);
}

private boolean isFlushTable(TableId id) {
final String schema = config.getString(USER);
return id.table().equalsIgnoreCase(SqlUtils.LOGMNR_FLUSH_TABLE) && id.schema().equalsIgnoreCase(schema);
}
}

Expand Down Expand Up @@ -882,12 +901,6 @@ public String getConnectorName() {
return Module.name();
}

public static List<String> getExcludedSchemaNames() {
return Arrays.asList("appqossys", "audsys", "ctxsys", "dvsys", "dbsfwuser", "dbsnmp", "gsmadmin_internal",
"lbacsys", "mdsys", "ojvmsys", "olapsys", "orddata", "ordsys", "outln", "sys", "system",
"wmsys", "xdb");
}

public static int validateOutServerName(Configuration config, Field field, ValidationOutput problems) {
if (ConnectorAdapter.XSTREAM.equals(ConnectorAdapter.parse(config.getString(CONNECTOR_ADAPTER)))) {
return Field.isRequired(config, field, problems);
Expand Down
Expand Up @@ -9,6 +9,7 @@
import java.sql.SQLRecoverableException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -76,7 +77,7 @@ public class SqlUtils {
// log writer flush statements
// todo: this table shifted from LOG_MINING_AUDIT to LOG_MINING_FLUSH.
// since this is only used during streaming to flush redo log buffer, accept this change?
static final String LOGMNR_FLUSH_TABLE = "LOG_MINING_FLUSH";
public static final String LOGMNR_FLUSH_TABLE = "LOG_MINING_FLUSH";
static final String FLUSH_TABLE_NOT_EMPTY = "SELECT '1' AS ONE FROM " + LOGMNR_FLUSH_TABLE;
static final String CREATE_FLUSH_TABLE = "CREATE TABLE " + LOGMNR_FLUSH_TABLE + "(LAST_SCN NUMBER(19,0))";
static final String INSERT_FLUSH_TABLE = "INSERT INTO " + LOGMNR_FLUSH_TABLE + " VALUES(0)";
Expand Down Expand Up @@ -230,10 +231,9 @@ static String logMinerContentsQuery(OracleConnectorConfig connectorConfig, Strin

// There are some common schemas that we automatically ignore when building the filter predicates
// and we pull that same list of schemas in here and apply those exclusions in the generated SQL.
List<String> excludedSchemas = OracleConnectorConfig.getExcludedSchemaNames();
if (!excludedSchemas.isEmpty()) {
if (OracleConnectorConfig.EXCLUDED_SCHEMAS.length > 0) {
query.append("AND SEG_OWNER NOT IN (");
for (Iterator<String> i = excludedSchemas.iterator(); i.hasNext();) {
for (Iterator<String> i = Arrays.stream(OracleConnectorConfig.EXCLUDED_SCHEMAS).iterator(); i.hasNext();) {
String excludedSchema = i.next();
query.append("'").append(excludedSchema.toUpperCase()).append("'");
if (i.hasNext()) {
Expand Down
Expand Up @@ -57,6 +57,9 @@ public static void beforeClass() throws SQLException {

@AfterClass
public static void closeConnection() throws SQLException {
if (adminConnection != null) {
adminConnection.close();
}
if (connection != null) {
connection.close();
}
Expand Down
Expand Up @@ -738,12 +738,13 @@ public void shouldConsumeEventsWithMaskedAndTruncatedColumnsWithoutDatabaseName(
public void shouldConsumeEventsWithMaskedAndTruncatedColumns(boolean useDatabaseName) throws Exception {
final Configuration config;
if (useDatabaseName) {
final String dbName = TestHelper.getDatabaseName();
config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
.with("column.mask.with.12.chars", "ORCLPDB1.DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME")
.with("column.mask.with.12.chars", dbName + ".DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME")
.with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K",
"ORCLPDB1.DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME2,ORCLPDB1.DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME3")
.with("column.truncate.to.4.chars", "ORCLPDB1.DEBEZIUM.TRUNCATED_COLUMN_TABLE.NAME")
dbName + ".DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME2," + dbName + ".DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME3")
.with("column.truncate.to.4.chars", dbName + ".DEBEZIUM.TRUNCATED_COLUMN_TABLE.NAME")
.build();
}
else {
Expand Down
Expand Up @@ -100,7 +100,7 @@ public void snapshotSchemaChanges() throws Exception {
Assertions.assertThat(schemaRecords).hasSize(3);
schemaRecords.forEach(record -> {
Assertions.assertThat(record.topic()).isEqualTo("server1");
Assertions.assertThat(((Struct) record.key()).getString("databaseName")).isEqualTo("ORCLPDB1");
Assertions.assertThat(((Struct) record.key()).getString("databaseName")).isEqualTo(TestHelper.getDatabaseName());
Assertions.assertThat(record.sourceOffset().get("snapshot")).isEqualTo(true);
});
Assertions.assertThat(((Struct) schemaRecords.get(0).value()).getStruct("source").getString("snapshot")).isEqualTo("true");
Expand Down
Expand Up @@ -135,14 +135,18 @@ record = records.get(2);

// TX End
record = records.get(3);
assertEndTransaction(record, expectedTxId, 2, Collect.hashMapOf("ORCLPDB1.DEBEZIUM.CUSTOMER", 1, "ORCLPDB1.DEBEZIUM.ORDERS", 1));

final String dbName = TestHelper.getDatabaseName();
assertEndTransaction(record, expectedTxId, 2, Collect.hashMapOf(dbName + ".DEBEZIUM.CUSTOMER", 1, dbName + ".DEBEZIUM.ORDERS", 1));
}

@Test
@FixFor("DBZ-3090")
public void transactionMetadataMultipleTransactions() throws Exception {
try (OracleConnection secondaryConn = TestHelper.testConnection()) {

final String dbName = TestHelper.getDatabaseName();

Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER,DEBEZIUM\\.ORDERS")
.with(OracleConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY)
Expand Down Expand Up @@ -184,7 +188,7 @@ record = records.get(1);

// TX End
record = records.get(2);
assertEndTransaction(record, expectedTxId, 1, Collect.hashMapOf("ORCLPDB1.DEBEZIUM.ORDERS", 1));
assertEndTransaction(record, expectedTxId, 1, Collect.hashMapOf(dbName + ".DEBEZIUM.ORDERS", 1));

// TX Begin
record = records.get(3);
Expand Down Expand Up @@ -212,7 +216,7 @@ record = records.get(5);

// TX End
record = records.get(6);
assertEndTransaction(record, expectedTxId, 2, Collect.hashMapOf("ORCLPDB1.DEBEZIUM.CUSTOMER", 1, "ORCLPDB1.DEBEZIUM.ORDERS", 1));
assertEndTransaction(record, expectedTxId, 2, Collect.hashMapOf(dbName + ".DEBEZIUM.CUSTOMER", 1, dbName + ".DEBEZIUM.ORDERS", 1));
}
}
}
Expand Up @@ -12,8 +12,8 @@
import java.sql.SQLRecoverableException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -321,11 +321,10 @@ public void shouldDetectConnectionProblems() {
private String resolveLogMineryContentQueryFromTemplate(String schemaReplacement, String tableReplacement) {
String query = LOG_MINER_CONTENT_QUERY_TEMPLATE;

List<String> excludedSchemas = OracleConnectorConfig.getExcludedSchemaNames();
if (!excludedSchemas.isEmpty()) {
if (OracleConnectorConfig.EXCLUDED_SCHEMAS.length > 0) {
StringBuilder systemPredicate = new StringBuilder();
systemPredicate.append("AND SEG_OWNER NOT IN (");
for (Iterator<String> i = excludedSchemas.iterator(); i.hasNext();) {
for (Iterator<String> i = Arrays.stream(OracleConnectorConfig.EXCLUDED_SCHEMAS).iterator(); i.hasNext();) {
Naros marked this conversation as resolved.
Show resolved Hide resolved
String excludedSchema = i.next();
systemPredicate.append("'").append(excludedSchema.toUpperCase()).append("'");
if (i.hasNext()) {
Expand Down