From 7fec9a1f4b9887897c9c7d0f3f80433fa34f18c0 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Sun, 5 Apr 2015 09:38:54 +0900 Subject: [PATCH] Improve RedisStateQuerier to convert List from Redis value * Reuse RedisStoreMapper / RedisLookupMapper ** RedisState*Querier / RedisState*Updater --- .../state/RedisClusterStateQuerier.java | 37 ++++++-------- .../state/RedisClusterStateUpdater.java | 33 ++++++------ .../trident/state/RedisStateQuerier.java | 50 +++++++++++-------- .../trident/state/RedisStateUpdater.java | 32 ++++++------ .../redis/trident/WordCountLookupMapper.java | 40 +++++++++++++++ .../redis/trident/WordCountStoreMapper.java | 22 ++++++++ .../redis/trident/WordCountTridentRedis.java | 12 +++-- .../trident/WordCountTridentRedisCluster.java | 11 ++-- .../WordCountTridentRedisClusterMap.java | 1 - .../trident/WordCountTridentRedisMap.java | 1 - .../redis/trident/WordCountTupleMapper.java | 16 ------ 11 files changed, 158 insertions(+), 97 deletions(-) create mode 100644 external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java create mode 100644 external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java delete mode 100644 external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java index 17614a15415..4382fe3c663 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateQuerier.java @@ -19,6 +19,7 @@ import backtype.storm.tuple.Values; import com.google.common.collect.Lists; +import org.apache.storm.redis.common.mapper.RedisLookupMapper; import org.apache.storm.redis.common.mapper.TupleMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,37 +30,30 @@ import java.util.List; -public class RedisClusterStateQuerier extends BaseQueryFunction { +public class RedisClusterStateQuerier extends BaseQueryFunction> { private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class); - private final String redisKeyPrefix; - private final TupleMapper tupleMapper; + private final RedisLookupMapper lookupMapper; - public RedisClusterStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) { - this.redisKeyPrefix = redisKeyPrefix; - this.tupleMapper = tupleMapper; + public RedisClusterStateQuerier(RedisLookupMapper lookupMapper) { + this.lookupMapper = lookupMapper; } @Override - public List batchRetrieve(RedisClusterState redisClusterState, List inputs) { - List ret = Lists.newArrayList(); - - List keys = Lists.newArrayList(); + public List> batchRetrieve(RedisClusterState redisClusterState, List inputs) { + List> ret = Lists.newArrayList(); JedisCluster jedisCluster = null; try { jedisCluster = redisClusterState.getJedisCluster(); + for (int i = 0 ; i < inputs.size() ; i++) { + TridentTuple input = inputs.get(i); - for (TridentTuple input : inputs) { - String key = this.tupleMapper.getKeyFromTuple(input); - if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) { - key = redisKeyPrefix + key; - } + String key = lookupMapper.getKeyFromTuple(input); String value = jedisCluster.get(key); - ret.add(value); - - logger.debug("redis get key[" + key + "] count[" + value + "]"); + ret.add(lookupMapper.toTuple(input, value)); + logger.debug("redis get key[" + key + "] value [" + value + "]"); } } finally { if (jedisCluster != null) { @@ -71,8 +65,9 @@ public List batchRetrieve(RedisClusterState redisClusterState, List values, TridentCollector collector) { + for (Values value : values) { + collector.emit(value); + } } } \ No newline at end of file diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java index 023b527ed0e..35fb48e5eaa 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisClusterStateUpdater.java @@ -17,6 +17,8 @@ */ package org.apache.storm.redis.trident.state; +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; +import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.redis.common.mapper.TupleMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,13 +32,13 @@ public class RedisClusterStateUpdater extends BaseStateUpdater { private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class); - private final String redisKeyPrefix; - private final TupleMapper tupleMapper; + private final RedisStoreMapper storeMapper; private final int expireIntervalSec; - public RedisClusterStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) { - this.redisKeyPrefix = redisKeyPrefix; - this.tupleMapper = tupleMapper; + public RedisClusterStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) { + this.storeMapper = storeMapper; + assertDataType(storeMapper.getDataTypeDescription()); + if (expireIntervalSec > 0) { this.expireIntervalSec = expireIntervalSec; } else { @@ -52,19 +54,15 @@ public void updateState(RedisClusterState redisClusterState, List try { jedisCluster = redisClusterState.getJedisCluster(); for (TridentTuple input : inputs) { - String key = this.tupleMapper.getKeyFromTuple(input); - String redisKey = key; - if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) { - redisKey = redisKeyPrefix + redisKey; - } - String value = this.tupleMapper.getValueFromTuple(input); + String key = storeMapper.getKeyFromTuple(input); + String value = storeMapper.getValueFromTuple(input); - logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]"); + logger.debug("update key[" + key + "] redisKey[" + key + "] value[" + value + "]"); if (this.expireIntervalSec > 0) { - jedisCluster.setex(redisKey, expireIntervalSec, value); + jedisCluster.setex(key, expireIntervalSec, value); } else { - jedisCluster.set(redisKey, value); + jedisCluster.set(key, value); } } } finally { @@ -73,4 +71,11 @@ public void updateState(RedisClusterState redisClusterState, List } } } + + private void assertDataType(RedisDataTypeDescription storeMapper) { + if (storeMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) { + throw new IllegalArgumentException("State should be STRING type"); + } + } + } diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java index 294e83b9d91..a2157414696 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateQuerier.java @@ -19,42 +19,43 @@ import backtype.storm.tuple.Values; import com.google.common.collect.Lists; -import org.apache.storm.redis.common.mapper.TupleMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; +import org.apache.storm.redis.common.mapper.RedisLookupMapper; import redis.clients.jedis.Jedis; import storm.trident.operation.TridentCollector; import storm.trident.state.BaseQueryFunction; import storm.trident.tuple.TridentTuple; +import java.util.ArrayList; import java.util.List; -public class RedisStateQuerier extends BaseQueryFunction { - private static final Logger logger = LoggerFactory.getLogger(RedisState.class); +public class RedisStateQuerier extends BaseQueryFunction> { + private final RedisLookupMapper lookupMapper; - private final String redisKeyPrefix; - private final TupleMapper tupleMapper; - - public RedisStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) { - this.redisKeyPrefix = redisKeyPrefix; - this.tupleMapper = tupleMapper; + public RedisStateQuerier(RedisLookupMapper lookupMapper) { + this.lookupMapper = lookupMapper; + assertDataType(lookupMapper.getDataTypeDescription()); } @Override - public List batchRetrieve(RedisState redisState, List inputs) { + public List> batchRetrieve(RedisState redisState, List inputs) { + List> values = new ArrayList>(); + List keys = Lists.newArrayList(); for (TridentTuple input : inputs) { - String key = this.tupleMapper.getKeyFromTuple(input); - if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) { - key = redisKeyPrefix + key; - } - keys.add(key); + keys.add(lookupMapper.getKeyFromTuple(input)); } Jedis jedis = null; try { jedis = redisState.getJedis(); - return jedis.mget(keys.toArray(new String[keys.size()])); + List redisVals = jedis.mget(keys.toArray(new String[keys.size()])); + + for (int i = 0 ; i < redisVals.size() ; i++) { + values.add(lookupMapper.toTuple(inputs.get(i), redisVals.get(i))); + } + + return values; } finally { if (jedis != null) { redisState.returnJedis(jedis); @@ -63,8 +64,15 @@ public List batchRetrieve(RedisState redisState, List inpu } @Override - public void execute(TridentTuple tuple, String s, TridentCollector collector) { - String key = this.tupleMapper.getKeyFromTuple(tuple); - collector.emit(new Values(key, s)); + public void execute(TridentTuple tuple, List values, TridentCollector collector) { + for (Values value : values) { + collector.emit(value); + } + } + + private void assertDataType(RedisDataTypeDescription lookupMapper) { + if (lookupMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) { + throw new IllegalArgumentException("State should be STRING type"); + } } } \ No newline at end of file diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java index 664a2220f15..384a1201ff9 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisStateUpdater.java @@ -17,6 +17,8 @@ */ package org.apache.storm.redis.trident.state; +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; +import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.redis.common.mapper.TupleMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,13 +32,13 @@ public class RedisStateUpdater extends BaseStateUpdater { private static final Logger logger = LoggerFactory.getLogger(RedisState.class); - private final String redisKeyPrefix; - private final TupleMapper tupleMapper; + private final RedisStoreMapper storeMapper; private final int expireIntervalSec; - public RedisStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) { - this.redisKeyPrefix = redisKeyPrefix; - this.tupleMapper = tupleMapper; + public RedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) { + this.storeMapper = storeMapper; + assertDataType(storeMapper.getDataTypeDescription()); + if (expireIntervalSec > 0) { this.expireIntervalSec = expireIntervalSec; } else { @@ -51,19 +53,15 @@ public void updateState(RedisState redisState, List inputs, try { jedis = redisState.getJedis(); for (TridentTuple input : inputs) { - String key = this.tupleMapper.getKeyFromTuple(input); - String redisKey = key; - if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) { - redisKey = redisKeyPrefix + redisKey; - } - String value = this.tupleMapper.getValueFromTuple(input); + String key = storeMapper.getKeyFromTuple(input); + String value = storeMapper.getValueFromTuple(input); - logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]"); + logger.debug("update key[" + key + "] redisKey[" + key+ "] value[" + value + "]"); if (this.expireIntervalSec > 0) { - jedis.setex(redisKey, expireIntervalSec, value); + jedis.setex(key, expireIntervalSec, value); } else { - jedis.set(redisKey, value); + jedis.set(key, value); } } } finally { @@ -72,4 +70,10 @@ public void updateState(RedisState redisState, List inputs, } } } + + private void assertDataType(RedisDataTypeDescription storeMapper) { + if (storeMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) { + throw new IllegalArgumentException("State should be STRING type"); + } + } } diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java new file mode 100644 index 00000000000..891a1af7877 --- /dev/null +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountLookupMapper.java @@ -0,0 +1,40 @@ +package org.apache.storm.redis.trident; + +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.ITuple; +import backtype.storm.tuple.Values; +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; +import org.apache.storm.redis.common.mapper.RedisLookupMapper; + +import java.util.ArrayList; +import java.util.List; + +public class WordCountLookupMapper implements RedisLookupMapper { + @Override + public List toTuple(ITuple input, Object value) { + List values = new ArrayList(); + values.add(new Values(getKeyFromTuple(input), value)); + return values; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "value")); + } + + @Override + public RedisDataTypeDescription getDataTypeDescription() { + return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING); + } + + @Override + public String getKeyFromTuple(ITuple tuple) { + return "test_" + tuple.getString(0); + } + + @Override + public String getValueFromTuple(ITuple tuple) { + return tuple.getInteger(1).toString(); + } +} \ No newline at end of file diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java new file mode 100644 index 00000000000..aa03ead00fc --- /dev/null +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountStoreMapper.java @@ -0,0 +1,22 @@ +package org.apache.storm.redis.trident; + +import backtype.storm.tuple.ITuple; +import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; +import org.apache.storm.redis.common.mapper.RedisStoreMapper; + +public class WordCountStoreMapper implements RedisStoreMapper { + @Override + public RedisDataTypeDescription getDataTypeDescription() { + return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING); + } + + @Override + public String getKeyFromTuple(ITuple tuple) { + return "test_" + tuple.getString(0); + } + + @Override + public String getValueFromTuple(ITuple tuple) { + return tuple.getInteger(1).toString(); + } +} \ No newline at end of file diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java index 8b6ebc5156d..79bab5a21be 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedis.java @@ -23,7 +23,8 @@ import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; -import org.apache.storm.redis.common.mapper.TupleMapper; +import org.apache.storm.redis.common.mapper.RedisLookupMapper; +import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.redis.trident.state.RedisState; import org.apache.storm.redis.trident.state.RedisStateQuerier; import org.apache.storm.redis.trident.state.RedisStateUpdater; @@ -47,7 +48,9 @@ public static StormTopology buildTopology(String redisHost, Integer redisPort){ JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(redisHost).setPort(redisPort) .build(); - TupleMapper tupleMapper = new WordCountTupleMapper(); + + RedisStoreMapper storeMapper = new WordCountStoreMapper(); + RedisLookupMapper lookupMapper = new WordCountLookupMapper(); RedisState.Factory factory = new RedisState.Factory(poolConfig); TridentTopology topology = new TridentTopology(); @@ -55,12 +58,12 @@ public static StormTopology buildTopology(String redisHost, Integer redisPort){ stream.partitionPersist(factory, fields, - new RedisStateUpdater("test_", tupleMapper, 86400000), + new RedisStateUpdater(storeMapper, 86400000), new Fields()); TridentState state = topology.newStaticState(factory); stream = stream.stateQuery(state, new Fields("word"), - new RedisStateQuerier("test_", tupleMapper), + new RedisStateQuerier(lookupMapper), new Fields("columnName","columnValue")); stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields()); return topology.build(); @@ -92,5 +95,4 @@ public static void main(String[] args) throws Exception { System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port"); } } - } diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java index ddb6939f039..280b2737ece 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisCluster.java @@ -23,7 +23,8 @@ import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; -import org.apache.storm.redis.common.mapper.TupleMapper; +import org.apache.storm.redis.common.mapper.RedisLookupMapper; +import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.redis.trident.state.RedisClusterState; import org.apache.storm.redis.trident.state.RedisClusterStateQuerier; import org.apache.storm.redis.trident.state.RedisClusterStateUpdater; @@ -55,7 +56,9 @@ public static StormTopology buildTopology(String redisHostPort){ } JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes) .build(); - TupleMapper tupleMapper = new WordCountTupleMapper(); + + RedisStoreMapper storeMapper = new WordCountStoreMapper(); + RedisLookupMapper lookupMapper = new WordCountLookupMapper(); RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig); TridentTopology topology = new TridentTopology(); @@ -63,12 +66,12 @@ public static StormTopology buildTopology(String redisHostPort){ stream.partitionPersist(factory, fields, - new RedisClusterStateUpdater("test_", tupleMapper, 86400000), + new RedisClusterStateUpdater(storeMapper, 86400000), new Fields()); TridentState state = topology.newStaticState(factory); stream = stream.stateQuery(state, new Fields("word"), - new RedisClusterStateQuerier("test_", tupleMapper), + new RedisClusterStateQuerier(lookupMapper), new Fields("columnName","columnValue")); stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields()); return topology.build(); diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java index de1f252169e..027c785f8da 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisClusterMap.java @@ -56,7 +56,6 @@ public static StormTopology buildTopology(String redisHostPort){ } JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes) .build(); - TupleMapper tupleMapper = new WordCountTupleMapper(); StateFactory factory = RedisClusterMapState.transactional(clusterConfig); TridentTopology topology = new TridentTopology(); diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java index 4d4afe8a244..8ecf9ad044d 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTridentRedisMap.java @@ -48,7 +48,6 @@ public static StormTopology buildTopology(String redisHost, Integer redisPort){ JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(redisHost).setPort(redisPort) .build(); - TupleMapper tupleMapper = new WordCountTupleMapper(); StateFactory factory = RedisMapState.transactional(poolConfig); TridentTopology topology = new TridentTopology(); diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java b/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java deleted file mode 100644 index 1e601c926b0..00000000000 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/trident/WordCountTupleMapper.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.apache.storm.redis.trident; - -import backtype.storm.tuple.ITuple; -import org.apache.storm.redis.common.mapper.TupleMapper; - -public class WordCountTupleMapper implements TupleMapper { - @Override - public String getKeyFromTuple(ITuple tuple) { - return tuple.getString(0); - } - - @Override - public String getValueFromTuple(ITuple tuple) { - return tuple.getInteger(1).toString(); - } -}