From 897b3155c43300453ed019c3210d9cfc8be17ffd Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Fri, 30 Jun 2017 17:30:30 +0900 Subject: [PATCH 01/12] STORM-2601: CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 61a01513506..493fdc5b5c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 1.1.1 + * STORM-2601: add the timeout parameter to the method of getting the nimbus client * STORM-2369: [storm-redis] Use binary type for State management * STORM-2564: We should provide a template for storm-cluster-auth.yaml * STORM-2599: Fix BasicContainer wildcard classpath on Windows From 789110ee005fdc8ca65b7dc1bb745cdab0e8ee5e Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 3 Jul 2017 23:51:00 +0900 Subject: [PATCH 02/12] Update target version of 1.x-branch to 1.2.0-SNAPSHOT --- examples/storm-elasticsearch-examples/pom.xml | 2 +- examples/storm-hbase-examples/pom.xml | 2 +- examples/storm-hdfs-examples/pom.xml | 2 +- examples/storm-hive-examples/pom.xml | 2 +- examples/storm-jdbc-examples/pom.xml | 2 +- examples/storm-jms-examples/pom.xml | 2 +- examples/storm-kafka-client-examples/pom.xml | 2 +- examples/storm-kafka-examples/pom.xml | 2 +- examples/storm-mongodb-examples/pom.xml | 2 +- examples/storm-mqtt-examples/pom.xml | 2 +- examples/storm-opentsdb-examples/pom.xml | 2 +- examples/storm-perf/pom.xml | 2 +- examples/storm-pmml-examples/pom.xml | 2 +- examples/storm-redis-examples/pom.xml | 2 +- examples/storm-solr-examples/pom.xml | 2 +- examples/storm-starter/pom.xml | 2 +- external/flux/flux-core/pom.xml | 2 +- external/flux/flux-wrappers/pom.xml | 2 +- external/flux/pom.xml | 2 +- external/sql/pom.xml | 2 +- external/sql/storm-sql-core/pom.xml | 2 +- external/sql/storm-sql-external/storm-sql-hdfs/pom.xml | 2 +- external/sql/storm-sql-external/storm-sql-kafka/pom.xml | 2 +- external/sql/storm-sql-external/storm-sql-mongodb/pom.xml | 2 +- external/sql/storm-sql-external/storm-sql-redis/pom.xml | 2 +- external/sql/storm-sql-runtime/pom.xml | 2 +- external/storm-autocreds/pom.xml | 2 +- external/storm-cassandra/pom.xml | 2 +- external/storm-druid/pom.xml | 2 +- external/storm-elasticsearch/pom.xml | 2 +- external/storm-eventhubs/pom.xml | 2 +- external/storm-hbase/pom.xml | 2 +- external/storm-hdfs/pom.xml | 2 +- external/storm-hive/pom.xml | 2 +- external/storm-jdbc/pom.xml | 2 +- external/storm-jms/pom.xml | 2 +- external/storm-kafka-client/pom.xml | 2 +- external/storm-kafka-monitor/pom.xml | 2 +- external/storm-kafka/pom.xml | 2 +- external/storm-kinesis/pom.xml | 2 +- external/storm-metrics/pom.xml | 2 +- external/storm-mongodb/pom.xml | 2 +- external/storm-mqtt/pom.xml | 2 +- external/storm-opentsdb/pom.xml | 2 +- external/storm-pmml/pom.xml | 2 +- external/storm-redis/pom.xml | 2 +- external/storm-solr/pom.xml | 2 +- external/storm-submit-tools/pom.xml | 2 +- pom.xml | 2 +- storm-buildtools/maven-shade-clojure-transformer/pom.xml | 2 +- storm-buildtools/storm-maven-plugins/pom.xml | 2 +- storm-core/pom.xml | 2 +- storm-dist/binary/pom.xml | 2 +- storm-dist/source/pom.xml | 2 +- storm-multilang/javascript/pom.xml | 2 +- storm-multilang/python/pom.xml | 2 +- storm-multilang/ruby/pom.xml | 2 +- storm-rename-hack/pom.xml | 2 +- 58 files changed, 58 insertions(+), 58 deletions(-) diff --git a/examples/storm-elasticsearch-examples/pom.xml b/examples/storm-elasticsearch-examples/pom.xml index b3b5aaa0521..e695c212975 100644 --- a/examples/storm-elasticsearch-examples/pom.xml +++ b/examples/storm-elasticsearch-examples/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/examples/storm-hbase-examples/pom.xml b/examples/storm-hbase-examples/pom.xml index 209f5b4677d..0bdbabef8fb 100644 --- a/examples/storm-hbase-examples/pom.xml +++ b/examples/storm-hbase-examples/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/examples/storm-hdfs-examples/pom.xml b/examples/storm-hdfs-examples/pom.xml index f4520ec8702..da4c25743e4 100644 --- a/examples/storm-hdfs-examples/pom.xml +++ b/examples/storm-hdfs-examples/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/examples/storm-hive-examples/pom.xml b/examples/storm-hive-examples/pom.xml index a65a117099b..4ef51dc907d 100644 --- a/examples/storm-hive-examples/pom.xml +++ b/examples/storm-hive-examples/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/examples/storm-jdbc-examples/pom.xml b/examples/storm-jdbc-examples/pom.xml index 712e9b9331b..d790020cb99 100644 --- a/examples/storm-jdbc-examples/pom.xml +++ b/examples/storm-jdbc-examples/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/examples/storm-jms-examples/pom.xml b/examples/storm-jms-examples/pom.xml index 651a91bee4d..0d8926c236b 100644 --- a/examples/storm-jms-examples/pom.xml +++ b/examples/storm-jms-examples/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/examples/storm-kafka-client-examples/pom.xml b/examples/storm-kafka-client-examples/pom.xml index 65a7b9514c0..9be33cad06d 100644 --- a/examples/storm-kafka-client-examples/pom.xml +++ b/examples/storm-kafka-client-examples/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/examples/storm-kafka-examples/pom.xml b/examples/storm-kafka-examples/pom.xml index 9a0d495a114..21d72f29dbf 100644 --- a/examples/storm-kafka-examples/pom.xml +++ b/examples/storm-kafka-examples/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/examples/storm-mongodb-examples/pom.xml b/examples/storm-mongodb-examples/pom.xml index c85780d69e8..49b547987ee 100644 --- a/examples/storm-mongodb-examples/pom.xml +++ b/examples/storm-mongodb-examples/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/examples/storm-mqtt-examples/pom.xml b/examples/storm-mqtt-examples/pom.xml index 8548cc9c33e..a9617991277 100644 --- a/examples/storm-mqtt-examples/pom.xml +++ b/examples/storm-mqtt-examples/pom.xml @@ -26,7 +26,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/examples/storm-opentsdb-examples/pom.xml b/examples/storm-opentsdb-examples/pom.xml index b93148d2a2e..117998f0404 100644 --- a/examples/storm-opentsdb-examples/pom.xml +++ b/examples/storm-opentsdb-examples/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/examples/storm-perf/pom.xml b/examples/storm-perf/pom.xml index 71c7b00d1cd..afedb1f9da2 100644 --- a/examples/storm-perf/pom.xml +++ b/examples/storm-perf/pom.xml @@ -20,7 +20,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/examples/storm-pmml-examples/pom.xml b/examples/storm-pmml-examples/pom.xml index d37dd3775e6..83077524703 100644 --- a/examples/storm-pmml-examples/pom.xml +++ b/examples/storm-pmml-examples/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml 4.0.0 diff --git a/examples/storm-redis-examples/pom.xml b/examples/storm-redis-examples/pom.xml index 9ba04ccf9af..fa0a6d1d76b 100644 --- a/examples/storm-redis-examples/pom.xml +++ b/examples/storm-redis-examples/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/examples/storm-solr-examples/pom.xml b/examples/storm-solr-examples/pom.xml index c4f81832c0d..b4864c77c73 100644 --- a/examples/storm-solr-examples/pom.xml +++ b/examples/storm-solr-examples/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml index 26991cbdaec..fa1ec05ef01 100644 --- a/examples/storm-starter/pom.xml +++ b/examples/storm-starter/pom.xml @@ -20,7 +20,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/flux/flux-core/pom.xml b/external/flux/flux-core/pom.xml index e5ea55d9bbd..3f7c9d2d416 100644 --- a/external/flux/flux-core/pom.xml +++ b/external/flux/flux-core/pom.xml @@ -21,7 +21,7 @@ org.apache.storm flux - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../pom.xml diff --git a/external/flux/flux-wrappers/pom.xml b/external/flux/flux-wrappers/pom.xml index 39823814c40..847d84a167f 100644 --- a/external/flux/flux-wrappers/pom.xml +++ b/external/flux/flux-wrappers/pom.xml @@ -21,7 +21,7 @@ org.apache.storm flux - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../pom.xml diff --git a/external/flux/pom.xml b/external/flux/pom.xml index f1eb82e5451..f44cce0434e 100644 --- a/external/flux/pom.xml +++ b/external/flux/pom.xml @@ -26,7 +26,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/sql/pom.xml b/external/sql/pom.xml index 9a7729a1fcc..9dea3fc2e86 100644 --- a/external/sql/pom.xml +++ b/external/sql/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/sql/storm-sql-core/pom.xml b/external/sql/storm-sql-core/pom.xml index 0db0e8dc4f5..a30e7de5ca9 100644 --- a/external/sql/storm-sql-core/pom.xml +++ b/external/sql/storm-sql-core/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../../pom.xml diff --git a/external/sql/storm-sql-external/storm-sql-hdfs/pom.xml b/external/sql/storm-sql-external/storm-sql-hdfs/pom.xml index 7d29dfc3e64..46180b475c8 100644 --- a/external/sql/storm-sql-external/storm-sql-hdfs/pom.xml +++ b/external/sql/storm-sql-external/storm-sql-hdfs/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../../../pom.xml diff --git a/external/sql/storm-sql-external/storm-sql-kafka/pom.xml b/external/sql/storm-sql-external/storm-sql-kafka/pom.xml index 67dcfee1c27..39887c23b4f 100644 --- a/external/sql/storm-sql-external/storm-sql-kafka/pom.xml +++ b/external/sql/storm-sql-external/storm-sql-kafka/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../../../pom.xml diff --git a/external/sql/storm-sql-external/storm-sql-mongodb/pom.xml b/external/sql/storm-sql-external/storm-sql-mongodb/pom.xml index c31c70818d4..1083a947257 100644 --- a/external/sql/storm-sql-external/storm-sql-mongodb/pom.xml +++ b/external/sql/storm-sql-external/storm-sql-mongodb/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../../../pom.xml diff --git a/external/sql/storm-sql-external/storm-sql-redis/pom.xml b/external/sql/storm-sql-external/storm-sql-redis/pom.xml index 6dae0e5fb72..3535fb16e0e 100644 --- a/external/sql/storm-sql-external/storm-sql-redis/pom.xml +++ b/external/sql/storm-sql-external/storm-sql-redis/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../../../pom.xml diff --git a/external/sql/storm-sql-runtime/pom.xml b/external/sql/storm-sql-runtime/pom.xml index 452df6ef171..c83682fd35f 100644 --- a/external/sql/storm-sql-runtime/pom.xml +++ b/external/sql/storm-sql-runtime/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../../pom.xml diff --git a/external/storm-autocreds/pom.xml b/external/storm-autocreds/pom.xml index a285452e62d..ab654a33591 100644 --- a/external/storm-autocreds/pom.xml +++ b/external/storm-autocreds/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml 4.0.0 diff --git a/external/storm-cassandra/pom.xml b/external/storm-cassandra/pom.xml index 98741acb26d..fbbf70842dc 100644 --- a/external/storm-cassandra/pom.xml +++ b/external/storm-cassandra/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/storm-druid/pom.xml b/external/storm-druid/pom.xml index d8810cd548b..b670a1d47b4 100644 --- a/external/storm-druid/pom.xml +++ b/external/storm-druid/pom.xml @@ -20,7 +20,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml 4.0.0 diff --git a/external/storm-elasticsearch/pom.xml b/external/storm-elasticsearch/pom.xml index c39b97a0ca1..c9815dde94d 100644 --- a/external/storm-elasticsearch/pom.xml +++ b/external/storm-elasticsearch/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml index 6d7ee7b6ebb..0809b6eaa39 100755 --- a/external/storm-eventhubs/pom.xml +++ b/external/storm-eventhubs/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml index 9e979cdd3fb..a6e738be82b 100644 --- a/external/storm-hbase/pom.xml +++ b/external/storm-hbase/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml index d71613a5d34..28149960238 100644 --- a/external/storm-hdfs/pom.xml +++ b/external/storm-hdfs/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/storm-hive/pom.xml b/external/storm-hive/pom.xml index a7d271fb4c2..78751d76900 100644 --- a/external/storm-hive/pom.xml +++ b/external/storm-hive/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/storm-jdbc/pom.xml b/external/storm-jdbc/pom.xml index 455222d8c25..cbf2bb0db64 100644 --- a/external/storm-jdbc/pom.xml +++ b/external/storm-jdbc/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/storm-jms/pom.xml b/external/storm-jms/pom.xml index a7749532c8b..774b1188088 100644 --- a/external/storm-jms/pom.xml +++ b/external/storm-jms/pom.xml @@ -20,7 +20,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml index b6dafab3d7b..182567752a7 100644 --- a/external/storm-kafka-client/pom.xml +++ b/external/storm-kafka-client/pom.xml @@ -22,7 +22,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/storm-kafka-monitor/pom.xml b/external/storm-kafka-monitor/pom.xml index f1a509b7c18..c66e701c94e 100644 --- a/external/storm-kafka-monitor/pom.xml +++ b/external/storm-kafka-monitor/pom.xml @@ -20,7 +20,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml 4.0.0 diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml index de830a71c81..f9cd57e6aca 100644 --- a/external/storm-kafka/pom.xml +++ b/external/storm-kafka/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/storm-kinesis/pom.xml b/external/storm-kinesis/pom.xml index e2ec0b71760..f97a3a372f9 100644 --- a/external/storm-kinesis/pom.xml +++ b/external/storm-kinesis/pom.xml @@ -17,7 +17,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml 4.0.0 diff --git a/external/storm-metrics/pom.xml b/external/storm-metrics/pom.xml index 3e729eae826..06bd710241e 100644 --- a/external/storm-metrics/pom.xml +++ b/external/storm-metrics/pom.xml @@ -20,7 +20,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/storm-mongodb/pom.xml b/external/storm-mongodb/pom.xml index 7822344b8b8..822b9485698 100644 --- a/external/storm-mongodb/pom.xml +++ b/external/storm-mongodb/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/storm-mqtt/pom.xml b/external/storm-mqtt/pom.xml index 61fa29bea47..5546a97a6e9 100644 --- a/external/storm-mqtt/pom.xml +++ b/external/storm-mqtt/pom.xml @@ -25,7 +25,7 @@ org.apache.storm storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/storm-opentsdb/pom.xml b/external/storm-opentsdb/pom.xml index 0f172628a75..45e133ff39b 100644 --- a/external/storm-opentsdb/pom.xml +++ b/external/storm-opentsdb/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/storm-pmml/pom.xml b/external/storm-pmml/pom.xml index 3e12e9de5d7..24af7cfdc3d 100644 --- a/external/storm-pmml/pom.xml +++ b/external/storm-pmml/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml 4.0.0 diff --git a/external/storm-redis/pom.xml b/external/storm-redis/pom.xml index 73dd32438e6..3facb1587e2 100644 --- a/external/storm-redis/pom.xml +++ b/external/storm-redis/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/external/storm-solr/pom.xml b/external/storm-solr/pom.xml index 4a172eac42e..684cbc580a5 100644 --- a/external/storm-solr/pom.xml +++ b/external/storm-solr/pom.xml @@ -20,7 +20,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml 4.0.0 diff --git a/external/storm-submit-tools/pom.xml b/external/storm-submit-tools/pom.xml index a755a42fae7..48bc32fb311 100644 --- a/external/storm-submit-tools/pom.xml +++ b/external/storm-submit-tools/pom.xml @@ -19,7 +19,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml 4.0.0 diff --git a/pom.xml b/pom.xml index c6d3387d759..f53171b74b4 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ org.apache.storm storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT pom Storm Distributed and fault-tolerant realtime computation diff --git a/storm-buildtools/maven-shade-clojure-transformer/pom.xml b/storm-buildtools/maven-shade-clojure-transformer/pom.xml index 416faf59e2a..acedaf73852 100644 --- a/storm-buildtools/maven-shade-clojure-transformer/pom.xml +++ b/storm-buildtools/maven-shade-clojure-transformer/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml diff --git a/storm-buildtools/storm-maven-plugins/pom.xml b/storm-buildtools/storm-maven-plugins/pom.xml index 2ca62f3ec72..503a37305d1 100644 --- a/storm-buildtools/storm-maven-plugins/pom.xml +++ b/storm-buildtools/storm-maven-plugins/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml org.apache.storm diff --git a/storm-core/pom.xml b/storm-core/pom.xml index a85f58c467c..0497bdc4615 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -20,7 +20,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT .. org.apache.storm diff --git a/storm-dist/binary/pom.xml b/storm-dist/binary/pom.xml index fb0b6b1bb80..3945f78684f 100644 --- a/storm-dist/binary/pom.xml +++ b/storm-dist/binary/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml org.apache.storm diff --git a/storm-dist/source/pom.xml b/storm-dist/source/pom.xml index 1b8b2bbe1f2..fbf7b2883a4 100644 --- a/storm-dist/source/pom.xml +++ b/storm-dist/source/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml org.apache.storm diff --git a/storm-multilang/javascript/pom.xml b/storm-multilang/javascript/pom.xml index 2e9f484a6bd..d96d9fb8826 100644 --- a/storm-multilang/javascript/pom.xml +++ b/storm-multilang/javascript/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml org.apache.storm diff --git a/storm-multilang/python/pom.xml b/storm-multilang/python/pom.xml index bef4024d526..8d56edf0981 100644 --- a/storm-multilang/python/pom.xml +++ b/storm-multilang/python/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml org.apache.storm diff --git a/storm-multilang/ruby/pom.xml b/storm-multilang/ruby/pom.xml index 9cfcbfa93af..84155577287 100644 --- a/storm-multilang/ruby/pom.xml +++ b/storm-multilang/ruby/pom.xml @@ -21,7 +21,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../../pom.xml org.apache.storm diff --git a/storm-rename-hack/pom.xml b/storm-rename-hack/pom.xml index 73168080148..dbbc8ef5a84 100644 --- a/storm-rename-hack/pom.xml +++ b/storm-rename-hack/pom.xml @@ -20,7 +20,7 @@ storm org.apache.storm - 1.1.1-SNAPSHOT + 1.2.0-SNAPSHOT ../pom.xml From 48675195234f31d71999b59d8142c76c4135938d Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Tue, 4 Jul 2017 21:22:25 +0900 Subject: [PATCH 03/12] Reflect version 1.2.0 and 1.1.1 to CHANGELOG --- CHANGELOG.md | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 493fdc5b5c3..bea1becc1a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,38 +1,36 @@ -## 1.1.1 +## 1.2.0 * STORM-2601: add the timeout parameter to the method of getting the nimbus client * STORM-2369: [storm-redis] Use binary type for State management - * STORM-2564: We should provide a template for storm-cluster-auth.yaml * STORM-2599: Fix BasicContainer wildcard classpath on Windows * STORM-2598: Add proxy server option for dependency resolver - * STORM-2568: Fix getTopicsString * STORM-2555: handle impersonation properly for HBase delegation token * STORM-2551: Thrift client socket timeout - * STORM-2563: Remove the workaround to handle missing UGI.loginUserFromSubject * STORM-2553: Add support for password in JedisCluster - * STORM-2562: Use stronger key size for blow fish key generator and remove printStackTrace * STORM-2484: Add Flux support for bolt+spout memory configuration + * STORM-2536: excludes jersey 1.x from storm-autocreds + * STORM-2501: Auto populate Hive Credentials + * STORM-2512: Make constructor public and add one more builder constructor + * STORM-2482: Refactor the Storm auto credential plugins to be more usable + +## 1.1.1 + * STORM-2597: Don't parse passed in class paths + * STORM-2564: We should provide a template for storm-cluster-auth.yaml + * STORM-2568: Fix getTopicsString + * STORM-2563: Remove the workaround to handle missing UGI.loginUserFromSubject * STORM-2552: KafkaSpoutMessageId should be serializable + * STORM-2562: Use stronger key size than default for blow fish key generator and get rid of stack trace * STORM-2557: A bug in DisruptorQueue causing severe underestimation of queue arrival rates * STORM-2449: Ensure same key appears only once in State iterator - * STORM-2536: excludes jersey 1.x from storm-autocreds * STORM-2516: Fix timing issues with testPrepareLateTupleStreamWithoutBuilder * STORM-2489: Overlap and data loss on WindowedBolt based on Duration * STORM-2528: Bump log4j version to 2.8.2 * STORM-2527: Initialize java.sql.DriverManager earlier to avoid deadlock - * STORM-2448: Add in Storm and JDK versions when submitting a topology * STORM-2413: Make new Kafka spout respect tuple retry limits - * STORM-2518: Handles empty name for "USER type" ACL when normalizing - * STORM-2501: Auto populate Hive Credentials - * STORM-2520: AutoHDFS should prefer cluster-wise hdfs kerberos principal - * STORM-2519: Modify AbstractAutoCreds to look for configKeys in both nimbus and topology configs - * STORM-2493: update documents to reflect the changes + * STORM-2518: Handles empty name for "USER type" ACL when normalizing ACLs * STORM-2511: Submitting a topology with name containing unicode getting failed * STORM-2496: Dependency artifacts should be uploaded to blobstore with READ permission for all - * STORM-2512: Make constructor public and add one more builder constructor * STORM-2505: Spout to support topic compaction * STORM-2498: Fix Download Full File link - * STORM-2191: shorten classpaths by using wildcards - * STORM-2482: Refactor the Storm auto credential plugins to be more usable * STORM-2343: New Kafka spout can stop emitting tuples if more than maxUncommittedOffsets tuples fail at once. * STORM-2486: Prevent cd from printing target directory to avoid breaking classpath * STORM-2488: The UI user Must be HTTP. @@ -41,9 +39,14 @@ * STORM-2315: New kafka spout can't commit offset when ack is disabled * STORM-2467: Use explicit charset when decoding from array backed buffer * STORM-1114: Race condition in trident zookeeper zk-node create/delete + * STORM-2448: Add in Storm and JDK versions when submitting a topology + * STORM-2343: Fix new Kafka spout stopping processing if more than maxUncommittedOffsets tuples fail at once + * STORM-2431: the default blobstore.dir is storm.local.dir/blobs which is different from distcache-blobstore.md * STORM-2429: Properly validate supervisor.scheduler.meta - * STORM-2194: Stop ignoring socket timeout error from executor * STORM-2451: windows storm.cmd does not set log4j2 config file correctly by default + * STORM-2450: Write resources into correct local director + * STORM-2440: Kill process if executor catches `java.net.SocketTimeoutException` + * STORM-2432: Storm-Kafka-Client Trident Spout Seeks Incorrect Offset With UNCOMMITTED_LATEST Strategy ## 1.1.0 * STORM-2432: Storm-Kafka-Client Trident Spout Seeks Incorrect Offset With UNCOMMITTED_LATEST Strategy From 0582f502835d3459ecc09aa722e9d308336f7dd9 Mon Sep 17 00:00:00 2001 From: liuzhaokun Date: Mon, 26 Jun 2017 19:53:03 +0800 Subject: [PATCH 04/12] [STORM-2602] storm.zookeeper.topology.auth.payload doesn't work even you set it --- storm-core/src/jvm/org/apache/storm/StormSubmitter.java | 9 +++------ storm-core/test/clj/org/apache/storm/submitter_test.clj | 4 ++-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java index eeb90c559da..86ec6c2cab4 100644 --- a/storm-core/src/jvm/org/apache/storm/StormSubmitter.java +++ b/storm-core/src/jvm/org/apache/storm/StormSubmitter.java @@ -77,21 +77,18 @@ public static boolean validateZKDigestPayload(String payload) { @SuppressWarnings("unchecked") public static Map prepareZookeeperAuthentication(Map conf) { Map toRet = new HashMap(); - + String secretPayload = (String) conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD); // Is the topology ZooKeeper authentication configuration unset? if (! conf.containsKey(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) || conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD) == null || ! validateZKDigestPayload((String) conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD))) { - - String secretPayload = generateZookeeperDigestSecretPayload(); - toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, secretPayload); + secretPayload = generateZookeeperDigestSecretPayload(); LOG.info("Generated ZooKeeper secret payload for MD5-digest: " + secretPayload); } - + toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD, secretPayload); // This should always be set to digest. toRet.put(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME, "digest"); - return toRet; } diff --git a/storm-core/test/clj/org/apache/storm/submitter_test.clj b/storm-core/test/clj/org/apache/storm/submitter_test.clj index 3d858e04e6d..e8664379d3b 100644 --- a/storm-core/test/clj/org/apache/storm/submitter_test.clj +++ b/storm-core/test/clj/org/apache/storm/submitter_test.clj @@ -26,7 +26,7 @@ result (StormSubmitter/prepareZookeeperAuthentication conf) actual-payload (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD) actual-scheme (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME)] - (is (nil? actual-payload)) + (is (= "foobar:12345" actual-payload)) (is (= "digest" actual-scheme)))) (testing "Scheme is set to digest if not already." @@ -34,7 +34,7 @@ result (StormSubmitter/prepareZookeeperAuthentication conf) actual-payload (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD) actual-scheme (.get result STORM-ZOOKEEPER-TOPOLOGY-AUTH-SCHEME)] - (is (nil? actual-payload)) + (is (= "foobar:12345" actual-payload)) (is (= "digest" actual-scheme)))) (testing "A payload is generated when no payload is present." From ce947770bb1872744b4be06ffdb42912b207ea00 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Thu, 6 Jul 2017 12:24:59 +0900 Subject: [PATCH 05/12] STORM-2602: CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bea1becc1a4..98a2320d748 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * STORM-2482: Refactor the Storm auto credential plugins to be more usable ## 1.1.1 + * STORM-2602: storm.zookeeper.topology.auth.payload doesn't work even you set it * STORM-2597: Don't parse passed in class paths * STORM-2564: We should provide a template for storm-cluster-auth.yaml * STORM-2568: Fix getTopicsString From 0e0828323414b8f1def6bbcb9035107e7c009e4e Mon Sep 17 00:00:00 2001 From: Srishty Agrawal Date: Wed, 29 Mar 2017 12:35:57 -0700 Subject: [PATCH 06/12] STORM-2506: Print mapping between Task ID and Kafka Partitions --- .../apache/storm/kafka/spout/KafkaSpout.java | 4 ++-- .../org/apache/storm/kafka/KafkaSpout.java | 4 ++-- .../org/apache/storm/kafka/KafkaUtils.java | 17 ++++++++-------- .../apache/storm/kafka/StaticCoordinator.java | 4 ++-- .../org/apache/storm/kafka/ZkCoordinator.java | 20 ++++++++++--------- .../apache/storm/kafka/KafkaUtilsTest.java | 8 ++++---- .../apache/storm/kafka/ZkCoordinatorTest.java | 2 +- 7 files changed, 31 insertions(+), 28 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 58347e334f5..fcef7aca256 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -139,8 +139,8 @@ public void onPartitionsRevoked(Collection partitions) { @Override public void onPartitionsAssigned(Collection partitions) { - LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]", + context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); initialize(partitions); } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java index 01cc9b71878..3abadf2e12d 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java @@ -82,11 +82,11 @@ public void open(Map conf, final TopologyContext context, final SpoutOutputColle if (_spoutConfig.hosts instanceof StaticHosts) { _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), - totalTasks, topologyInstanceId); + totalTasks, context.getThisTaskId(), topologyInstanceId); } else { _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), - totalTasks, topologyInstanceId); + totalTasks, context.getThisTaskId(), topologyInstanceId); } context.registerMetric("kafkaOffset", new IMetric() { diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java index f23c873ddf4..73e86e97b6f 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java @@ -254,7 +254,8 @@ public static Iterable> generateTuples(MessageMetadataSchemeAsMulti } - public static List calculatePartitionsForTask(List partitons, int totalTasks, int taskIndex) { + public static List calculatePartitionsForTask(List partitons, + int totalTasks, int taskIndex, int taskId) { Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks"); List taskPartitions = new ArrayList(); List partitions = new ArrayList(); @@ -269,20 +270,20 @@ public static List calculatePartitionsForTask(List taskPartitions) { - String taskPrefix = taskId(taskIndex, totalTasks); + private static void logPartitionMapping(int totalTasks, int taskIndex, List taskPartitions, int taskId) { + String taskPrefix = taskPrefix(taskIndex, totalTasks, taskId); if (taskPartitions.isEmpty()) { - LOG.warn(taskPrefix + "no partitions assigned"); + LOG.warn(taskPrefix + " no partitions assigned"); } else { - LOG.info(taskPrefix + "assigned " + taskPartitions); + LOG.info(taskPrefix + " assigned " + taskPartitions); } } - public static String taskId(int taskIndex, int totalTasks) { - return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] "; + public static String taskPrefix(int taskIndex, int totalTasks, int taskId) { + return "Task [" + (taskIndex + 1) + "/" + totalTasks + "], Task-ID: " + taskId; } } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java index 628bfc0ac59..c3c5e97990b 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java @@ -26,11 +26,11 @@ public class StaticCoordinator implements PartitionCoordinator { Map _managers = new HashMap(); List _allManagers = new ArrayList<>(); - public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { + public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, int taskId, String topologyInstanceId) { StaticHosts hosts = (StaticHosts) config.hosts; List partitions = new ArrayList(); partitions.add(hosts.getPartitionInformation()); - List myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex); + List myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex, taskId); for (Partition myPartition : myPartitions) { _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition)); } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java index 14be5845007..d9dbfb34f3e 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java @@ -23,7 +23,7 @@ import java.util.*; -import static org.apache.storm.kafka.KafkaUtils.taskId; +import static org.apache.storm.kafka.KafkaUtils.taskPrefix; public class ZkCoordinator implements PartitionCoordinator { private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class); @@ -31,6 +31,7 @@ public class ZkCoordinator implements PartitionCoordinator { SpoutConfig _spoutConfig; int _taskIndex; int _totalTasks; + int _taskId; String _topologyInstanceId; Map _managers = new HashMap(); List _cachedList = new ArrayList(); @@ -41,15 +42,16 @@ public class ZkCoordinator implements PartitionCoordinator { ZkState _state; Map _stormConf; - public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { - this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig)); + public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, int taskId, String topologyInstanceId) { + this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, taskId, topologyInstanceId, buildReader(stormConf, spoutConfig)); } - public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) { + public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, int taskId, String topologyInstanceId, DynamicBrokersReader reader) { _spoutConfig = spoutConfig; _connections = connections; _taskIndex = taskIndex; _totalTasks = totalTasks; + _taskId = taskId; _topologyInstanceId = topologyInstanceId; _stormConf = stormConf; _state = state; @@ -75,9 +77,9 @@ public List getMyManagedPartitions() { @Override public void refresh() { try { - LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections"); + LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Refreshing partition manager connections"); List brokerInfo = _reader.getBrokerInfo(); - List mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex); + List mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex, _taskId); Set curr = _managers.keySet(); Set newPartitions = new HashSet(mine); @@ -86,7 +88,7 @@ public void refresh() { Set deletedPartitions = new HashSet(curr); deletedPartitions.removeAll(mine); - LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString()); + LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Deleted partition managers: " + deletedPartitions.toString()); Map deletedManagers = new HashMap<>(); for (Partition id : deletedPartitions) { @@ -95,7 +97,7 @@ public void refresh() { for (PartitionManager manager : deletedManagers.values()) { if (manager != null) manager.close(); } - LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString()); + LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition managers: " + newPartitions.toString()); for (Partition id : newPartitions) { PartitionManager man = new PartitionManager( @@ -113,7 +115,7 @@ public void refresh() { throw new RuntimeException(e); } _cachedList = new ArrayList(_managers.values()); - LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing"); + LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Finished refreshing"); } @Override diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java index 9da6c0a5187..9362f91e1ae 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java @@ -271,7 +271,7 @@ public void runPartitionToTaskMappingTest(int numPartitions, int partitionsPerTa partitions.add(globalPartitionInformation); int numTasks = numPartitions / partitionsPerTask; for (int i = 0 ; i < numTasks ; i++) { - assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i).size()); + assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i, i).size()); } } @@ -281,8 +281,8 @@ public void moreTasksThanPartitions() { List partitions = new ArrayList(); partitions.add(globalPartitionInformation); int numTasks = 2; - assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0).size()); - assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1).size()); + assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0, 0).size()); + assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1, 1).size()); } @Test (expected = IllegalArgumentException.class ) @@ -290,6 +290,6 @@ public void assignInvalidTask() { GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC); List partitions = new ArrayList(); partitions.add(globalPartitionInformation); - KafkaUtils.calculatePartitionsForTask(partitions, 1, 1); + KafkaUtils.calculatePartitionsForTask(partitions, 1, 1, 1); } } diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java index b23d5bcea2b..0b8684511e0 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java @@ -174,7 +174,7 @@ private void waitForRefresh() throws InterruptedException { private List buildCoordinators(int totalTasks) { List coordinatorList = new ArrayList(); for (int i = 0; i < totalTasks; i++) { - ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, "test-id", reader); + ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, stormConf, spoutConfig, state, i, totalTasks, i, "test-id", reader); coordinatorList.add(coordinator); } return coordinatorList; From 25f969c33d04986eb18eef74127f2f0c33e6af28 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Fri, 7 Jul 2017 12:17:34 +0900 Subject: [PATCH 07/12] STORM-2506: CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 98a2320d748..db173399faa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 1.2.0 + * STORM-2506: Print mapping between Task ID and Kafka Partitions * STORM-2601: add the timeout parameter to the method of getting the nimbus client * STORM-2369: [storm-redis] Use binary type for State management * STORM-2599: Fix BasicContainer wildcard classpath on Windows From 7f6080fb68a2a58eb02d523e4485b0145b452632 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Sun, 16 Apr 2017 18:02:07 +0200 Subject: [PATCH 08/12] STORM-2478: Fix BlobStoreTest.testDeleteAfterFailedCreate on Windows --- .../apache/storm/blobstore/FileBlobStoreImpl.java | 6 ++++-- .../apache/storm/blobstore/LocalFsBlobStore.java | 7 +++++++ .../org/apache/storm/blobstore/BlobStoreTest.java | 14 ++++++++++---- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/FileBlobStoreImpl.java b/storm-core/src/jvm/org/apache/storm/blobstore/FileBlobStoreImpl.java index 6c6877f3ead..a866505246d 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/FileBlobStoreImpl.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/FileBlobStoreImpl.java @@ -17,6 +17,7 @@ */ package org.apache.storm.blobstore; +import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -182,7 +183,8 @@ public void deleteKey(String key) throws IOException { delete(keyDir); } - private File getKeyDir(String key) { + @VisibleForTesting + File getKeyDir(String key) { String hash = String.valueOf(Math.abs((long)key.hashCode()) % BUCKETS); File ret = new File(new File(fullPath, hash), key); LOG.debug("{} Looking for {} in {}", new Object[]{fullPath, key, hash}); @@ -240,7 +242,7 @@ protected Iterator listKeys(File path) throws IOException { } protected void delete(File path) throws IOException { - if (Files.exists(path.toPath())){ + if (Files.exists(path.toPath())) { Files.walkFileTree(path.toPath(), new SimpleFileVisitor() { diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java index ca75c75ad40..50c455c6d92 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java @@ -46,6 +46,8 @@ import static org.apache.storm.blobstore.BlobStoreAclHandler.READ; import static org.apache.storm.blobstore.BlobStoreAclHandler.WRITE; +import com.google.common.annotations.VisibleForTesting; + /** * Provides a local file system backed blob store implementation for Nimbus. * @@ -341,4 +343,9 @@ public synchronized void checkForBlobUpdate(String key) { public void fullCleanup(long age) throws IOException { fbs.fullCleanup(age); } + + @VisibleForTesting + File getKeyDataDir(String key) { + return fbs.getKeyDir(DATA_PREFIX + key); + } } diff --git a/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java b/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java index 1d68326c98f..fcff7c433a8 100644 --- a/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java +++ b/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java @@ -52,6 +52,8 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.spy; +import java.nio.file.Files; + public class BlobStoreTest { private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class); URI base; @@ -169,14 +171,18 @@ public void testMultipleLocalFs() throws Exception { @Test public void testDeleteAfterFailedCreate() throws Exception{ + //Check that a blob can be deleted when a temporary file exists in the blob directory LocalFsBlobStore store = initLocalFs(); + String key = "test"; SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler .WORLD_EVERYTHING); - AtomicOutputStream out = store.createBlob("test", metadata, null); - out.write(1); - - + try(AtomicOutputStream out = store.createBlob(key, metadata, null)) { + out.write(1); + File blobDir = store.getKeyDataDir(key); + Files.createFile(blobDir.toPath().resolve("tempFile.tmp")); + } + store.deleteBlob("test",null); } From b6eb9997ce6969592210027e719406b19ae9b835 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Fri, 7 Jul 2017 13:50:12 +0900 Subject: [PATCH 09/12] STORM-2478: CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index db173399faa..57b2d183d50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ * STORM-2482: Refactor the Storm auto credential plugins to be more usable ## 1.1.1 + * STORM-2478: Fix BlobStoreTest.testDeleteAfterFailedCreate on Windows * STORM-2602: storm.zookeeper.topology.auth.payload doesn't work even you set it * STORM-2597: Don't parse passed in class paths * STORM-2564: We should provide a template for storm-cluster-auth.yaml From a5fb685fb18b7ad2bd6c454ed9634bd91126098c Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Mon, 27 Feb 2017 21:01:13 +0900 Subject: [PATCH 10/12] STORM-2383 [storm-hbase] Support HBase as state backend * Implement HBase state backend * picked Redis state backend implementation * also implemented state iterator * Added 'how to set up' to State-checkpointing doc * address @arunmahadevan's review comment --- docs/State-checkpointing.md | 87 +++- .../jvm/storm/starter/StatefulTopology.java | 2 +- external/storm-hbase/pom.xml | 20 + .../bolt/mapper/HBaseProjectionCriteria.java | 16 + .../apache/storm/hbase/common/ColumnList.java | 37 ++ .../storm/hbase/common/HBaseClient.java | 40 ++ .../storm/hbase/state/HBaseKeyValueState.java | 421 ++++++++++++++++++ .../state/HBaseKeyValueStateIterator.java | 155 +++++++ .../state/HBaseKeyValueStateProvider.java | 165 +++++++ .../hbase/state/HBaseClientTestUtil.java | 377 ++++++++++++++++ .../state/HBaseKeyValueStateIteratorTest.java | 212 +++++++++ .../state/HBaseKeyValueStateProviderTest.java | 96 ++++ .../hbase/state/HBaseKeyValueStateTest.java | 117 +++++ .../state/DefaultStateSerializerTest.java | 4 +- 14 files changed, 1743 insertions(+), 6 deletions(-) create mode 100644 external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java create mode 100644 external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java create mode 100644 external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java create mode 100644 external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java create mode 100644 external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java create mode 100644 external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java create mode 100644 external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java rename {external/storm-redis/src/test/java/org/apache/storm/redis => storm-core/test/jvm/org/apache/storm}/state/DefaultStateSerializerTest.java (98%) diff --git a/docs/State-checkpointing.md b/docs/State-checkpointing.md index 7015ce08da7..4be6e237f40 100644 --- a/docs/State-checkpointing.md +++ b/docs/State-checkpointing.md @@ -50,8 +50,8 @@ last committed by the framework during the previous run. can be changed by setting the storm config `topology.state.checkpoint.interval.ms` 5. For state persistence, use a state provider that supports persistence by setting the `topology.state.provider` in the storm config. E.g. for using Redis based key-value state implementation set `topology.state.provider: org.apache.storm.redis.state.RedisKeyValueStateProvider` -in storm.yaml. The provider implementation jar should be in the class path, which in this case means putting the `storm-redis-*.jar` -in the extlib directory. +in storm.yaml. The provider implementation jar should be in the class path, which in this case means adding `storm-redis` +to dependency of your topology, or adding `--artifacts "org.apache.storm:storm-sql-redis:"` when submitting your topology with `storm jar`. 6. The state provider properties can be overridden by setting `topology.state.provider.config`. For Redis state this is a json config with the following properties. @@ -160,7 +160,10 @@ duplicate state updates during recovery. The state abstraction does not eliminate duplicate evaluations and currently provides only at-least once guarantee. -In order to provide the at-least once guarantee, all bolts in a stateful topology are expected to anchor the tuples while emitting and ack the input tuples once its processed. For non-stateful bolts, the anchoring/acking can be automatically managed by extending the `BaseBasicBolt`. Stateful bolts are expected to anchor tuples while emitting and ack the tuple after processing like in the `WordCountBolt` example in the State management section above. +In order to provide the at-least once guarantee, all bolts in a stateful topology are expected to anchor the tuples +while emitting and ack the input tuples once its processed. For non-stateful bolts, the anchoring/acking can be automatically +managed by extending the `BaseBasicBolt`. Stateful bolts are expected to anchor tuples while emitting and ack the tuple +after processing like in the `WordCountBolt` example in the State management section above. ### IStateful bolt hooks IStateful bolt interface provides hook methods where in the stateful bolts could implement some custom actions. @@ -204,3 +207,81 @@ The framework instantiates the state via the corresponding `StateProvider` imple a `StateProvider` implementation which can load and return the state based on the namespace. Each state belongs to a unique namespace. The namespace is typically unique per task so that each task can have its own state. The StateProvider and the corresponding State implementation should be available in the class path of Storm (by placing them in the extlib directory). + + +### Supported State Backends + +#### Redis + +* State provider class name (`topology.state.provider`) + +`org.apache.storm.redis.state.RedisKeyValueStateProvider` + +* Provider config (`topology.state.provider.config`) + +``` + { + "keyClass": "Optional fully qualified class name of the Key type.", + "valueClass": "Optional fully qualified class name of the Value type.", + "keySerializerClass": "Optional Key serializer implementation class.", + "valueSerializerClass": "Optional Value Serializer implementation class.", + "jedisPoolConfig": { + "host": "localhost", + "port": 6379, + "timeout": 2000, + "database": 0, + "password": "xyz" + } + } + ``` + +* Artifacts to add (`--artifacts`) + +`org.apache.storm:storm-redis:` + +#### HBase + +In order to make state scalable, HBaseKeyValueState stores state KV to a row. This introduces `non-atomic` commit phase and guarantee +eventual consistency on HBase side. It doesn't matter in point of state's view because HBaseKeyValueState can still provide not-yet-committed value. +Even if worker crashes at commit phase, after restart it will read pending-commit states (stored atomically) from HBase and states will be stored eventually. + +NOTE: HBase state provider uses pre-created table and column family, so users need to create and provide one to the provider config. + +You can simply create table via `create 'state', 'cf'` in `hbase shell` but in production you may want to give some more properties. + +* State provider class name (`topology.state.provider`) + +`org.apache.storm.hbase.state.HBaseKeyValueStateProvider` + +* Provider config (`topology.state.provider.config`) + +``` + { + "keyClass": "Optional fully qualified class name of the Key type.", + "valueClass": "Optional fully qualified class name of the Value type.", + "keySerializerClass": "Optional Key serializer implementation class.", + "valueSerializerClass": "Optional Value Serializer implementation class.", + "hbaseConfigKey": "config key to load hbase configuration from storm root configuration. (similar to storm-hbase)", + "tableName": "Pre-created table name for state.", + "columnFamily": "Pre-created column family for state." + } + ``` + +If you want to initialize HBase state provider from codebase, please see below example: + +``` +Config conf = new Config(); + Map hbConf = new HashMap(); + hbConf.put("hbase.rootdir", "file:///tmp/hbase"); + conf.put("hbase.conf", hbConf); + conf.put("topology.state.provider", "org.apache.storm.hbase.state.HBaseKeyValueStateProvider"); + conf.put("topology.state.provider.config", "{" + + " \"hbaseConfigKey\": \"hbase.conf\"," + + " \"tableName\": \"state\"," + + " \"columnFamily\": \"cf\"" + + " }"); +``` + +* Artifacts to add (`--artifacts`) + +`org.apache.storm:storm-hbase:` \ No newline at end of file diff --git a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java index ba513dd0a2e..b5224b118e6 100644 --- a/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java +++ b/examples/storm-starter/src/jvm/storm/starter/StatefulTopology.java @@ -142,4 +142,4 @@ public static void main(String[] args) throws Exception { cluster.shutdown(); } } -} +} \ No newline at end of file diff --git a/external/storm-hbase/pom.xml b/external/storm-hbase/pom.xml index a6e738be82b..dd8cd0b2136 100644 --- a/external/storm-hbase/pom.xml +++ b/external/storm-hbase/pom.xml @@ -96,5 +96,25 @@ storm-autocreds ${project.version} + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + junit + junit + test + + + org.mockito + mockito-all + test + diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java index 81e94b4e054..7325c62c1a2 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/mapper/HBaseProjectionCriteria.java @@ -20,6 +20,7 @@ import com.google.common.collect.Lists; import java.io.Serializable; +import java.util.Arrays; import java.util.List; /** @@ -41,6 +42,11 @@ public ColumnMetaData(String columnFamily, String qualifier) { this.qualifier = qualifier.getBytes(); } + public ColumnMetaData(byte[] columnFamily, byte[] qualifier) { + this.columnFamily = Arrays.copyOf(columnFamily, columnFamily.length); + this.qualifier = Arrays.copyOf(qualifier, qualifier.length); + } + public byte[] getColumnFamily() { return columnFamily; } @@ -65,6 +71,16 @@ public HBaseProjectionCriteria addColumnFamily(String columnFamily) { return this; } + /** + * all columns from this family will be included as result of HBase lookup. + * @param columnFamily + * @return + */ + public HBaseProjectionCriteria addColumnFamily(byte[] columnFamily) { + this.columnFamilies.add(Arrays.copyOf(columnFamily, columnFamily.length)); + return this; + } + /** * Only this column from the the columnFamily will be included as result of HBase lookup. * @param column diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java index 73703dcdbbc..0abe1ad4e77 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/ColumnList.java @@ -87,6 +87,7 @@ public long getIncrement() { private ArrayList columns; + private ArrayList columnsToDelete; private ArrayList counters; @@ -97,6 +98,13 @@ private ArrayList columns(){ return this.columns; } + private ArrayList columnsToDelete(){ + if(this.columnsToDelete == null){ + this.columnsToDelete = new ArrayList(); + } + return this.columnsToDelete; + } + private ArrayList counters(){ if(this.counters == null){ this.counters = new ArrayList(); @@ -163,6 +171,17 @@ public ColumnList addCounter(ICounter counter){ return this.addCounter(counter.family(), counter.qualifier(), counter.increment()); } + /** + * Remove a standard HBase column + * + * @param family + * @param qualifier + * @return + */ + public ColumnList deleteColumn(byte[] family, byte[] qualifier) { + columnsToDelete().add(new Column(family, qualifier, -1, null)); + return this; + } /** * Query to determine if we have column definitions. @@ -173,6 +192,15 @@ public boolean hasColumns(){ return this.columns != null; } + /** + * Query to determine if we have column delete definitions. + * + * @return + */ + public boolean hasColumnsToDelete(){ + return this.columnsToDelete != null; + } + /** * Query to determine if we have counter definitions. * @@ -191,6 +219,15 @@ public List getColumns(){ return this.columns; } + /** + * Get the list of 'column to delete' definitions. + * + * @return + */ + public ArrayList getColumnsToDelete() { + return this.columnsToDelete; + } + /** * Get the list of counter definitions. * @return diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java index c73bc41382a..f44d3981b93 100644 --- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/common/HBaseClient.java @@ -19,6 +19,7 @@ import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; @@ -83,6 +84,19 @@ public List constructMutationReq(byte[] rowKey, ColumnList cols, Durab mutations.add(inc); } + if (cols.hasColumnsToDelete()) { + Delete delete = new Delete(rowKey); + delete.setDurability(durability); + for (ColumnList.Column col : cols.getColumnsToDelete()) { + if (col.getTs() > 0) { + delete.addColumn(col.getFamily(), col.getQualifier(), col.getTs()); + } else { + delete.addColumn(col.getFamily(), col.getQualifier()); + } + } + mutations.add(delete); + } + if (mutations.isEmpty()) { mutations.add(new Put(rowKey)); } @@ -128,6 +142,32 @@ public Result[] batchGet(List gets) throws Exception { } } + public ResultScanner scan(byte[] startRow, byte[] stopRow) throws Exception { + try { + if (startRow == null) { + startRow = HConstants.EMPTY_START_ROW; + } + if (stopRow == null) { + stopRow = HConstants.EMPTY_END_ROW; + } + + Scan scan = new Scan(startRow, stopRow); + return table.getScanner(scan); + } catch (Exception e) { + LOG.warn("Could not open HBASE scanner.", e); + throw e; + } + } + + public boolean exists(Get get) throws Exception { + try { + return table.exists(get); + } catch (Exception e) { + LOG.warn("Could not perform HBASE existence check.", e); + throw e; + } + } + @Override public void close() throws IOException { table.close(); diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java new file mode 100644 index 00000000000..54a38a2a15b --- /dev/null +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueState.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hbase.state; + +import com.google.common.collect.Maps; +import com.google.common.primitives.UnsignedBytes; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; +import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; +import org.apache.storm.hbase.common.ColumnList; +import org.apache.storm.hbase.common.HBaseClient; +import org.apache.storm.state.DefaultStateEncoder; +import org.apache.storm.state.DefaultStateSerializer; +import org.apache.storm.state.KeyValueState; +import org.apache.storm.state.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * A Hbase based implementation that persists the state in HBase. + */ +public class HBaseKeyValueState implements KeyValueState { + private static final Logger LOG = LoggerFactory.getLogger(HBaseKeyValueState.class); + + public static byte[] STATE_QUALIFIER = "s".getBytes(); + public static final int ITERATOR_CHUNK_SIZE = 1000; + + public static final NavigableMap EMPTY_PENDING_COMMIT_MAP = Maps.unmodifiableNavigableMap( + new TreeMap(UnsignedBytes.lexicographicalComparator())); + + private static byte[] COMMIT_TXID_KEY = "commit".getBytes(); + private static byte[] PREPARE_TXID_KEY = "prepare".getBytes(); + + private final byte[] keyNamespace; + private final byte[] prepareNamespace; + private final byte[] txidNamespace; + private final String namespace; + private final byte[] columnFamily; + private final DefaultStateEncoder encoder; + private final HBaseClient hBaseClient; + + private ConcurrentNavigableMap pendingPrepare; + private NavigableMap pendingCommit; + + // the key and value of txIds are guaranteed to be converted to UTF-8 encoded String + private NavigableMap txIds; + + public HBaseKeyValueState(HBaseClient hbaseClient, String columnFamily, String namespace) { + this(hbaseClient, columnFamily, namespace, new DefaultStateSerializer(), + new DefaultStateSerializer()); + } + + public HBaseKeyValueState(HBaseClient hBaseClient, String columnFamily, String namespace, + Serializer keySerializer, Serializer valueSerializer) { + + this.hBaseClient = hBaseClient; + this.columnFamily = columnFamily.getBytes(); + this.namespace = namespace; + this.keyNamespace = (namespace + "$key:").getBytes(); + this.prepareNamespace = (namespace + "$prepare").getBytes(); + this.txidNamespace = (namespace + "$txid").getBytes(); + this.encoder = new DefaultStateEncoder(keySerializer, valueSerializer); + this.pendingPrepare = createPendingPrepareMap(); + initTxids(); + initPendingCommit(); + } + + private void initTxids() { + HBaseProjectionCriteria criteria = new HBaseProjectionCriteria(); + criteria.addColumnFamily(columnFamily); + Get get = hBaseClient.constructGetRequests(txidNamespace, criteria); + try { + Result[] results = hBaseClient.batchGet(Collections.singletonList(get)); + Result result = results[0]; + if (!result.isEmpty()) { + NavigableMap familyMap = result.getFamilyMap(columnFamily); + txIds = new TreeMap<>(familyMap); + } else { + txIds = new TreeMap<>(UnsignedBytes.lexicographicalComparator()); + } + + LOG.debug("initTxids, txIds {}", txIds); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void initPendingCommit() { + HBaseProjectionCriteria criteria = new HBaseProjectionCriteria(); + criteria.addColumnFamily(columnFamily); + Get get = hBaseClient.constructGetRequests(prepareNamespace, criteria); + try { + Result[] results = hBaseClient.batchGet(Collections.singletonList(get)); + Result result = results[0]; + if (!result.isEmpty()) { + LOG.debug("Loading previously prepared commit from {}", prepareNamespace); + NavigableMap familyMap = result.getFamilyMap(columnFamily); + pendingCommit = Maps.unmodifiableNavigableMap(familyMap); + } else { + LOG.debug("No previously prepared commits."); + pendingCommit = EMPTY_PENDING_COMMIT_MAP; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void put(K key, V value) { + LOG.debug("put key '{}', value '{}'", key, value); + byte[] columnKey = encoder.encodeKey(key); + byte[] columnValue = encoder.encodeValue(value); + pendingPrepare.put(columnKey, columnValue); + } + + @Override + public V get(K key) { + LOG.debug("get key '{}'", key); + byte[] columnKey = encoder.encodeKey(key); + byte[] columnValue = null; + + if (pendingPrepare.containsKey(columnKey)) { + columnValue = pendingPrepare.get(columnKey); + } else if (pendingCommit.containsKey(columnKey)) { + columnValue = pendingCommit.get(columnKey); + } else { + HBaseProjectionCriteria criteria = new HBaseProjectionCriteria(); + HBaseProjectionCriteria.ColumnMetaData column = new HBaseProjectionCriteria.ColumnMetaData(columnFamily, + STATE_QUALIFIER); + criteria.addColumn(column); + Get get = hBaseClient.constructGetRequests(getRowKeyForStateKey(columnKey), criteria); + try { + Result[] results = hBaseClient.batchGet(Collections.singletonList(get)); + Result result = results[0]; + columnValue = result.getValue(column.getColumnFamily(), column.getQualifier()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + V value = null; + if (columnValue != null) { + value = encoder.decodeValue(columnValue); + } + LOG.debug("Value for key '{}' is '{}'", key, value); + return value; + } + + @Override + public V get(K key, V defaultValue) { + V val = get(key); + return val != null ? val : defaultValue; + } + + @Override + public V delete(K key) { + LOG.debug("delete key '{}'", key); + byte[] columnKey = encoder.encodeKey(key); + V curr = get(key); + pendingPrepare.put(columnKey, encoder.getTombstoneValue()); + return curr; + } + + @Override + public Iterator> iterator() { + return new HBaseKeyValueStateIterator<>(namespace, columnFamily, hBaseClient, pendingPrepare.entrySet().iterator(), + pendingCommit.entrySet().iterator(), ITERATOR_CHUNK_SIZE, encoder.getKeySerializer(), encoder.getValueSerializer()); + } + + @Override + public void prepareCommit(long txid) { + LOG.debug("prepareCommit txid {}", txid); + validatePrepareTxid(txid); + + try { + ConcurrentNavigableMap currentPending = pendingPrepare; + pendingPrepare = createPendingPrepareMap(); + + Result result = getColumnFamily(prepareNamespace, columnFamily); + if (!result.isEmpty()) { + LOG.debug("Prepared txn already exists, will merge", txid); + for (Map.Entry e : pendingCommit.entrySet()) { + if (!currentPending.containsKey(e.getKey())) { + currentPending.put(e.getKey(), e.getValue()); + } + } + } else { + LOG.debug("Nothing to save for prepareCommit, txid {}.", txid); + } + + if (!currentPending.isEmpty()) { + mutateRow(prepareNamespace, columnFamily, currentPending); + } else { + LOG.debug("Nothing to save for prepareCommit, txid {}.", txid); + } + + txIds.put(PREPARE_TXID_KEY, String.valueOf(txid).getBytes()); + mutateRow(txidNamespace, columnFamily, txIds); + pendingCommit = Maps.unmodifiableNavigableMap(currentPending); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void commit(long txid) { + LOG.debug("commit txid {}", txid); + validateCommitTxid(txid); + try { + if (!pendingCommit.isEmpty()) { + applyPendingStateToHBase(pendingCommit); + } else { + LOG.debug("Nothing to save for commit, txid {}.", txid); + } + txIds.put(COMMIT_TXID_KEY, String.valueOf(txid).getBytes()); + mutateRow(txidNamespace, columnFamily, txIds); + deleteRow(prepareNamespace); + pendingCommit = EMPTY_PENDING_COMMIT_MAP; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void commit() { + if (!pendingPrepare.isEmpty()) { + try { + applyPendingStateToHBase(pendingPrepare); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + LOG.debug("Nothing to save for commit"); + } + pendingPrepare = createPendingPrepareMap(); + } + + @Override + public void rollback() { + LOG.debug("rollback"); + try { + if (existsRow(prepareNamespace)) { + deleteRow(prepareNamespace); + } else { + LOG.debug("Nothing to rollback, prepared data is empty"); + } + Long lastCommittedId = lastCommittedTxid(); + if (lastCommittedId != null) { + txIds.put(PREPARE_TXID_KEY, String.valueOf(lastCommittedId).getBytes()); + } else { + txIds.remove(PREPARE_TXID_KEY); + } + if (!txIds.isEmpty()) { + LOG.debug("put txidNamespace {}, txIds {}", txidNamespace, txIds); + mutateRow(txidNamespace, columnFamily, txIds); + } + pendingCommit = EMPTY_PENDING_COMMIT_MAP; + pendingPrepare = createPendingPrepareMap(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /* + * Same txid can be prepared again, but the next txid cannot be prepared + * when previous one is not committed yet. + */ + private void validatePrepareTxid(long txid) { + Long committedTxid = lastCommittedTxid(); + if (committedTxid != null) { + if (txid <= committedTxid) { + throw new RuntimeException("Invalid txid '" + txid + "' for prepare. Txid '" + committedTxid + + "' is already committed"); + } + } + } + + /* + * Same txid can be committed again but the + * txid to be committed must be the last prepared one. + */ + private void validateCommitTxid(long txid) { + Long committedTxid = lastCommittedTxid(); + if (committedTxid != null) { + if (txid < committedTxid) { + throw new RuntimeException("Invalid txid '" + txid + "' txid '" + committedTxid + "' is already committed"); + } + } + Long preparedTxid = lastPreparedTxid(); + if (preparedTxid != null) { + if (txid != preparedTxid) { + throw new RuntimeException("Invalid txid '" + txid + "' not same as prepared txid '" + preparedTxid + "'"); + } + } + } + + + private Long lastCommittedTxid() { + return lastId(COMMIT_TXID_KEY); + } + + private Long lastPreparedTxid() { + return lastId(PREPARE_TXID_KEY); + } + + private Long lastId(byte[] key) { + Long lastId = null; + byte[] txId = txIds.get(key); + if (txId != null) { + lastId = Long.valueOf(new String(txId)); + } + return lastId; + } + + private byte[] getRowKeyForStateKey(byte[] columnKey) { + byte[] rowKey = new byte[keyNamespace.length + columnKey.length]; + System.arraycopy(keyNamespace, 0, rowKey, 0, keyNamespace.length); + System.arraycopy(columnKey, 0, rowKey, keyNamespace.length, columnKey.length); + return rowKey; + } + + private void applyPendingStateToHBase(NavigableMap pendingMap) throws Exception { + List mutations = new ArrayList<>(); + for (Map.Entry entry : pendingMap.entrySet()) { + byte[] rowKey = entry.getKey(); + byte[] value = entry.getValue(); + + if (Arrays.equals(value, encoder.getTombstoneValue())) { + mutations.add(new Delete(getRowKeyForStateKey(rowKey))); + } else { + List mutationsForRow = prepareMutateRow(getRowKeyForStateKey(rowKey), columnFamily, + Collections.singletonMap(STATE_QUALIFIER, value)); + mutations.addAll(mutationsForRow); + } + } + + hBaseClient.batchMutate(mutations); + } + + private Result getColumnFamily(byte[] rowKey, byte[] columnFamily) throws Exception { + HBaseProjectionCriteria criteria = new HBaseProjectionCriteria(); + criteria.addColumnFamily(columnFamily); + Get get = hBaseClient.constructGetRequests(rowKey, criteria); + Result[] results = hBaseClient.batchGet(Collections.singletonList(get)); + return results[0]; + } + + private List prepareMutateRow(byte[] rowKey, byte[] columnFamily, Map map) { + return prepareMutateRow(rowKey, columnFamily, map, Durability.USE_DEFAULT); + } + + private List prepareMutateRow(byte[] rowKey, byte[] columnFamily, Map map, + Durability durability) { + ColumnList columnList = buildColumnList(columnFamily, map); + return hBaseClient.constructMutationReq(rowKey, columnList, durability); + } + + private void mutateRow(byte[] rowKey, byte[] columnFamily, Map map) + throws Exception { + mutateRow(rowKey, columnFamily, map, Durability.USE_DEFAULT); + } + + private void mutateRow(byte[] rowKey, byte[] columnFamily, Map map, + Durability durability) throws Exception { + hBaseClient.batchMutate(prepareMutateRow(rowKey, columnFamily, map, durability)); + } + + private boolean existsRow(byte[] rowKey) throws Exception { + Get get = new Get(rowKey); + return hBaseClient.exists(get); + } + + private void deleteRow(byte[] rowKey) throws Exception { + Delete delete = new Delete(rowKey); + hBaseClient.batchMutate(Collections.singletonList(delete)); + } + + private ColumnList buildColumnList(byte[] columnFamily, Map map) { + ColumnList columnList = new ColumnList(); + for (Map.Entry entry : map.entrySet()) { + columnList.addColumn(columnFamily, entry.getKey(), entry.getValue()); + } + return columnList; + } + + /** + * Intended to extract this to separate method since only pendingPrepare uses ConcurrentNavigableMap. + */ + private ConcurrentNavigableMap createPendingPrepareMap() { + return new ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator()); + } +} diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java new file mode 100644 index 00000000000..dd9c20e0b76 --- /dev/null +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateIterator.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hbase.state; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; + +import com.google.common.primitives.UnsignedBytes; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.storm.hbase.common.HBaseClient; +import org.apache.storm.state.BaseBinaryStateIterator; +import org.apache.storm.state.DefaultStateEncoder; +import org.apache.storm.state.Serializer; + +import org.apache.storm.state.StateEncoder; + +import static org.apache.storm.hbase.state.HBaseKeyValueState.STATE_QUALIFIER; + +/** + * An iterator over {@link HBaseKeyValueState}. + */ +public class HBaseKeyValueStateIterator extends BaseBinaryStateIterator { + + private final byte[] keyNamespace; + private byte[] cursorKey; + private final byte[] endScanKey; + private final byte[] columnFamily; + private final HBaseClient hBaseClient; + private final int chunkSize; + private final StateEncoder encoder; + + private Iterator> cachedResultIterator; + + /** + * Constructor. + * + * @param namespace The namespace of State + * @param columnFamily The column family of state + * @param hBaseClient The instance of HBaseClient + * @param pendingPrepareIterator The iterator of pendingPrepare + * @param pendingCommitIterator The iterator of pendingCommit + * @param chunkSize The size of chunk to get entries from HBase + * @param keySerializer The serializer of key + * @param valueSerializer The serializer of value + */ + public HBaseKeyValueStateIterator(String namespace, byte[] columnFamily, HBaseClient hBaseClient, + Iterator> pendingPrepareIterator, + Iterator> pendingCommitIterator, + int chunkSize, Serializer keySerializer, + Serializer valueSerializer) { + super(pendingPrepareIterator, pendingCommitIterator); + this.columnFamily = columnFamily; + this.keyNamespace = (namespace + "$key:").getBytes(); + this.cursorKey = (namespace + "$key:").getBytes(); + + // this is the end key for whole scan + this.endScanKey = advanceRow(this.cursorKey); + this.hBaseClient = hBaseClient; + this.chunkSize = chunkSize; + this.encoder = new DefaultStateEncoder(keySerializer, valueSerializer); + } + + @Override + protected Iterator> loadChunkFromStateStorage() { + loadChunkFromHBase(); + return cachedResultIterator; + } + + @Override + protected boolean isEndOfDataFromStorage() { + if (cachedResultIterator != null && cachedResultIterator.hasNext()) { + return false; + } + + try { + ResultScanner resultScanner = hBaseClient.scan(cursorKey, endScanKey); + return !(resultScanner.iterator().hasNext()); + } catch (Exception e) { + throw new RuntimeException("Fail to scan from HBase state storage."); + } + } + + @Override + protected K decodeKey(byte[] key) { + return encoder.decodeKey(key); + } + + @Override + protected V decodeValue(byte[] value) { + return encoder.decodeValue(value); + } + + @Override + protected boolean isTombstoneValue(byte[] value) { + return Arrays.equals(value, encoder.getTombstoneValue()); + } + + private void loadChunkFromHBase() { + Map chunk = new TreeMap<>(UnsignedBytes.lexicographicalComparator()); + try { + ResultScanner resultScanner = hBaseClient.scan(cursorKey, endScanKey); + + Result[] results = resultScanner.next(chunkSize); + + for (Result result : results) { + byte[] columnKey = extractStateKeyFromRowKey(result.getRow()); + byte[] columnValue = result.getValue(columnFamily, STATE_QUALIFIER); + + chunk.put(columnKey, columnValue); + } + + if (results.length > 0) { + byte[] lastRow = results[results.length - 1].getRow(); + cursorKey = advanceRow(lastRow); + } + + cachedResultIterator = chunk.entrySet().iterator(); + } catch (Exception e) { + throw new RuntimeException("Fail to scan from HBase state storage.", e); + } + } + + private byte[] advanceRow(byte[] row) { + byte[] advancedRow = new byte[row.length]; + System.arraycopy(row, 0, advancedRow, 0, row.length); + advancedRow[row.length - 1]++; + return advancedRow; + } + + private byte[] extractStateKeyFromRowKey(byte[] row) { + byte[] stateKey = new byte[row.length - keyNamespace.length]; + System.arraycopy(row, keyNamespace.length, stateKey, 0, row.length - keyNamespace.length); + return stateKey; + } + +} diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java new file mode 100644 index 00000000000..001400a1394 --- /dev/null +++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/state/HBaseKeyValueStateProvider.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hbase.state; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.hbase.common.HBaseClient; +import org.apache.storm.state.DefaultStateSerializer; +import org.apache.storm.state.Serializer; +import org.apache.storm.state.State; +import org.apache.storm.state.StateProvider; +import org.apache.storm.task.TopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Provides {@link HBaseKeyValueState}. + */ +public class HBaseKeyValueStateProvider implements StateProvider { + private static final Logger LOG = LoggerFactory.getLogger(HBaseKeyValueStateProvider.class); + + @Override + public State newState(String namespace, Map stormConf, TopologyContext context) { + try { + return getHBaseKeyValueState(namespace, stormConf, getStateConfig(stormConf)); + } catch (Exception ex) { + LOG.error("Error loading config from storm conf {}", stormConf); + throw new RuntimeException(ex); + } + } + + StateConfig getStateConfig(Map stormConf) throws Exception { + StateConfig stateConfig = null; + String providerConfig = null; + ObjectMapper mapper = new ObjectMapper(); + mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + if (stormConf.containsKey(org.apache.storm.Config.TOPOLOGY_STATE_PROVIDER_CONFIG)) { + providerConfig = (String) stormConf.get(org.apache.storm.Config.TOPOLOGY_STATE_PROVIDER_CONFIG); + stateConfig = mapper.readValue(providerConfig, StateConfig.class); + } else { + stateConfig = new StateConfig(); + } + + // assertion + assertMandatoryParameterNotEmpty(stateConfig.hbaseConfigKey, "hbaseConfigKey"); + assertMandatoryParameterNotEmpty(stateConfig.tableName, "tableName"); + assertMandatoryParameterNotEmpty(stateConfig.columnFamily, "columnFamily"); + + return stateConfig; + } + + private HBaseKeyValueState getHBaseKeyValueState(String namespace, Map stormConf, StateConfig config) throws Exception { + Map conf = getHBaseConfigMap(stormConf, config.hbaseConfigKey); + final Configuration hbConfig = getHBaseConfigurationInstance(conf); + + //heck for backward compatibility, we need to pass TOPOLOGY_AUTO_CREDENTIALS to hbase conf + //the conf instance is instance of persistentMap so making a copy. + Map hbaseConfMap = new HashMap(conf); + hbaseConfMap.put(Config.TOPOLOGY_AUTO_CREDENTIALS, stormConf.get(Config.TOPOLOGY_AUTO_CREDENTIALS)); + HBaseClient hbaseClient = new HBaseClient(hbaseConfMap, hbConfig, config.tableName); + + return new HBaseKeyValueState(hbaseClient, config.columnFamily, namespace, + getKeySerializer(config), getValueSerializer(config)); + } + + private Configuration getHBaseConfigurationInstance(Map conf) { + final Configuration hbConfig = HBaseConfiguration.create(); + for(String key : conf.keySet()) { + hbConfig.set(key, String.valueOf(conf.get(key))); + } + return hbConfig; + } + + private Map getHBaseConfigMap(Map stormConfMap, String hbaseConfigKey) { + Map conf = (Map) stormConfMap.get(hbaseConfigKey); + if(conf == null) { + throw new IllegalArgumentException("HBase configuration not found using key '" + hbaseConfigKey + "'"); + } + + if(conf.get("hbase.rootdir") == null) { + LOG.warn("No 'hbase.rootdir' value found in configuration! Using HBase defaults."); + } + return conf; + } + + private void assertMandatoryParameterNotEmpty(String paramValue, String fieldName) { + if (StringUtils.isEmpty(paramValue)) { + throw new IllegalArgumentException(fieldName + " should be provided."); + } + } + + private Serializer getKeySerializer(StateConfig config) throws Exception { + Serializer serializer = null; + if (config.keySerializerClass != null) { + Class klass = (Class) Class.forName(config.keySerializerClass); + serializer = (Serializer) klass.newInstance(); + } else if (config.keyClass != null) { + serializer = new DefaultStateSerializer(Collections.singletonList(Class.forName(config.keyClass))); + } else { + serializer = new DefaultStateSerializer(); + } + return serializer; + } + + private Serializer getValueSerializer(StateConfig config) throws Exception { + Serializer serializer = null; + if (config.valueSerializerClass != null) { + Class klass = (Class) Class.forName(config.valueSerializerClass); + serializer = (Serializer) klass.newInstance(); + } else if (config.valueClass != null) { + serializer = new DefaultStateSerializer(Collections.singletonList(Class.forName(config.valueClass))); + } else { + serializer = new DefaultStateSerializer(); + } + return serializer; + } + + static class StateConfig { + String keyClass; + String valueClass; + String keySerializerClass; + String valueSerializerClass; + String hbaseConfigKey; + String tableName; + String columnFamily; + + @Override + public String toString() { + return "StateConfig{" + + "keyClass='" + keyClass + '\'' + + ", valueClass='" + valueClass + '\'' + + ", keySerializerClass='" + keySerializerClass + '\'' + + ", valueSerializerClass='" + valueSerializerClass + '\'' + + ", hbaseConfigKey='" + hbaseConfigKey + '\'' + + ", tableName='" + tableName + '\'' + + ", columnFamily='" + columnFamily + '\'' + + '}'; + } + } +} diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java new file mode 100644 index 00000000000..634620e66cf --- /dev/null +++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseClientTestUtil.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hbase.state; + +import com.google.common.primitives.UnsignedBytes; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.storm.hbase.bolt.mapper.HBaseProjectionCriteria; +import org.apache.storm.hbase.common.ColumnList; +import org.apache.storm.hbase.common.HBaseClient; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; + +public class HBaseClientTestUtil { + private HBaseClientTestUtil() { + } + + public static HBaseClient mockedHBaseClient() throws Exception { + return mockedHBaseClient(new ConcurrentSkipListMap>>( + UnsignedBytes.lexicographicalComparator())); + } + + public static HBaseClient mockedHBaseClient( + ConcurrentNavigableMap>> internalMap) + throws Exception { + HBaseClient mockClient = mock(HBaseClient.class); + + Mockito.doNothing().when(mockClient).close(); + + Mockito.when(mockClient.constructGetRequests(any(byte[].class), any(HBaseProjectionCriteria.class))) + .thenCallRealMethod(); + + Mockito.when(mockClient.constructMutationReq(any(byte[].class), any(ColumnList.class), any(Durability.class))) + .thenCallRealMethod(); + + Mockito.when(mockClient.exists(any(Get.class))).thenAnswer(new ExistsAnswer(internalMap)); + Mockito.when(mockClient.batchGet(any(List.class))).thenAnswer(new BatchGetAnswer(internalMap)); + Mockito.doAnswer(new BatchMutateAnswer(internalMap)).when(mockClient).batchMutate(any(List.class)); + Mockito.when(mockClient.scan(any(byte[].class), any(byte[].class))).thenAnswer(new ScanAnswer(internalMap)); + + return mockClient; + } + + static class BuildCellsHelper { + public static void addMatchingColumnFamilies(byte[] rowKey, Map> familyMap, + NavigableMap> cfToQualifierToValueMap, + List cells) { + for (Map.Entry> entry : familyMap.entrySet()) { + byte[] columnFamily = entry.getKey(); + + NavigableMap qualifierToValueMap = cfToQualifierToValueMap.get(columnFamily); + if (qualifierToValueMap != null) { + if (entry.getValue() == null || entry.getValue().size() == 0) { + addAllQualifiers(rowKey, columnFamily, qualifierToValueMap, cells); + } else { + addMatchingQualifiers(rowKey, columnFamily, entry, qualifierToValueMap, cells); + } + } + } + } + + public static void addMatchingQualifiers(byte[] rowKey, byte[] columnFamily, + Map.Entry> qualifierSet, + NavigableMap qualifierToValueMap, + List cells) { + for (byte[] qualifier : qualifierSet.getValue()) { + byte[] value = qualifierToValueMap.get(qualifier); + if (value != null) { + cells.add(new KeyValue(rowKey, columnFamily, qualifier, value)); + } + } + } + + public static void addAllColumnFamilies(byte[] rowKey, NavigableMap> cfToQualifierToValueMap, + List cells) { + for (Map.Entry> entry : cfToQualifierToValueMap.entrySet()) { + byte[] columnFamily = entry.getKey(); + addAllQualifiers(rowKey, columnFamily, entry.getValue(), cells); + } + } + + public static void addAllQualifiers(byte[] rowKey, byte[] columnFamily, + NavigableMap qualifierToValueMap, List cells) { + for (Map.Entry entry2 : qualifierToValueMap.entrySet()) { + byte[] qualifier = entry2.getKey(); + byte[] value = entry2.getValue(); + cells.add(new KeyValue(rowKey, columnFamily, qualifier, value)); + } + } + + } + + static class BatchGetAnswer implements Answer { + private final ConcurrentNavigableMap>> mockMap; + + public BatchGetAnswer(ConcurrentNavigableMap>> mockMap) { + this.mockMap = mockMap; + } + + @Override + public Result[] answer(InvocationOnMock invocationOnMock) throws Throwable { + Object[] args = invocationOnMock.getArguments(); + List param = (List) args[0]; + + List results = new ArrayList<>(param.size()); + + for (Get get : param) { + byte[] rowKey = get.getRow(); + + NavigableMap> cfToQualifierToValueMap = + mockMap.get(rowKey); + + if (cfToQualifierToValueMap != null) { + Map> familyMap = get.getFamilyMap(); + + List cells = new ArrayList<>(); + if (familyMap == null || familyMap.size() == 0) { + // all column families + BuildCellsHelper.addAllColumnFamilies(rowKey, cfToQualifierToValueMap, cells); + } else { + // one or more column families + BuildCellsHelper.addMatchingColumnFamilies(rowKey, familyMap, cfToQualifierToValueMap, cells); + } + + // Result.create() states that "You must ensure that the keyvalues are already sorted." + Collections.sort(cells, new KeyValue.KVComparator()); + results.add(Result.create(cells)); + } else { + results.add(Result.EMPTY_RESULT); + } + } + + return results.toArray(new Result[0]); + } + } + + static class BatchMutateAnswer implements Answer { + private final ConcurrentNavigableMap>> mockMap; + + public BatchMutateAnswer(ConcurrentNavigableMap>> mockMap) { + this.mockMap = mockMap; + } + + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + Object[] args = invocationOnMock.getArguments(); + List param = (List) args[0]; + + // assumption: there're no put and delete for same target in parameter list + for (Mutation mutation : param) { + byte[] rowKey = mutation.getRow(); + + NavigableMap> familyCellMap = mutation.getFamilyCellMap(); + if (familyCellMap == null || familyCellMap.size() == 0) { + if (mutation instanceof Delete) { + deleteRow(mockMap, rowKey); + } else { + throw new IllegalStateException("Not supported in mocked mutate."); + } + } + + for (Map.Entry> entry : familyCellMap.entrySet()) { + byte[] columnFamily = entry.getKey(); + List cells = entry.getValue(); + + if (cells == null || cells.size() == 0) { + if (mutation instanceof Delete) { + deleteColumnFamily(mockMap, rowKey, columnFamily); + } else { + throw new IllegalStateException("Not supported in mocked mutate."); + } + } else { + for (Cell cell : cells) { + byte[] qualifier = CellUtil.cloneQualifier(cell); + + if (mutation instanceof Put) { + byte[] value = CellUtil.cloneValue(cell); + + putCell(mockMap, rowKey, columnFamily, qualifier, value); + } else if (mutation instanceof Delete) { + deleteCell(mockMap, rowKey, columnFamily, qualifier); + } else { + throw new IllegalStateException("Not supported in mocked mutate."); + } + } + } + } + } + + return null; + } + + private void putCell(ConcurrentNavigableMap>> mockMap, + byte[] rowKey, byte[] columnFamily, byte[] qualifier, byte[] value) { + NavigableMap> cfToQualifierToValue = mockMap.get(rowKey); + if (cfToQualifierToValue == null) { + cfToQualifierToValue = new TreeMap<>(UnsignedBytes.lexicographicalComparator()); + mockMap.put(rowKey, cfToQualifierToValue); + } + + NavigableMap qualifierToValue = cfToQualifierToValue.get(columnFamily); + if (qualifierToValue == null) { + qualifierToValue = new TreeMap<>(UnsignedBytes.lexicographicalComparator()); + cfToQualifierToValue.put(columnFamily, qualifierToValue); + } + + qualifierToValue.put(qualifier, value); + } + + private void deleteRow(ConcurrentNavigableMap>> mockMap, + byte[] rowKey) { + mockMap.remove(rowKey); + } + + private void deleteColumnFamily(ConcurrentNavigableMap>> mockMap, + byte[] rowKey, byte[] columnFamily) { + NavigableMap> cfToQualifierToValue = mockMap.get(rowKey); + if (cfToQualifierToValue != null) { + cfToQualifierToValue.remove(columnFamily); + } + } + + private void deleteCell(ConcurrentNavigableMap>> mockMap, + byte[] rowKey, byte[] columnFamily, byte[] qualifier) { + NavigableMap> cfToQualifierToValue = mockMap.get(rowKey); + if (cfToQualifierToValue != null) { + NavigableMap qualifierToValue = cfToQualifierToValue.get(columnFamily); + if (qualifierToValue != null) { + qualifierToValue.remove(qualifier); + } + } + } + } + + static class ExistsAnswer implements Answer { + private final ConcurrentNavigableMap>> mockMap; + + public ExistsAnswer(ConcurrentNavigableMap>> mockMap) { + this.mockMap = mockMap; + } + + @Override + public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable { + Object[] args = invocationOnMock.getArguments(); + Get param = (Get) args[0]; + + // assume that Get doesn't have any families defined. this is for not digging deeply... + byte[] rowKey = param.getRow(); + Map> familyMap = param.getFamilyMap(); + if (familyMap.size() > 0) { + throw new IllegalStateException("Not supported in mocked exists."); + } + + return mockMap.containsKey(rowKey); + } + } + + static class ScanAnswer implements Answer { + private final ConcurrentNavigableMap>> mockMap; + + public ScanAnswer(ConcurrentNavigableMap>> internalMap) { + this.mockMap = internalMap; + } + + @Override + public ResultScanner answer(InvocationOnMock invocationOnMock) throws Throwable { + Object[] args = invocationOnMock.getArguments(); + byte[] startKey = (byte[]) args[0]; + byte[] endKey = (byte[]) args[1]; + + final ConcurrentNavigableMap>> subMap = + mockMap.subMap(startKey, true, endKey, false); + + final List results = buildResults(subMap); + + return new MockedResultScanner(results); + } + + private List buildResults(ConcurrentNavigableMap>> subMap) { + final List results = new ArrayList<>(); + for (Map.Entry>> entry : subMap.entrySet()) { + byte[] rowKey = entry.getKey(); + NavigableMap> cfToQualifierToValueMap = entry.getValue(); + List cells = new ArrayList<>(); + // all column families + BuildCellsHelper.addAllColumnFamilies(rowKey, cfToQualifierToValueMap, cells); + + // Result.create() states that "You must ensure that the keyvalues are already sorted." + Collections.sort(cells, new KeyValue.KVComparator()); + results.add(Result.create(cells)); + } + return results; + } + + static class MockedResultScanner implements ResultScanner { + + private final List results; + private int position = 0; + + MockedResultScanner(List results) { + this.results = results; + } + + @Override + public Result next() throws IOException { + if (results.size() <= position) { + return null; + } + return results.get(position++); + } + + @Override + public Result[] next(int nbRows) throws IOException { + List bulkResult = new ArrayList<>(); + for (int i = 0 ; i < nbRows ; i++) { + Result result = next(); + if (result == null) { + break; + } + + bulkResult.add(result); + } + return bulkResult.toArray(new Result[0]); + } + + @Override + public void close() { + + } + + @Override + public Iterator iterator() { + return results.iterator(); + } + } + } +} diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java new file mode 100644 index 00000000000..d9a2ed85733 --- /dev/null +++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateIteratorTest.java @@ -0,0 +1,212 @@ +/* + * Copyright 2016 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hbase.state; + +import com.google.common.primitives.UnsignedBytes; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.storm.hbase.common.ColumnList; +import org.apache.storm.hbase.common.HBaseClient; +import org.apache.storm.state.DefaultStateEncoder; +import org.apache.storm.state.DefaultStateSerializer; +import org.apache.storm.state.Serializer; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +import static org.apache.storm.hbase.state.HBaseKeyValueState.STATE_QUALIFIER; +import static org.junit.Assert.*; + +/** + * Test for HBaseKeyValueStateIterator. + */ +public class HBaseKeyValueStateIteratorTest { + + private String namespace; + private byte[] keyNamespace; + private byte[] columnFamily; + private HBaseClient mockHBaseClient; + private int chunkSize = 1000; + private Serializer keySerializer = new DefaultStateSerializer<>(); + private Serializer valueSerializer = new DefaultStateSerializer<>(); + private DefaultStateEncoder encoder; + + @Before + public void setUp() throws Exception { + namespace = "namespace"; + keyNamespace = (namespace + "$key:").getBytes(); + columnFamily = "cf".getBytes(); + mockHBaseClient = HBaseClientTestUtil.mockedHBaseClient(); + encoder = new DefaultStateEncoder<>(keySerializer, valueSerializer); + } + + @Test + public void testGetEntriesInHBase() throws Exception { + // pendingPrepare has no entries + NavigableMap pendingPrepare = getBinaryTreeMap(); + + // pendingCommit has no entries + NavigableMap pendingCommit = getBinaryTreeMap(); + + // HBase has some entries + NavigableMap chunkMap = getBinaryTreeMap(); + putEncodedKeyValueToMap(chunkMap, "key0".getBytes(), "value0".getBytes()); + putEncodedKeyValueToMap(chunkMap, "key2".getBytes(), "value2".getBytes()); + + applyPendingStateToHBase(chunkMap); + + HBaseKeyValueStateIterator kvIterator = + new HBaseKeyValueStateIterator<>(namespace, columnFamily, mockHBaseClient, pendingPrepare.entrySet().iterator(), + pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); + + assertNextEntry(kvIterator, "key0".getBytes(), "value0".getBytes()); + + // key1 shouldn't in iterator + + assertNextEntry(kvIterator, "key2".getBytes(), "value2".getBytes()); + + assertFalse(kvIterator.hasNext()); + } + + @Test + public void testGetEntriesRemovingDuplicationKeys() throws Exception { + NavigableMap pendingPrepare = getBinaryTreeMap(); + putEncodedKeyValueToMap(pendingPrepare, "key0".getBytes(), "value0".getBytes()); + putTombstoneToMap(pendingPrepare, "key1".getBytes()); + + NavigableMap pendingCommit = getBinaryTreeMap(); + putEncodedKeyValueToMap(pendingCommit, "key1".getBytes(), "value1".getBytes()); + putEncodedKeyValueToMap(pendingCommit, "key2".getBytes(), "value2".getBytes()); + + NavigableMap chunkMap = getBinaryTreeMap(); + putEncodedKeyValueToMap(chunkMap, "key2".getBytes(), "value2".getBytes()); + putEncodedKeyValueToMap(chunkMap, "key3".getBytes(), "value3".getBytes()); + putEncodedKeyValueToMap(chunkMap, "key4".getBytes(), "value4".getBytes()); + + applyPendingStateToHBase(chunkMap); + + HBaseKeyValueStateIterator kvIterator = + new HBaseKeyValueStateIterator<>(namespace, columnFamily, mockHBaseClient, pendingPrepare.entrySet().iterator(), + pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); + + // keys shouldn't appear twice + + assertNextEntry(kvIterator, "key0".getBytes(), "value0".getBytes()); + + // key1 shouldn't be in iterator since it's marked as deleted + + assertNextEntry(kvIterator, "key2".getBytes(), "value2".getBytes()); + assertNextEntry(kvIterator, "key3".getBytes(), "value3".getBytes()); + assertNextEntry(kvIterator, "key4".getBytes(), "value4".getBytes()); + + assertFalse(kvIterator.hasNext()); + } + + @Test + public void testGetEntryNotAvailable() { + NavigableMap pendingPrepare = getBinaryTreeMap(); + + NavigableMap pendingCommit = getBinaryTreeMap(); + + HBaseKeyValueStateIterator kvIterator = + new HBaseKeyValueStateIterator<>(namespace, columnFamily, mockHBaseClient, pendingPrepare.entrySet().iterator(), + pendingCommit.entrySet().iterator(), chunkSize, keySerializer, valueSerializer); + + assertFalse(kvIterator.hasNext()); + } + + private void assertNextEntry(HBaseKeyValueStateIterator kvIterator, byte[] expectedKey, + byte[] expectedValue) { + assertTrue(kvIterator.hasNext()); + Map.Entry entry = kvIterator.next(); + assertArrayEquals(expectedKey, entry.getKey()); + assertArrayEquals(expectedValue, entry.getValue()); + } + + private void putEncodedKeyValueToMap(NavigableMap map, byte[] key, byte[] value) { + map.put(encoder.encodeKey(key), encoder.encodeValue(value)); + } + + private void putTombstoneToMap(NavigableMap map, byte[] key) { + map.put(encoder.encodeKey(key), encoder.getTombstoneValue()); + } + + private TreeMap getBinaryTreeMap() { + return new TreeMap<>(UnsignedBytes.lexicographicalComparator()); + } + + private void applyPendingStateToHBase(NavigableMap pendingMap) throws Exception { + List mutations = new ArrayList<>(); + for (Map.Entry entry : pendingMap.entrySet()) { + byte[] rowKey = entry.getKey(); + byte[] value = entry.getValue(); + + if (Arrays.equals(value, encoder.getTombstoneValue())) { + mutations.add(new Delete(getRowKeyForStateKey(rowKey))); + } else { + List mutationsForRow = prepareMutateRow(getRowKeyForStateKey(rowKey), columnFamily, + Collections.singletonMap(STATE_QUALIFIER, value)); + mutations.addAll(mutationsForRow); + } + } + + mockHBaseClient.batchMutate(mutations); + } + + private byte[] getRowKeyForStateKey(byte[] columnKey) { + byte[] rowKey = new byte[keyNamespace.length + columnKey.length]; + System.arraycopy(keyNamespace, 0, rowKey, 0, keyNamespace.length); + System.arraycopy(columnKey, 0, rowKey, keyNamespace.length, columnKey.length); + return rowKey; + } + + private ColumnList buildColumnList(byte[] columnFamily, Map map) { + ColumnList columnList = new ColumnList(); + for (Map.Entry entry : map.entrySet()) { + columnList.addColumn(columnFamily, entry.getKey(), entry.getValue()); + } + return columnList; + } + + private List prepareMutateRow(byte[] rowKey, byte[] columnFamily, Map map) { + return prepareMutateRow(rowKey, columnFamily, map, Durability.USE_DEFAULT); + } + + private List prepareMutateRow(byte[] rowKey, byte[] columnFamily, Map map, + Durability durability) { + ColumnList columnList = buildColumnList(columnFamily, map); + return mockHBaseClient.constructMutationReq(rowKey, columnList, durability); + } + + private void mutateRow(byte[] rowKey, byte[] columnFamily, Map map) + throws Exception { + mutateRow(rowKey, columnFamily, map, Durability.USE_DEFAULT); + } + + private void mutateRow(byte[] rowKey, byte[] columnFamily, Map map, + Durability durability) throws Exception { + mockHBaseClient.batchMutate(prepareMutateRow(rowKey, columnFamily, map, durability)); + } +} \ No newline at end of file diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java new file mode 100644 index 00000000000..bd38e57a9f5 --- /dev/null +++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateProviderTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hbase.state; + +import org.apache.storm.Config; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Unit tests for {@link HBaseKeyValueStateProvider} + */ +public class HBaseKeyValueStateProviderTest { + + @Test + public void testConfigHBaseConfigKeyIsEmpty() throws Exception { + HBaseKeyValueStateProvider provider = new HBaseKeyValueStateProvider(); + Map stormConf = new HashMap<>(); + stormConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, "{\"keyClass\":\"String\", \"valueClass\":\"String\"," + + " \"tableName\": \"table\", \"columnFamily\": \"cf\"}"); + + try { + HBaseKeyValueStateProvider.StateConfig config = provider.getStateConfig(stormConf); + fail("IllegalArgumentException is expected here."); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("hbaseConfigKey")); + } + } + + @Test + public void testConfigTableNameIsEmpty() throws Exception { + HBaseKeyValueStateProvider provider = new HBaseKeyValueStateProvider(); + Map stormConf = new HashMap<>(); + stormConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, "{\"keyClass\":\"String\", \"valueClass\":\"String\"," + + " \"hbaseConfigKey\": \"hbaseConfKey\", \"columnFamily\": \"cf\"}"); + + try { + HBaseKeyValueStateProvider.StateConfig config = provider.getStateConfig(stormConf); + fail("IllegalArgumentException is expected here."); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("tableName")); + } + } + + @Test + public void testConfigColumnFamilyIsEmpty() throws Exception { + HBaseKeyValueStateProvider provider = new HBaseKeyValueStateProvider(); + Map stormConf = new HashMap<>(); + stormConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, "{\"keyClass\":\"String\", \"valueClass\":\"String\"," + + " \"hbaseConfigKey\": \"hbaseConfKey\", \"tableName\": \"table\"}"); + + try { + HBaseKeyValueStateProvider.StateConfig config = provider.getStateConfig(stormConf); + fail("IllegalArgumentException is expected here."); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("columnFamily")); + } + } + + @Test + public void testValidProviderConfig() throws Exception { + HBaseKeyValueStateProvider provider = new HBaseKeyValueStateProvider(); + Map stormConf = new HashMap<>(); + stormConf.put(Config.TOPOLOGY_STATE_PROVIDER_CONFIG, "{\"keyClass\":\"String\", \"valueClass\":\"String\"," + + " \"hbaseConfigKey\": \"hbaseConfKey\", \"tableName\": \"table\"," + + " \"columnFamily\": \"columnFamily\"}"); + + HBaseKeyValueStateProvider.StateConfig config = provider.getStateConfig(stormConf); + assertEquals("String", config.keyClass); + assertEquals("String", config.valueClass); + assertEquals("hbaseConfKey", config.hbaseConfigKey); + assertEquals("table", config.tableName); + assertEquals("columnFamily", config.columnFamily); + } +} \ No newline at end of file diff --git a/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java new file mode 100644 index 00000000000..7827283951c --- /dev/null +++ b/external/storm-hbase/src/test/java/org/apache/storm/hbase/state/HBaseKeyValueStateTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.hbase.state; + +import com.google.common.primitives.UnsignedBytes; +import org.apache.storm.hbase.common.HBaseClient; +import org.apache.storm.state.DefaultStateSerializer; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Unit tests for {@link HBaseKeyValueState} + */ +public class HBaseKeyValueStateTest { + private static final String COLUMN_FAMILY = "cf"; + private static final String NAMESPACE = "namespace"; + + HBaseClient mockClient; + HBaseKeyValueState keyValueState; + ConcurrentNavigableMap>> mockMap; + + @Before + public void setUp() throws Exception { + mockMap = new ConcurrentSkipListMap<>(UnsignedBytes.lexicographicalComparator()); + mockClient = HBaseClientTestUtil.mockedHBaseClient(mockMap); + keyValueState = new HBaseKeyValueState<>(mockClient, COLUMN_FAMILY, NAMESPACE, + new DefaultStateSerializer(), new DefaultStateSerializer()); + } + + @Test + public void testPutAndGet() throws Exception { + keyValueState.put("a", "1"); + keyValueState.put("b", "2"); + assertEquals("1", keyValueState.get("a")); + assertEquals("2", keyValueState.get("b")); + assertEquals(null, keyValueState.get("c")); + } + + @Test + public void testPutAndDelete() throws Exception { + keyValueState.put("a", "1"); + keyValueState.put("b", "2"); + assertEquals("1", keyValueState.get("a")); + assertEquals("2", keyValueState.get("b")); + assertEquals(null, keyValueState.get("c")); + assertEquals("1", keyValueState.delete("a")); + assertEquals(null, keyValueState.get("a")); + assertEquals("2", keyValueState.get("b")); + assertEquals(null, keyValueState.get("c")); + } + + @Test + public void testPrepareCommitRollback() throws Exception { + keyValueState.put("a", "1"); + keyValueState.put("b", "2"); + keyValueState.prepareCommit(1); + keyValueState.put("c", "3"); + assertArrayEquals(new String[]{"1", "2", "3"}, getValues()); + keyValueState.rollback(); + assertArrayEquals(new String[]{null, null, null}, getValues()); + keyValueState.put("a", "1"); + keyValueState.put("b", "2"); + keyValueState.prepareCommit(1); + keyValueState.commit(1); + keyValueState.put("c", "3"); + assertArrayEquals(new String[]{"1", "2", "3"}, getValues()); + keyValueState.rollback(); + assertArrayEquals(new String[]{"1", "2", null}, getValues()); + keyValueState.put("c", "3"); + assertEquals("2", keyValueState.delete("b")); + assertEquals("3", keyValueState.delete("c")); + assertArrayEquals(new String[]{"1", null, null}, getValues()); + keyValueState.prepareCommit(2); + assertArrayEquals(new String[]{"1", null, null}, getValues()); + keyValueState.commit(2); + assertArrayEquals(new String[]{"1", null, null}, getValues()); + keyValueState.put("b", "2"); + keyValueState.prepareCommit(3); + keyValueState.put("c", "3"); + assertArrayEquals(new String[]{"1", "2", "3"}, getValues()); + keyValueState.rollback(); + assertArrayEquals(new String[]{"1", null, null}, getValues()); + } + + private String[] getValues() { + return new String[]{ + keyValueState.get("a"), + keyValueState.get("b"), + keyValueState.get("c") + }; + } +} \ No newline at end of file diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/state/DefaultStateSerializerTest.java b/storm-core/test/jvm/org/apache/storm/state/DefaultStateSerializerTest.java similarity index 98% rename from external/storm-redis/src/test/java/org/apache/storm/redis/state/DefaultStateSerializerTest.java rename to storm-core/test/jvm/org/apache/storm/state/DefaultStateSerializerTest.java index 734698974e2..9462634f4fa 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/state/DefaultStateSerializerTest.java +++ b/storm-core/test/jvm/org/apache/storm/state/DefaultStateSerializerTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.redis.state; +package org.apache.storm.state; import org.apache.storm.spout.CheckPointState; import org.apache.storm.state.DefaultStateSerializer; From 774d6bb687282e00ac040a8591515fb2d0336885 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Fri, 7 Jul 2017 14:15:18 +0900 Subject: [PATCH 11/12] STORM-2383: CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 57b2d183d50..d5d63cc4834 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## 1.2.0 + * STORM-2383: [storm-hbase] Support HBase as state backend * STORM-2506: Print mapping between Task ID and Kafka Partitions * STORM-2601: add the timeout parameter to the method of getting the nimbus client * STORM-2369: [storm-redis] Use binary type for State management From fe4fce3bd6107f517103581a8964e4474d6b552b Mon Sep 17 00:00:00 2001 From: Kevin Conaway Date: Thu, 29 Jun 2017 16:53:46 -0400 Subject: [PATCH 12/12] STORM-2608 Remove any pending offsets that are no longer valid STORM-2608 Add test case STORM-2608 Add missing license header STORM-2608 Ensure that the topic is created and available in ZK before returning from #createTopic STORM-2608 Update to latest from 1.x branch STORM-2608 Fix IT pom version STORM-2608 Ensure that partitions have leaders before returning from createTopic STORM-2608 Remove any pending offsets that are no longer valid --- .../apache/storm/kafka/PartitionManager.java | 4 +- .../apache/storm/kafka/KafkaTestBroker.java | 101 +++++++- .../storm/kafka/PartitionManagerTest.java | 243 ++++++++++++++++++ integration-test/pom.xml | 2 +- 4 files changed, 340 insertions(+), 10 deletions(-) create mode 100644 external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index 928a5630e1d..5420887fd27 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -253,6 +253,8 @@ private void fill() { _lostMessageCount.incrBy(omitted.size()); } + _pending.headMap(offset).clear(); + LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted); } @@ -356,7 +358,7 @@ public void commit() { } } - private String committedPath() { + protected String committedPath() { return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId(); } diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java index fed615554fa..0952764206b 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaTestBroker.java @@ -17,6 +17,14 @@ */ package org.apache.storm.kafka; +import kafka.admin.AdminUtils; +import kafka.api.PartitionMetadata; +import kafka.api.TopicMetadata; +import kafka.common.ErrorMapping; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; +import kafka.utils.ZKStringSerializer$; +import org.I0Itec.zkclient.ZkClient; import org.apache.commons.io.FileUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -24,13 +32,12 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingServer; +import scala.collection.JavaConversions; import java.io.File; import java.io.IOException; import java.util.Properties; - -import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; +import java.util.concurrent.TimeUnit; /** * Date: 11/01/2014 @@ -38,6 +45,9 @@ */ public class KafkaTestBroker { + // Bind services to the loopback address for environments where _localhost_ may resolve to an unreachable host + private static final String LOCALHOST = "127.0.0.1"; + private int port; private KafkaServerStartable kafka; private TestingServer server; @@ -45,15 +55,31 @@ public class KafkaTestBroker { private File logDir; public KafkaTestBroker() { + this(new Properties()); + } + + public KafkaTestBroker(Properties brokerProps) { try { - server = new TestingServer(); + InstanceSpec spec = new InstanceSpec( + null, + -1, + -1, + -1, + true, + -1, + -1, + -1, + null, + LOCALHOST + ); + server = new TestingServer(spec, true); String zookeeperConnectionString = server.getConnectString(); ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); zookeeper = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); zookeeper.start(); port = InstanceSpec.getRandomPort(); logDir = new File(System.getProperty("java.io.tmpdir"), "kafka/logs/kafka-test-" + port); - KafkaConfig config = buildKafkaConfig(zookeeperConnectionString); + KafkaConfig config = buildKafkaConfig(brokerProps, zookeeperConnectionString); kafka = new KafkaServerStartable(config); kafka.startup(); } catch (Exception ex) { @@ -61,17 +87,75 @@ public KafkaTestBroker() { } } - private kafka.server.KafkaConfig buildKafkaConfig(String zookeeperConnectionString) { - Properties p = new Properties(); + public void createTopic(String topicName, int numPartitions, Properties properties) { + ZkClient zkClient = new ZkClient(getZookeeperConnectionString()); + zkClient.setZkSerializer(ZKStringSerializer$.MODULE$); + + try { + AdminUtils.createTopic(zkClient, topicName, numPartitions, 1, properties); + + ensureTopicCreated(zkClient, topicName); + } finally { + zkClient.close(); + } + } + + + /** + * Wait for up to 30 seconds for the topic to be created and leader assignments for all partitions + */ + private void ensureTopicCreated(ZkClient zkClient, String topicName) { + long maxWaitTime = TimeUnit.SECONDS.toNanos(30); + long waitTime = 0; + boolean partitionsHaveLeaders = false; + + while (!partitionsHaveLeaders && waitTime < maxWaitTime) { + partitionsHaveLeaders = true; + TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient); + for (PartitionMetadata partitionMetadata : JavaConversions.seqAsJavaList(topicMetadata.partitionsMetadata())) { + if (partitionMetadata.leader().isEmpty() || partitionMetadata.errorCode() != ErrorMapping.NoError()) { + partitionsHaveLeaders = false; + } + } + + if (!partitionsHaveLeaders) { + long start = System.nanoTime(); + try { + Thread.sleep(100); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for topic to be available"); + } + + waitTime += (System.nanoTime() - start); + } + } + + if (!partitionsHaveLeaders) { + throw new RuntimeException("Could not create topic: " + topicName); + } + } + + private kafka.server.KafkaConfig buildKafkaConfig(Properties brokerProps, String zookeeperConnectionString) { + Properties p = new Properties(brokerProps); p.setProperty("zookeeper.connect", zookeeperConnectionString); p.setProperty("broker.id", "0"); p.setProperty("port", "" + port); + p.setProperty("host.name", LOCALHOST); p.setProperty("log.dirs", logDir.getAbsolutePath()); return new KafkaConfig(p); } public String getBrokerConnectionString() { - return "localhost:" + port; + return LOCALHOST + ":" + port; + } + + public String getZookeeperConnectionString() { + return server.getConnectString(); + } + + public int getZookeeperPort() { + return server.getPort(); } public int getPort() { @@ -95,3 +179,4 @@ public void shutdown() { FileUtils.deleteQuietly(logDir); } } + diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java new file mode 100644 index 00000000000..888ecde4a9f --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/PartitionManagerTest.java @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.storm.Config; +import org.apache.storm.kafka.KafkaSpout.EmitState; +import org.apache.storm.kafka.PartitionManager.KafkaMessageId; +import org.apache.storm.kafka.trident.ZkBrokerReader; +import org.apache.storm.spout.ISpoutOutputCollector; +import org.apache.storm.spout.SpoutOutputCollector; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +public class PartitionManagerTest { + + private static final String TOPIC_NAME = "testTopic"; + + private KafkaTestBroker broker; + private TestingSpoutOutputCollector outputCollector; + private ZkState zkState; + private ZkCoordinator coordinator; + private KafkaProducer producer; + + @Before + public void setup() { + outputCollector = new TestingSpoutOutputCollector(); + + Properties brokerProps = new Properties(); + brokerProps.setProperty("log.retention.check.interval.ms", "1000"); + + broker = new KafkaTestBroker(brokerProps); + + // Configure Kafka to remove messages after 2 seconds + Properties topicProperties = new Properties(); + topicProperties.put("delete.retention.ms", "2000"); + topicProperties.put("retention.ms", "2000"); + + broker.createTopic(TOPIC_NAME, 1, topicProperties); + + Map conf = new HashMap<>(); + conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, broker.getZookeeperPort()); + conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Collections.singletonList("127.0.0.1")); + conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000); + conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000); + conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3); + conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30); + conf.put(Config.TOPOLOGY_NAME, "test"); + + zkState = new ZkState(conf); + + ZkHosts zkHosts = new ZkHosts(broker.getZookeeperConnectionString()); + + SpoutConfig spoutConfig = new SpoutConfig(zkHosts, TOPIC_NAME, "/test", "id"); + + coordinator = new ZkCoordinator( + new DynamicPartitionConnections(spoutConfig, new ZkBrokerReader(conf, TOPIC_NAME, zkHosts)), + conf, + spoutConfig, + zkState, + 0, + 1, + 1, + "topo" + ); + + Properties producerProps = new Properties(); + producerProps.put("acks", "1"); + producerProps.put("bootstrap.servers", broker.getBrokerConnectionString()); + producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.put("metadata.fetch.timeout.ms", 1000); + producer = new KafkaProducer<>(producerProps); + } + + @After + public void shutdown() { + producer.close(); + broker.shutdown(); + } + + /** + * Test for STORM-2608 + * + * - Send a few messages to topic + * - Emit those messages from the partition manager + * - Fail those tuples so that they are added to the failedMsgRetryManager + * - Commit partition info to Zookeeper + * - Wait for kafka to roll logs and remove those messages + * - Send a new message to the topic + * - On the next fetch request, a TopicOffsetOutOfRangeException is thrown and the new offset is after + * the offset that is currently sitting in both the pending tree and the failedMsgRetryManager + * - Ack latest message to partition manager + * - Commit partition info to zookeeper + * - The committed offset should be the next offset _after_ the last one that was committed + * + */ + @Test + public void test2608() throws Exception { + SpoutOutputCollector spoutOutputCollector = new SpoutOutputCollector(outputCollector); + List partitionManagers = coordinator.getMyManagedPartitions(); + Assert.assertEquals(1, partitionManagers.size()); + + PartitionManager partitionManager = partitionManagers.get(0); + + for (int i=0; i < 5; i++) { + sendMessage("message-" + i); + } + + waitForEmitState(partitionManager, spoutOutputCollector, EmitState.EMITTED_MORE_LEFT); + waitForEmitState(partitionManager, spoutOutputCollector, EmitState.EMITTED_MORE_LEFT); + waitForEmitState(partitionManager, spoutOutputCollector, EmitState.EMITTED_MORE_LEFT); + waitForEmitState(partitionManager, spoutOutputCollector, EmitState.EMITTED_MORE_LEFT); + waitForEmitState(partitionManager, spoutOutputCollector, EmitState.EMITTED_END); + + partitionManager.commit(); + + Map> emitted = outputCollector.getEmitted(); + + Assert.assertEquals(5, emitted.size()); + + for (KafkaMessageId messageId : emitted.keySet()) { + partitionManager.fail(messageId.offset); + } + + // Kafka log roller task has an initial delay of 30 seconds so we need to wait for it + Thread.sleep(TimeUnit.SECONDS.toMillis(35)); + + outputCollector.clearEmittedMessages(); + + sendMessage("new message"); + + // First request will fail due to offset out of range + Assert.assertEquals(EmitState.NO_EMITTED, partitionManager.next(spoutOutputCollector)); + waitForEmitState(partitionManager, spoutOutputCollector, EmitState.EMITTED_END); + + emitted = outputCollector.getEmitted(); + + Assert.assertEquals(1, emitted.size()); + KafkaMessageId messageId = emitted.keySet().iterator().next(); + + partitionManager.ack(messageId.offset); + partitionManager.commit(); + + Map json = zkState.readJSON(partitionManager.committedPath()); + Assert.assertNotNull(json); + long committedOffset = (long) json.get("offset"); + + Assert.assertEquals(messageId.offset + 1, committedOffset); + } + + private void waitForEmitState(PartitionManager partitionManager, SpoutOutputCollector outputCollector, EmitState expectedState) { + int maxRetries = 5; + EmitState state = null; + + for (int retryCount = 0; retryCount < maxRetries; retryCount++) { + state = partitionManager.next(outputCollector); + + if (state == EmitState.NO_EMITTED) { + retryCount++; + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for message"); + } + } else { + break; + } + } + + Assert.assertEquals(expectedState, state); + } + + private void sendMessage(String value) { + try { + producer.send(new ProducerRecord<>(TOPIC_NAME, (String) null, value)).get(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + private static class TestingSpoutOutputCollector implements ISpoutOutputCollector { + + private final Map> emitted = new HashMap<>(); + + Map> getEmitted() { + return emitted; + } + + void clearEmittedMessages() { + emitted.clear(); + } + + @Override + public List emit(String streamId, List tuple, Object messageId) { + emitted.put((KafkaMessageId) messageId, tuple); + return Collections.emptyList(); + } + + @Override + public void reportError(Throwable error) { + throw new RuntimeException("Spout error", error); + } + + + @Override + public void emitDirect(int taskId, String streamId, List tuple, Object messageId) { + throw new UnsupportedOperationException(); + } + + @Override + public long getPendingCount() { + throw new UnsupportedOperationException(); + } + } + +} \ No newline at end of file diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 50e9bb76d1d..03512353f5b 100755 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -22,7 +22,7 @@ storm org.apache.storm - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT ..