From dbefc0da83cc7587766b39b215c7f2a21a5ef573 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 30 Jan 2015 15:40:42 -0600 Subject: [PATCH] STORM-637: Integrate PartialKeyGrouping into storm API --- docs/documentation/Common-patterns.md | 14 +- docs/documentation/Concepts.md | 13 +- .../storm/starter/SkewedRollingTopWords.java | 134 ++++++++++++++++++ .../starter/bolt/RollingCountAggBolt.java | 78 ++++++++++ .../coordination/BatchSubtopologyBuilder.java | 11 ++ .../storm/drpc/LinearDRPCInputDeclarer.java | 5 +- .../storm/drpc/LinearDRPCTopologyBuilder.java | 13 +- .../storm/grouping/PartialKeyGrouping.java | 31 +++- .../storm/topology/InputDeclarer.java | 3 + .../storm/topology/TopologyBuilder.java | 11 ++ .../TransactionalTopologyBuilder.java | 13 +- .../topology/TridentTopologyBuilder.java | 13 +- .../grouping/PartialKeyGroupingTest.java | 26 +++- 13 files changed, 348 insertions(+), 17 deletions(-) create mode 100644 examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java create mode 100644 examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java diff --git a/docs/documentation/Common-patterns.md b/docs/documentation/Common-patterns.md index 3d274f6838f..c11f5e94da6 100644 --- a/docs/documentation/Common-patterns.md +++ b/docs/documentation/Common-patterns.md @@ -64,7 +64,7 @@ A common continuous computation done on Storm is a "streaming top N" of some sor This approach obviously doesn't scale to large streams since the entire stream has to go through one task. A better way to do the computation is to do many top N's in parallel across partitions of the stream, and then merge those top N's together to get the global top N. The pattern looks like this: ```java -builder.setBolt("rank", new RankObjects(), parallellism) +builder.setBolt("rank", new RankObjects(), parallelism) .fieldsGrouping("objects", new Fields("value")); builder.setBolt("merge", new MergeObjects()) .globalGrouping("rank"); @@ -72,6 +72,18 @@ builder.setBolt("merge", new MergeObjects()) This pattern works because of the fields grouping done by the first bolt which gives the partitioning you need for this to be semantically correct. You can see an example of this pattern in storm-starter [here](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java). +If however you have a known skew in the data being processed it can be advantageous to use partialKeyGrouping instead of fieldsGrouping. This will distribute the load for each key between two downstream bolts instead of a single one. + +```java +builder.setBolt("count", new CountObjects(), parallelism) + .partialKeyGrouping("objects", new Fields("value")); +builder.setBolt("rank" new AggregateCountsAndRank(), parallelism) + .fieldsGrouping("count", new Fields("key")) +builder.setBolt("merge", new MergeRanksObjects()) + .globalGrouping("rank"); +``` + +The topology needs an extra layer of processing to aggregate the partial counts from the upstream bolts but this only processes aggregated values now so the bolt it is not subject to the load caused by the skewed data. You can see an example of this pattern in storm-starter [here](https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java). ### TimeCacheMap for efficiently keeping a cache of things that have been recently updated diff --git a/docs/documentation/Concepts.md b/docs/documentation/Concepts.md index 7ce15dafe07..827bb3ac2a3 100644 --- a/docs/documentation/Concepts.md +++ b/docs/documentation/Concepts.md @@ -84,11 +84,12 @@ There are seven built-in stream groupings in Storm, and you can implement a cust 1. **Shuffle grouping**: Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples. 2. **Fields grouping**: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"'s may go to different tasks. -3. **All grouping**: The stream is replicated across all the bolt's tasks. Use this grouping with care. -4. **Global grouping**: The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id. -5. **None grouping**: This grouping specifies that you don't care how the stream is grouped. Currently, none groupings are equivalent to shuffle groupings. Eventually though, Storm will push down bolts with none groupings to execute in the same thread as the bolt or spout they subscribe from (when possible). -6. **Direct grouping**: This is a special kind of grouping. A stream grouped this way means that the __producer__ of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](/apidocs/backtype/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided [TopologyContext](/apidocs/backtype/storm/task/TopologyContext.html) or by keeping track of the output of the `emit` method in [OutputCollector](/apidocs/backtype/storm/task/OutputCollector.html) (which returns the task ids that the tuple was sent to). -7. **Local or shuffle grouping**: If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping. +3. **Partial Key grouping**: The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. [This paper](https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf) provides a good explanation of how it works and the advantages it provides. +4. **All grouping**: The stream is replicated across all the bolt's tasks. Use this grouping with care. +5. **Global grouping**: The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id. +6. **None grouping**: This grouping specifies that you don't care how the stream is grouped. Currently, none groupings are equivalent to shuffle groupings. Eventually though, Storm will push down bolts with none groupings to execute in the same thread as the bolt or spout they subscribe from (when possible). +7. **Direct grouping**: This is a special kind of grouping. A stream grouped this way means that the __producer__ of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](/apidocs/backtype/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided [TopologyContext](/apidocs/backtype/storm/task/TopologyContext.html) or by keeping track of the output of the `emit` method in [OutputCollector](/apidocs/backtype/storm/task/OutputCollector.html) (which returns the task ids that the tuple was sent to). +8. **Local or shuffle grouping**: If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping. **Resources:** @@ -114,4 +115,4 @@ Topologies execute across one or more worker processes. Each worker process is a **Resources:** -* [Config.TOPOLOGY_WORKERS](/apidocs/backtype/storm/Config.html#TOPOLOGY_WORKERS): this config sets the number of workers to allocate for executing the topology \ No newline at end of file +* [Config.TOPOLOGY_WORKERS](/apidocs/backtype/storm/Config.html#TOPOLOGY_WORKERS): this config sets the number of workers to allocate for executing the topology diff --git a/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java b/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java new file mode 100644 index 00000000000..0ad8d60947c --- /dev/null +++ b/examples/storm-starter/src/jvm/storm/starter/SkewedRollingTopWords.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.starter; + +import backtype.storm.Config; +import backtype.storm.testing.TestWordSpout; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import org.apache.log4j.Logger; +import storm.starter.bolt.IntermediateRankingsBolt; +import storm.starter.bolt.RollingCountBolt; +import storm.starter.bolt.RollingCountAggBolt; +import storm.starter.bolt.TotalRankingsBolt; +import storm.starter.util.StormRunner; + +/** + * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality. + * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things + * like trending topics or trending images on Twitter. It takes an approach that assumes that some works will be much + * more common then other words, and uses partialKeyGrouping to better balance the skewed load. + */ +public class SkewedRollingTopWords { + private static final Logger LOG = Logger.getLogger(SkewedRollingTopWords.class); + private static final int DEFAULT_RUNTIME_IN_SECONDS = 60; + private static final int TOP_N = 5; + + private final TopologyBuilder builder; + private final String topologyName; + private final Config topologyConfig; + private final int runtimeInSeconds; + + public SkewedRollingTopWords(String topologyName) throws InterruptedException { + builder = new TopologyBuilder(); + this.topologyName = topologyName; + topologyConfig = createTopologyConfiguration(); + runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS; + + wireTopology(); + } + + private static Config createTopologyConfiguration() { + Config conf = new Config(); + conf.setDebug(true); + return conf; + } + + private void wireTopology() throws InterruptedException { + String spoutId = "wordGenerator"; + String counterId = "counter"; + String aggId = "aggregator"; + String intermediateRankerId = "intermediateRanker"; + String totalRankerId = "finalRanker"; + builder.setSpout(spoutId, new TestWordSpout(), 5); + builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId, new Fields("word")); + builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj")); + builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId, new Fields("obj")); + builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); + } + + public void runLocally() throws InterruptedException { + StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds); + } + + public void runRemotely() throws Exception { + StormRunner.runTopologyRemotely(builder.createTopology(), topologyName, topologyConfig); + } + + /** + * Submits (runs) the topology. + * + * Usage: "RollingTopWords [topology-name] [local|remote]" + * + * By default, the topology is run locally under the name "slidingWindowCounts". + * + * Examples: + * + *
+   * {@code
+   *
+   * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts"
+   * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords
+   *
+   * # Runs in local mode (LocalCluster), with topology name "foobar"
+   * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords foobar
+   *
+   * # Runs in local mode (LocalCluster), with topology name "foobar"
+   * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords foobar local
+   *
+   * # Runs in remote/cluster mode, with topology name "production-topology"
+   * $ storm jar storm-starter-jar-with-dependencies.jar storm.starter.RollingTopWords production-topology remote
+   * }
+   * 
+ * + * @param args First positional argument (optional) is topology name, second positional argument (optional) defines + * whether to run the topology locally ("local") or remotely, i.e. on a real cluster ("remote"). + * @throws Exception + */ + public static void main(String[] args) throws Exception { + String topologyName = "slidingWindowCounts"; + if (args.length >= 1) { + topologyName = args[0]; + } + boolean runLocally = true; + if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) { + runLocally = false; + } + + LOG.info("Topology name: " + topologyName); + SkewedRollingTopWords rtw = new SkewedRollingTopWords(topologyName); + if (runLocally) { + LOG.info("Running in local mode"); + rtw.runLocally(); + } + else { + LOG.info("Running in remote (cluster) mode"); + rtw.runRemotely(); + } + } +} diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java new file mode 100644 index 00000000000..e513b09a65f --- /dev/null +++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.starter.bolt; + +import backtype.storm.Config; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.base.BaseRichBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import org.apache.log4j.Logger; +import storm.starter.tools.NthLastModifiedTimeTracker; +import storm.starter.tools.SlidingWindowCounter; +import storm.starter.util.TupleHelpers; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +/** + * This bolt aggregates counts from multiple upstream bolts. + */ +public class RollingCountAggBolt extends BaseRichBolt { + private static final long serialVersionUID = 5537727428628598519L; + private static final Logger LOG = Logger.getLogger(RollingCountAggBolt.class); + //Mapping of key->upstreamBolt->count + private Map> counts = new HashMap>(); + private OutputCollector collector; + + + @SuppressWarnings("rawtypes") + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(Tuple tuple) { + Object obj = tuple.getValue(0); + long count = tuple.getLong(1); + int source = tuple.getSourceTask(); + Map subCounts = counts.get(obj); + if (subCounts == null) { + subCounts = new HashMap(); + counts.put(obj, subCounts); + } + //Update the current count for this object + subCounts.put(source, count); + //Output the sum of all the known counts so for this key + long sum = 0; + for (Long val: subCounts.values()) { + sum += val; + } + collector.emit(new Values(obj, sum)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("obj", "count")); + } +} diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java b/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java index 32258ed6606..f94c2845c7e 100644 --- a/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java +++ b/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java @@ -22,6 +22,7 @@ import backtype.storm.generated.GlobalStreamId; import backtype.storm.generated.Grouping; import backtype.storm.grouping.CustomStreamGrouping; +import backtype.storm.grouping.PartialKeyGrouping; import backtype.storm.topology.BaseConfigurationDeclarer; import backtype.storm.topology.BasicBoltExecutor; import backtype.storm.topology.BoltDeclarer; @@ -374,6 +375,16 @@ public String getComponent() { }); return this; } + + @Override + public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) { + return customGrouping(componentId, new PartialKeyGrouping(fields)); + } + + @Override + public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) { + return customGrouping(componentId, streamId, new PartialKeyGrouping(fields)); + } @Override public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) { diff --git a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java b/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java index eeafc997ff3..d03075e6bc7 100644 --- a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java +++ b/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCInputDeclarer.java @@ -42,7 +42,10 @@ public interface LinearDRPCInputDeclarer extends ComponentConfigurationDeclarer< public LinearDRPCInputDeclarer directGrouping(); public LinearDRPCInputDeclarer directGrouping(String streamId); - + + public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields); + public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields); + public LinearDRPCInputDeclarer customGrouping(CustomStreamGrouping grouping); public LinearDRPCInputDeclarer customGrouping(String streamId, CustomStreamGrouping grouping); diff --git a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java b/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java index 75d75f3136b..a171a2ce70b 100644 --- a/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java +++ b/storm-core/src/jvm/backtype/storm/drpc/LinearDRPCTopologyBuilder.java @@ -28,6 +28,7 @@ import backtype.storm.generated.StormTopology; import backtype.storm.generated.StreamInfo; import backtype.storm.grouping.CustomStreamGrouping; +import backtype.storm.grouping.PartialKeyGrouping; import backtype.storm.topology.BaseConfigurationDeclarer; import backtype.storm.topology.BasicBoltExecutor; import backtype.storm.topology.BoltDeclarer; @@ -347,7 +348,17 @@ public void declare(String prevComponent, InputDeclarer declarer) { }); return this; } - + + @Override + public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields) { + return customGrouping(new PartialKeyGrouping(fields)); + } + + @Override + public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields) { + return customGrouping(streamId, new PartialKeyGrouping(fields)); + } + @Override public LinearDRPCInputDeclarer customGrouping(final CustomStreamGrouping grouping) { addDeclaration(new InputDeclaration() { diff --git a/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java b/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java index f36f4c6da0c..d1f534b3fd5 100644 --- a/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java +++ b/storm-core/src/jvm/backtype/storm/grouping/PartialKeyGrouping.java @@ -18,12 +18,14 @@ package backtype.storm.grouping; import java.io.Serializable; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import backtype.storm.generated.GlobalStreamId; import backtype.storm.grouping.CustomStreamGrouping; import backtype.storm.task.WorkerTopologyContext; +import backtype.storm.tuple.Fields; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; @@ -34,20 +36,43 @@ public class PartialKeyGrouping implements CustomStreamGrouping, Serializable { private long[] targetTaskStats; private HashFunction h1 = Hashing.murmur3_128(13); private HashFunction h2 = Hashing.murmur3_128(17); + private Fields fields = null; + private Fields outFields = null; + + public PartialKeyGrouping() { + //Empty + } + + public PartialKeyGrouping(Fields fields) { + this.fields = fields; + } @Override public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) { this.targetTasks = targetTasks; targetTaskStats = new long[this.targetTasks.size()]; + if (this.fields != null) { + this.outFields = context.getComponentOutputFields(stream); + } } @Override public List chooseTasks(int taskId, List values) { List boltIds = new ArrayList(1); if (values.size() > 0) { - String str = values.get(0).toString(); // assume key is the first field - int firstChoice = (int) Math.abs(h1.hashBytes(str.getBytes()).asLong()) % this.targetTasks.size(); - int secondChoice = (int) Math.abs(h2.hashBytes(str.getBytes()).asLong()) % this.targetTasks.size(); + byte[] raw = null; + if (fields != null) { + List selectedFields = outFields.select(fields, values); + ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4); + for (Object o: selectedFields) { + out.putInt(o.hashCode()); + } + raw = out.array(); + } else { + raw = values.get(0).toString().getBytes(); // assume key is the first field + } + int firstChoice = (int) (Math.abs(h1.hashBytes(raw).asLong()) % this.targetTasks.size()); + int secondChoice = (int) (Math.abs(h2.hashBytes(raw).asLong()) % this.targetTasks.size()); int selected = targetTaskStats[firstChoice] > targetTaskStats[secondChoice] ? secondChoice : firstChoice; boltIds.add(targetTasks.get(selected)); targetTaskStats[selected]++; diff --git a/storm-core/src/jvm/backtype/storm/topology/InputDeclarer.java b/storm-core/src/jvm/backtype/storm/topology/InputDeclarer.java index 457fa35f103..ac0848f15f9 100644 --- a/storm-core/src/jvm/backtype/storm/topology/InputDeclarer.java +++ b/storm-core/src/jvm/backtype/storm/topology/InputDeclarer.java @@ -45,6 +45,9 @@ public interface InputDeclarer { public T directGrouping(String componentId); public T directGrouping(String componentId, String streamId); + public T partialKeyGrouping(String componentId, Fields fields); + public T partialKeyGrouping(String componentId, String streamId, Fields fields); + public T customGrouping(String componentId, CustomStreamGrouping grouping); public T customGrouping(String componentId, String streamId, CustomStreamGrouping grouping); diff --git a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java index aebf995f647..0a4762645e3 100644 --- a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java +++ b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java @@ -28,6 +28,7 @@ import backtype.storm.generated.StateSpoutSpec; import backtype.storm.generated.StormTopology; import backtype.storm.grouping.CustomStreamGrouping; +import backtype.storm.grouping.PartialKeyGrouping; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; import java.util.ArrayList; @@ -330,6 +331,16 @@ private BoltDeclarer grouping(String componentId, String streamId, Grouping grou return this; } + @Override + public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) { + return customGrouping(componentId, new PartialKeyGrouping(fields)); + } + + @Override + public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) { + return customGrouping(componentId, streamId, new PartialKeyGrouping(fields)); + } + @Override public BoltDeclarer customGrouping(String componentId, CustomStreamGrouping grouping) { return customGrouping(componentId, Utils.DEFAULT_STREAM_ID, grouping); diff --git a/storm-core/src/jvm/backtype/storm/transactional/TransactionalTopologyBuilder.java b/storm-core/src/jvm/backtype/storm/transactional/TransactionalTopologyBuilder.java index 570522d4411..6619e07b94a 100644 --- a/storm-core/src/jvm/backtype/storm/transactional/TransactionalTopologyBuilder.java +++ b/storm-core/src/jvm/backtype/storm/transactional/TransactionalTopologyBuilder.java @@ -28,6 +28,7 @@ import backtype.storm.generated.Grouping; import backtype.storm.generated.StormTopology; import backtype.storm.grouping.CustomStreamGrouping; +import backtype.storm.grouping.PartialKeyGrouping; import backtype.storm.topology.BaseConfigurationDeclarer; import backtype.storm.topology.BasicBoltExecutor; import backtype.storm.topology.BoltDeclarer; @@ -448,7 +449,17 @@ public String getComponent() { }); return this; } - + + @Override + public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) { + return customGrouping(componentId, new PartialKeyGrouping(fields)); + } + + @Override + public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) { + return customGrouping(componentId, streamId, new PartialKeyGrouping(fields)); + } + @Override public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) { addDeclaration(new InputDeclaration() { diff --git a/storm-core/src/jvm/storm/trident/topology/TridentTopologyBuilder.java b/storm-core/src/jvm/storm/trident/topology/TridentTopologyBuilder.java index 7b81ed9ea2f..498503bb375 100644 --- a/storm-core/src/jvm/storm/trident/topology/TridentTopologyBuilder.java +++ b/storm-core/src/jvm/storm/trident/topology/TridentTopologyBuilder.java @@ -21,6 +21,7 @@ import backtype.storm.generated.Grouping; import backtype.storm.generated.StormTopology; import backtype.storm.grouping.CustomStreamGrouping; +import backtype.storm.grouping.PartialKeyGrouping; import backtype.storm.topology.BaseConfigurationDeclarer; import backtype.storm.topology.BoltDeclarer; import backtype.storm.topology.IRichSpout; @@ -645,7 +646,17 @@ public String getStream() { }); return this; } - + + @Override + public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) { + return customGrouping(componentId, new PartialKeyGrouping(fields)); + } + + @Override + public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) { + return customGrouping(componentId, streamId, new PartialKeyGrouping(fields)); + } + @Override public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) { addDeclaration(new InputDeclaration() { diff --git a/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java b/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java index ad43869a42b..4809b450edd 100644 --- a/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java +++ b/storm-core/test/jvm/backtype/storm/grouping/PartialKeyGroupingTest.java @@ -17,19 +17,23 @@ */ package backtype.storm.grouping; -import static org.hamcrest.CoreMatchers.*; -import static org.junit.Assert.assertThat; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; import java.util.List; import org.junit.Test; +import backtype.storm.generated.GlobalStreamId; +import backtype.storm.task.WorkerTopologyContext; +import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import com.google.common.collect.Lists; public class PartialKeyGroupingTest { - @Test public void testChooseTasks() { PartialKeyGrouping pkg = new PartialKeyGrouping(); @@ -43,4 +47,20 @@ public void testChooseTasks() { assertThat(choice3, is(not(choice2))); assertThat(choice3, is(choice1)); } + + @Test + public void testChooseTasksFields() { + PartialKeyGrouping pkg = new PartialKeyGrouping(new Fields("test")); + WorkerTopologyContext context = mock(WorkerTopologyContext.class); + when(context.getComponentOutputFields(any(GlobalStreamId.class))).thenReturn(new Fields("test")); + pkg.prepare(context, null, Lists.newArrayList(0, 1, 2, 3, 4, 5)); + Values message = new Values("key1"); + List choice1 = pkg.chooseTasks(0, message); + assertThat(choice1.size(), is(1)); + List choice2 = pkg.chooseTasks(0, message); + assertThat(choice2, is(not(choice1))); + List choice3 = pkg.chooseTasks(0, message); + assertThat(choice3, is(not(choice2))); + assertThat(choice3, is(choice1)); + } }