From 54cc1c3eb2ed5e69f622e863a7db87a3105a5abb Mon Sep 17 00:00:00 2001 From: Sanjeev Kulkarni Date: Sat, 2 Jun 2018 19:10:14 -0700 Subject: [PATCH] Simplify PushSource (#1898) ### Motivation Currently implementing push source requires users implement an additional interface called setConsumer and then using that consumer in their loop. This pr just exposes a consume method from the pushSource abstract class that derived functions can make use of. --- .../org/apache/pulsar/io/core/PushSource.java | 18 +++++++----------- .../apache/pulsar/io/kafka/KafkaSource.java | 9 +-------- .../pulsar/io/rabbitmq/RabbitMQSource.java | 16 +++++----------- .../pulsar/io/twitter/TwitterFireHose.java | 8 +------- 4 files changed, 14 insertions(+), 37 deletions(-) diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java index 011f8ab386c10..af304b937e216 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java @@ -41,16 +41,6 @@ public abstract class PushSource implements Source { public PushSource() { this.queue = new LinkedBlockingQueue<>(this.getQueueLength()); - this.setConsumer(new Consumer>() { - @Override - public void accept(Record record) { - try { - queue.put(record); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - }); } @Override @@ -71,7 +61,13 @@ public Record read() throws Exception { * to pass messages whenever there is data to be pushed to Pulsar. * @param consumer */ - abstract public void setConsumer(Consumer> consumer); + public void consume(Record record) { + try { + queue.put(record); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } /** * Get length of the queue that records are push onto diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java index ad967be2cbdb7..9fa43f7ca349c 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java @@ -48,8 +48,6 @@ public abstract class KafkaSource extends PushSource { private KafkaSourceConfig kafkaSourceConfig; Thread runnerThread; - private java.util.function.Consumer> consumeFunction; - @Override public void open(Map config) throws Exception { kafkaSourceConfig = KafkaSourceConfig.load(config); @@ -78,11 +76,6 @@ public void open(Map config) throws Exception { } - @Override - public void setConsumer(java.util.function.Consumer> consumerFunction) { - this.consumeFunction = consumerFunction; - } - @Override public void close() throws InterruptedException { LOG.info("Stopping kafka source"); @@ -112,7 +105,7 @@ public void start() { for (ConsumerRecord consumerRecord : consumerRecords) { LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value()); KafkaRecord record = new KafkaRecord<>(consumerRecord, extractValue(consumerRecord)); - consumeFunction.accept(record); + consume(record); futures[index] = record.getCompletableFuture(); index++; } diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java index 967c00530e927..e17ab5f8d7cc1 100644 --- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java +++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java @@ -41,7 +41,6 @@ public class RabbitMQSource extends PushSource { private static Logger logger = LoggerFactory.getLogger(RabbitMQSource.class); - private Consumer> consumer; private Connection rabbitMQConnection; private Channel rabbitMQChannel; private RabbitMQConfig rabbitMQConfig; @@ -62,16 +61,11 @@ public void open(Map config) throws Exception { ); rabbitMQChannel = rabbitMQConnection.createChannel(); rabbitMQChannel.queueDeclare(rabbitMQConfig.getQueueName(), false, false, false, null); - com.rabbitmq.client.Consumer consumer = new RabbitMQConsumer(this.consumer, rabbitMQChannel); + com.rabbitmq.client.Consumer consumer = new RabbitMQConsumer(this, rabbitMQChannel); rabbitMQChannel.basicConsume(rabbitMQConfig.getQueueName(), consumer); logger.info("A consumer for queue {} has been successfully started.", rabbitMQConfig.getQueueName()); } - @Override - public void setConsumer(Consumer> consumer) { - this.consumer = consumer; - } - @Override public void close() throws Exception { rabbitMQChannel.close(); @@ -79,16 +73,16 @@ public void close() throws Exception { } private class RabbitMQConsumer extends DefaultConsumer { - private Consumer> consumeFunction; + private RabbitMQSource source; - public RabbitMQConsumer(Consumer> consumeFunction, Channel channel) { + public RabbitMQConsumer(RabbitMQSource source, Channel channel) { super(channel); - this.consumeFunction = consumeFunction; + this.source = source; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - consumeFunction.accept(new RabbitMQRecord(body)); + source.consume(new RabbitMQRecord(body)); } } diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java index 8331e7d3d89d6..3ecfb090bfc9b 100644 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java @@ -50,7 +50,6 @@ public class TwitterFireHose extends PushSource { // ----- Runtime fields private Object waitObject; - private Consumer> consumeFunction; @Override public void open(Map config) throws IOException { @@ -65,11 +64,6 @@ public void open(Map config) throws IOException { startThread(hoseConfig); } - @Override - public void setConsumer(Consumer> consumeFunction) { - this.consumeFunction = consumeFunction; - } - @Override public void close() throws Exception { stopThread(); @@ -125,7 +119,7 @@ public boolean process() throws IOException, InterruptedException { // We don't really care if the record succeeds or not. // However might be in the future to count failures // TODO:- Figure out the metrics story for connectors - consumeFunction.accept(new TwitterRecord(line)); + consume(new TwitterRecord(line)); } catch (Exception e) { LOG.error("Exception thrown"); }