From 80ab3625e9802fbd8bff16c706fc788cf7aed17c Mon Sep 17 00:00:00 2001 From: "P. Taylor Goetz" Date: Wed, 26 Feb 2014 11:30:15 -0500 Subject: [PATCH] STORM-245: implement Stream.localOrShuffle() for trident --- storm-core/src/jvm/storm/trident/Stream.java | 5 ++++- storm-core/test/clj/storm/trident/integration_test.clj | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/storm-core/src/jvm/storm/trident/Stream.java b/storm-core/src/jvm/storm/trident/Stream.java index e847eee8735..c3087450d1d 100644 --- a/storm-core/src/jvm/storm/trident/Stream.java +++ b/storm-core/src/jvm/storm/trident/Stream.java @@ -100,7 +100,10 @@ public Stream partition(CustomStreamGrouping partitioner) { public Stream shuffle() { return partition(Grouping.shuffle(new NullStruct())); } - + + public Stream localOrShuffle() { + return partition(Grouping.local_or_shuffle(new NullStruct())); + } public Stream global() { // use this instead of storm's built in one so that we can specify a singleemitbatchtopartition // without knowledge of storm's internals diff --git a/storm-core/test/clj/storm/trident/integration_test.clj b/storm-core/test/clj/storm/trident/integration_test.clj index ec8d49f45a9..ac3bbea1746 100644 --- a/storm-core/test/clj/storm/trident/integration_test.clj +++ b/storm-core/test/clj/storm/trident/integration_test.clj @@ -192,7 +192,7 @@ (bind topo (TridentTopology.)) (bind drpc-stream (-> topo (.newDRPCStream "tester" drpc) (.each (fields "args") (Split.) (fields "word")) - (.shuffle) + (.localOrShuffle) (.shuffle) (.aggregate (CountAsAggregator.) (fields "count")) ))