From c0ad9fbc36f942570016785a5b0c58f57b53089e Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Wed, 29 Jun 2016 17:04:09 -0700 Subject: [PATCH 1/2] Add schema source connector --- .../kafka/connect/runtime/AbstractHerder.java | 6 +- .../connect/tools/SchemaSourceConnector.java | 68 +++++++ .../kafka/connect/tools/SchemaSourceTask.java | 189 ++++++++++++++++++ 3 files changed, 261 insertions(+), 2 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceConnector.java create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 1130268907362..44da0424118c9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -35,6 +35,7 @@ import org.apache.kafka.connect.tools.MockConnector; import org.apache.kafka.connect.tools.MockSinkConnector; import org.apache.kafka.connect.tools.MockSourceConnector; +import org.apache.kafka.connect.tools.SchemaSourceConnector; import org.apache.kafka.connect.tools.VerifiableSinkConnector; import org.apache.kafka.connect.tools.VerifiableSourceConnector; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -91,8 +92,9 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con private static final Object LOCK = new Object(); private Thread classPathTraverser; private static final List> EXCLUDES = Arrays.asList( - VerifiableSourceConnector.class, VerifiableSinkConnector.class, - MockConnector.class, MockSourceConnector.class, MockSinkConnector.class); + VerifiableSourceConnector.class, VerifiableSinkConnector.class, + MockConnector.class, MockSourceConnector.class, MockSinkConnector.class, + SchemaSourceConnector.class); public AbstractHerder(Worker worker, String workerId, diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceConnector.java new file mode 100644 index 0000000000000..249ed71794f47 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceConnector.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.tools; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.source.SourceConnector; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SchemaSourceConnector extends SourceConnector { + + private Map config; + + @Override + public String version() { + return AppInfoParser.getVersion(); + } + + @Override + public void start(Map props) { + this.config = props; + } + + @Override + public Class taskClass() { + return SchemaSourceTask.class; + } + + @Override + public List> taskConfigs(int maxTasks) { + ArrayList> configs = new ArrayList<>(); + for (Integer i = 0; i < maxTasks; i++) { + Map props = new HashMap<>(config); + props.put(SchemaSourceTask.ID_CONFIG, i.toString()); + configs.add(props); + } + return configs; + } + + @Override + public void stop() { + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java new file mode 100644 index 0000000000000..657f944c908d9 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.tools; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class SchemaSourceTask extends SourceTask { + + private static final Logger log = LoggerFactory.getLogger(SchemaSourceTask.class); + + public static final String NAME_CONFIG = "name"; + public static final String ID_CONFIG = "id"; + public static final String TOPIC_CONFIG = "topic"; + public static final String NUM_MSGS_CONFIG = "num.messages"; + public static final String THROUGHPUT_CONFIG = "throughput"; + public static final String MULTIPLE_SCHEMA_CONFIG = "multiple.schema"; + public static final String PARTITION_COUNT_CONFIG = "partition.count"; + + private static final String ID_FIELD = "id"; + private static final String SEQNO_FIELD = "seqno"; + + private String name; // Connector name + private int id; // Task ID + private String topic; + private Map partition; + private long startingSeqno; + private long seqno; + private long count; + private long maxNumMsgs; + private boolean multipleSchema; + private int partitionCount; + + // Until we can use ThroughputThrottler from Kafka, use a fixed sleep interval. This isn't perfect, but close enough + // for system testing purposes + private long intervalMs; + private int intervalNanos; + + private static Schema valueSchema = SchemaBuilder.struct().version(1).name("record") + .field("boolean", Schema.BOOLEAN_SCHEMA) + .field("int", Schema.INT32_SCHEMA) + .field("long", Schema.INT64_SCHEMA) + .field("float", Schema.FLOAT32_SCHEMA) + .field("double", Schema.FLOAT64_SCHEMA) + .field("partitioning", Schema.INT32_SCHEMA) + .field("id", Schema.INT32_SCHEMA) + .field("seqno", Schema.INT64_SCHEMA) + .build(); + + private static Schema valueSchema2 = SchemaBuilder.struct().version(2).name("record") + .field("boolean", Schema.BOOLEAN_SCHEMA) + .field("int", Schema.INT32_SCHEMA) + .field("long", Schema.INT64_SCHEMA) + .field("float", Schema.FLOAT32_SCHEMA) + .field("double", Schema.FLOAT64_SCHEMA) + .field("partitioning", Schema.INT32_SCHEMA) + .field("string", SchemaBuilder.string().defaultValue("abc").build()) + .field("id", Schema.INT32_SCHEMA) + .field("seqno", Schema.INT64_SCHEMA) + .build(); + + public String version() { + return new SchemaSourceConnector().version(); + } + + @Override + public void start(Map props) { + try { + name = props.get(NAME_CONFIG); + id = Integer.parseInt(props.get(ID_CONFIG)); + topic = props.get(TOPIC_CONFIG); + maxNumMsgs = Long.parseLong(props.get(NUM_MSGS_CONFIG)); + multipleSchema = Boolean.parseBoolean(props.get(MULTIPLE_SCHEMA_CONFIG)); + partitionCount = Integer.parseInt(props.containsKey(PARTITION_COUNT_CONFIG) ? props.get(PARTITION_COUNT_CONFIG) + : "1"); + String throughputStr = props.get(THROUGHPUT_CONFIG); + if (throughputStr != null) { + long throughput = Long.parseLong(throughputStr); + long intervalTotalNanos = 1_000_000_000L / throughput; + intervalMs = intervalTotalNanos / 1_000_000L; + intervalNanos = (int) (intervalTotalNanos % 1_000_000L); + } else { + intervalMs = 0; + intervalNanos = 0; + } + } catch (NumberFormatException e) { + throw new ConnectException("Invalid SchemaSourceTask configuration", e); + } + + partition = Collections.singletonMap(ID_FIELD, id); + Map previousOffset = this.context.offsetStorageReader().offset(partition); + if (previousOffset != null) { + seqno = (Long) previousOffset.get(SEQNO_FIELD) + 1; + } else { + seqno = 0; + } + startingSeqno = seqno; + count = 0; + log.info("Started SchemaSourceTask {}-{} producing to topic {} resuming from seqno {}", + name, id, topic, startingSeqno); + } + + @Override + public List poll() throws InterruptedException { + if (count < maxNumMsgs) { + if (intervalMs > 0 || intervalNanos > 0) { + synchronized (this) { + this.wait(intervalMs, intervalNanos); + } + } + + Map ccOffset = Collections.singletonMap(SEQNO_FIELD, seqno); + int partitionVal = (int) (seqno % partitionCount); + final Struct data; + final SourceRecord srcRecord; + if (!multipleSchema || count % 2 == 0) { + data = new Struct(valueSchema) + .put("boolean", true) + .put("int", 12) + .put("long", 12L) + .put("float", 12.2f) + .put("double", 12.2) + .put("partitioning", partitionVal) + .put("id", id) + .put("seqno", seqno); + + srcRecord = new SourceRecord(partition, ccOffset, topic, id, Schema.STRING_SCHEMA, "key", valueSchema, data); + } else { + data = new Struct(valueSchema2) + .put("boolean", true) + .put("int", 12) + .put("long", 12L) + .put("float", 12.2f) + .put("double", 12.2) + .put("partitioning", partitionVal) + .put("string", "def") + .put("id", id) + .put("seqno", seqno); + + srcRecord = new SourceRecord(partition, ccOffset, topic, id, Schema.STRING_SCHEMA, "key", valueSchema2, data); + } + + System.out.println("{\"task\": " + id + ", \"seqno\": " + seqno + "}"); + List result = Arrays.asList(srcRecord); + seqno++; + count++; + return result; + } else { + synchronized (this) { + this.wait(); + } + return new ArrayList<>(); + } + } + + @Override + public void stop() { + synchronized (this) { + this.notifyAll(); + } + } +} \ No newline at end of file From 92524c8d726b584e039d0a98a07ca51abe173fa1 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Wed, 6 Jul 2016 17:42:58 -0700 Subject: [PATCH 2/2] Use throughput throttler --- .../kafka/connect/tools/SchemaSourceTask.java | 37 ++++++------------- .../ConnectorPluginsResourceTest.java | 2 + 2 files changed, 13 insertions(+), 26 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java index 657f944c908d9..23f8d2f740e8c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.tools.ThroughputThrottler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +47,7 @@ public class SchemaSourceTask extends SourceTask { private static final String ID_FIELD = "id"; private static final String SEQNO_FIELD = "seqno"; + private ThroughputThrottler throttler; private String name; // Connector name private int id; // Task ID @@ -58,11 +60,6 @@ public class SchemaSourceTask extends SourceTask { private boolean multipleSchema; private int partitionCount; - // Until we can use ThroughputThrottler from Kafka, use a fixed sleep interval. This isn't perfect, but close enough - // for system testing purposes - private long intervalMs; - private int intervalNanos; - private static Schema valueSchema = SchemaBuilder.struct().version(1).name("record") .field("boolean", Schema.BOOLEAN_SCHEMA) .field("int", Schema.INT32_SCHEMA) @@ -92,28 +89,20 @@ public String version() { @Override public void start(Map props) { + final long throughput; try { name = props.get(NAME_CONFIG); id = Integer.parseInt(props.get(ID_CONFIG)); topic = props.get(TOPIC_CONFIG); maxNumMsgs = Long.parseLong(props.get(NUM_MSGS_CONFIG)); multipleSchema = Boolean.parseBoolean(props.get(MULTIPLE_SCHEMA_CONFIG)); - partitionCount = Integer.parseInt(props.containsKey(PARTITION_COUNT_CONFIG) ? props.get(PARTITION_COUNT_CONFIG) - : "1"); - String throughputStr = props.get(THROUGHPUT_CONFIG); - if (throughputStr != null) { - long throughput = Long.parseLong(throughputStr); - long intervalTotalNanos = 1_000_000_000L / throughput; - intervalMs = intervalTotalNanos / 1_000_000L; - intervalNanos = (int) (intervalTotalNanos % 1_000_000L); - } else { - intervalMs = 0; - intervalNanos = 0; - } + partitionCount = Integer.parseInt(props.containsKey(PARTITION_COUNT_CONFIG) ? props.get(PARTITION_COUNT_CONFIG) : "1"); + throughput = Long.parseLong(props.get(THROUGHPUT_CONFIG)); } catch (NumberFormatException e) { throw new ConnectException("Invalid SchemaSourceTask configuration", e); } + throttler = new ThroughputThrottler(throughput, System.currentTimeMillis()); partition = Collections.singletonMap(ID_FIELD, id); Map previousOffset = this.context.offsetStorageReader().offset(partition); if (previousOffset != null) { @@ -123,17 +112,15 @@ public void start(Map props) { } startingSeqno = seqno; count = 0; - log.info("Started SchemaSourceTask {}-{} producing to topic {} resuming from seqno {}", - name, id, topic, startingSeqno); + log.info("Started SchemaSourceTask {}-{} producing to topic {} resuming from seqno {}", name, id, topic, startingSeqno); } @Override public List poll() throws InterruptedException { if (count < maxNumMsgs) { - if (intervalMs > 0 || intervalNanos > 0) { - synchronized (this) { - this.wait(intervalMs, intervalNanos); - } + long sendStartMs = System.currentTimeMillis(); + if (throttler.shouldThrottle(seqno - startingSeqno, sendStartMs)) { + throttler.throttle(); } Map ccOffset = Collections.singletonMap(SEQNO_FIELD, seqno); @@ -182,8 +169,6 @@ public List poll() throws InterruptedException { @Override public void stop() { - synchronized (this) { - this.notifyAll(); - } + throttler.wakeup(); } } \ No newline at end of file diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index c7f532bf5ed53..e8ee93d70ace9 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -42,6 +42,7 @@ import org.apache.kafka.connect.tools.MockConnector; import org.apache.kafka.connect.tools.MockSinkConnector; import org.apache.kafka.connect.tools.MockSourceConnector; +import org.apache.kafka.connect.tools.SchemaSourceConnector; import org.apache.kafka.connect.tools.VerifiableSinkConnector; import org.apache.kafka.connect.tools.VerifiableSourceConnector; import org.easymock.EasyMock; @@ -171,6 +172,7 @@ public void testListConnectorPlugins() { assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSourceConnector.class.getCanonicalName()))); assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSinkConnector.class.getCanonicalName()))); assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockConnector.class.getCanonicalName()))); + assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SchemaSourceConnector.class.getCanonicalName()))); assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class.getCanonicalName()))); }