-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
MySqlConnector.java
113 lines (96 loc) · 3.89 KB
/
MySqlConnector.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.util.Strings;
/**
* A Kafka Connect source connector that creates tasks that read the MySQL binary log and generate the corresponding
* data change events.
* <h2>Configuration</h2>
* <p>
* This connector is configured with the set of properties described in {@link MySqlConnectorConfig}.
*
*
* @author Randall Hauch
*/
public class MySqlConnector extends SourceConnector {
private Logger logger = LoggerFactory.getLogger(getClass());
private Map<String, String> props;
public MySqlConnector() {
}
@Override
public String version() {
return Module.version();
}
@Override
public Class<? extends Task> taskClass() {
return MySqlConnectorTask.class;
}
@Override
public void start(Map<String, String> props) {
this.props = props;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return props == null ? Collections.emptyList() : Collections.singletonList(new HashMap<String, String>(props));
}
@Override
public void stop() {
this.props = null;
}
@Override
public ConfigDef config() {
return MySqlConnectorConfig.configDef();
}
@Override
public Config validate(Map<String, String> connectorConfigs) {
Configuration config = Configuration.from(connectorConfigs);
// First, validate all of the individual fields, which is easy since don't make any of the fields invisible ...
Map<String, ConfigValue> results = config.validate(MySqlConnectorConfig.EXPOSED_FIELDS);
// Get the config values for each of the connection-related fields ...
ConfigValue hostnameValue = results.get(MySqlConnectorConfig.HOSTNAME.name());
ConfigValue portValue = results.get(MySqlConnectorConfig.PORT.name());
ConfigValue userValue = results.get(MySqlConnectorConfig.USER.name());
final String passwordValue = config.getString(MySqlConnectorConfig.PASSWORD);
if (Strings.isNullOrEmpty(passwordValue)) {
logger.warn("The connection password is empty");
}
// If there are no errors on any of these ...
if (hostnameValue.errorMessages().isEmpty()
&& portValue.errorMessages().isEmpty()
&& userValue.errorMessages().isEmpty()) {
// Try to connect to the database ...
try (MySqlJdbcContext jdbcContext = new MySqlJdbcContext(config)) {
jdbcContext.start();
JdbcConnection mysql = jdbcContext.jdbc();
try {
mysql.execute("SELECT version()");
logger.info("Successfully tested connection for {} with user '{}'", jdbcContext.connectionString(), mysql.username());
} catch (SQLException e) {
logger.info("Failed testing connection for {} with user '{}'", jdbcContext.connectionString(), mysql.username());
hostnameValue.addErrorMessage("Unable to connect: " + e.getMessage());
} finally {
jdbcContext.shutdown();
}
}
}
return new Config(new ArrayList<>(results.values()));
}
}