Skip to content

Commit

Permalink
Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyIg committed Mar 6, 2020
2 parents 90bcc19 + 4619fcd commit c78c368
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ public String version() {

@Override
public void start(Configuration config) {
if (!state.compareAndSet(State.STOPPED, State.RUNNING)) {
/*if (!state.compareAndSet(State.STOPPED, State.RUNNING)) {
LOGGER.info("Connector has already been started");
return;
}
}*/

final SqlServerConnectorConfig connectorConfig = new SqlServerConnectorConfig(config);
final TopicSelector<TableId> topicSelector = SqlServerTopicSelector.defaultSelector(connectorConfig);
Expand Down Expand Up @@ -187,10 +187,10 @@ public void stop() {
}

private void cleanupResources() {
if (!state.compareAndSet(State.RUNNING, State.STOPPED)) {
/*if (!state.compareAndSet(State.RUNNING, State.STOPPED)) {
LOGGER.info("Connector has already been stopped");
return;
}
}*/

try {
if (coordinator != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ public class SqlServerSnapshotChangeEventSource extends HistorizedRelationalSnap

private final SqlServerConnectorConfig connectorConfig;
private final SqlServerConnection jdbcConnection;
private final SqlServerDatabaseSchema sqlServerDatabaseSchema;

public SqlServerSnapshotChangeEventSource(SqlServerConnectorConfig connectorConfig, SqlServerOffsetContext previousOffset, SqlServerConnection jdbcConnection, SqlServerDatabaseSchema schema, EventDispatcher<TableId> dispatcher, Clock clock, SnapshotProgressListener snapshotProgressListener) {
super(connectorConfig, previousOffset, jdbcConnection, schema, dispatcher, clock, snapshotProgressListener);
this.connectorConfig = connectorConfig;
this.jdbcConnection = jdbcConnection;
this.sqlServerDatabaseSchema = schema;
}

@Override
Expand Down Expand Up @@ -172,7 +174,7 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext, Snapsh
snapshotContext.catalogName,
schema,
connectorConfig.getTableFilters().dataCollectionFilter(),
null,
connectorConfig.getColumnFilter(),
false
);
}
Expand Down Expand Up @@ -202,9 +204,42 @@ protected void complete(SnapshotContext snapshotContext) {
*/
@Override
protected String getSnapshotSelect(SnapshotContext snapshotContext, TableId tableId) {
String modifiedColumns = checkBlacklistedColumns(tableId);
if (modifiedColumns != null){
return String.format("SELECT %s FROM [%s].[%s]", modifiedColumns, tableId.schema(), tableId.table());
}
return String.format("SELECT * FROM [%s].[%s]", tableId.schema(), tableId.table());
}

@Override
protected String enhanceOverriddenSelect(SnapshotContext snapshotContext, String overriddenSelect, TableId tableId){
String modifiedColumns = checkBlacklistedColumns(tableId);
if (modifiedColumns != null){
overriddenSelect = overriddenSelect.replaceAll("\\*", modifiedColumns);
}
return overriddenSelect;
}

private String checkBlacklistedColumns(TableId tableId){
String modifiedColumns = null;
String blackListColumnStr = connectorConfig.getConfig().getString(connectorConfig.COLUMN_BLACKLIST);
if (blackListColumnStr != null && blackListColumnStr.trim().length() > 0
&& blackListColumnStr.contains(tableId.table())) {
Table table = sqlServerDatabaseSchema.tableFor(tableId);
modifiedColumns = table.retrieveColumnNames().stream()
.map(s->{
StringBuilder sb = new StringBuilder();
if (!s.contains(tableId.table())){
sb.append(tableId.table()).append(".").append(s);
} else {
sb.append(s);
}
return sb.toString();
}).collect(Collectors.joining(","));
}
return modifiedColumns;
}

@Override
protected ChangeRecordEmitter getChangeRecordEmitter(SnapshotContext snapshotContext, TableId tableId, Object[] row) {
((SqlServerOffsetContext) snapshotContext.offset).setSourceTime(Instant.ofEpochMilli(getClock().currentTimeInMillis()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.SocketException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
Expand Down Expand Up @@ -243,28 +242,33 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
lastProcessedPosition = TxLogPosition.valueOf(currentMaxLsn);
// Terminate the transaction otherwise CDC could not be disabled for tables
dataConnection.rollback();
} catch (SQLException e) {
tablesSlot.set(processErrorFromChangeTableQuery(e, tablesSlot.get()));
if (e.getCause() instanceof SocketException) {
LOGGER.warn("Exception while processing table " + tablesSlot.get(), e);
} catch (Exception e) {
LOGGER.warn("Exception while processing table ", e);
try {
if (e.getCause() instanceof SQLException) {
tablesSlot.set(processErrorFromChangeTableQuery((SQLException) e, tablesSlot.get()));
}
dataConnection.close();
dataConnection.connection(false);
metadataConnection.close();
metadataConnection.connection(false);
} else {
// Terminate the transaction otherwise CDC could not be disabled for tables
dataConnection.rollback();
} catch (Exception ignore){
LOGGER.warn(ignore.getMessage(), ignore);
}
}
} catch (SQLException e) {
if (e.getCause() instanceof SocketException) {
LOGGER.warn("Exception while fetching max LSN", e);
} catch (Exception e) {
LOGGER.warn("Exception while fetching max LSN", e);
try {
//if (e.getCause() instanceof SocketException) {
dataConnection.close();
dataConnection.connection(false);
metadataConnection.close();
metadataConnection.connection(false);
} else {
throw e;
//} else {
// throw e;
//}
} catch (Exception ignore){
LOGGER.warn(ignore.getMessage(), ignore);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ private String determineSnapshotSelect(SnapshotContext snapshotContext, TableId
overriddenSelect = connectorConfig.getSnapshotSelectOverridesByTable().get(new TableId(null, tableId.schema(), tableId.table()));
}

return overriddenSelect != null ? enhanceOverriddenSelect(snapshotContext, overriddenSelect) :
return overriddenSelect != null ? enhanceOverriddenSelect(snapshotContext, overriddenSelect, tableId) :
getSnapshotSelect(snapshotContext, tableId);
}

Expand All @@ -429,7 +429,7 @@ private String determineSnapshotSelect(SnapshotContext snapshotContext, TableId
* @param overriddenSelect conditional snapshot select
* @return enhanced select statement. By default it just returns original select statements.
*/
protected String enhanceOverriddenSelect(SnapshotContext snapshotContext, String overriddenSelect){
protected String enhanceOverriddenSelect(SnapshotContext snapshotContext, String overriddenSelect, TableId tableId){
return overriddenSelect;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -225,16 +226,19 @@ private void validateIncomingRowToInternalMetadata(int[] recordIndexes, Field[]
protected Function<Object[], Struct> createValueGenerator(Schema schema, TableId tableId, List<Column> columns,
ColumnNameFilter filter, ColumnMappers mappers) {
if (schema != null) {
int[] recordIndexes = indexesForColumns(columns);
Field[] fields = fieldsForColumns(schema, columns);
int numFields = recordIndexes.length;
ValueConverter[] converters = convertersForColumns(schema, tableId, columns, filter, mappers);
List<Column> columnsThatShouldBeAdded = columns.stream()
.filter(column -> filter == null || filter.matches(tableId.catalog(), tableId.schema(), tableId.table(), column.name()))
.collect(Collectors.toList());
int[] recordIndexes = indexesForColumns(columnsThatShouldBeAdded);
Field[] fields = fieldsForColumns(schema, columnsThatShouldBeAdded);
int numFields = columnsThatShouldBeAdded.size();
ValueConverter[] converters = convertersForColumns(schema, tableId, columnsThatShouldBeAdded, filter, mappers);
return (row) -> {
Struct result = new Struct(schema);
for (int i = 0; i != numFields; ++i) {
validateIncomingRowToInternalMetadata(recordIndexes, fields, converters, row, i);
Object value = row[recordIndexes[i]];

//validateIncomingRowToInternalMetadata(recordIndexes, fields, converters, row, i);
//Object value = row[recordIndexes[i]];
Object value = row[i];
ValueConverter converter = converters[i];

if (converter != null) {
Expand Down Expand Up @@ -304,10 +308,6 @@ protected ValueConverter[] convertersForColumns(Schema schema, TableId tableId,
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);

if (filter != null && !filter.matches(tableId.catalog(), tableId.schema(), tableId.table(), column.name())) {
continue;
}

ValueConverter converter = createValueConverterFor(column, schema.field(column.name()));
converter = wrapInMappingConverterIfNeeded(mappers, tableId, column, converter);

Expand Down

0 comments on commit c78c368

Please sign in to comment.