Skip to content

Commit

Permalink
allow multiple connections in kafka connector
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Skorikov committed Oct 12, 2018
1 parent ccddc20 commit 48dbad0
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 30 deletions.
3 changes: 1 addition & 2 deletions integrations/apache-kafka/config/source.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,5 @@ limitations under the License.
name=plc-source-test
connector.class=org.apache.plc4x.kafka.Plc4xSourceConnector
topic=test
url=test:unused
queries=RANDOM/foo:INTEGER,RANDOM/bar:STRING
queries=test:unused#RANDOM/foo:INTEGER,test:another#RANDOM/bar:STRING
rate=2000
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,28 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.ConnectorUtils;
import org.apache.plc4x.kafka.util.VersionUtil;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;

public class Plc4xSourceConnector extends SourceConnector {
static final String TOPIC_CONFIG = "topic";
private static final String TOPIC_CONFIG = "topic";
private static final String TOPIC_DOC = "Kafka topic to publish to";

static final String URL_CONFIG = "url";
private static final String URL_DOC = "Connection string used by PLC4X to connect to the PLC";

static final String QUERIES_CONFIG = "queries";
private static final String QUERIES_CONFIG = "queries";
private static final String QUERIES_DOC = "Field queries to be sent to the PLC";

static final String RATE_CONFIG = "rate";
private static final String RATE_CONFIG = "rate";
private static final Integer RATE_DEFAULT = 1000;
private static final String RATE_DOC = "Polling rate";

static final ConfigDef CONFIG_DEF = new ConfigDef()
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
.define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
.define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC)
.define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);

private String topic;
private String url;
private List<String> queries;
private Integer rate;

Expand All @@ -63,23 +55,30 @@ public Class<? extends Task> taskClass() {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> configs = new LinkedList<>();
List<List<String>> queryGroups = ConnectorUtils.groupPartitions(queries, maxTasks);
for (List<String> queryGroup: queryGroups) {
Map<String, List<String>> groupedByHost = new HashMap<>();
queries.stream().map(query -> query.split("#", 2)).collect(Collectors.groupingBy(parts -> parts[0])).forEach((host, queries) -> {
groupedByHost.put(host, queries.stream().map(parts -> parts[1]).collect(Collectors.toList()));
});
if (groupedByHost.size() > maxTasks) {
// Not enough tasks
// TODO: throw exception?
return Collections.emptyList();
}
groupedByHost.forEach((host, qs) -> {
Map<String, String> taskConfig = new HashMap<>();
taskConfig.put(TOPIC_CONFIG, topic);
taskConfig.put(URL_CONFIG, url);
taskConfig.put(QUERIES_CONFIG, String.join(",", queryGroup));
taskConfig.put(RATE_CONFIG, rate.toString());
taskConfig.put(Plc4xSourceTask.TOPIC_CONFIG, topic);
taskConfig.put(Plc4xSourceTask.URL_CONFIG, host);
taskConfig.put(Plc4xSourceTask.QUERIES_CONFIG, String.join(",", qs));
taskConfig.put(Plc4xSourceTask.RATE_CONFIG, rate.toString());
configs.add(taskConfig);
}
});
return configs;
}

@Override
public void start(Map<String, String> props) {
AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
topic = config.getString(TOPIC_CONFIG);
url = config.getString(URL_CONFIG);
queries = config.getList(QUERIES_CONFIG);
rate = config.getInt(RATE_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Licensed to the Apache Software Foundation (ASF) under one
package org.apache.plc4x.kafka;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -43,6 +44,25 @@ Licensed to the Apache Software Foundation (ASF) under one
* If the flag does not become true, the method returns null, otherwise a fetch is performed.
*/
public class Plc4xSourceTask extends SourceTask {
static final String TOPIC_CONFIG = "topic";
private static final String TOPIC_DOC = "Kafka topic to publish to";

static final String URL_CONFIG = "url";
private static final String URL_DOC = "PLC URL";

static final String QUERIES_CONFIG = "queries";
private static final String QUERIES_DOC = "Field queries to be sent to the PLC";

static final String RATE_CONFIG = "rate";
private static final Integer RATE_DEFAULT = 1000;
private static final String RATE_DOC = "Polling rate";

private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
.define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, URL_DOC)
.define(QUERIES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, QUERIES_DOC)
.define(RATE_CONFIG, ConfigDef.Type.INT, RATE_DEFAULT, ConfigDef.Importance.MEDIUM, RATE_DOC);

private final static long WAIT_LIMIT_MILLIS = 100;
private final static long TIMEOUT_LIMIT_MILLIS = 5000;

Expand Down Expand Up @@ -72,18 +92,18 @@ public String version() {

@Override
public void start(Map<String, String> props) {
AbstractConfig config = new AbstractConfig(Plc4xSourceConnector.CONFIG_DEF, props);
topic = config.getString(Plc4xSourceConnector.TOPIC_CONFIG);
url = config.getString(Plc4xSourceConnector.URL_CONFIG);
queries = config.getList(Plc4xSourceConnector.QUERIES_CONFIG);
AbstractConfig config = new AbstractConfig(CONFIG_DEF, props);
topic = config.getString(TOPIC_CONFIG);
url = config.getString(URL_CONFIG);
queries = config.getList(QUERIES_CONFIG);

openConnection();

if (!plcConnection.readRequestBuilder().isPresent()) {
throw new ConnectException("Reading not supported on this connection");
}

int rate = Integer.valueOf(props.get(Plc4xSourceConnector.RATE_CONFIG));
int rate = Integer.valueOf(props.get(RATE_CONFIG));
scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(Plc4xSourceTask.this::scheduleFetch, rate, rate, TimeUnit.MILLISECONDS);
}
Expand Down

0 comments on commit 48dbad0

Please sign in to comment.