Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kafka-plugins-0.10/docs/Kafka-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ Properties

**topic:** The Kafka topic to read from. (Macro-enabled)

**tableName:** Optional table name to track the latest offset we read from kafka. It is recommended to name it same as the
pipeline name to avoid conflict on table names. By default it will be the topic name. (Macro-enabled)
**offsetDir:** Optional directory path to track the latest offset we read from kafka. It is useful for incrementally
processing data from Kafka across subsequent runs. (Macro-enabled)

**partitions:** List of topic partitions to read from. If not specified, all partitions will be read. (Macro-enabled)

Expand Down
12 changes: 12 additions & 0 deletions kafka-plugins-0.10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
<version>${project.parent.version}-0.10.2.0</version>

<dependencies>
<dependency>
<groupId>co.cask.hydrator</groupId>
<artifactId>kafka-plugins-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
Expand Down Expand Up @@ -128,6 +133,13 @@
<version>${cdap.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>co.cask.hydrator</groupId>
<artifactId>kafka-plugins-common</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;


/**
* A class which reads from the fetch results from kafka.
*/
public class KafkaReader {
private static final Logger LOG = LoggerFactory.getLogger(KafkaReader.class);
final class Kafka10Reader implements KafkaReader {
private static final Logger LOG = LoggerFactory.getLogger(Kafka10Reader.class);
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];

// index of context
Expand All @@ -50,61 +50,58 @@ public class KafkaReader {


/**
* Construct using the json representation of the kafka request
* Construct a reader based on the given {@link KafkaRequest}.
*/
public KafkaReader(KafkaRequest request) {
Kafka10Reader(KafkaRequest request) {
kafkaRequest = request;
currentOffset = request.getOffset();
lastOffset = request.getLastOffset();
currentOffset = request.getStartOffset();
lastOffset = request.getEndOffset();

// read data from queue
Properties properties = new Properties();
properties.putAll(request.getConf());
consumer = new KafkaConsumer<>(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
fetch();
}

public boolean hasNext() throws IOException {
@Override
public boolean hasNext() {
if (currentOffset >= lastOffset) {
return false;
}
if (messageIter != null && messageIter.hasNext()) {
return true;
} else {
return fetch();
}
return fetch();
}

/**
* Fetches the next Kafka message and stuffs the results into the key and value.
* Fetches the next Kafka message. The message key will be set into the given {@link KafkaKey} object, and the message
* payload will be returned.
*/
public KafkaMessage getNext(KafkaKey kafkaKey) throws IOException {
if (hasNext()) {
ConsumerRecord<byte[], byte[]> consumerRecord = messageIter.next();

byte[] keyBytes = consumerRecord.key();
byte[] value = consumerRecord.value();
if (value == null) {
LOG.warn("Received message with null message.payload with topic {} and partition {}",
kafkaKey.getTopic(), kafkaKey.getPartition());
}

ByteBuffer payload = value == null ? ByteBuffer.wrap(EMPTY_BYTE_ARRAY) : ByteBuffer.wrap(value);
ByteBuffer key = keyBytes == null ? ByteBuffer.wrap(EMPTY_BYTE_ARRAY) : ByteBuffer.wrap(keyBytes);

kafkaKey.clear();
kafkaKey.set(kafkaRequest.getTopic(), kafkaRequest.getPartition(), currentOffset,
consumerRecord.offset() + 1);
kafkaKey.setMessageSize(value == null ? -1 : value.length);
currentOffset = consumerRecord.offset() + 1; // increase offset
return new KafkaMessage(payload, key);
} else {
return null;
@Override
public KafkaMessage getNext(KafkaKey kafkaKey) {
if (!hasNext()) {
throw new NoSuchElementException("No message is available");
}

ConsumerRecord<byte[], byte[]> consumerRecord = messageIter.next();

byte[] keyBytes = consumerRecord.key();
byte[] value = consumerRecord.value();

ByteBuffer key = keyBytes == null ? ByteBuffer.wrap(EMPTY_BYTE_ARRAY) : ByteBuffer.wrap(keyBytes);
ByteBuffer payload = value == null ? ByteBuffer.wrap(EMPTY_BYTE_ARRAY) : ByteBuffer.wrap(value);

kafkaKey.set(currentOffset, consumerRecord.offset() + 1,
consumerRecord.serializedKeySize() + consumerRecord.serializedValueSize(), consumerRecord.checksum());
currentOffset = consumerRecord.offset() + 1; // increase offset
return new KafkaMessage(payload, key);
}

/**
* Creates a fetch request.
* Fetch messages from Kafka.
*
* @return {@code true} if there is some messages available, {@code false} otherwise
*/
private boolean fetch() {
if (currentOffset >= lastOffset) {
Expand All @@ -124,9 +121,10 @@ private boolean fetch() {
}

/**
* Closes this context
* Closes this reader.
*/
public void close() throws IOException {
@Override
public void close() {
if (consumer != null) {
consumer.close();
}
Expand Down
Loading