Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

APEXMALHAR-2120 #resolve #comment solve problems of KafkaInputOperato… #320

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
<version>0.9.0.1</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this version change?

<optional>true</optional>
<exclusions>
<exclusion>
Expand All @@ -222,7 +222,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
<version>0.9.0.1</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -74,7 +75,7 @@ public class KafkaConsumerWrapper implements Closeable

private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);

private boolean isAlive = false;
private AtomicBoolean isAlive = new AtomicBoolean(false);

private final Map<String, KafkaConsumer<byte[], byte[]>> consumers = new HashMap<>();

Expand Down Expand Up @@ -129,6 +130,11 @@ public void emitImmediately(Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Lon
if (meta.getTopicPartition().equals(tp)) {
kc.resume(tp);
} else {
try {
kc.position(tp);
} catch (NoOffsetForPartitionException e) {
kc.seekToBeginning(tp);
}
kc.pause(tp);
}
}
Expand Down Expand Up @@ -188,7 +194,7 @@ public void run()
try {


while (wrapper.isAlive) {
while (wrapper.isAlive.get()) {
if (wrapper.waitForReplay) {
Thread.sleep(100);
continue;
Expand Down Expand Up @@ -255,7 +261,7 @@ public void create(AbstractKafkaInputOperator ownerOperator)
public void start(boolean waitForReplay)
{
this.waitForReplay = waitForReplay;
isAlive = true;
isAlive.set(true);

// thread to consume the kafka data
// create thread pool for consumer threads
Expand Down Expand Up @@ -330,11 +336,11 @@ public void start(boolean waitForReplay)
*/
public void stop()
{
isAlive.set(false);
for (KafkaConsumer<byte[], byte[]> c : consumers.values()) {
c.wakeup();
}
kafkaConsumerExecutor.shutdownNow();
isAlive = false;
holdingBuffer.clear();
IOUtils.closeQuietly(this);
}
Expand All @@ -347,16 +353,6 @@ public void teardown()
holdingBuffer.clear();
}

public boolean isAlive()
{
return isAlive;
}

public void setAlive(boolean isAlive)
{
this.isAlive = isAlive;
}

public Pair<String, ConsumerRecord<byte[], byte[]>> pollMessage()
{
return holdingBuffer.poll();
Expand Down