Skip to content
lianghq edited this page Jul 8, 2014 · 5 revisions

Welcome to the streaming-app wiki!

Test case

XML配置信息: com.asiainfo.ocdc.streaming.impl.KafkaSource oc47:2183,oc48:2183,oc49:2183 test_in test-consumer-group 3 <stream_columns>product_no,imsi,calltype_id,time,fee,city_id,product_id</stream_columns> com.asiainfo.ocdc.streaming.impl.StreamFilter user_acct band_id product_no product_no,imsi,user_acct.band_id,product_id,time,fee user_acct.band_id==0 com.asiainfo.ocdc.streaming.impl.KafkaOut test_out oc47:9093,oc48:9094,oc49:9095 product_no,imsi

start topicproducer bin/kafka-console-producer.sh --broker-list oc47:9093,oc48:9094,oc49:9095 --topic test_in start topicconsumer bin/kafka-console-consumer.sh --zookeeper oc47:2183,oc48:2183,oc49:2183 --topic test_out

test data: product_no,imsi,calltype_id,time,fee,city_id,product_id 13681587777 MS0908078012918 4 0451 80 29 807 15911111189 MS1118088110102 3 0451 28 13 808 15810737999 MS0625888113592 5 0456 25 35 588

log: Time: 1403604854000 ms

====================== KafkaSource 输出数据 ======================= product_no####15810737999imsi####MS0625888113592calltype_id####5time####0456fee####25city_id####35product_id####588=============hbase 查询结果================ user_acct.band_id###0 ================StreamFilter 过滤条件为:user_acct.band_id==0==========判断结果为: true ================StreamFilter 输出值: ===================== product_no####15810737999================StreamFilter 输出值: ===================== imsi####MS0625888113592================StreamFilter 输出值: ===================== user_acct.band_id####0================StreamFilter 输出值: ===================== product_id####588================StreamFilter 输出值: ===================== time####0456================StreamFilter 输出值: ===================== fee####25======================Kafka输出信息======================== kafkaout==>15810737999,MS0625888113592, brokers==>oc47:9093,oc48:9094,oc49:9095 topic==>test_out delim==>,


Time: 1403604902000 ms

====================== KafkaSource 输出数据 ======================= product_no####15911111189imsi####MS1118088110102calltype_id####3time####0451fee####28city_id####13product_id####808=============hbase 查询结果================ user_acct.band_id###1 ================StreamFilter 过滤条件为:user_acct.band_id==0==========判断结果为: false 14/06/24 18:15:04 INFO spark.SparkContext: Job finished: take at DStream.scala:586, took 0.01998062 s


Time: 1403604914000 ms

====================== KafkaSource 输出数据 ======================= product_no####15810737999imsi####MS0625888113592calltype_id####5time####0456fee####25city_id####35product_id####588=============hbase 查询结果================ user_acct.band_id###0 ================StreamFilter 过滤条件为:user_acct.band_id==0==========判断结果为: true ================StreamFilter 输出值: ===================== product_no####15810737999================StreamFilter 输出值: ===================== imsi####MS0625888113592================StreamFilter 输出值: ===================== user_acct.band_id####0================StreamFilter 输出值: ===================== product_id####588================StreamFilter 输出值: ===================== time####0456================StreamFilter 输出值: ===================== fee####25======================Kafka输出信息======================== kafkaout==>15810737999,MS0625888113592, brokers==>oc47:9093,oc48:9094,oc49:9095 topic==>test_out delim==>, 14/06/24 18:15:16 INFO spark.SparkContext: Job finished: take at DStream.scala:586, took 0.017004735 s

Clone this wiki locally