From 7f63a73b7718fc27165bd7b80ed625235a07cec6 Mon Sep 17 00:00:00 2001 From: Hugo Louro Date: Fri, 23 Dec 2016 17:09:06 -0800 Subject: [PATCH] STORM-2320: DRPC client printer class reusable for local and remote DRPC - Client necessary to check for DRPC results while running in distribute mode --- ...ridentKafkaClientWordCountNamedTopics.java | 15 +++- .../kafka/trident/DrpcResultsPrinter.java | 85 +++++++++++++++++++ .../trident/TridentKafkaConsumerTopology.java | 13 ++- .../kafka/trident/TridentKafkaWordCount.java | 6 +- 4 files changed, 114 insertions(+), 5 deletions(-) create mode 100644 examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java index 5861363c9cd..83d6884d6d1 100644 --- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java +++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java @@ -33,6 +33,12 @@ import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; + import java.io.Serializable; import java.util.Arrays; import java.util.List; @@ -59,7 +65,7 @@ private static class JustValueFunc implements Func apply(ConsumerRecord record) { return new Values(record.value()); } - } + }; protected KafkaSpoutConfig newKafkaSpoutConfig() { return KafkaSpoutConfig.builder(KAFKA_LOCAL_BROKER, TOPIC_1, TOPIC_2) @@ -82,7 +88,7 @@ public static void main(String[] args) throws Exception { new TridentKafkaClientWordCountNamedTopics().run(args); } - protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException { + protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, InterruptedException { if (args.length > 0 && Arrays.binarySearch(args, "-h") >= 0) { System.out.printf("Usage: java %s [%s] [%s] [%s] [%s]\n", getClass().getName(), "broker_host:broker_port", "topic1", "topic2", "topology_name"); @@ -101,6 +107,11 @@ protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyE StormSubmitter.submitTopology(topic2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, topic2)); // Consumer StormSubmitter.submitTopology("topics-consumer", tpConf, TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque())); + + // Print results to console, which also causes the print filter in the consumer topology to print the results in the worker log + Thread.sleep(2000); + DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS); + } else { //Submit Local final LocalSubmitter localSubmitter = LocalSubmitter.newInstance(); diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java new file mode 100644 index 00000000000..f71e2dfa40a --- /dev/null +++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.trident; + +import org.apache.storm.LocalDRPC; +import org.apache.storm.generated.DistributedRPC; +import org.apache.storm.thrift.transport.TTransportException; +import org.apache.storm.utils.DRPCClient; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class DrpcResultsPrinter { + private static final Logger LOG = LoggerFactory.getLogger(DrpcResultsPrinter.class); + + private final DistributedRPC.Iface drpcClient; + + public DrpcResultsPrinter(DistributedRPC.Iface drpcClient) { + this.drpcClient = drpcClient; + } + + /** + * @return local DRPC client running on the same JVML + */ + public static DrpcResultsPrinter localClient() { + return new DrpcResultsPrinter(new LocalDRPC()); + } + + /** + * @return remote DRPC client running on local host, on port 3772, with defaults.yaml config + */ + public static DrpcResultsPrinter remoteClient() { + return remoteClient(Utils.readDefaultConfig(), "localhost", 3772); + } + + /** + * @return remote DRPC client running on the specified host, port, with the provided config + */ + public static DrpcResultsPrinter remoteClient(Map config, String host, int port) { + try { + return new DrpcResultsPrinter(new DRPCClient(config, host,port)); + } catch (TTransportException e) { + throw new RuntimeException(String.format("DRPC Client failed to connect to DRPC server. " + + "[host = %s], [port = %s], [config = %s]", host, port, config)); + } + } + + /** + * Prints the DRPC results for the number of times specified, sleeping the specified time in between prints + */ + public void printResults(int num, int sleepTime, TimeUnit sleepUnit) { + for (int i = 0; i < num; i++) { + try { + LOG.info("--- DRPC RESULT: " + drpcClient.execute("words", "the and apple snow jumped")); + System.out.println(); + Thread.sleep(sleepUnit.toMillis(sleepTime)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + public static void main(String[] args) { + DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS); + } +} diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java index 1e7914ea09a..a39eba1a5d3 100644 --- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java +++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java @@ -24,6 +24,7 @@ import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.BaseFilter; import org.apache.storm.trident.operation.builtin.Count; import org.apache.storm.trident.operation.builtin.Debug; import org.apache.storm.trident.operation.builtin.FilterNull; @@ -31,6 +32,7 @@ import org.apache.storm.trident.spout.ITridentDataSource; import org.apache.storm.trident.testing.MemoryMapState; import org.apache.storm.trident.testing.Split; +import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.tuple.Fields; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,13 +58,20 @@ public static StormTopology newTopology(LocalDRPC drpc, ITridentDataSource tride return tridentTopology.build(); } - private static Stream addDRPCStream(TridentTopology tridentTopology, TridentState state, LocalDRPC drpc) { + private static Stream addDRPCStream(TridentTopology tridentTopology, final TridentState state, LocalDRPC drpc) { return tridentTopology.newDRPCStream("words", drpc) .each(new Fields("args"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .stateQuery(state, new Fields("word"), new MapGet(), new Fields("count")) .each(new Fields("count"), new FilterNull()) - .project(new Fields("word", "count")); + .project(new Fields("word", "count")) + .filter(new BaseFilter() { + @Override + public boolean isKeep(TridentTuple tuple) { + LOG.debug("DRPC RESULT: " + tuple); // Used to show the DRPC results in the worker log. Useful for debugging + return true; + } + }); } private static TridentState addTridentState(TridentTopology tridentTopology, ITridentDataSource tridentSpout) { diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java index d1de3672ac0..84dc380ce70 100644 --- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java +++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java @@ -85,6 +85,10 @@ public static void main(String[] args) throws Exception { // Consumer StormSubmitter.submitTopology(args[2] + "-consumer", tpConf, TridentKafkaConsumerTopology.newTopology( new TransactionalTridentKafkaSpout(newTridentKafkaConfig(zkBrokerUrl[0])))); + + // Print results to console, which also causes the print filter in the consumer topology to print the results in the worker log + Thread.sleep(2000); + DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS); } else { //Submit Local final LocalSubmitter localSubmitter = LocalSubmitter.newInstance(); final String prodTpName = "kafkaBolt"; @@ -98,7 +102,7 @@ public static void main(String[] args) throws Exception { new TransactionalTridentKafkaSpout(newTridentKafkaConfig(zkBrokerUrl[0])))); // print - localSubmitter.printResults(60, 1, TimeUnit.SECONDS); + new DrpcResultsPrinter(localSubmitter.getDrpc()).printResults(60, 1, TimeUnit.SECONDS); } finally { // kill localSubmitter.kill(prodTpName);