Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/storm-redis-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
<maxAllowedViolations>54</maxAllowedViolations>
<maxAllowedViolations>0</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@

import redis.clients.util.SafeEncoder;


public class Base64ToBinaryStateMigrationUtil {
private static final Logger LOG = LoggerFactory.getLogger(Base64ToBinaryStateMigrationUtil.class);
private static final String OPTION_REDIS_HOST_SHORT = "h";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.redis.topology;

import com.google.common.collect.Lists;

import java.util.List;
import java.util.Map;
import java.util.Random;
Expand All @@ -36,11 +39,10 @@
import org.apache.storm.tuple.ITuple;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;

public class LookupWordCount {
private static final String WORD_SPOUT = "WORD_SPOUT";
private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
Expand All @@ -65,7 +67,7 @@ public void execute(Tuple input) {
String countStr = input.getStringByField("count");

// print lookup result with low probability
if(RANDOM.nextInt(1000) > 995) {
if (RANDOM.nextInt(1000) > 995) {
int count = 0;
if (countStr != null) {
count = Integer.parseInt(countStr);
Expand All @@ -82,8 +84,6 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
}

public static void main(String[] args) throws Exception {
Config config = new Config();

String host = TEST_REDIS_HOST;
int port = TEST_REDIS_PORT;

Expand Down Expand Up @@ -114,6 +114,7 @@ public static void main(String[] args) throws Exception {
System.out.println("Usage: LookupWordCount <redis host> <redis port> (topology name)");
return;
}
Config config = new Config();
StormSubmitter.submitTopology(topoName, config, builder.createTopology());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.redis.topology;

import org.apache.storm.Config;
Expand All @@ -36,8 +37,6 @@ public class PersistentWordCount {
private static final int TEST_REDIS_PORT = 6379;

public static void main(String[] args) throws Exception {
Config config = new Config();

String host = TEST_REDIS_HOST;
int port = TEST_REDIS_PORT;

Expand Down Expand Up @@ -68,6 +67,7 @@ public static void main(String[] args) throws Exception {
System.out.println("Usage: PersistentWordCount <redis host> <redis port> (topology name)");
return;
}
Config config = new Config();
StormSubmitter.submitTopology(topoName, config, builder.createTopology());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.redis.topology;

import java.util.Map;
Expand Down Expand Up @@ -62,7 +63,7 @@ public void execute(Tuple input) {
String countStr = input.getStringByField("count");

// print lookup result with low probability
if(RANDOM.nextInt(1000) > 995) {
if (RANDOM.nextInt(1000) > 995) {
int count = 0;
if (countStr != null) {
count = Integer.parseInt(countStr);
Expand All @@ -79,8 +80,6 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
}

public static void main(String[] args) throws Exception {
Config config = new Config();

String host = TEST_REDIS_HOST;
int port = TEST_REDIS_PORT;

Expand All @@ -96,12 +95,12 @@ public static void main(String[] args) throws Exception {
RedisFilterMapper filterMapper = setupWhitelistMapper();
RedisFilterBolt whitelistBolt = new RedisFilterBolt(poolConfig, filterMapper);
WordCounter wordCounterBolt = new WordCounter();
PrintWordTotalCountBolt printBolt = new PrintWordTotalCountBolt();

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(WORD_SPOUT, spout, 1);
builder.setBolt(WHITELIST_BOLT, whitelistBolt, 1).shuffleGrouping(WORD_SPOUT);
builder.setBolt(COUNT_BOLT, wordCounterBolt, 1).fieldsGrouping(WHITELIST_BOLT, new Fields("word"));
PrintWordTotalCountBolt printBolt = new PrintWordTotalCountBolt();
builder.setBolt(PRINT_BOLT, printBolt, 1).shuffleGrouping(COUNT_BOLT);

String topoName = "test";
Expand All @@ -111,6 +110,7 @@ public static void main(String[] args) throws Exception {
System.out.println("Usage: WhitelistWordCount <redis host> <redis port> [topology name]");
return;
}
Config config = new Config();
StormSubmitter.submitTopology(topoName, config, builder.createTopology());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.redis.topology;

import com.google.common.collect.Maps;

import java.util.Map;

import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IBasicBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import com.google.common.collect.Maps;

import java.util.Map;

public class WordCounter implements IBasicBolt {
private Map<String, Integer> wordCounter = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.redis.topology;

import java.util.Map;
import java.util.Random;
import java.util.UUID;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;
import java.util.Random;
import java.util.UUID;

public class WordSpout implements IRichSpout {
boolean isDistributed;
SpoutOutputCollector collector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.redis.trident;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;

import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;

import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrintFunction extends BaseFunction {

Expand All @@ -33,7 +35,7 @@ public class PrintFunction extends BaseFunction {

@Override
public void execute(TridentTuple tuple, TridentCollector tridentCollector) {
if(RANDOM.nextInt(1000) > 995) {
if (RANDOM.nextInt(1000) > 995) {
LOG.info(tuple.toString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.redis.trident;

import java.util.ArrayList;
import java.util.List;

import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.ITuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;

import java.util.ArrayList;
import java.util.List;

public class WordCountLookupMapper implements RedisLookupMapper {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.redis.trident;

import org.apache.storm.tuple.ITuple;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.tuple.ITuple;

public class WordCountStoreMapper implements RedisStoreMapper {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.redis.trident;

import org.apache.storm.Config;
Expand All @@ -34,7 +35,8 @@
import org.apache.storm.tuple.Values;

public class WordCountTridentRedis {
public static StormTopology buildTopology(String redisHost, Integer redisPort){

public static StormTopology buildTopology(String redisHost, Integer redisPort) {
Fields fields = new Fields("word", "count");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", 1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.redis.trident;

import java.net.InetSocketAddress;
Expand All @@ -38,7 +39,8 @@
import org.apache.storm.tuple.Values;

public class WordCountTridentRedisCluster {
public static StormTopology buildTopology(String redisHostPort){

public static StormTopology buildTopology(String redisHostPort) {
Fields fields = new Fields("word", "count");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", 1),
Expand All @@ -50,8 +52,8 @@ public static StormTopology buildTopology(String redisHostPort){

Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();
for (String hostPort : redisHostPort.split(",")) {
String[] host_port = hostPort.split(":");
nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1])));
String[] hostPortSplit = hostPort.split(":");
nodes.add(new InetSocketAddress(hostPortSplit[0], Integer.valueOf(hostPortSplit[1])));
}
JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.redis.trident;

import java.net.InetSocketAddress;
Expand All @@ -38,7 +39,8 @@
import org.apache.storm.tuple.Values;

public class WordCountTridentRedisClusterMap {
public static StormTopology buildTopology(String redisHostPort){

public static StormTopology buildTopology(String redisHostPort) {
Fields fields = new Fields("word", "count");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", 1),
Expand All @@ -50,8 +52,8 @@ public static StormTopology buildTopology(String redisHostPort){

Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();
for (String hostPort : redisHostPort.split(",")) {
String[] host_port = hostPort.split(":");
nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1])));
String[] hostPortSplit = hostPort.split(":");
nodes.add(new InetSocketAddress(hostPortSplit[0], Integer.valueOf(hostPortSplit[1])));
}
JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.storm.redis.trident;

import org.apache.storm.Config;
Expand All @@ -34,7 +35,8 @@
import org.apache.storm.tuple.Values;

public class WordCountTridentRedisMap {
public static StormTopology buildTopology(String redisHost, Integer redisPort){

public static StormTopology buildTopology(String redisHost, Integer redisPort) {
Fields fields = new Fields("word", "count");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", 1),
Expand Down