requests = new ArrayList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
- TopicAndPartition topicAndPartition = new TopicAndPartition(partitionInfo.topic(), partitionInfo.partition());
- TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
- long latestOffset = latestOffsets.get(topicPartition);
- Long start;
- byte[] tableStart = table.read(topicAndPartition.toString());
- if (tableStart != null) {
- start = Bytes.toLong(tableStart);
- } else {
- start = offsets.containsKey(topicAndPartition) ? offsets.get(topicAndPartition) - 1 : null;
- }
+ String topic = partitionInfo.topic();
+ int partition = partitionInfo.partition();
- long earliestOffset = start == null || start == -2 ? earliestOffsets.get(topicPartition) : start;
- if (earliestOffset == -1) {
- earliestOffset = latestOffset;
+ TopicPartition topicPartition = new TopicPartition(topic, partition);
+
+ long startOffset = partitionOffsets.getPartitionOffset(partitionInfo.partition(),
+ earliestOffsets.getOrDefault(topicPartition, -1L));
+ long endOffset = latestOffsets.get(topicPartition);
+ // StartOffset shouldn't be negative, as it should either in the partitionOffsets or in the earlierOffsets
+ if (startOffset < 0) {
+ throw new IOException("Failed to find start offset for topic " + topic + " and partition " + partition);
+ }
+ // Also, end offset shouldn't be negative.
+ if (endOffset < 0) {
+ throw new IOException("Failed to find end offset for topic " + topic + " and partition " + partition);
}
+
+ // Limit the number of records fetched
if (maxNumberRecords > 0) {
- latestOffset =
- (latestOffset - earliestOffset) <= maxNumberRecords ? latestOffset : (earliestOffset + maxNumberRecords);
+ endOffset = Math.min(endOffset, startOffset + maxNumberRecords);
}
+
LOG.debug("Getting kafka messages from topic {}, partition {}, with earlistOffset {}, latest offset {}",
- topicAndPartition.topic(), topicAndPartition.partition(), earliestOffset, latestOffset);
- KafkaRequest kafkaRequest = new KafkaRequest(kafkaConf, topicAndPartition.topic(), topicAndPartition.partition());
- kafkaRequest.setLatestOffset(latestOffset);
- kafkaRequest.setEarliestOffset(earliestOffset);
- kafkaRequest.setOffset(earliestOffset);
- requests.add(kafkaRequest);
+ topic, partition, startOffset, endOffset);
+
+ requests.add(new KafkaRequest(topic, partition, kafkaConf, startOffset, endOffset));
}
return requests;
}
diff --git a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRequest.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRequest.java
deleted file mode 100644
index 73493e3..0000000
--- a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRequest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Copyright © 2017 Cask Data, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package co.cask.hydrator.plugin.batch.source;
-
-import com.google.common.collect.ImmutableMap;
-
-import java.util.Map;
-
-
-/**
- * A class that represents the kafka pull request.
- *
- * The class is a container for topic, leaderId, partition, uri and offset. It is
- * used in reading and writing the sequence files used for the extraction job.
- */
-public class KafkaRequest {
-
- public static final long DEFAULT_OFFSET = 0;
-
- private Map conf;
- private String topic = "";
- private int partition = 0;
-
- private long offset = DEFAULT_OFFSET;
- private long latestOffset = -1;
- private long earliestOffset = -2;
- private long avgMsgSize = 1024;
-
- public KafkaRequest(Map conf, String topic, int partition) {
- this(conf, topic, partition, DEFAULT_OFFSET, -1);
- }
-
- public KafkaRequest(Map conf, String topic, int partition, long offset, long latestOffset) {
- this.conf = ImmutableMap.copyOf(conf);
- this.topic = topic;
- this.partition = partition;
- this.latestOffset = latestOffset;
- setOffset(offset);
- }
-
- public void setLatestOffset(long latestOffset) {
- this.latestOffset = latestOffset;
- }
-
- public void setEarliestOffset(long earliestOffset) {
- this.earliestOffset = earliestOffset;
- }
-
- public void setAvgMsgSize(long size) {
- this.avgMsgSize = size;
- }
-
- public void setOffset(long offset) {
- this.offset = offset;
- }
-
- public Map getConf() {
- return conf;
- }
-
- public String getTopic() {
- return this.topic;
- }
-
- public int getPartition() {
- return this.partition;
- }
-
- public long getOffset() {
- return this.offset;
- }
-
- public long getEarliestOffset() {
- return this.earliestOffset;
- }
-
- public long getLastOffset() {
- return this.latestOffset;
- }
-
- public long estimateDataSize() {
- long endOffset = getLastOffset();
- return (endOffset - offset) * avgMsgSize;
- }
-}
diff --git a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaSplit.java b/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaSplit.java
deleted file mode 100644
index b837bde..0000000
--- a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaSplit.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Copyright © 2017 Cask Data, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package co.cask.hydrator.plugin.batch.source;
-
-
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import javax.annotation.Nullable;
-
-/**
- * Kafka split
- */
-public class KafkaSplit extends InputSplit implements Writable {
- private KafkaRequest request;
- private long length = 0;
-
- public KafkaSplit() {
- }
-
- public KafkaSplit(KafkaRequest request) {
- this.request = request;
- length = request.estimateDataSize();
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- MapWritable mapWritable = new MapWritable();
- mapWritable.readFields(in);
- String topic = in.readUTF();
- int partition = in.readInt();
- long offset = in.readLong();
- long latestOffset = in.readLong();
- long earliestOffset = in.readLong();
- Map conf = writableToConf(mapWritable);
- request = new KafkaRequest(conf, topic, partition, offset, latestOffset);
- request.setEarliestOffset(earliestOffset);
- length = request.estimateDataSize();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- MapWritable conf = confToWritable(request.getConf());
- conf.write(out);
- out.writeUTF(request.getTopic());
- out.writeInt(request.getPartition());
- out.writeLong(request.getOffset());
- out.writeLong(request.getLastOffset());
- out.writeLong(request.getEarliestOffset());
- }
-
- @Override
- public long getLength() throws IOException {
- return length;
- }
-
- @Override
- public String[] getLocations() throws IOException {
- return new String[] {};
- }
-
- @Nullable
- public KafkaRequest popRequest() {
- KafkaRequest result = request;
- request = null;
- return result;
- }
-
- private MapWritable confToWritable(Map conf) {
- MapWritable mapWritable = new MapWritable();
- for (Map.Entry entry : conf.entrySet()) {
- mapWritable.put(new Text(entry.getKey()), new Text(entry.getValue()));
- }
- return mapWritable;
- }
-
- private Map writableToConf(MapWritable mapWritable) {
- Map conf = new HashMap<>();
- for (Map.Entry entry : mapWritable.entrySet()) {
- conf.put(entry.getKey().toString(), entry.getValue().toString());
- }
- return conf;
- }
-}
diff --git a/kafka-plugins-0.10/src/test/java/co/cask/hydrator/KafkaBatchSourceTest.java b/kafka-plugins-0.10/src/test/java/co/cask/hydrator/KafkaBatchSourceTest.java
index ceec1e5..013763e 100644
--- a/kafka-plugins-0.10/src/test/java/co/cask/hydrator/KafkaBatchSourceTest.java
+++ b/kafka-plugins-0.10/src/test/java/co/cask/hydrator/KafkaBatchSourceTest.java
@@ -16,235 +16,30 @@
package co.cask.hydrator;
-import co.cask.cdap.api.artifact.ArtifactSummary;
-import co.cask.cdap.api.common.Bytes;
-import co.cask.cdap.api.data.format.StructuredRecord;
-import co.cask.cdap.api.data.schema.Schema;
-import co.cask.cdap.api.dataset.lib.KeyValueTable;
-import co.cask.cdap.api.dataset.table.Table;
-import co.cask.cdap.datapipeline.DataPipelineApp;
-import co.cask.cdap.datapipeline.SmartWorkflow;
-import co.cask.cdap.etl.api.batch.BatchSource;
-import co.cask.cdap.etl.mock.batch.MockSink;
-import co.cask.cdap.etl.mock.test.HydratorTestBase;
-import co.cask.cdap.etl.proto.v2.ETLBatchConfig;
-import co.cask.cdap.etl.proto.v2.ETLPlugin;
-import co.cask.cdap.etl.proto.v2.ETLStage;
-import co.cask.cdap.proto.ProgramRunStatus;
-import co.cask.cdap.proto.artifact.AppRequest;
-import co.cask.cdap.proto.id.ApplicationId;
-import co.cask.cdap.proto.id.ArtifactId;
-import co.cask.cdap.proto.id.NamespaceId;
-import co.cask.cdap.test.ApplicationManager;
-import co.cask.cdap.test.DataSetManager;
-import co.cask.cdap.test.TestConfiguration;
-import co.cask.cdap.test.WorkflowManager;
import co.cask.hydrator.plugin.batch.source.KafkaBatchSource;
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Uninterruptibles;
-import kafka.common.TopicAndPartition;
-import org.apache.kafka.clients.consumer.RangeAssignor;
-import org.apache.twill.internal.kafka.client.ZKKafkaClientService;
-import org.apache.twill.internal.utils.Networks;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.kafka.client.Compression;
-import org.apache.twill.kafka.client.KafkaClientService;
-import org.apache.twill.kafka.client.KafkaPublisher;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
+import com.google.common.util.concurrent.Service;
-import java.io.File;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
+import java.util.Collections;
+import java.util.List;
import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
/**
* Unit tests for our plugins.
*/
-public class KafkaBatchSourceTest extends HydratorTestBase {
- private static final ArtifactSummary APP_ARTIFACT = new ArtifactSummary("data-pipeline", "1.0.0");
- @ClassRule
- public static final TestConfiguration CONFIG = new TestConfiguration("explore.enabled", false);
+public class KafkaBatchSourceTest extends AbstractKafkaBatchSourceTest {
- private static ZKClientService zkClient;
- private static KafkaClientService kafkaClient;
- private static InMemoryZKServer zkServer;
- private static EmbeddedKafkaServer kafkaServer;
- private static int kafkaPort;
-
- @BeforeClass
- public static void setupTestClass() throws Exception {
- ArtifactId parentArtifact = NamespaceId.DEFAULT.artifact(APP_ARTIFACT.getName(), APP_ARTIFACT.getVersion());
-
- // add the data-pipeline artifact and mock plugins
- setupBatchArtifacts(parentArtifact, DataPipelineApp.class);
-
- // add our plugins artifact with the data-pipeline artifact as its parent.
- // this will make our plugins available to data-pipeline.
- addPluginArtifact(NamespaceId.DEFAULT.artifact("example-plugins", "1.0.0"),
- parentArtifact,
- KafkaBatchSource.class,
- RangeAssignor.class);
-
- zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
- zkServer.startAndWait();
-
- kafkaPort = Networks.getRandomPort();
- kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkServer.getConnectionStr(),
- kafkaPort, TMP_FOLDER.newFolder()));
- kafkaServer.startAndWait();
-
- zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
- zkClient.startAndWait();
-
- kafkaClient = new ZKKafkaClientService(zkClient);
- kafkaClient.startAndWait();
- }
-
- @AfterClass
- public static void cleanup() {
- kafkaClient.stopAndWait();
- kafkaServer.stopAndWait();
- zkClient.stopAndWait();
- zkServer.stopAndWait();
- }
-
- @Test
- public void testKafkaSource() throws Exception {
- Schema schema = Schema.recordOf(
- "user",
- Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
- Schema.Field.of("first", Schema.of(Schema.Type.STRING)),
- Schema.Field.of("last", Schema.of(Schema.Type.STRING)));
-
- // create the pipeline config
- String inputName = "sourceTestInput";
- String outputName = "sourceTestOutput";
-
- Map sourceProperties = new HashMap<>();
- sourceProperties.put("kafkaBrokers", "localhost:" + kafkaPort);
- sourceProperties.put("referenceName", "kafkaTest");
- sourceProperties.put("tableName", "testKafkaSource");
- sourceProperties.put("topic", "users");
- sourceProperties.put("maxNumberRecords", "3");
- sourceProperties.put("schema", schema.toString());
- sourceProperties.put("format", "csv");
- ETLStage source =
- new ETLStage("source", new ETLPlugin(KafkaBatchSource.NAME, BatchSource.PLUGIN_TYPE, sourceProperties, null));
- ETLStage sink = new ETLStage("sink", MockSink.getPlugin(outputName));
-
- ETLBatchConfig pipelineConfig = ETLBatchConfig.builder("* * * * *")
- .addStage(source)
- .addStage(sink)
- .addConnection(source.getName(), sink.getName())
- .build();
-
- // create the pipeline
- ApplicationId pipelineId = NamespaceId.DEFAULT.app("testKafkaSource");
- ApplicationManager appManager = deployApplication(pipelineId, new AppRequest<>(APP_ARTIFACT, pipelineConfig));
-
-
- Map messages = new HashMap<>();
- messages.put("a", "1,samuel,jackson");
- messages.put("b", "2,dwayne,johnson");
- messages.put("c", "3,christopher,walken");
- messages.put("d", "4,donald,trump");
- sendKafkaMessage("users", messages);
-
-
- WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
- workflowManager.start();
- workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 1, TimeUnit.MINUTES);
-
- // check the pipeline output
- DataSetManager outputManager = getDataset(outputName);
- Set outputRecords = new HashSet<>();
- outputRecords.addAll(MockSink.readOutput(outputManager));
-
- final Map expected = ImmutableMap.of(
- 1L, "samuel jackson",
- 2L, "dwayne johnson",
- 3L, "christopher walken"
- );
-
- Map actual = new HashMap<>();
- for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) {
- actual.put((Long) outputRecord.get("id"), outputRecord.get("first") + " " + outputRecord.get("last"));
- }
-
- Assert.assertEquals(3, outputRecords.size());
- Assert.assertEquals(expected, actual);
- DataSetManager kvTable = getDataset("testKafkaSource");
- KeyValueTable table = kvTable.get();
- byte[] offset = table.read(new TopicAndPartition("users", 0).toString());
- Assert.assertNotNull(offset);
- Assert.assertEquals(3, Bytes.toLong(offset));
-
- messages = new HashMap<>();
- messages.put("d", "5,samuel,jackson");
- messages.put("e", "6,dwayne,johnson");
- sendKafkaMessage("users", messages);
- workflowManager.start();
- TimeUnit.SECONDS.sleep(10);
- workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 1, TimeUnit.MINUTES);
- workflowManager.waitForRuns(ProgramRunStatus.COMPLETED, 2, 1, TimeUnit.MINUTES);
- final Map expected2 = ImmutableMap.builder()
- .put(1L, "samuel jackson")
- .put(2L, "dwayne johnson")
- .put(3L, "christopher walken")
- .put(4L, "donald trump")
- .put(5L, "samuel jackson")
- .put(6L, "dwayne johnson")
- .build();
-
- outputRecords = new HashSet<>();
- outputRecords.addAll(MockSink.readOutput(outputManager));
- Map actual2 = new HashMap<>();
- for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) {
- actual2.put((Long) outputRecord.get("id"), outputRecord.get("first") + " " + outputRecord.get("last"));
- }
- Assert.assertEquals(6, outputRecords.size());
- Assert.assertEquals(expected2, actual2);
+ @Override
+ protected Service createKafkaServer(Properties kafkaConfig) {
+ return new EmbeddedKafkaServer(kafkaConfig);
}
- private static Properties generateKafkaConfig(String zkConnectStr, int port, File logDir) {
- Properties prop = new Properties();
- prop.setProperty("log.dir", logDir.getAbsolutePath());
- prop.setProperty("port", Integer.toString(port));
- prop.setProperty("broker.id", "1");
- prop.setProperty("num.partitions", "1");
- prop.setProperty("zookeeper.connect", zkConnectStr);
- prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
- prop.setProperty("default.replication.factor", "1");
- return prop;
+ @Override
+ protected List> getPluginClasses() {
+ return Collections.singletonList(KafkaBatchSource.class);
}
- private void sendKafkaMessage(String topic, Map messages) {
- KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.ALL_RECEIVED, Compression.NONE);
-
- // If publish failed, retry up to 20 times, with 100ms delay between each retry
- // This is because leader election in Kafka 08 takes time when a topic is being created upon publish request.
- int count = 0;
- do {
- KafkaPublisher.Preparer preparer = publisher.prepare(topic);
- for (Map.Entry entry : messages.entrySet()) {
- preparer.add(Charsets.UTF_8.encode(entry.getValue()), entry.getKey());
- }
- try {
- preparer.send().get();
- break;
- } catch (Exception e) {
- // Backoff if send failed.
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- }
- } while (count++ < 20);
+ @Override
+ protected String getKafkaBatchSourceName() {
+ return KafkaBatchSource.NAME;
}
}
diff --git a/kafka-plugins-0.10/widgets/Kafka-batchsource.json b/kafka-plugins-0.10/widgets/Kafka-batchsource.json
index a4c053b..57b8bdb 100644
--- a/kafka-plugins-0.10/widgets/Kafka-batchsource.json
+++ b/kafka-plugins-0.10/widgets/Kafka-batchsource.json
@@ -27,8 +27,8 @@
},
{
"widget-type": "textbox",
- "label": "Offset Table Name",
- "name": "tableName"
+ "label": "Offset Directory",
+ "name": "offsetDir"
},
{
"widget-type": "csv",
diff --git a/kafka-plugins-0.8/docs/Kafka-batchsource.md b/kafka-plugins-0.8/docs/Kafka-batchsource.md
index fdb062d..9ba7b66 100644
--- a/kafka-plugins-0.8/docs/Kafka-batchsource.md
+++ b/kafka-plugins-0.8/docs/Kafka-batchsource.md
@@ -21,8 +21,8 @@ Properties
**topic:** The Kafka topic to read from. (Macro-enabled)
-**tableName:** Optional table name to track the latest offset we read from kafka. It is recommended to name it same as the
-pipeline name to avoid conflict on table names. By default it will be the topic name. (Macro-enabled)
+**offsetDir:** Optional directory path to track the latest offset we read from kafka. It is useful for incrementally
+processing data from Kafka across subsequent runs. (Macro-enabled)
**partitions:** List of topic partitions to read from. If not specified, all partitions will be read. (Macro-enabled)
diff --git a/kafka-plugins-0.8/pom.xml b/kafka-plugins-0.8/pom.xml
index 3952adc..cb47388 100644
--- a/kafka-plugins-0.8/pom.xml
+++ b/kafka-plugins-0.8/pom.xml
@@ -14,6 +14,11 @@
${project.parent.version}-0.8.2.2
+
+ co.cask.hydrator
+ kafka-plugins-common
+ ${project.parent.version}
+
org.apache.kafka
kafka_2.10
@@ -112,6 +117,13 @@
${cdap.version}
test
+
+ co.cask.hydrator
+ kafka-plugins-common
+ ${project.parent.version}
+ test-jar
+ test
+
io.netty
netty-all
diff --git a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/Kafka08Reader.java b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/Kafka08Reader.java
new file mode 100644
index 0000000..40ed3b0
--- /dev/null
+++ b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/Kafka08Reader.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright © 2017-2018 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package co.cask.hydrator.plugin.batch.source;
+
+import kafka.api.PartitionFetchInfo;
+import kafka.cluster.Broker;
+import kafka.common.BrokerNotAvailableException;
+import kafka.common.LeaderNotAvailableException;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchRequest;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * A class which reads from the fetch results from kafka.
+ */
+final class Kafka08Reader implements KafkaReader {
+ private static final Logger LOG = LoggerFactory.getLogger(Kafka08Reader.class);
+
+ // index of context
+ private static final int fetchBufferSize = 1024 * 1024;
+ private final KafkaRequest kafkaRequest;
+ private final SimpleConsumer simpleConsumer;
+
+ private long currentOffset;
+ private long lastOffset;
+ private Iterator messageIter;
+
+ /**
+ * Construct a reader based on the given {@link KafkaRequest}.
+ */
+ Kafka08Reader(KafkaRequest request) {
+ this.kafkaRequest = request;
+ this.currentOffset = request.getStartOffset();
+ this.lastOffset = request.getEndOffset();
+
+ // read data from queue
+ Map conf = request.getConf();
+ Broker leader = getLeader(KafkaBatchConfig.parseBrokerMap(conf.get(KafkaInputFormat.KAFKA_BROKERS)),
+ request.getTopic(), request.getPartition());
+ this.simpleConsumer = new SimpleConsumer(leader.host(), leader.port(), 20 * 1000, fetchBufferSize, "client");
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (currentOffset >= lastOffset) {
+ return false;
+ }
+ if (messageIter != null && messageIter.hasNext()) {
+ return true;
+ }
+ return fetch();
+ }
+
+ /**
+ * Fetches the next Kafka message and stuffs the results into the key and value.
+ */
+ @Override
+ public KafkaMessage getNext(KafkaKey kafkaKey) {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No message is available");
+ }
+
+ MessageAndOffset msgAndOffset = messageIter.next();
+ Message message = msgAndOffset.message();
+
+ ByteBuffer payload = message.payload();
+ ByteBuffer key = message.key();
+
+ if (payload == null) {
+ LOG.warn("Received message with null message.payload with topic {} and partition {}",
+ kafkaKey.getTopic(), kafkaKey.getPartition());
+
+ }
+
+ kafkaKey.set(currentOffset, msgAndOffset.offset() + 1, msgAndOffset.message().size(), message.checksum());
+ currentOffset = msgAndOffset.offset() + 1; // increase offset
+ return new KafkaMessage(payload, key);
+ }
+
+ /**
+ * Fetch messages from Kafka.
+ *
+ * @return {@code true} if there is some messages available, {@code false} otherwise
+ */
+ private boolean fetch() {
+ if (currentOffset >= lastOffset) {
+ return false;
+ }
+ TopicAndPartition topicAndPartition = new TopicAndPartition(kafkaRequest.getTopic(), kafkaRequest.getPartition());
+ PartitionFetchInfo partitionFetchInfo = new PartitionFetchInfo(currentOffset, fetchBufferSize);
+
+ Map fetchInfo = new HashMap<>();
+ fetchInfo.put(topicAndPartition, partitionFetchInfo);
+
+ FetchRequest fetchRequest = new FetchRequest(-1, "client", 1000, 1024, fetchInfo);
+
+ try {
+ FetchResponse fetchResponse = simpleConsumer.fetch(fetchRequest);
+ if (fetchResponse.hasError()) {
+ String message =
+ "Error Code generated : " + fetchResponse.errorCode(kafkaRequest.getTopic(), kafkaRequest.getPartition());
+ throw new RuntimeException(message);
+ }
+ return processFetchResponse(fetchResponse);
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ private boolean processFetchResponse(FetchResponse fetchResponse) {
+ ByteBufferMessageSet messageBuffer = fetchResponse.messageSet(kafkaRequest.getTopic(), kafkaRequest.getPartition());
+ messageIter = messageBuffer.iterator();
+ if (!messageIter.hasNext()) {
+ messageIter = null;
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Closes this reader
+ */
+ @Override
+ public void close() {
+ if (simpleConsumer != null) {
+ simpleConsumer.close();
+ }
+ }
+
+ /**
+ * Gets the leader broker for the given topic partition.
+ *
+ * @param brokers the set of brokers to query from
+ * @param topic the topic to query for
+ * @param partition the partition to query for
+ * @return the leader broker
+ * @throws LeaderNotAvailableException if cannot find the leader broker
+ */
+ private static Broker getLeader(Map brokers, String topic, int partition) {
+ TopicMetadataRequest request = new TopicMetadataRequest(Collections.singletonList(topic));
+
+ for (Map.Entry entry : brokers.entrySet()) {
+ SimpleConsumer consumer = new SimpleConsumer(entry.getKey(), entry.getValue(),
+ 20 * 1000, fetchBufferSize, "client");
+ try {
+ for (TopicMetadata metadata : consumer.send(request).topicsMetadata()) {
+ // This shouldn't happen. In case it does, just skip the metadata not for the right topic
+ if (!topic.equals(metadata.topic())) {
+ continue;
+ }
+
+ // Find the leader broker for the given partition.
+ return metadata.partitionsMetadata().stream()
+ .filter(meta -> meta.partitionId() == partition)
+ .findFirst()
+ .map(PartitionMetadata::leader)
+ .orElseThrow(BrokerNotAvailableException::new);
+ }
+ } catch (Exception e) {
+ // No-op just query next broker
+ } finally {
+ consumer.close();
+ }
+ }
+
+ throw new LeaderNotAvailableException(String.format("Failed to get broker information for partition %d in topic %s",
+ partition, topic));
+ }
+}
diff --git a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java
index 75758fd..7bc223b 100644
--- a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java
+++ b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchSource.java
@@ -1,5 +1,5 @@
/*
- * Copyright © 2017 Cask Data, Inc.
+ * Copyright © 2017-2018 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -17,19 +17,14 @@
package co.cask.hydrator.plugin.batch.source;
import co.cask.cdap.api.annotation.Description;
-import co.cask.cdap.api.annotation.Macro;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
-import co.cask.cdap.api.annotation.Requirements;
-import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.batch.Input;
import co.cask.cdap.api.data.format.FormatSpecification;
import co.cask.cdap.api.data.format.RecordFormat;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
-import co.cask.cdap.api.dataset.DatasetProperties;
import co.cask.cdap.api.dataset.lib.KeyValue;
-import co.cask.cdap.api.dataset.lib.KeyValueTable;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.common.io.ByteBuffers;
import co.cask.cdap.etl.api.Emitter;
@@ -38,26 +33,21 @@
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.api.batch.BatchSourceContext;
import co.cask.cdap.format.RecordFormats;
-import co.cask.hydrator.common.KeyValueListParser;
import co.cask.hydrator.common.LineageRecorder;
-import co.cask.hydrator.common.ReferencePluginConfig;
import co.cask.hydrator.common.SourceInputFormatProvider;
import co.cask.hydrator.common.batch.JobUtils;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
-import kafka.common.TopicAndPartition;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Job;
import java.io.IOException;
-import java.util.ArrayList;
+import java.net.URI;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.stream.Collectors;
-import javax.annotation.Nullable;
/**
* Kafka batch source.
@@ -65,309 +55,24 @@
@Plugin(type = BatchSource.PLUGIN_TYPE)
@Name(KafkaBatchSource.NAME)
@Description("Kafka batch source.")
-@Requirements(datasetTypes = {KeyValueTable.TYPE})
public class KafkaBatchSource extends BatchSource {
+
public static final String NAME = "Kafka";
private final KafkaBatchConfig config;
- private KeyValueTable table;
+ private FileContext fileContext;
+ private Path offsetsFile;
private List kafkaRequests;
private Schema schema;
private RecordFormat recordFormat;
private String messageField;
- /**
- * Config properties for the plugin.
- */
- public static class KafkaBatchConfig extends ReferencePluginConfig {
-
- @Description("Kafka topic to read from.")
- @Macro
- private String topic;
-
- @Description("List of Kafka brokers specified in host1:port1,host2:port2 form. For example, " +
- "host1.example.com:9092,host2.example.com:9092.")
- @Macro
- private String kafkaBrokers;
-
- @Description("Table name to track the latest offset we read from kafka. It is recommended to name it " +
- "same as the pipeline name to avoid conflict on table names.")
- @Macro
- @Nullable
- private String tableName;
-
- @Description("A comma separated list of topic partitions to read from. " +
- "If not specified, all partitions will be read.")
- @Macro
- @Nullable
- private String partitions;
-
- @Description("The initial offset for each topic partition in partition1:offset1,partition2:offset2 form. " +
- "These offsets will only be used for the first run of the pipeline. " +
- "Any subsequent run will read from the latest offset from previous run. " +
- "Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. " +
- "If not specified, the initial run will start reading from the latest message in Kafka.")
- @Nullable
- @Macro
- private String initialPartitionOffsets;
-
- @Description("Output schema of the source, including the timeField and keyField. " +
- "The fields excluding keyField are used in conjunction with the format " +
- "to parse Kafka payloads.")
- private String schema;
-
- @Description("Optional format of the Kafka event. Any format supported by CDAP is supported. " +
- "For example, a value of 'csv' will attempt to parse Kafka payloads as comma-separated values. " +
- "If no format is given, Kafka message payloads will be treated as bytes.")
- @Nullable
- private String format;
-
- @Description("Optional name of the field containing the message key. " +
- "If this is not set, no key field will be added to output records. " +
- "If set, this field must be present in the schema property and must be bytes.")
- @Nullable
- private String keyField;
-
- @Description("Optional name of the field containing the kafka partition that was read from. " +
- "If this is not set, no partition field will be added to output records. " +
- "If set, this field must be present in the schema property and must be an integer.")
- @Nullable
- private String partitionField;
-
- @Description("Optional name of the field containing the kafka offset that the message was read from. " +
- "If this is not set, no offset field will be added to output records. " +
- "If set, this field must be present in the schema property and must be a long.")
- @Nullable
- private String offsetField;
-
- public KafkaBatchConfig() {
- super("");
- }
-
- public KafkaBatchConfig(String brokers, String partitions, String topic, String initialPartitionOffsets,
- Long defaultOffset) {
- super(String.format("Kafka_%s", topic));
- this.kafkaBrokers = brokers;
- this.partitions = partitions;
- this.topic = topic;
- this.initialPartitionOffsets = initialPartitionOffsets;
- }
-
- // Accessors
- public String getTopic() {
- return topic;
- }
-
- public String getBrokers() {
- return kafkaBrokers;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public Set getPartitions() {
- Set partitionSet = new HashSet<>();
- if (partitions == null) {
- return partitionSet;
- }
- for (String partition : Splitter.on(',').trimResults().split(partitions)) {
- try {
- partitionSet.add(Integer.parseInt(partition));
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException(
- String.format("Invalid partition '%s'. Partitions must be integers.", partition));
- }
- }
- return partitionSet;
- }
-
- @Nullable
- public String getKeyField() {
- return Strings.isNullOrEmpty(keyField) ? null : keyField;
- }
-
- @Nullable
- public String getPartitionField() {
- return Strings.isNullOrEmpty(partitionField) ? null : partitionField;
- }
-
- @Nullable
- public String getOffsetField() {
- return Strings.isNullOrEmpty(offsetField) ? null : offsetField;
- }
-
- @Nullable
- public String getFormat() {
- return Strings.isNullOrEmpty(format) ? null : format;
- }
-
- public Schema getSchema() {
- try {
- return Strings.isNullOrEmpty(schema) ? null : Schema.parseJson(schema);
- } catch (IOException e) {
- throw new IllegalArgumentException("Unable to parse schema: " + e.getMessage());
- }
- }
-
- /**
- * Gets the message schema from the schema field. If the time, key, partition,
- * or offset fields are in the configured schema, they will be removed.
- */
- public Schema getMessageSchema() {
- Schema schema = getSchema();
- List messageFields = new ArrayList<>();
- boolean keyFieldExists = false;
- boolean partitionFieldExists = false;
- boolean offsetFieldExists = false;
-
- for (Schema.Field field : schema.getFields()) {
- String fieldName = field.getName();
- Schema fieldSchema = field.getSchema();
- Schema.Type fieldType = fieldSchema.isNullable()
- ? fieldSchema.getNonNullable().getType()
- : fieldSchema.getType();
- // if the field is not the time field and not the key field, it is a message field.
- if (fieldName.equals(keyField)) {
- if (fieldType != Schema.Type.BYTES) {
- throw new IllegalArgumentException("The key field must be of type bytes or nullable bytes.");
- }
- keyFieldExists = true;
- } else if (fieldName.equals(partitionField)) {
- if (fieldType != Schema.Type.INT) {
- throw new IllegalArgumentException("The partition field must be of type int.");
- }
- partitionFieldExists = true;
- } else if (fieldName.equals(offsetField)) {
- if (fieldType != Schema.Type.LONG) {
- throw new IllegalArgumentException("The offset field must be of type long.");
- }
- offsetFieldExists = true;
- } else {
- messageFields.add(field);
- }
- }
- if (messageFields.isEmpty()) {
- throw new IllegalArgumentException(
- "Schema must contain at least one other field besides the time and key fields.");
- }
- if (getKeyField() != null && !keyFieldExists) {
- throw new IllegalArgumentException(String.format(
- "keyField '%s' does not exist in the schema. Please add it to the schema.", keyField));
- }
- if (getPartitionField() != null && !partitionFieldExists) {
- throw new IllegalArgumentException(String.format(
- "partitionField '%s' does not exist in the schema. Please add it to the schema.", partitionField));
- }
- if (getOffsetField() != null && !offsetFieldExists) {
- throw new IllegalArgumentException(String.format(
- "offsetField '%s' does not exist in the schema. Please add it to the schema.", offsetField));
- }
- return Schema.recordOf("kafka.message", messageFields);
- }
-
- /**
- * @return broker host to broker port mapping.
- */
- public Map getBrokerMap() {
- Map brokerMap = new HashMap<>();
- for (KeyValue hostAndPort : KeyValueListParser.DEFAULT.parse(kafkaBrokers)) {
- String host = hostAndPort.getKey();
- String portStr = hostAndPort.getValue();
- try {
- brokerMap.put(host, Integer.parseInt(portStr));
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException(String.format(
- "Invalid port '%s' for host '%s'.", portStr, host));
- }
- }
- if (brokerMap.isEmpty()) {
- throw new IllegalArgumentException("Must specify kafka brokers.");
- }
- return brokerMap;
- }
-
- public Map getInitialPartitionOffsets() {
- Map partitionOffsets = new HashMap<>();
-
- // if initial partition offsets are specified, overwrite the defaults.
- if (initialPartitionOffsets != null) {
- for (KeyValue partitionAndOffset : KeyValueListParser.DEFAULT.parse(initialPartitionOffsets)) {
- String partitionStr = partitionAndOffset.getKey();
- String offsetStr = partitionAndOffset.getValue();
- int partition;
- try {
- partition = Integer.parseInt(partitionStr);
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException(String.format(
- "Invalid partition '%s' in initialPartitionOffsets.", partitionStr));
- }
- long offset;
- try {
- offset = Long.parseLong(offsetStr);
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException(String.format(
- "Invalid offset '%s' in initialPartitionOffsets for partition %d.", partitionStr, partition));
- }
- partitionOffsets.put(new TopicAndPartition(topic, partition), offset);
- }
- }
-
- return partitionOffsets;
- }
-
- public void validate() {
- // brokers can be null since it is macro enabled.
- if (kafkaBrokers != null) {
- getBrokerMap();
- }
- getPartitions();
- getInitialPartitionOffsets();
-
- Schema messageSchema = getMessageSchema();
- // if format is empty, there must be just a single message field of type bytes or nullable types.
- if (Strings.isNullOrEmpty(format)) {
- List messageFields = messageSchema.getFields();
- if (messageFields.size() > 1) {
- String fieldNames = messageFields.stream().map(Schema.Field::getName).collect(Collectors.joining(","));
- throw new IllegalArgumentException(String.format(
- "Without a format, the schema must contain just a single message field of type bytes or nullable bytes. " +
- "Found %s message fields (%s).", messageFields.size(), fieldNames));
- }
-
- Schema.Field messageField = messageFields.get(0);
- Schema messageFieldSchema = messageField.getSchema();
- Schema.Type messageFieldType = messageFieldSchema.isNullable() ?
- messageFieldSchema.getNonNullable().getType() : messageFieldSchema.getType();
- if (messageFieldType != Schema.Type.BYTES) {
- throw new IllegalArgumentException(String.format(
- "Without a format, the message field must be of type bytes or nullable bytes, but field %s is of type %s.",
- messageField.getName(), messageField.getSchema()));
- }
- } else {
- // otherwise, if there is a format, make sure we can instantiate it.
- FormatSpecification formatSpec = new FormatSpecification(format, messageSchema, new HashMap<>());
-
- try {
- RecordFormats.createInitializedFormat(formatSpec);
- } catch (Exception e) {
- throw new IllegalArgumentException(String.format(
- "Unable to instantiate a message parser from format '%s' and message schema '%s': %s",
- format, messageSchema, e.getMessage()), e);
- }
- }
- }
- }
-
public KafkaBatchSource(KafkaBatchConfig config) {
this.config = config;
}
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
- if (config.getTableName() != null) {
- pipelineConfigurer.createDataset(config.getTableName(), KeyValueTable.class, DatasetProperties.EMPTY);
- }
config.validate();
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
}
@@ -376,14 +81,27 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
public void prepareRun(BatchSourceContext context) throws Exception {
Job job = JobUtils.createInstance();
Configuration conf = job.getConfiguration();
- String tableName = config.getTableName() != null ? config.getTableName() : config.getTopic();
- if (!context.datasetExists(tableName)) {
- context.createDataset(tableName, KeyValueTable.class.getName(), DatasetProperties.EMPTY);
+
+ KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets();
+
+ // If the offset directory is provided, try to load the file
+ if (config.getOffsetDir() != null) {
+ Path offsetDir = new Path(URI.create(config.getOffsetDir()));
+ fileContext = FileContext.getFileContext(offsetDir.toUri(), conf);
+ try {
+ fileContext.mkdir(offsetDir, new FsPermission("700"), true);
+ } catch (FileAlreadyExistsException e) {
+ // It's ok if the parent already exists
+ }
+ offsetsFile = KafkaBatchConfig.getOffsetFilePath(offsetDir, context.getNamespace(), context.getPipelineName());
+
+ // Load the offset from the offset file
+ partitionOffsets = KafkaPartitionOffsets.load(fileContext, offsetsFile);
}
- table = context.getDataset(tableName);
+
kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), config.getBrokerMap(),
- config.getPartitions(), config.getInitialPartitionOffsets(),
- table);
+ config.getPartitions(), config.getMaxNumberRecords(),
+ partitionOffsets);
LineageRecorder lineageRecorder = new LineageRecorder(context, config.referenceName);
Schema schema = config.getSchema();
if (schema != null) {
@@ -399,11 +117,14 @@ public void prepareRun(BatchSourceContext context) throws Exception {
@Override
public void onRunFinish(boolean succeeded, BatchSourceContext context) {
- if (succeeded) {
- for (KafkaRequest kafkaRequest : kafkaRequests) {
- TopicAndPartition topicAndPartition = new TopicAndPartition(kafkaRequest.getTopic(),
- kafkaRequest.getPartition());
- table.write(topicAndPartition.toString(), Bytes.toBytes(kafkaRequest.getLastOffset()));
+ if (succeeded && kafkaRequests != null && fileContext != null && offsetsFile != null) {
+ KafkaPartitionOffsets partitionOffsets = new KafkaPartitionOffsets(
+ kafkaRequests.stream().collect(Collectors.toMap(KafkaRequest::getPartition, KafkaRequest::getEndOffset)));
+
+ try {
+ KafkaPartitionOffsets.save(fileContext, offsetsFile, partitionOffsets);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
}
diff --git a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaInputFormat.java b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaInputFormat.java
index b782ded..2ab38b2 100644
--- a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaInputFormat.java
+++ b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaInputFormat.java
@@ -1,5 +1,5 @@
/*
- * Copyright © 2017 Cask Data, Inc.
+ * Copyright © 2017-2018 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -16,14 +16,13 @@
package co.cask.hydrator.plugin.batch.source;
-import co.cask.cdap.api.common.Bytes;
-import co.cask.cdap.api.dataset.lib.KeyValueTable;
-import com.google.common.collect.ImmutableList;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import kafka.api.PartitionOffsetRequestInfo;
+import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
@@ -40,14 +39,14 @@
import java.io.IOException;
import java.lang.reflect.Type;
-import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
/**
@@ -57,17 +56,17 @@ public class KafkaInputFormat extends InputFormat {
private static final Logger LOG = LoggerFactory.getLogger(KafkaInputFormat.class);
private static final String KAFKA_REQUEST = "kafka.request";
+ static final String KAFKA_BROKERS = "kafka.brokers";
+
private static final Type LIST_TYPE = new TypeToken>() { }.getType();
@Override
- public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new KafkaRecordReader();
+ public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) {
+ return new KafkaRecordReader(Kafka08Reader::new);
}
-
@Override
- public List getSplits(JobContext context) throws IOException, InterruptedException {
+ public List getSplits(JobContext context) {
Gson gson = new Gson();
List finalRequests = gson.fromJson(context.getConfiguration().get(KAFKA_REQUEST), LIST_TYPE);
List kafkaSplits = new ArrayList<>();
@@ -80,81 +79,77 @@ public List getSplits(JobContext context) throws IOException, Interr
return kafkaSplits;
}
- public static List saveKafkaRequests(Configuration conf, String topic, Map brokers,
- Set partitions,
- Map initOffsets,
- KeyValueTable table) throws Exception {
- ArrayList finalRequests;
- HashMap> offsetRequestInfo = new HashMap<>();
-
- // Get Metadata for all topics
- List topicMetadataList = getKafkaMetadata(brokers, topic);
-
- for (TopicMetadata topicMetadata : topicMetadataList) {
- for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) {
- LeaderInfo leader =
- new LeaderInfo(new URI("tcp://" + partitionMetadata.leader().connectionString()),
- partitionMetadata.leader().id());
- if (partitions.isEmpty() || partitions.contains(partitionMetadata.partitionId())) {
- if (offsetRequestInfo.containsKey(leader)) {
- ArrayList topicAndPartitions = offsetRequestInfo.get(leader);
- topicAndPartitions.add(new TopicAndPartition(topicMetadata.topic(), partitionMetadata.partitionId()));
- offsetRequestInfo.put(leader, topicAndPartitions);
- } else {
- ArrayList topicAndPartitions = new ArrayList<>();
- topicAndPartitions.add(new TopicAndPartition(topicMetadata.topic(), partitionMetadata.partitionId()));
- offsetRequestInfo.put(leader, topicAndPartitions);
- }
- }
- }
- }
-
- // Get the latest offsets and generate the KafkaRequests
- finalRequests = fetchLatestOffsetAndCreateKafkaRequests(offsetRequestInfo, initOffsets, table);
-
- Collections.sort(finalRequests, new Comparator() {
- @Override
- public int compare(KafkaRequest r1, KafkaRequest r2) {
- return r1.getTopic().compareTo(r2.getTopic());
- }
- });
-
- Map offsetKeys = new HashMap<>();
- for (KafkaRequest request : finalRequests) {
- KafkaKey key = offsetKeys.get(request);
-
- if (key != null) {
- request.setOffset(key.getOffset());
- request.setAvgMsgSize(key.getMessageSize());
- }
-
- if (request.getEarliestOffset() > request.getOffset() || request.getOffset() > request.getLastOffset()) {
-
- boolean offsetUnset = request.getOffset() == KafkaRequest.DEFAULT_OFFSET;
- // When the offset is unset, it means it's a new topic/partition, we also need to consume the earliest offset
- if (offsetUnset) {
- request.setOffset(request.getEarliestOffset());
- offsetKeys.put(
- request,
- new KafkaKey(request.getTopic(), request.getLeaderId(), request.getPartition(), 0, request.getOffset()));
- }
- }
- }
- conf.set(KAFKA_REQUEST, new Gson().toJson(finalRequests));
- return finalRequests;
+ /**
+ * Generates and serializes a list of requests for reading from Kafka to the given {@link Configuration}.
+ *
+ * @param conf the hadoop configuration to update
+ * @param topic the Kafka topic
+ * @param brokers a {@link Map} from broker host to port
+ * @param partitions the set of partitions to consume from.
+ * If it is empty, it means reading from all available partitions under the given topic.
+ * @param maxNumberRecords maximum number of records to read in one batch per partition
+ * @param partitionOffsets the {@link KafkaPartitionOffsets} containing the starting offset for each partition
+ * @return a {@link List} of {@link KafkaRequest} that get serialized in the hadoop configuration
+ * @throws IOException if failed to setup the {@link KafkaRequest}
+ */
+ static List saveKafkaRequests(Configuration conf, String topic,
+ Map brokers, Set partitions,
+ long maxNumberRecords,
+ KafkaPartitionOffsets partitionOffsets) throws Exception {
+ // Find the leader for each requested partition
+ Map> brokerPartitions = getBrokerPartitions(brokers, topic, partitions);
+
+ // Create and save the KafkaRequest
+ List requests = createKafkaRequests(topic, brokerPartitions, maxNumberRecords, partitionOffsets);
+
+ conf.set(KAFKA_REQUEST, new Gson().toJson(requests));
+ return requests;
}
- private static List getKafkaMetadata(Map brokers, String topic) {
- List topicMetadataList = new ArrayList<>();
+ /**
+ * Returns a {@link Map} from {@link Broker} to the set of partitions that the given broker is a leader of.
+ */
+ private static Map> getBrokerPartitions(Map brokers,
+ String topic, Set partitions) {
+ TopicMetadataRequest request = new TopicMetadataRequest(Collections.singletonList(topic));
+ Map> result = new HashMap<>();
+ Set partitionsRemained = new HashSet<>(partitions);
+ // Goes through the list of broker to fetch topic metadata. It uses the first one that returns
for (Map.Entry entry : brokers.entrySet()) {
SimpleConsumer consumer = createSimpleConsumer(entry.getKey(), entry.getValue());
LOG.debug("Fetching metadata from broker {}: {} with client id {} for topic {}", entry.getKey(),
entry.getValue(), consumer.clientId(), topic);
try {
- topicMetadataList =
- consumer.send(new TopicMetadataRequest(ImmutableList.of(topic))).topicsMetadata();
- break;
+ boolean hasError = false;
+ for (TopicMetadata metadata : consumer.send(request).topicsMetadata()) {
+ // This shouldn't happen. In case it does, just skip the metadata not for the right topic
+ if (!topic.equals(metadata.topic())) {
+ continue;
+ }
+
+ // Associate partition to leader broker
+ for (PartitionMetadata partitionMetadata : metadata.partitionsMetadata()) {
+ int partitionId = partitionMetadata.partitionId();
+
+ // Skip error
+ if (partitionMetadata.errorCode() != ErrorMapping.NoError()) {
+ hasError = true;
+ continue;
+ }
+ // Add the partition if either the user wants all partitions or user explicitly request a set.
+ // If the user wants all partitions, the partitions set is empty
+ if (partitions.isEmpty() || partitionsRemained.remove(partitionId)) {
+ result.computeIfAbsent(partitionMetadata.leader(), k -> new HashSet<>()).add(partitionId);
+ }
+ }
+ }
+
+ // If there is no error and all partitions are needed, then we are done
+ // Alternatively, if only a subset of partitions are needed and all of them are fetch, then we are also done
+ if ((!hasError && partitions.isEmpty()) || (!partitions.isEmpty() && partitionsRemained.isEmpty())) {
+ return result;
+ }
} catch (Exception e) {
// No-op just query next broker
} finally {
@@ -162,13 +157,9 @@ private static List getKafkaMetadata(Map brokers
}
}
- if (topicMetadataList.isEmpty()) {
- throw new IllegalArgumentException(
- String.format("Failed to get any information for topic: %s from the given brokers: %s", topic,
- brokers.toString()));
- }
-
- return topicMetadataList;
+ throw new IllegalArgumentException(
+ String.format("Failed to get broker information for partitions %s in topic %s from the given brokers: %s",
+ partitionsRemained, topic, brokers));
}
private static SimpleConsumer createSimpleConsumer(String host, int port) {
@@ -176,84 +167,95 @@ private static SimpleConsumer createSimpleConsumer(String host, int port) {
}
/**
- * Gets the latest offsets and create the requests as needed
+ * Creates a list of {@link KafkaRequest} by setting up the start and end offsets for each request. It may
+ * query Kafka using the given set of brokers for the earliest and latest offsets in the given set of partitions.
*/
- private static ArrayList fetchLatestOffsetAndCreateKafkaRequests(
- Map> offsetRequestInfo,
- Map offsets,
- KeyValueTable table) {
- ArrayList finalRequests = new ArrayList<>();
- for (LeaderInfo leader : offsetRequestInfo.keySet()) {
- Long latestTime = kafka.api.OffsetRequest.LatestTime();
- Long earliestTime = kafka.api.OffsetRequest.EarliestTime();
-
- SimpleConsumer consumer = createSimpleConsumer(leader.getUri().getHost(), leader.getUri().getPort());
- // Latest Offset
- PartitionOffsetRequestInfo partitionLatestOffsetRequestInfo = new PartitionOffsetRequestInfo(latestTime, 1);
- // Earliest Offset
- PartitionOffsetRequestInfo partitionEarliestOffsetRequestInfo = new PartitionOffsetRequestInfo(earliestTime, 1);
- Map latestOffsetInfo = new HashMap<>();
- Map earliestOffsetInfo = new HashMap<>();
- ArrayList topicAndPartitions = offsetRequestInfo.get(leader);
- for (TopicAndPartition topicAndPartition : topicAndPartitions) {
- latestOffsetInfo.put(topicAndPartition, partitionLatestOffsetRequestInfo);
- earliestOffsetInfo.put(topicAndPartition, partitionEarliestOffsetRequestInfo);
- }
+ private static List createKafkaRequests(String topic, Map> brokerPartitions,
+ long maxNumberRecords,
+ KafkaPartitionOffsets partitionOffsets) throws IOException {
+ List result = new ArrayList<>();
+ String brokerString = brokerPartitions.keySet().stream()
+ .map(b -> b.host() + ":" + b.port()).collect(Collectors.joining(","));
+
+ for (Map.Entry> entry : brokerPartitions.entrySet()) {
+ Broker broker = entry.getKey();
+ SimpleConsumer consumer = createSimpleConsumer(broker.host(), broker.port());
+ try {
+ // Get the latest offsets for partitions in this broker
+ Map latestOffsets = getOffsetsBefore(consumer, topic, entry.getValue(),
+ kafka.api.OffsetRequest.LatestTime());
+
+ // If there is no known partition offset for a given partition, also need to query for the earliest offset
+ long earliestTime = kafka.api.OffsetRequest.EarliestTime();
+ Set earliestTimePartitions = entry.getValue().stream()
+ .filter(p -> partitionOffsets.getPartitionOffset(p, earliestTime) == earliestTime)
+ .collect(Collectors.toSet());
+
+ Map earliestOffsets = getOffsetsBefore(consumer, topic, earliestTimePartitions, earliestTime);
+
+ // Add KafkaRequest objects for the partitions in this broker
+ for (int partition : entry.getValue()) {
+ long startOffset = partitionOffsets.getPartitionOffset(partition,
+ earliestOffsets.getOrDefault(partition, -1L));
+ long endOffset = latestOffsets.getOrDefault(partition, -1L);
+
+ // StartOffset shouldn't be negative, as it should either in the partitionOffsets or in the earlierOffsets
+ if (startOffset < 0) {
+ throw new IOException("Failed to find start offset for topic " + topic + " and partition " + partition);
+ }
+ // Also, end offset shouldn't be negative.
+ if (endOffset < 0) {
+ throw new IOException("Failed to find end offset for topic " + topic + " and partition " + partition);
+ }
- OffsetResponse latestOffsetResponse = getLatestOffsetResponse(consumer, latestOffsetInfo);
- OffsetResponse earliestOffsetResponse = null;
- if (latestOffsetResponse != null) {
- earliestOffsetResponse = getLatestOffsetResponse(consumer, earliestOffsetInfo);
- }
- consumer.close();
- if (earliestOffsetResponse == null) {
- continue;
- }
+ // Limit the number of records fetched
+ if (maxNumberRecords > 0) {
+ endOffset = Math.min(endOffset, startOffset + maxNumberRecords);
+ }
- for (TopicAndPartition topicAndPartition : topicAndPartitions) {
- long latestOffset = latestOffsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0];
- Long start;
- byte[] tableStart = table.read(topicAndPartition.toString());
- if (tableStart != null) {
- start = Bytes.toLong(tableStart);
- } else {
- start = offsets.containsKey(topicAndPartition) ? offsets.get(topicAndPartition) - 1 : null;
- }
+ LOG.debug("Getting kafka messages from topic {}, partition {}, with start offset {}, end offset {}",
+ topic, partition, startOffset, endOffset);
- long earliestOffset = start == null || start == -2
- ? earliestOffsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition())[0] : start;
- if (earliestOffset == -1) {
- earliestOffset = latestOffset;
+ result.add(new KafkaRequest(topic, partition, Collections.singletonMap(KAFKA_BROKERS, brokerString),
+ startOffset, endOffset));
}
- LOG.debug("Getting kafka messages from topic {}, partition {}, with earlistOffset {}, latest offset {}",
- topicAndPartition.topic(), topicAndPartition.partition(), earliestOffset, latestOffset);
- KafkaRequest kafkaRequest =
- new KafkaRequest(topicAndPartition.topic(), Integer.toString(leader.getLeaderId()),
- topicAndPartition.partition(), leader.getUri());
- kafkaRequest.setLatestOffset(latestOffset);
- kafkaRequest.setEarliestOffset(earliestOffset);
- finalRequests.add(kafkaRequest);
+
+ } finally {
+ consumer.close();
}
}
- return finalRequests;
- }
- private static OffsetResponse getLatestOffsetResponse(SimpleConsumer consumer,
- Map offsetInfo) {
+ return result;
+ }
- OffsetResponse offsetResponse =
- consumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(offsetInfo, kafka.api.OffsetRequest.CurrentVersion(),
- "client"));
- if (offsetResponse.hasError()) {
- for (TopicAndPartition key : offsetInfo.keySet()) {
- short errorCode = offsetResponse.errorCode(key.topic(), key.partition());
+ /**
+ * Queries Kafka for the offsets before the given time for the given set of partitions.
+ */
+ private static Map getOffsetsBefore(SimpleConsumer consumer, String topic,
+ Set partitions, long time) throws IOException {
+ if (partitions.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ Map request =
+ partitions.stream().collect(Collectors.toMap(p -> new TopicAndPartition(topic, p),
+ p -> new PartitionOffsetRequestInfo(time, 1)));
+ OffsetResponse response = consumer.getOffsetsBefore(new OffsetRequest(request,
+ kafka.api.OffsetRequest.CurrentVersion(),
+ "client"));
+ if (response.hasError()) {
+ for (int partition : partitions) {
+ short errorCode = response.errorCode(topic, partition);
if (errorCode != ErrorMapping.NoError()) {
throw new RuntimeException(
- String.format("Error happens when getting the offset for topic %s and partition %d with error code %d",
- key.topic(), key.partition(), errorCode));
+ String.format("Failed to get the offset for topic %s and partition %d with error code %d",
+ topic, partition, errorCode));
}
}
+
+ // This shouldn't happen
+ throw new IOException("Failed to get offsets for topic " + topic + " and partitions " + partitions);
}
- return offsetResponse;
+
+ return partitions.stream().collect(Collectors.toMap(p -> p, p -> response.offsets(topic, p)[0]));
}
}
diff --git a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaKey.java b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaKey.java
deleted file mode 100644
index 11ca365..0000000
--- a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaKey.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Copyright © 2017 Cask Data, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package co.cask.hydrator.plugin.batch.source;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-
-/**
- * The key for the mapreduce job to pull kafka. Contains offsets and the
- * checksum.
- */
-public class KafkaKey implements WritableComparable {
-
- private String leaderId = "";
- private int partition = 0;
- private long beginOffset = 0;
- private long offset = 0;
- private long checksum = 0;
- private String topic = "";
- private MapWritable partitionMap = new MapWritable();
-
- /**
- * dummy empty constructor
- */
- public KafkaKey() {
- this("dummy", "0", 0, 0, 0, 0);
- }
-
- public KafkaKey(String topic, String leaderId, int partition, long beginOffset, long offset) {
- this(topic, leaderId, partition, beginOffset, offset, 0);
- }
-
- public KafkaKey(String topic, String leaderId, int partition, long beginOffset, long offset, long checksum) {
- this.set(topic, leaderId, partition, beginOffset, offset, checksum);
- }
-
- public void set(String topic, String leaderId, int partition, long beginOffset, long offset, long checksum) {
- this.leaderId = leaderId;
- this.partition = partition;
- this.beginOffset = beginOffset;
- this.offset = offset;
- this.checksum = checksum;
- this.topic = topic;
- }
-
- public void clear() {
- leaderId = "";
- partition = 0;
- beginOffset = 0;
- offset = 0;
- checksum = 0;
- topic = "";
- partitionMap = new MapWritable();
- }
-
- public String getTopic() {
- return topic;
- }
-
- public int getPartition() {
- return partition;
- }
-
- public long getOffset() {
- return offset;
- }
-
- public long getMessageSize() {
- Text key = new Text("message.size");
- if (this.partitionMap.containsKey(key)) {
- return ((LongWritable) this.partitionMap.get(key)).get();
- } else {
- return 1024; //default estimated size
- }
- }
-
- public void setMessageSize(long messageSize) {
- Text key = new Text("message.size");
- put(key, new LongWritable(messageSize));
- }
-
- public void put(Writable key, Writable value) {
- this.partitionMap.put(key, value);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- this.leaderId = in.readUTF();
- this.partition = in.readInt();
- this.beginOffset = in.readLong();
- this.offset = in.readLong();
- this.checksum = in.readLong();
- this.topic = in.readUTF();
- this.partitionMap = new MapWritable();
- this.partitionMap.readFields(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- UTF8.writeString(out, this.leaderId);
- out.writeInt(this.partition);
- out.writeLong(this.beginOffset);
- out.writeLong(this.offset);
- out.writeLong(this.checksum);
- out.writeUTF(this.topic);
- this.partitionMap.write(out);
- }
-
- @Override
- public int compareTo(KafkaKey o) {
- int comp = Integer.compare(partition, o.partition);
- if (comp != 0) {
- return comp;
- }
- comp = Long.compare(offset, o.offset);
- if (comp != 0) {
- return comp;
- }
- return Long.compare(checksum, o.checksum);
- }
-}
diff --git a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaMessage.java b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaMessage.java
deleted file mode 100644
index 3ea4d3f..0000000
--- a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaMessage.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright © 2017 Cask Data, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package co.cask.hydrator.plugin.batch.source;
-
-import java.nio.ByteBuffer;
-
-/**
- * Kafka Message.
- */
-public class KafkaMessage {
-
- private final ByteBuffer payload;
- private final ByteBuffer key;
-
- public KafkaMessage(ByteBuffer payload, ByteBuffer key) {
- this.payload = payload;
- this.key = key;
- }
-
- public ByteBuffer getPayload() {
- return payload;
- }
-
- public ByteBuffer getKey() {
- return key;
- }
-}
diff --git a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaReader.java b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaReader.java
deleted file mode 100644
index ceccee0..0000000
--- a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaReader.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Copyright © 2017 Cask Data, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package co.cask.hydrator.plugin.batch.source;
-
-import kafka.api.PartitionFetchInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchRequest;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.MessageAndOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-
-/**
- * A class which reads from the fetch results from kafka.
- */
-public class KafkaReader {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaReader.class);
-
- // index of context
- private static final int fetchBufferSize = 1024 * 1024;
- private final KafkaRequest kafkaRequest;
- private final SimpleConsumer simpleConsumer;
-
- private long currentOffset;
- private long lastOffset;
- private Iterator messageIter;
-
-
- /**
- * Construct using the json representation of the kafka request
- */
- public KafkaReader(KafkaRequest request) {
- kafkaRequest = request;
- currentOffset = request.getOffset();
- lastOffset = request.getLastOffset();
-
- // read data from queue
- URI uri = kafkaRequest.getURI();
- simpleConsumer = new SimpleConsumer(uri.getHost(), uri.getPort(), 20 * 1000, fetchBufferSize, "client");
- fetch();
- }
-
- public boolean hasNext() throws IOException {
- if (currentOffset >= lastOffset) {
- return false;
- }
- if (messageIter != null && messageIter.hasNext()) {
- return true;
- } else {
- return fetch();
- }
- }
-
- /**
- * Fetches the next Kafka message and stuffs the results into the key and value.
- */
- public KafkaMessage getNext(KafkaKey kafkaKey) throws IOException {
- if (hasNext()) {
-
- MessageAndOffset msgAndOffset = messageIter.next();
- Message message = msgAndOffset.message();
-
- ByteBuffer payload = message.payload();
- ByteBuffer key = message.key();
-
- if (payload == null) {
- LOG.warn("Received message with null message.payload with topic {} and partition {}",
- kafkaKey.getTopic(), kafkaKey.getPartition());
-
- }
-
- kafkaKey.clear();
- kafkaKey.set(kafkaRequest.getTopic(), kafkaRequest.getLeaderId(), kafkaRequest.getPartition(), currentOffset,
- msgAndOffset.offset() + 1, message.checksum());
- kafkaKey.setMessageSize(msgAndOffset.message().size());
- currentOffset = msgAndOffset.offset() + 1; // increase offset
- return new KafkaMessage(payload, key);
- } else {
- return null;
- }
- }
-
- /**
- * Creates a fetch request.
- */
- private boolean fetch() {
- if (currentOffset >= lastOffset) {
- return false;
- }
- TopicAndPartition topicAndPartition = new TopicAndPartition(kafkaRequest.getTopic(), kafkaRequest.getPartition());
- PartitionFetchInfo partitionFetchInfo = new PartitionFetchInfo(currentOffset, fetchBufferSize);
-
- Map fetchInfo = new HashMap<>();
- fetchInfo.put(topicAndPartition, partitionFetchInfo);
-
- FetchRequest fetchRequest = new FetchRequest(-1, "client", 1000, 1024, fetchInfo);
-
- FetchResponse fetchResponse;
- try {
- fetchResponse = simpleConsumer.fetch(fetchRequest);
- if (fetchResponse.hasError()) {
- String message =
- "Error Code generated : " + fetchResponse.errorCode(kafkaRequest.getTopic(), kafkaRequest.getPartition());
- throw new RuntimeException(message);
- }
- return processFetchResponse(fetchResponse);
- } catch (Exception e) {
- return false;
- }
- }
-
- private boolean processFetchResponse(FetchResponse fetchResponse) {
- ByteBufferMessageSet messageBuffer = fetchResponse.messageSet(kafkaRequest.getTopic(), kafkaRequest.getPartition());
- messageIter = messageBuffer.iterator();
- if (!messageIter.hasNext()) {
- messageIter = null;
- return false;
- }
- return true;
- }
-
- /**
- * Closes this context
- */
- public void close() throws IOException {
- if (simpleConsumer != null) {
- simpleConsumer.close();
- }
- }
-}
diff --git a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRecordReader.java b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRecordReader.java
deleted file mode 100644
index 851fc55..0000000
--- a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRecordReader.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright © 2017 Cask Data, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package co.cask.hydrator.plugin.batch.source;
-
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-/**
- * Kafka Record Reader to be used by {@link KafkaInputFormat}.
- */
-public class KafkaRecordReader extends RecordReader {
-
- private KafkaSplit split;
- private long totalBytes;
- private KafkaReader reader;
- private long readBytes = 0;
- private final KafkaKey key = new KafkaKey();
- private KafkaMessage value;
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- this.split = (KafkaSplit) split;
- this.totalBytes = this.split.getLength();
- }
-
- @Override
- public synchronized void close() throws IOException {
- if (reader != null) {
- reader.close();
- }
- }
-
- @Override
- public float getProgress() throws IOException {
- if (getPos() == 0) {
- return 0f;
- }
-
- if (getPos() >= totalBytes) {
- return 1f;
- }
- return (float) ((double) getPos() / totalBytes);
- }
-
- private long getPos() throws IOException {
- return readBytes;
- }
-
- @Override
- public KafkaKey getCurrentKey() throws IOException, InterruptedException {
- return key;
- }
-
- @Override
- public KafkaMessage getCurrentValue() throws IOException, InterruptedException {
- return value;
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (reader == null || !reader.hasNext()) {
- KafkaRequest request = split.popRequest();
- if (request == null) {
- return false;
- }
-
- key.set(request.getTopic(), request.getLeaderId(), request.getPartition(), request.getOffset(),
- request.getOffset(), 0);
- value = null;
-
- if (reader != null) {
- closeReader();
- }
- reader = new KafkaReader(request);
- }
- KafkaMessage message;
- if ((message = reader.getNext(key)) != null) {
- readBytes += key.getMessageSize();
- value = message;
- return true;
- }
- return false;
- }
-
-
- private void closeReader() throws IOException {
- if (reader != null) {
- try {
- reader.close();
- } catch (Exception e) {
- // not much to do here but skip the task
- } finally {
- reader = null;
- }
- }
- }
-}
diff --git a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRequest.java b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRequest.java
deleted file mode 100644
index a1ef1b5..0000000
--- a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRequest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Copyright © 2017 Cask Data, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package co.cask.hydrator.plugin.batch.source;
-
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * A class that represents the kafka pull request.
- *
- * The class is a container for topic, leaderId, partition, uri and offset. It is
- * used in reading and writing the sequence files used for the extraction job.
- *
- */
-public class KafkaRequest {
-
- public static final long DEFAULT_OFFSET = 0;
-
- private String topic = "";
- private String leaderId = "";
- private int partition = 0;
-
- private URI uri = null;
- private long offset = DEFAULT_OFFSET;
- private long latestOffset = -1;
- private long earliestOffset = -2;
- private long avgMsgSize = 1024;
-
- public KafkaRequest(String topic, String leaderId, int partition, URI brokerUri) {
- this(topic, leaderId, partition, brokerUri, DEFAULT_OFFSET, -1);
- }
-
- public KafkaRequest(String topic, String leaderId, int partition, URI brokerUri, long offset, long latestOffset) {
- this.topic = topic;
- this.leaderId = leaderId;
- this.uri = brokerUri;
- this.partition = partition;
- this.latestOffset = latestOffset;
- setOffset(offset);
- }
-
- public void setLatestOffset(long latestOffset) {
- this.latestOffset = latestOffset;
- }
-
- public void setEarliestOffset(long earliestOffset) {
- this.earliestOffset = earliestOffset;
- }
-
- public void setAvgMsgSize(long size) {
- this.avgMsgSize = size;
- }
-
- public void setOffset(long offset) {
- this.offset = offset;
- }
-
- public String getLeaderId() {
- return this.leaderId;
- }
-
- public String getTopic() {
- return this.topic;
- }
-
- public URI getURI() {
- return this.uri;
- }
-
- public int getPartition() {
- return this.partition;
- }
-
- public long getOffset() {
- return this.offset;
- }
-
- public long getEarliestOffset() {
- if (this.earliestOffset == -2 && uri != null) {
- SimpleConsumer consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), 20000, 1024 * 1024, "client");
- Map offsetInfo = new HashMap<>();
- offsetInfo.put(new TopicAndPartition(topic, partition),
- new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1));
- OffsetResponse response =
- consumer.getOffsetsBefore(new OffsetRequest(offsetInfo, kafka.api.OffsetRequest.CurrentVersion(), "client"));
- long[] endOffset = response.offsets(topic, partition);
- if (endOffset.length == 0) {
- throw new RuntimeException("Could not find earliest offset for topic: " + topic +
- " and partition: " + partition);
- }
- consumer.close();
- this.earliestOffset = endOffset[0];
- return endOffset[0];
- } else {
- return this.earliestOffset;
- }
- }
-
- public long getLastOffset() {
- if (this.latestOffset == -1 && uri != null) {
- return getLastOffset(kafka.api.OffsetRequest.LatestTime());
- } else {
- return this.latestOffset;
- }
- }
-
- private long getLastOffset(long time) {
- SimpleConsumer consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), 60000, 1024 * 1024, "client");
- Map offsetInfo = new HashMap<>();
- offsetInfo.put(new TopicAndPartition(topic, partition), new PartitionOffsetRequestInfo(time, 1));
- OffsetResponse response =
- consumer.getOffsetsBefore(new OffsetRequest(offsetInfo, kafka.api.OffsetRequest.CurrentVersion(), "client"));
- long[] endOffset = response.offsets(topic, partition);
- consumer.close();
- if (endOffset.length == 0) {
- throw new RuntimeException("Could not find latest offset for topic: " + topic +
- " and partition: " + partition);
- }
- this.latestOffset = endOffset[0];
- return endOffset[0];
- }
-
- public long estimateDataSize() {
- long endOffset = getLastOffset();
- return (endOffset - offset) * avgMsgSize;
- }
-}
diff --git a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaSplit.java b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaSplit.java
deleted file mode 100644
index d1b9435..0000000
--- a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaSplit.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright © 2017 Cask Data, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package co.cask.hydrator.plugin.batch.source;
-
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import javax.annotation.Nullable;
-
-/**
- * Kafka split
- */
-public class KafkaSplit extends InputSplit implements Writable {
- private KafkaRequest request;
- private long length = 0;
-
- public KafkaSplit() {
- }
-
- public KafkaSplit(KafkaRequest request) {
- this.request = request;
- length = request.estimateDataSize();
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- String topic = in.readUTF();
- String leaderId = in.readUTF();
- String str = in.readUTF();
- URI uri = null;
- if (!str.isEmpty()) {
- try {
- uri = new URI(str);
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
- }
- int partition = in.readInt();
- long offset = in.readLong();
- long latestOffset = in.readLong();
- request = new KafkaRequest(topic, leaderId, partition, uri, offset, latestOffset);
- length = request.estimateDataSize();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeUTF(request.getTopic());
- out.writeUTF(request.getLeaderId());
- if (request.getURI() != null) {
- out.writeUTF(request.getURI().toString());
- } else {
- out.writeUTF("");
- }
- out.writeInt(request.getPartition());
- out.writeLong(request.getOffset());
- out.writeLong(request.getLastOffset());
- }
-
- @Override
- public long getLength() throws IOException {
- return length;
- }
-
- @Override
- public String[] getLocations() throws IOException {
- return new String[] {};
- }
-
- @Nullable
- public KafkaRequest popRequest() {
- KafkaRequest result = request;
- request = null;
- return result;
- }
-}
diff --git a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/LeaderInfo.java b/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/LeaderInfo.java
deleted file mode 100644
index e39c161..0000000
--- a/kafka-plugins-0.8/src/main/java/co/cask/hydrator/plugin/batch/source/LeaderInfo.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright © 2017 Cask Data, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package co.cask.hydrator.plugin.batch.source;
-
-import java.net.URI;
-import java.util.Objects;
-
-
-/**
- * Model class to store the leaderInformation
- */
-
-public class LeaderInfo {
-
- private final URI uri;
- private final int leaderId;
-
- public LeaderInfo(URI uri, int leaderId) {
- this.uri = uri;
- this.leaderId = leaderId;
- }
-
- public int getLeaderId() {
- return leaderId;
- }
-
- public URI getUri() {
- return uri;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- LeaderInfo that = (LeaderInfo) o;
- return leaderId == that.leaderId && Objects.equals(uri, that.uri);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(uri, leaderId);
- }
-}
diff --git a/kafka-plugins-0.8/src/test/java/co/cask/hydrator/KafkaBatchSourceTest.java b/kafka-plugins-0.8/src/test/java/co/cask/hydrator/KafkaBatchSourceTest.java
index a2042a3..455793a 100644
--- a/kafka-plugins-0.8/src/test/java/co/cask/hydrator/KafkaBatchSourceTest.java
+++ b/kafka-plugins-0.8/src/test/java/co/cask/hydrator/KafkaBatchSourceTest.java
@@ -16,230 +16,31 @@
package co.cask.hydrator;
-import co.cask.cdap.api.artifact.ArtifactSummary;
-import co.cask.cdap.api.common.Bytes;
-import co.cask.cdap.api.data.format.StructuredRecord;
-import co.cask.cdap.api.data.schema.Schema;
-import co.cask.cdap.api.dataset.lib.KeyValueTable;
-import co.cask.cdap.api.dataset.table.Table;
-import co.cask.cdap.datapipeline.DataPipelineApp;
-import co.cask.cdap.datapipeline.SmartWorkflow;
-import co.cask.cdap.etl.api.batch.BatchSource;
-import co.cask.cdap.etl.mock.batch.MockSink;
-import co.cask.cdap.etl.mock.test.HydratorTestBase;
-import co.cask.cdap.etl.proto.v2.ETLBatchConfig;
-import co.cask.cdap.etl.proto.v2.ETLPlugin;
-import co.cask.cdap.etl.proto.v2.ETLStage;
-import co.cask.cdap.proto.ProgramRunStatus;
-import co.cask.cdap.proto.artifact.AppRequest;
-import co.cask.cdap.proto.id.ApplicationId;
-import co.cask.cdap.proto.id.ArtifactId;
-import co.cask.cdap.proto.id.NamespaceId;
-import co.cask.cdap.test.ApplicationManager;
-import co.cask.cdap.test.DataSetManager;
-import co.cask.cdap.test.TestConfiguration;
-import co.cask.cdap.test.WorkflowManager;
import co.cask.hydrator.plugin.batch.source.KafkaBatchSource;
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Uninterruptibles;
-import kafka.common.TopicAndPartition;
+import com.google.common.util.concurrent.Service;
import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
-import org.apache.twill.internal.kafka.client.ZKKafkaClientService;
-import org.apache.twill.internal.utils.Networks;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.kafka.client.Compression;
-import org.apache.twill.kafka.client.KafkaClientService;
-import org.apache.twill.kafka.client.KafkaPublisher;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import java.io.File;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
+import java.util.Collections;
+import java.util.List;
import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
/**
* Unit tests for our plugins.
*/
-public class KafkaBatchSourceTest extends HydratorTestBase {
- private static final ArtifactSummary APP_ARTIFACT = new ArtifactSummary("data-pipeline", "1.0.0");
- @ClassRule
- public static final TestConfiguration CONFIG = new TestConfiguration("explore.enabled", false);
+public class KafkaBatchSourceTest extends AbstractKafkaBatchSourceTest {
- private static ZKClientService zkClient;
- private static KafkaClientService kafkaClient;
- private static InMemoryZKServer zkServer;
- private static EmbeddedKafkaServer kafkaServer;
- private static int kafkaPort;
-
- @BeforeClass
- public static void setupTestClass() throws Exception {
- ArtifactId parentArtifact = NamespaceId.DEFAULT.artifact(APP_ARTIFACT.getName(), APP_ARTIFACT.getVersion());
-
- // add the data-pipeline artifact and mock plugins
- setupBatchArtifacts(parentArtifact, DataPipelineApp.class);
-
- // add our plugins artifact with the data-pipeline artifact as its parent.
- // this will make our plugins available to data-pipeline.
- addPluginArtifact(NamespaceId.DEFAULT.artifact("example-plugins", "1.0.0"),
- parentArtifact,
- KafkaBatchSource.class);
-
- zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
- zkServer.startAndWait();
-
- kafkaPort = Networks.getRandomPort();
- kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zkServer.getConnectionStr(),
- kafkaPort, TMP_FOLDER.newFolder()));
- kafkaServer.startAndWait();
-
- zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
- zkClient.startAndWait();
-
- kafkaClient = new ZKKafkaClientService(zkClient);
- kafkaClient.startAndWait();
- }
-
- @AfterClass
- public static void cleanup() {
- kafkaClient.stopAndWait();
- kafkaServer.stopAndWait();
- zkClient.stopAndWait();
- zkServer.stopAndWait();
- }
-
- @Test
- public void testKafkaSource() throws Exception {
- Schema schema = Schema.recordOf(
- "user",
- Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
- Schema.Field.of("first", Schema.of(Schema.Type.STRING)),
- Schema.Field.of("last", Schema.of(Schema.Type.STRING)));
-
- // create the pipeline config
- String inputName = "sourceTestInput";
- String outputName = "sourceTestOutput";
-
- Map sourceProperties = new HashMap<>();
- sourceProperties.put("kafkaBrokers", "localhost:" + kafkaPort);
- sourceProperties.put("referenceName", "kafkaTest");
- sourceProperties.put("tableName", "testKafkaSource");
- sourceProperties.put("topic", "users");
- sourceProperties.put("schema", schema.toString());
- sourceProperties.put("format", "csv");
- ETLStage source =
- new ETLStage("source", new ETLPlugin(KafkaBatchSource.NAME, BatchSource.PLUGIN_TYPE, sourceProperties, null));
- ETLStage sink = new ETLStage("sink", MockSink.getPlugin(outputName));
-
- ETLBatchConfig pipelineConfig = ETLBatchConfig.builder("* * * * *")
- .addStage(source)
- .addStage(sink)
- .addConnection(source.getName(), sink.getName())
- .build();
-
- // create the pipeline
- ApplicationId pipelineId = NamespaceId.DEFAULT.app("testKafkaSource");
- ApplicationManager appManager = deployApplication(pipelineId, new AppRequest<>(APP_ARTIFACT, pipelineConfig));
-
-
- Map messages = new HashMap<>();
- messages.put("a", "1,samuel,jackson");
- messages.put("b", "2,dwayne,johnson");
- messages.put("c", "3,christopher,walken");
- sendKafkaMessage("users", messages);
-
-
- WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
- workflowManager.start();
- workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 1, TimeUnit.MINUTES);
-
- // check the pipeline output
- DataSetManager outputManager = getDataset(outputName);
- Set outputRecords = new HashSet<>();
- outputRecords.addAll(MockSink.readOutput(outputManager));
-
- final Map expected = ImmutableMap.of(
- 1L, "samuel jackson",
- 2L, "dwayne johnson",
- 3L, "christopher walken"
- );
-
- Map actual = new HashMap<>();
- for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) {
- actual.put((Long) outputRecord.get("id"), outputRecord.get("first") + " " + outputRecord.get("last"));
- }
-
- Assert.assertEquals(3, outputRecords.size());
- Assert.assertEquals(expected, actual);
- DataSetManager kvTable = getDataset("testKafkaSource");
- KeyValueTable table = kvTable.get();
- byte[] offset = table.read(new TopicAndPartition("users", 0).toString());
- Assert.assertNotNull(offset);
- Assert.assertEquals(3, Bytes.toLong(offset));
-
- messages = new HashMap<>();
- messages.put("d", "4,samuel,jackson");
- messages.put("e", "5,dwayne,johnson");
- sendKafkaMessage("users", messages);
- workflowManager.start();
- TimeUnit.SECONDS.sleep(10);
- workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 1, TimeUnit.MINUTES);
- final Map expected2 = ImmutableMap.of(
- 1L, "samuel jackson",
- 2L, "dwayne johnson",
- 3L, "christopher walken",
- 4L, "samuel jackson",
- 5L, "dwayne johnson"
- );
-
- outputRecords = new HashSet<>();
- outputRecords.addAll(MockSink.readOutput(outputManager));
- Map actual2 = new HashMap<>();
- for (StructuredRecord outputRecord : MockSink.readOutput(outputManager)) {
- actual2.put((Long) outputRecord.get("id"), outputRecord.get("first") + " " + outputRecord.get("last"));
- }
- Assert.assertEquals(5, outputRecords.size());
- Assert.assertEquals(expected2, actual2);
+ @Override
+ protected Service createKafkaServer(Properties kafkaConfig) {
+ return new EmbeddedKafkaServer(kafkaConfig);
}
- private static Properties generateKafkaConfig(String zkConnectStr, int port, File logDir) {
- Properties prop = new Properties();
- prop.setProperty("log.dir", logDir.getAbsolutePath());
- prop.setProperty("port", Integer.toString(port));
- prop.setProperty("broker.id", "1");
- prop.setProperty("num.partitions", "1");
- prop.setProperty("zookeeper.connect", zkConnectStr);
- prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
- prop.setProperty("default.replication.factor", "1");
- return prop;
+ @Override
+ protected List> getPluginClasses() {
+ return Collections.singletonList(KafkaBatchSource.class);
}
- private void sendKafkaMessage(String topic, Map messages) {
- KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.ALL_RECEIVED, Compression.NONE);
-
- // If publish failed, retry up to 20 times, with 100ms delay between each retry
- // This is because leader election in Kafka 08 takes time when a topic is being created upon publish request.
- int count = 0;
- do {
- KafkaPublisher.Preparer preparer = publisher.prepare(topic);
- for (Map.Entry entry : messages.entrySet()) {
- preparer.add(Charsets.UTF_8.encode(entry.getValue()), entry.getKey());
- }
- try {
- preparer.send().get();
- break;
- } catch (Exception e) {
- // Backoff if send failed.
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- }
- } while (count++ < 20);
+ @Override
+ protected String getKafkaBatchSourceName() {
+ return KafkaBatchSource.NAME;
}
}
diff --git a/kafka-plugins-0.8/widgets/Kafka-batchsource.json b/kafka-plugins-0.8/widgets/Kafka-batchsource.json
index c9a70d7..0cf0145 100644
--- a/kafka-plugins-0.8/widgets/Kafka-batchsource.json
+++ b/kafka-plugins-0.8/widgets/Kafka-batchsource.json
@@ -27,8 +27,8 @@
},
{
"widget-type": "textbox",
- "label": "Offset Table Name",
- "name": "tableName"
+ "label": "Offset Directory",
+ "name": "offsetDir"
},
{
"widget-type": "csv",
diff --git a/kafka-plugins-common/pom.xml b/kafka-plugins-common/pom.xml
new file mode 100644
index 0000000..b778884
--- /dev/null
+++ b/kafka-plugins-common/pom.xml
@@ -0,0 +1,42 @@
+
+
+
+ kafka-plugins
+ co.cask.hydrator
+ 2.0.0-SNAPSHOT
+
+ 4.0.0
+
+ Apache Kafka plugins common module
+ kafka-plugins-common
+
+
+
+ co.cask.cdap
+ cdap-data-pipeline2_2.11
+ ${cdap.version}
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 2.4
+
+
+ test-jar
+
+ test-jar
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchConfig.java b/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchConfig.java
new file mode 100644
index 0000000..79b8079
--- /dev/null
+++ b/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaBatchConfig.java
@@ -0,0 +1,340 @@
+package co.cask.hydrator.plugin.batch.source;
+
+import co.cask.cdap.api.annotation.Description;
+import co.cask.cdap.api.annotation.Macro;
+import co.cask.cdap.api.data.format.FormatSpecification;
+import co.cask.cdap.api.data.schema.Schema;
+import co.cask.cdap.api.dataset.lib.KeyValue;
+import co.cask.cdap.format.RecordFormats;
+import co.cask.hydrator.common.KeyValueListParser;
+import co.cask.hydrator.common.ReferencePluginConfig;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+/**
+ * Config properties for the plugin.
+ */
+public class KafkaBatchConfig extends ReferencePluginConfig {
+
+ @Description("Kafka topic to read from.")
+ @Macro
+ private String topic;
+
+ @Description("List of Kafka brokers specified in host1:port1,host2:port2 form. For example, " +
+ "host1.example.com:9092,host2.example.com:9092.")
+ @Macro
+ private String kafkaBrokers;
+
+ @Description("A directory path to store the latest Kafka offsets. " +
+ "A file named with the pipeline name will be created under the given directory.")
+ @Macro
+ @Nullable
+ private String offsetDir;
+
+ @Description("A comma separated list of topic partitions to read from. " +
+ "If not specified, all partitions will be read.")
+ @Macro
+ @Nullable
+ private String partitions;
+
+ @Description("The initial offset for each topic partition in partition1:offset1,partition2:offset2 form. " +
+ "These offsets will only be used for the first run of the pipeline. " +
+ "Any subsequent run will read from the latest offset from previous run." +
+ "Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. " +
+ "If not specified, the initial run will start reading from the latest message in Kafka.")
+ @Nullable
+ @Macro
+ private String initialPartitionOffsets;
+
+ @Description("The maximum of messages the source will read from each topic partition. If the current topic " +
+ "partition does not have this number of messages, the source will read to the latest offset. " +
+ "Note that this is an estimation, the actual number of messages the source read may be smaller than this number.")
+ @Nullable
+ @Macro
+ private Long maxNumberRecords;
+
+ @Description("Output schema of the source, including the timeField and keyField. " +
+ "The fields excluding keyField are used in conjunction with the format " +
+ "to parse Kafka payloads.")
+ private String schema;
+
+ @Description("Optional format of the Kafka event. Any format supported by CDAP is supported. " +
+ "For example, a value of 'csv' will attempt to parse Kafka payloads as comma-separated values. " +
+ "If no format is given, Kafka message payloads will be treated as bytes.")
+ @Nullable
+ private String format;
+
+ @Description("Optional name of the field containing the message key. " +
+ "If this is not set, no key field will be added to output records. " +
+ "If set, this field must be present in the schema property and must be bytes.")
+ @Nullable
+ private String keyField;
+
+ @Description("Optional name of the field containing the kafka partition that was read from. " +
+ "If this is not set, no partition field will be added to output records. " +
+ "If set, this field must be present in the schema property and must be an integer.")
+ @Nullable
+ private String partitionField;
+
+ @Description("Optional name of the field containing the kafka offset that the message was read from. " +
+ "If this is not set, no offset field will be added to output records. " +
+ "If set, this field must be present in the schema property and must be a long.")
+ @Nullable
+ private String offsetField;
+
+ public KafkaBatchConfig() {
+ super("");
+ }
+
+ public KafkaBatchConfig(String brokers, String partitions, String topic, String initialPartitionOffsets) {
+ super(String.format("Kafka_%s", topic));
+ this.kafkaBrokers = brokers;
+ this.partitions = partitions;
+ this.topic = topic;
+ this.initialPartitionOffsets = initialPartitionOffsets;
+ }
+
+ // Accessors
+ public String getTopic() {
+ return topic;
+ }
+
+ public String getBrokers() {
+ return kafkaBrokers;
+ }
+
+ @Nullable
+ public String getOffsetDir() {
+ return offsetDir;
+ }
+
+ public Set getPartitions() {
+ Set partitionSet = new HashSet<>();
+ if (partitions == null) {
+ return partitionSet;
+ }
+ for (String partition : Splitter.on(',').trimResults().split(partitions)) {
+ try {
+ partitionSet.add(Integer.parseInt(partition));
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ String.format("Invalid partition '%s'. Partitions must be integers.", partition));
+ }
+ }
+ return partitionSet;
+ }
+
+ @Nullable
+ public String getKeyField() {
+ return Strings.isNullOrEmpty(keyField) ? null : keyField;
+ }
+
+ @Nullable
+ public String getPartitionField() {
+ return Strings.isNullOrEmpty(partitionField) ? null : partitionField;
+ }
+
+ @Nullable
+ public String getOffsetField() {
+ return Strings.isNullOrEmpty(offsetField) ? null : offsetField;
+ }
+
+ public long getMaxNumberRecords() {
+ return maxNumberRecords == null ? -1 : maxNumberRecords;
+ }
+
+ @Nullable
+ public String getFormat() {
+ return Strings.isNullOrEmpty(format) ? null : format;
+ }
+
+ @Nullable
+ public Schema getSchema() {
+ try {
+ return Strings.isNullOrEmpty(schema) ? null : Schema.parseJson(schema);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Unable to parse schema: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Gets the message schema from the schema field.
+ * If the time, key, partition, or offset fields are in the configured schema, they will be removed.
+ */
+ public Schema getMessageSchema() {
+ Schema schema = getSchema();
+ List messageFields = new ArrayList<>();
+ boolean keyFieldExists = false;
+ boolean partitionFieldExists = false;
+ boolean offsetFieldExists = false;
+
+ for (Schema.Field field : schema.getFields()) {
+ String fieldName = field.getName();
+ Schema fieldSchema = field.getSchema();
+ Schema.Type fieldType = fieldSchema.isNullable()
+ ? fieldSchema.getNonNullable().getType()
+ : fieldSchema.getType();
+ // if the field is not the time field and not the key field, it is a message field.
+ if (fieldName.equals(keyField)) {
+ if (fieldType != Schema.Type.BYTES) {
+ throw new IllegalArgumentException("The key field must be of type bytes or nullable bytes.");
+ }
+ keyFieldExists = true;
+ } else if (fieldName.equals(partitionField)) {
+ if (fieldType != Schema.Type.INT) {
+ throw new IllegalArgumentException("The partition field must be of type int.");
+ }
+ partitionFieldExists = true;
+ } else if (fieldName.equals(offsetField)) {
+ if (fieldType != Schema.Type.LONG) {
+ throw new IllegalArgumentException("The offset field must be of type long.");
+ }
+ offsetFieldExists = true;
+ } else {
+ messageFields.add(field);
+ }
+ }
+ if (messageFields.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Schema must contain at least one other field besides the time and key fields.");
+ }
+ if (getKeyField() != null && !keyFieldExists) {
+ throw new IllegalArgumentException(String.format(
+ "keyField '%s' does not exist in the schema. Please add it to the schema.", keyField));
+ }
+ if (getPartitionField() != null && !partitionFieldExists) {
+ throw new IllegalArgumentException(String.format(
+ "partitionField '%s' does not exist in the schema. Please add it to the schema.", partitionField));
+ }
+ if (getOffsetField() != null && !offsetFieldExists) {
+ throw new IllegalArgumentException(String.format(
+ "offsetField '%s' does not exist in the schema. Please add it to the schema.", offsetField));
+ }
+ return Schema.recordOf("kafka.message", messageFields);
+ }
+
+ /**
+ * @return broker host to broker port mapping.
+ */
+ public Map getBrokerMap() {
+ return parseBrokerMap(kafkaBrokers);
+ }
+
+ /**
+ * Parses a given Kafka broker string, which is in comma separate host:port format, into a Map of host to port.
+ */
+ public static Map parseBrokerMap(String kafkaBrokers) {
+ Map brokerMap = new HashMap<>();
+ for (KeyValue hostAndPort : KeyValueListParser.DEFAULT.parse(kafkaBrokers)) {
+ String host = hostAndPort.getKey();
+ String portStr = hostAndPort.getValue();
+ try {
+ brokerMap.put(host, Integer.parseInt(portStr));
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(String.format(
+ "Invalid port '%s' for host '%s'.", portStr, host));
+ }
+ }
+ if (brokerMap.isEmpty()) {
+ throw new IllegalArgumentException("Must specify kafka brokers.");
+ }
+ return brokerMap;
+ }
+
+ /**
+ * Gets the partition offsets as specified by the {@link #initialPartitionOffsets} field.
+ */
+ public KafkaPartitionOffsets getInitialPartitionOffsets() {
+ KafkaPartitionOffsets partitionOffsets = new KafkaPartitionOffsets(Collections.emptyMap());
+
+ if (initialPartitionOffsets == null) {
+ return partitionOffsets;
+ }
+
+ for (KeyValue partitionAndOffset : KeyValueListParser.DEFAULT.parse(initialPartitionOffsets)) {
+ String partitionStr = partitionAndOffset.getKey();
+ String offsetStr = partitionAndOffset.getValue();
+
+ int partition;
+ try {
+ partition = Integer.parseInt(partitionStr);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(String.format(
+ "Invalid partition '%s' in initialPartitionOffsets.", partitionStr));
+ }
+
+ long offset;
+ try {
+ offset = Long.parseLong(offsetStr);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(String.format(
+ "Invalid offset '%s' in initialPartitionOffsets for partition %d.", partitionStr, partition));
+ }
+
+ partitionOffsets.setPartitionOffset(partition, offset);
+ }
+
+ return partitionOffsets;
+ }
+
+ public void validate() {
+ // brokers can be null since it is macro enabled.
+ if (kafkaBrokers != null) {
+ getBrokerMap();
+ }
+ getPartitions();
+ getInitialPartitionOffsets();
+
+ Schema messageSchema = getMessageSchema();
+ // if format is empty, there must be just a single message field of type bytes or nullable types.
+ if (Strings.isNullOrEmpty(format)) {
+ List messageFields = messageSchema.getFields();
+ if (messageFields.size() > 1) {
+ String fieldNames = messageFields.stream().map(Schema.Field::getName).collect(Collectors.joining(","));
+
+ throw new IllegalArgumentException(String.format(
+ "Without a format, the schema must contain just a single message field of type bytes or nullable bytes. " +
+ "Found %s message fields (%s).", messageFields.size(), fieldNames));
+ }
+
+ Schema.Field messageField = messageFields.get(0);
+ Schema messageFieldSchema = messageField.getSchema();
+ Schema.Type messageFieldType = messageFieldSchema.isNullable() ?
+ messageFieldSchema.getNonNullable().getType() : messageFieldSchema.getType();
+ if (messageFieldType != Schema.Type.BYTES) {
+ throw new IllegalArgumentException(String.format(
+ "Without a format, the message field must be of type bytes or nullable bytes, but field %s is of type %s.",
+ messageField.getName(), messageField.getSchema()));
+ }
+ } else {
+ // otherwise, if there is a format, make sure we can instantiate it.
+ FormatSpecification formatSpec = new FormatSpecification(format, messageSchema, new HashMap<>());
+
+ try {
+ RecordFormats.createInitializedFormat(formatSpec);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format(
+ "Unable to instantiate a message parser from format '%s' and message schema '%s': %s",
+ format, messageSchema, e.getMessage()), e);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public static Path getOffsetFilePath(Path dir, String namespace, String pipelineName) {
+ return new Path(dir, String.format("%s.%s.offsets.json", namespace, pipelineName));
+ }
+}
diff --git a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaKey.java b/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaKey.java
similarity index 60%
rename from kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaKey.java
rename to kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaKey.java
index 9e918d6..31d9d97 100644
--- a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaKey.java
+++ b/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaKey.java
@@ -1,5 +1,5 @@
/*
- * Copyright © 2017 Cask Data, Inc.
+ * Copyright © 2017-2018 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -16,10 +16,6 @@
package co.cask.hydrator.plugin.batch.source;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
@@ -33,36 +29,31 @@
*/
public class KafkaKey implements WritableComparable {
- private int partition = 0;
- private long beginOffset = 0;
- private long offset = 0;
- private String topic = "";
- private MapWritable partitionMap = new MapWritable();
+ private String topic;
+ private int partition;
+ private long beginOffset;
+ private long offset;
+ private long messageSize;
+ private long checksum;
/**
* dummy empty constructor
*/
public KafkaKey() {
- this("dummy", 0, 0, 0);
+ this("dummy", 0, 0, 0, 0);
}
- public KafkaKey(String topic, int partition, long beginOffset, long offset) {
- this.set(topic, partition, beginOffset, offset);
+ public KafkaKey(String topic, int partition, long beginOffset, long offset, long checksum) {
+ this.topic = topic;
+ this.partition = partition;
+ set(beginOffset, offset, 1024L, checksum);
}
- public void set(String topic, int partition, long beginOffset, long offset) {
- this.partition = partition;
+ public void set(long beginOffset, long offset, long messageSize, long checksum) {
this.beginOffset = beginOffset;
this.offset = offset;
- this.topic = topic;
- }
-
- public void clear() {
- partition = 0;
- beginOffset = 0;
- offset = 0;
- topic = "";
- partitionMap = new MapWritable();
+ this.messageSize = messageSize;
+ this.checksum = checksum;
}
public String getTopic() {
@@ -78,40 +69,27 @@ public long getOffset() {
}
public long getMessageSize() {
- Text key = new Text("message.size");
- if (this.partitionMap.containsKey(key)) {
- return ((LongWritable) this.partitionMap.get(key)).get();
- } else {
- return 1024; //default estimated size
- }
- }
-
- public void setMessageSize(long messageSize) {
- Text key = new Text("message.size");
- put(key, new LongWritable(messageSize));
- }
-
- public void put(Writable key, Writable value) {
- this.partitionMap.put(key, value);
+ return messageSize;
}
@Override
public void readFields(DataInput in) throws IOException {
+ this.topic = in.readUTF();
this.partition = in.readInt();
this.beginOffset = in.readLong();
this.offset = in.readLong();
- this.topic = in.readUTF();
- this.partitionMap = new MapWritable();
- this.partitionMap.readFields(in);
+ this.messageSize = in.readLong();
+ this.checksum = in.readLong();
}
@Override
public void write(DataOutput out) throws IOException {
+ out.writeUTF(this.topic);
out.writeInt(this.partition);
out.writeLong(this.beginOffset);
out.writeLong(this.offset);
- out.writeUTF(this.topic);
- this.partitionMap.write(out);
+ out.writeLong(this.messageSize);
+ out.writeLong(this.checksum);
}
@Override
@@ -120,6 +98,10 @@ public int compareTo(KafkaKey o) {
if (comp != 0) {
return comp;
}
- return Long.compare(offset, o.offset);
+ comp = Long.compare(offset, o.offset);
+ if (comp != 0) {
+ return comp;
+ }
+ return Long.compare(checksum, o.checksum);
}
}
diff --git a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaMessage.java b/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaMessage.java
similarity index 100%
rename from kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaMessage.java
rename to kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaMessage.java
diff --git a/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaPartitionOffsets.java b/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaPartitionOffsets.java
new file mode 100644
index 0000000..c7aa2d5
--- /dev/null
+++ b/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaPartitionOffsets.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright © 2017 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package co.cask.hydrator.plugin.batch.source;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A container class for holding the Kafka offsets for a set of partitions.
+ */
+public class KafkaPartitionOffsets {
+
+ private final Map partitionOffsets;
+
+ public KafkaPartitionOffsets(Map partitionOffsets) {
+ this.partitionOffsets = new HashMap<>(partitionOffsets);
+ }
+
+ public void setPartitionOffset(int partition, long offset) {
+ partitionOffsets.computeIfPresent(partition, (k, v) -> offset);
+ }
+
+ public long getPartitionOffset(int partition, long defaultValue) {
+ return partitionOffsets.getOrDefault(partition, defaultValue);
+ }
+
+ /**
+ * Loads the {@link KafkaPartitionOffsets} from the given input file.
+ *
+ * @param fc the hadoop {@link FileContext} to read from the input file
+ * @param offsetFile the input file
+ * @return a {@link KafkaPartitionOffsets} object decoded from the file
+ * @throws IOException if failed to read the file
+ */
+ public static KafkaPartitionOffsets load(FileContext fc, Path offsetFile) throws IOException {
+ try (Reader reader = new InputStreamReader(fc.open(offsetFile), StandardCharsets.UTF_8)) {
+ return new Gson().fromJson(reader, KafkaPartitionOffsets.class);
+ } catch (FileNotFoundException e) {
+ // If the file does not exist, return an empty object
+ return new KafkaPartitionOffsets(Collections.emptyMap());
+ } catch (JsonSyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Saves the {@link KafkaPartitionOffsets} to the given file
+ *
+ * @param fc the hadoop {@link FileContext} to read from the input file
+ * @param offsetFile the file to write to
+ * @param partitionOffsets the {@link KafkaPartitionOffsets} object to save
+ * @throws IOException if failed to write to the file
+ */
+ public static void save(FileContext fc, Path offsetFile,
+ KafkaPartitionOffsets partitionOffsets) throws IOException {
+ try (Writer writer = new OutputStreamWriter(fc.create(offsetFile,
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
+ Options.CreateOpts.perms(new FsPermission("600"))))) {
+ new Gson().toJson(partitionOffsets, writer);
+ }
+ }
+}
+
diff --git a/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaReader.java b/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaReader.java
new file mode 100644
index 0000000..bd8d307
--- /dev/null
+++ b/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaReader.java
@@ -0,0 +1,13 @@
+package co.cask.hydrator.plugin.batch.source;
+
+import java.io.Closeable;
+
+interface KafkaReader extends Closeable {
+
+ boolean hasNext();
+
+ /**
+ * Fetches the next Kafka message and stuffs the results into the key and value.
+ */
+ KafkaMessage getNext(KafkaKey kafkaKey);
+}
diff --git a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRecordReader.java b/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRecordReader.java
similarity index 55%
rename from kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRecordReader.java
rename to kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRecordReader.java
index 9be0882..1fadec8 100644
--- a/kafka-plugins-0.10/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRecordReader.java
+++ b/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRecordReader.java
@@ -1,5 +1,5 @@
/*
- * Copyright © 2017 Cask Data, Inc.
+ * Copyright © 2017-2018 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
@@ -21,34 +21,39 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
+import java.util.function.Function;
/**
- * Kafka Record Reader to be used by {@link KafkaInputFormat}.
+ * Kafka Record Reader to be used by KafkaInputFormat.
*/
public class KafkaRecordReader extends RecordReader {
- private KafkaSplit split;
+ private final Function readerFunction;
+ private final KafkaKey key;
+
private long totalBytes;
private KafkaReader reader;
private long readBytes = 0;
- private final KafkaKey key = new KafkaKey();
private KafkaMessage value;
+ public KafkaRecordReader(Function readerFunction) {
+ this.readerFunction = readerFunction;
+ this.key = new KafkaKey();
+ }
+
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- this.split = (KafkaSplit) split;
- this.totalBytes = this.split.getLength();
+ this.totalBytes = split.getLength();
+ this.reader = readerFunction.apply(((KafkaSplit) split).getRequest());
}
@Override
- public synchronized void close() throws IOException {
- if (reader != null) {
- reader.close();
- }
+ public void close() {
+ closeReader();
}
@Override
- public float getProgress() throws IOException {
+ public float getProgress() {
if (getPos() == 0) {
return 0f;
}
@@ -59,48 +64,35 @@ public float getProgress() throws IOException {
return (float) ((double) getPos() / totalBytes);
}
- private long getPos() throws IOException {
+ private long getPos() {
return readBytes;
}
@Override
- public KafkaKey getCurrentKey() throws IOException, InterruptedException {
+ public KafkaKey getCurrentKey() {
return key;
}
@Override
- public KafkaMessage getCurrentValue() throws IOException, InterruptedException {
+ public KafkaMessage getCurrentValue() {
return value;
}
@Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (reader == null || !reader.hasNext()) {
- KafkaRequest request = split.popRequest();
- if (request == null) {
- return false;
- }
+ public boolean nextKeyValue() {
+ if (!reader.hasNext()) {
+ closeReader();
+ return false;
+ }
- key.set(request.getTopic(), request.getPartition(), request.getOffset(),
- request.getOffset());
- value = null;
+ KafkaMessage message = reader.getNext(key);
- if (reader != null) {
- closeReader();
- }
- reader = new KafkaReader(request);
- }
- KafkaMessage message;
- if ((message = reader.getNext(key)) != null) {
- readBytes += key.getMessageSize();
- value = message;
- return true;
- }
- return false;
+ readBytes += key.getMessageSize();
+ value = message;
+ return true;
}
-
- private void closeReader() throws IOException {
+ private synchronized void closeReader() {
if (reader != null) {
try {
reader.close();
diff --git a/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRequest.java b/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRequest.java
new file mode 100644
index 0000000..f68331a
--- /dev/null
+++ b/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaRequest.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright © 2017-2018 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package co.cask.hydrator.plugin.batch.source;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A class that represents the kafka pull request.
+ *
+ * The class is a container for topic, leaderId, partition, uri and offset. It is
+ * used in reading and writing the sequence files used for the extraction job.
+ *
+ */
+public class KafkaRequest {
+
+ private final String topic;
+ private final int partition;
+ private final Map conf;
+
+ private final long startOffset;
+ private final long endOffset;
+
+ private final long averageMessageSize;
+
+ public KafkaRequest(String topic, int partition, Map conf, long startOffset, long endOffset) {
+ this(topic, partition, conf, startOffset, endOffset, 1024);
+ }
+
+ public KafkaRequest(String topic, int partition, Map conf,
+ long startOffset, long endOffset, long averageMessageSize) {
+ this.topic = topic;
+ this.partition = partition;
+ this.conf = Collections.unmodifiableMap(new HashMap<>(conf));
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ this.averageMessageSize = averageMessageSize;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public int getPartition() {
+ return partition;
+ }
+
+ public Map getConf() {
+ return conf;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public long getEndOffset() {
+ return endOffset;
+ }
+
+ public long getAverageMessageSize() {
+ return averageMessageSize;
+ }
+
+ public long estimateDataSize() {
+ return (getEndOffset() - getStartOffset()) * averageMessageSize;
+ }
+}
diff --git a/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaSplit.java b/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaSplit.java
new file mode 100644
index 0000000..9d26482
--- /dev/null
+++ b/kafka-plugins-common/src/main/java/co/cask/hydrator/plugin/batch/source/KafkaSplit.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright © 2017-2018 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package co.cask.hydrator.plugin.batch.source;
+
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Kafka split
+ */
+public class KafkaSplit extends InputSplit implements Writable {
+
+ private KafkaRequest request;
+
+ @SuppressWarnings("unused")
+ public KafkaSplit() {
+ // For serialization
+ }
+
+ public KafkaSplit(KafkaRequest request) {
+ this.request = request;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ String topic = in.readUTF();
+ int partition = in.readInt();
+
+ // Read config map
+ int size = in.readInt();
+ Map conf = new HashMap<>();
+ for (int i = 0; i < size; i++) {
+ conf.put(in.readUTF(), in.readUTF());
+ }
+
+ long startOffset = in.readLong();
+ long endOffset = in.readLong();
+ long averageMessageSize = in.readLong();
+ request = new KafkaRequest(topic, partition, conf, startOffset, endOffset, averageMessageSize);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(request.getTopic());
+ out.writeInt(request.getPartition());
+
+ // Write config map
+ Map conf = request.getConf();
+ out.writeInt(conf.size());
+ for (Map.Entry entry : conf.entrySet()) {
+ out.writeUTF(entry.getKey());
+ out.writeUTF(entry.getValue());
+ }
+
+ out.writeLong(request.getStartOffset());
+ out.writeLong(request.getEndOffset());
+ out.writeLong(request.getAverageMessageSize());
+ }
+
+ @Override
+ public long getLength() {
+ return request.estimateDataSize();
+ }
+
+ @Override
+ public String[] getLocations() {
+ return new String[0];
+ }
+
+ public KafkaRequest getRequest() {
+ return request;
+ }
+}
diff --git a/kafka-plugins-common/src/test/java/co/cask/hydrator/AbstractKafkaBatchSourceTest.java b/kafka-plugins-common/src/test/java/co/cask/hydrator/AbstractKafkaBatchSourceTest.java
new file mode 100644
index 0000000..74ef5bb
--- /dev/null
+++ b/kafka-plugins-common/src/test/java/co/cask/hydrator/AbstractKafkaBatchSourceTest.java
@@ -0,0 +1,298 @@
+/*
+ * Copyright © 2016 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package co.cask.hydrator;
+
+import co.cask.cdap.api.artifact.ArtifactSummary;
+import co.cask.cdap.api.data.format.StructuredRecord;
+import co.cask.cdap.api.data.schema.Schema;
+import co.cask.cdap.api.dataset.table.Table;
+import co.cask.cdap.datapipeline.DataPipelineApp;
+import co.cask.cdap.datapipeline.SmartWorkflow;
+import co.cask.cdap.etl.api.batch.BatchSource;
+import co.cask.cdap.etl.mock.batch.MockSink;
+import co.cask.cdap.etl.mock.test.HydratorTestBase;
+import co.cask.cdap.etl.proto.v2.ETLBatchConfig;
+import co.cask.cdap.etl.proto.v2.ETLPlugin;
+import co.cask.cdap.etl.proto.v2.ETLStage;
+import co.cask.cdap.proto.ProgramRunStatus;
+import co.cask.cdap.proto.artifact.AppRequest;
+import co.cask.cdap.proto.id.ApplicationId;
+import co.cask.cdap.proto.id.ArtifactId;
+import co.cask.cdap.proto.id.NamespaceId;
+import co.cask.cdap.test.ApplicationManager;
+import co.cask.cdap.test.DataSetManager;
+import co.cask.cdap.test.TestConfiguration;
+import co.cask.cdap.test.WorkflowManager;
+import co.cask.hydrator.plugin.batch.source.KafkaBatchConfig;
+import co.cask.hydrator.plugin.batch.source.KafkaPartitionOffsets;
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Service;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.twill.internal.kafka.client.ZKBrokerService;
+import org.apache.twill.internal.kafka.client.ZKKafkaClientService;
+import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.kafka.client.BrokerService;
+import org.apache.twill.kafka.client.Compression;
+import org.apache.twill.kafka.client.KafkaClientService;
+import org.apache.twill.kafka.client.KafkaPublisher;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Unit tests for the Kafka batch source plugins.
+ */
+public abstract class AbstractKafkaBatchSourceTest extends HydratorTestBase {
+
+ private static final ArtifactSummary APP_ARTIFACT = new ArtifactSummary("data-pipeline", "1.0.0");
+ @ClassRule
+ public static final TestConfiguration CONFIG = new TestConfiguration("explore.enabled", false);
+
+ private ZKClientService zkClient;
+ private KafkaClientService kafkaClient;
+ private InMemoryZKServer zkServer;
+ private Service kafkaServer;
+ private String kafkaBroker;
+
+ /**
+ * Creates a {@link Service} for running an embedded Kafka server for testing.
+ */
+ protected abstract Service createKafkaServer(Properties kafkaConfig);
+
+ /**
+ * Returns a {@link List} of plugin classes to be deployed for testing.
+ */
+ protected abstract List> getPluginClasses();
+
+ /**
+ * Returns the name of the Kafka batch source plugin for testing.
+ */
+ protected abstract String getKafkaBatchSourceName();
+
+ @Before
+ public void setup() throws Exception {
+ ArtifactId parentArtifact = NamespaceId.DEFAULT.artifact(APP_ARTIFACT.getName(), APP_ARTIFACT.getVersion());
+
+ // add the data-pipeline artifact and mock plugins
+ setupBatchArtifacts(parentArtifact, DataPipelineApp.class);
+
+ // add our plugins artifact with the data-pipeline artifact as its parent.
+ // this will make our plugins available to data-pipeline.
+ for (Class> pluginClass : getPluginClasses()) {
+ addPluginArtifact(NamespaceId.DEFAULT.artifact("example-plugins", "1.0.0"), parentArtifact, pluginClass);
+ }
+
+ zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
+ zkServer.startAndWait();
+
+ zkClient = ZKClientService.Builder.of(zkServer.getConnectionStr()).build();
+ zkClient.startAndWait();
+
+ kafkaServer = createKafkaServer(generateKafkaConfig(zkServer.getConnectionStr(), TMP_FOLDER.newFolder()));
+ kafkaServer.startAndWait();
+
+ kafkaClient = new ZKKafkaClientService(zkClient);
+ kafkaClient.startAndWait();
+
+ BrokerService brokerService = new ZKBrokerService(zkClient);
+ brokerService.startAndWait();
+ kafkaBroker = brokerService.getBrokerList();
+ while (brokerService.getBrokerList().isEmpty()) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ kafkaBroker = brokerService.getBrokerList();
+ }
+ }
+
+ @After
+ public void cleanup() {
+ kafkaClient.stopAndWait();
+ kafkaServer.stopAndWait();
+ zkClient.stopAndWait();
+ zkServer.stopAndWait();
+ }
+
+ @Test
+ public void testKafkaSource() throws Exception {
+ File offsetDir = TMP_FOLDER.newFolder();
+
+ Schema schema = Schema.recordOf(
+ "user",
+ Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
+ Schema.Field.of("first", Schema.of(Schema.Type.STRING)),
+ Schema.Field.of("last", Schema.of(Schema.Type.STRING)));
+
+ // create the pipeline config
+ String outputName = "sourceTestOutput";
+
+ Map sourceProperties = new HashMap<>();
+ sourceProperties.put("kafkaBrokers", kafkaBroker);
+ sourceProperties.put("referenceName", "kafkaTest");
+ sourceProperties.put("offsetDir", offsetDir.toURI().toString());
+ sourceProperties.put("topic", "users");
+ sourceProperties.put("schema", schema.toString());
+ sourceProperties.put("format", "csv");
+ sourceProperties.put("maxNumberRecords", "${maxNumberRecords}");
+ ETLStage source = new ETLStage("source", new ETLPlugin(getKafkaBatchSourceName(),
+ BatchSource.PLUGIN_TYPE, sourceProperties, null));
+ ETLStage sink = new ETLStage("sink", MockSink.getPlugin(outputName));
+
+ ETLBatchConfig pipelineConfig = ETLBatchConfig.builder()
+ .addStage(source)
+ .addStage(sink)
+ .addConnection(source.getName(), sink.getName())
+ .build();
+
+ // create the pipeline
+ ApplicationId pipelineId = NamespaceId.DEFAULT.app("testKafkaSource");
+ ApplicationManager appManager = deployApplication(pipelineId, new AppRequest<>(APP_ARTIFACT, pipelineConfig));
+
+
+ Map messages = new HashMap<>();
+ messages.put("a", "1,samuel,jackson");
+ messages.put("b", "2,dwayne,johnson");
+ messages.put("c", "3,christopher,walken");
+ sendKafkaMessage("users", messages);
+
+
+ WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME);
+ workflowManager.startAndWaitForRun(Collections.singletonMap("maxNumberRecords", "-1"),
+ ProgramRunStatus.COMPLETED, 2, TimeUnit.MINUTES);
+
+ // check the pipeline output
+ validate(outputName, offsetDir, pipelineId, Collections.singletonMap(0, 3L), ImmutableMap.of(
+ 1L, "samuel jackson",
+ 2L, "dwayne johnson",
+ 3L, "christopher walken"
+ ));
+
+ // Validates the Kafka offsets have been preserved
+ Path offsetFilePath = KafkaBatchConfig.getOffsetFilePath(new Path(offsetDir.toURI()),
+ pipelineId.getNamespace(), pipelineId.getApplication());
+ FileContext fc = FileContext.getFileContext(offsetFilePath.toUri());
+ KafkaPartitionOffsets partitionOffsets = KafkaPartitionOffsets.load(fc, offsetFilePath);
+ Assert.assertEquals(3L, partitionOffsets.getPartitionOffset(0, -1L));
+
+ // Clear the data in the MockSink
+ MockSink.clear(getDataset(outputName));
+
+ messages = new LinkedHashMap<>();
+ messages.put("d", "4,samuel,jackson");
+ messages.put("e", "5,dwayne,johnson");
+ messages.put("f", "6,michael,jackson");
+ sendKafkaMessage("users", messages);
+ workflowManager.startAndWaitForRun(Collections.singletonMap("maxNumberRecords", "2"),
+ ProgramRunStatus.COMPLETED, 2, TimeUnit.MINUTES);
+
+ validate(outputName, offsetDir, pipelineId, Collections.singletonMap(0, 5L), ImmutableMap.of(
+ 4L, "samuel jackson",
+ 5L, "dwayne johnson"
+ ));
+
+ // Clear the data in the MockSink again
+ MockSink.clear(getDataset(outputName));
+
+ // Fetch the reminding records
+ workflowManager.startAndWaitForRun(Collections.singletonMap("maxNumberRecords", "-1"),
+ ProgramRunStatus.COMPLETED, 2, TimeUnit.MINUTES);
+
+ validate(outputName, offsetDir, pipelineId, Collections.singletonMap(0, 6L), Collections.singletonMap(
+ 6L, "michael jackson"
+ ));
+ }
+
+ /**
+ * Validates the test result and offsets after the testing pipeline was executed.
+ *
+ * @param outputName the output mock dataset name
+ * @param offsetDir the directory for saving the offset file to
+ * @param pipelineId the pipeline Id for testing
+ * @param expectedOffsets the expected set of offsets for each partition
+ * @param expected the expected results in the output mock dataset
+ */
+ private void validate(String outputName, File offsetDir,
+ ApplicationId pipelineId,
+ Map expectedOffsets,
+ Map expected) throws Exception {
+ DataSetManager outputManager = getDataset(outputName);
+ Set outputRecords = new HashSet<>(MockSink.readOutput(outputManager));
+ Map actual = new HashMap<>();
+ for (StructuredRecord outputRecord : outputRecords) {
+ actual.put(outputRecord.get("id"), outputRecord.get("first") + " " + outputRecord.get("last"));
+ }
+
+ Assert.assertEquals(expected, actual);
+
+ // Validates the offsets
+ Path offsetFilePath = KafkaBatchConfig.getOffsetFilePath(new Path(offsetDir.toURI()),
+ pipelineId.getNamespace(), pipelineId.getApplication());
+ FileContext fc = FileContext.getFileContext(offsetFilePath.toUri());
+ KafkaPartitionOffsets partitionOffsets = KafkaPartitionOffsets.load(fc, offsetFilePath);
+
+ expectedOffsets.forEach((partition, offset) ->
+ Assert.assertEquals(offset.longValue(),
+ partitionOffsets.getPartitionOffset(partition, -1L)));
+ }
+
+ private void sendKafkaMessage(String topic, Map messages) {
+ KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.ALL_RECEIVED, Compression.NONE);
+
+ // If publish failed, retry up to 20 times, with 100ms delay between each retry
+ // This is because leader election in Kafka 08 takes time when a topic is being created upon publish request.
+ int count = 0;
+ do {
+ KafkaPublisher.Preparer preparer = publisher.prepare(topic);
+ for (Map.Entry entry : messages.entrySet()) {
+ preparer.add(Charsets.UTF_8.encode(entry.getValue()), entry.getKey());
+ }
+ try {
+ preparer.send().get();
+ break;
+ } catch (Exception e) {
+ // Backoff if send failed.
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+ } while (count++ < 20);
+ }
+
+ private Properties generateKafkaConfig(String zkConnectStr, File logDir) {
+ Properties prop = new Properties();
+ prop.setProperty("log.dir", logDir.getAbsolutePath());
+ prop.setProperty("broker.id", "1");
+ prop.setProperty("num.partitions", "1");
+ prop.setProperty("zookeeper.connect", zkConnectStr);
+ prop.setProperty("zookeeper.connection.timeout.ms", "1000000");
+ prop.setProperty("default.replication.factor", "1");
+ return prop;
+ }
+}
diff --git a/pom.xml b/pom.xml
index acc81f9..8552897 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,6 +21,7 @@
kafka-plugins-0.8
kafka-plugins-0.10
+ kafka-plugins-common