-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
MySqlConnectorConfig.java
340 lines (291 loc) · 14.5 KB
/
MySqlConnectorConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import java.util.Optional;
import java.util.function.Predicate;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigDefinition;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.config.Field.ValidationOutput;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.gtid.GtidSetFactory;
import io.debezium.connector.mysql.gtid.MySqlGtidSetFactory;
import io.debezium.connector.mysql.history.MySqlHistoryRecordComparator;
import io.debezium.function.Predicates;
import io.debezium.relational.history.HistoryRecordComparator;
/**
* The configuration properties.
*/
public class MySqlConnectorConfig extends BinlogConnectorConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlConnectorConfig.class);
/**
* The set of predefined Snapshot Locking Mode options.
*/
public enum SnapshotLockingMode implements EnumeratedValue {
/**
* This mode will block all writes for the entire duration of the snapshot.
*
* Replaces deprecated configuration option snapshot.locking.minimal with a value of false.
*/
EXTENDED("extended"),
/**
* The connector holds the global read lock for just the initial portion of the snapshot while the connector reads the database
* schemas and other metadata. The remaining work in a snapshot involves selecting all rows from each table, and this can be done
* in a consistent fashion using the REPEATABLE READ transaction even when the global read lock is no longer held and while other
* MySQL clients are updating the database.
*
* Replaces deprecated configuration option snapshot.locking.minimal with a value of true.
*/
MINIMAL("minimal"),
/**
* The connector holds a (Percona-specific) backup lock for just the initial portion of the snapshot while the connector
* reads the database schemas and other metadata. This lock will only block DDL and DML on non-transactional tables
* (MyISAM etc.). The remaining work in a snapshot involves selecting all rows from each table, and this can be done in a
* consistent fashion using the REPEATABLE READ transaction even when the global read lock is no longer held and while other
* MySQL clients are updating the database.
*/
MINIMAL_PERCONA("minimal_percona"),
/**
* This mode will avoid using ANY table locks during the snapshot process. This mode can only be used with SnapShotMode
* set to schema_only or schema_only_recovery.
*/
NONE("none"),
/**
* Inject a custom mode, which allows for more control over snapshot locking.
*/
CUSTOM("custom");
private final String value;
SnapshotLockingMode(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
public boolean usesMinimalLocking() {
return value.equals(MINIMAL.value) || value.equals(MINIMAL_PERCONA.value);
}
public boolean usesLocking() {
return !value.equals(NONE.value);
}
public boolean flushResetsIsolationLevel() {
return !value.equals(MINIMAL_PERCONA.value);
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static SnapshotLockingMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (SnapshotLockingMode option : SnapshotLockingMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static SnapshotLockingMode parse(String value, String defaultValue) {
SnapshotLockingMode mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}
/**
* {@link Integer#MIN_VALUE Minimum value} used for fetch size hint.
* See <a href="https://issues.jboss.org/browse/DBZ-94">DBZ-94</a> for details.
*/
protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = Integer.MIN_VALUE;
public static final Field JDBC_DRIVER = Field.create(DATABASE_CONFIG_PREFIX + "jdbc.driver")
.withDisplayName("JDBC Driver Class Name")
.withType(Type.CLASS)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 41))
.withWidth(Width.MEDIUM)
.withDefault(com.mysql.cj.jdbc.Driver.class.getName())
.withImportance(Importance.LOW)
.withValidation(Field::isClassName)
.withDescription("JDBC Driver class name used to connect to the MySQL database server.");
public static final Field JDBC_PROTOCOL = Field.create(DATABASE_CONFIG_PREFIX + "protocol")
.withDisplayName("JDBC Protocol")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTION, 42))
.withWidth(Width.MEDIUM)
.withDefault("jdbc:mysql")
.withImportance(Importance.LOW)
.withDescription("JDBC protocol to use with the driver.");
/**
* A comma-separated list of regular expressions that match source UUIDs in the GTID set used to find the binlog
* position in the MySQL server. Only the GTID ranges that have sources matching one of these include patterns will
* be used.
* Must not be used with {@link #GTID_SOURCE_EXCLUDES}.
*/
public static final Field GTID_SOURCE_INCLUDES = BinlogConnectorConfig.GTID_SOURCE_INCLUDES
.withDescription("The source UUIDs used to include GTID ranges when determine the starting "
+ "position in the MySQL server's binlog.");
/**
* A comma-separated list of regular expressions that match source UUIDs in the GTID set used to find the binlog
* position in the MySQL server. Only the GTID ranges that have sources matching none of these exclude patterns will
* be used.
* Must not be used with {@link #GTID_SOURCE_INCLUDES}.
*/
public static final Field GTID_SOURCE_EXCLUDES = BinlogConnectorConfig.GTID_SOURCE_EXCLUDES
.withDescription("The source UUIDs used to exclude GTID ranges when determine the starting "
+ "position in the MySQL server's binlog.");
public static final Field SNAPSHOT_LOCKING_MODE = Field.create(SNAPSHOT_LOCKING_MODE_PROPERTY_NAME)
.withDisplayName("Snapshot locking mode")
.withEnum(SnapshotLockingMode.class, SnapshotLockingMode.MINIMAL)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 1))
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withDescription("Controls how long the connector holds onto the global read lock while it is performing a snapshot. The default is 'minimal', "
+ "which means the connector holds the global read lock (and thus prevents any updates) for just the initial portion of the snapshot "
+ "while the database schemas and other metadata are being read. The remaining work in a snapshot involves selecting all rows from "
+ "each table, and this can be done using the snapshot process' REPEATABLE READ transaction even when the lock is no longer held and "
+ "other operations are updating the database. However, in some cases it may be desirable to block all writes for the entire duration "
+ "of the snapshot; in such cases set this property to 'extended'. Using a value of 'none' will prevent the connector from acquiring any "
+ "table locks during the snapshot process. This mode can only be used in combination with snapshot.mode values of 'schema_only' or "
+ "'schema_only_recovery' and is only safe to use if no schema changes are happening while the snapshot is taken.")
.withValidation(MySqlConnectorConfig::validateSnapshotLockingMode);
public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
.withDefault(MySqlSourceInfoStructMaker.class.getName());
private static final ConfigDefinition CONFIG_DEFINITION = BinlogConnectorConfig.CONFIG_DEFINITION.edit()
.name("MySQL")
.excluding(
BinlogConnectorConfig.GTID_SOURCE_INCLUDES,
BinlogConnectorConfig.GTID_SOURCE_EXCLUDES)
.type(
JDBC_DRIVER,
JDBC_PROTOCOL)
.connector(SNAPSHOT_LOCKING_MODE)
.events(
GTID_SOURCE_INCLUDES,
GTID_SOURCE_EXCLUDES,
SOURCE_INFO_STRUCT_MAKER)
.create();
protected static ConfigDef configDef() {
return CONFIG_DEFINITION.configDef();
}
/**
* The set of {@link Field}s defined as part of this configuration.
*/
public static Field.Set ALL_FIELDS = Field.setOf(CONFIG_DEFINITION.all());
private final GtidSetFactory gtidSetFactory;
private final Predicate<String> gtidSourceFilter;
private final SnapshotLockingMode snapshotLockingMode;
private final SnapshotLockingStrategy snapshotLockingStrategy;
public MySqlConnectorConfig(Configuration config) {
super(MySqlConnector.class, config, DEFAULT_SNAPSHOT_FETCH_SIZE);
this.gtidSetFactory = new MySqlGtidSetFactory();
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
this.snapshotLockingStrategy = new MySqlSnapshotLockingStrategy(snapshotLockingMode);
// Set up the GTID filter ...
final String gtidSetIncludes = config.getString(GTID_SOURCE_INCLUDES);
final String gtidSetExcludes = config.getString(GTID_SOURCE_EXCLUDES);
this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includesUuids(gtidSetIncludes)
: (gtidSetExcludes != null ? Predicates.excludesUuids(gtidSetExcludes) : null);
}
public Optional<SnapshotLockingMode> getSnapshotLockingMode() {
return Optional.of(this.snapshotLockingMode);
}
@Override
protected SnapshotLockingStrategy getSnapshotLockingStrategy() {
return snapshotLockingStrategy;
}
@Override
protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStructMaker(Version version) {
return getSourceInfoStructMaker(SOURCE_INFO_STRUCT_MAKER, Module.name(), Module.version(), this);
}
@Override
public String getContextName() {
return Module.contextName();
}
@Override
public String getConnectorName() {
return Module.name();
}
@Override
public Predicate<String> getGtidSourceFilter() {
return gtidSourceFilter;
}
@Override
public GtidSetFactory getGtidSetFactory() {
return gtidSetFactory;
}
@Override
protected HistoryRecordComparator getHistoryRecordComparator() {
return new MySqlHistoryRecordComparator(gtidSourceFilter, getGtidSetFactory());
}
/**
* Custom {@link io.debezium.connector.binlog.BinlogConnectorConfig.SnapshotLockingStrategy} for MySQL.
*/
public static class MySqlSnapshotLockingStrategy implements SnapshotLockingStrategy {
private final SnapshotLockingMode snapshotLockingMode;
public MySqlSnapshotLockingStrategy(SnapshotLockingMode snapshotLockingMode) {
this.snapshotLockingMode = snapshotLockingMode;
}
@Override
public boolean isLockingEnabled() {
return snapshotLockingMode.usesLocking();
}
@Override
public boolean isMinimalLockingEnabled() {
return snapshotLockingMode.usesMinimalLocking();
}
@Override
public boolean isIsolationLevelResetOnFlush() {
return snapshotLockingMode.flushResetsIsolationLevel();
}
}
/**
* Validate the new snapshot.locking.mode configuration, which replaces snapshot.minimal.locking.
*
* If minimal.locking is explicitly defined and locking.mode is NOT explicitly defined:
* - coerce minimal.locking into the new snap.locking.mode property.
*
* If minimal.locking is NOT explicitly defined and locking.mode IS explicitly defined:
* - use new locking.mode property.
*
* If BOTH minimal.locking and locking.mode ARE defined:
* - Throw a validation error.
*/
private static int validateSnapshotLockingMode(Configuration config, Field field, ValidationOutput problems) {
// Determine which configurations are explicitly defined
if (config.hasKey(SNAPSHOT_LOCKING_MODE.name())) {
final SnapshotLockingMode lockingModeValue = SnapshotLockingMode.parse(
config.getString(MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE));
// Sanity check, validate the configured value is a valid option.
if (lockingModeValue == null) {
problems.accept(SNAPSHOT_LOCKING_MODE, lockingModeValue, "Must be a valid snapshot.locking.mode value");
return 1;
}
}
// Everything checks out ok.
return 0;
}
}