Skip to content

Commit

Permalink
Merge branch 'master' of http://git.navis.lan/scm/n4fra/debezium
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyIg committed Dec 13, 2019
2 parents d174eab + 4086b3e commit 90979c1
Showing 1 changed file with 18 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -235,15 +241,20 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
}
});
lastProcessedPosition = TxLogPosition.valueOf(currentMaxLsn);
} catch (SQLException e) {
// Terminate the transaction otherwise CDC could not be disabled for tables
dataConnection.rollback();
} catch (SQLException e) {
tablesSlot.set(processErrorFromChangeTableQuery(e, tablesSlot.get()));
LOGGER.warn("Exception while processing table " + tablesSlot.get(), e);
dataConnection.close();
dataConnection.connection(false);
metadataConnection.close();
metadataConnection.connection(false);
if (e.getCause() instanceof SocketException) {
LOGGER.warn("Exception while processing table " + tablesSlot.get(), e);
dataConnection.close();
dataConnection.connection(false);
metadataConnection.close();
metadataConnection.connection(false);
} else {
// Terminate the transaction otherwise CDC could not be disabled for tables
dataConnection.rollback();
}
}
} catch (SQLException e) {
if (e.getCause() instanceof SocketException) {
Expand Down

0 comments on commit 90979c1

Please sign in to comment.