Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import backtype.storm.tuple.Values;
import com.google.common.collect.Lists;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,37 +30,30 @@

import java.util.List;

public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterState, String> {
public class RedisClusterStateQuerier extends BaseQueryFunction<RedisClusterState, List<Values>> {
private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);

private final String redisKeyPrefix;
private final TupleMapper tupleMapper;
private final RedisLookupMapper lookupMapper;

public RedisClusterStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) {
this.redisKeyPrefix = redisKeyPrefix;
this.tupleMapper = tupleMapper;
public RedisClusterStateQuerier(RedisLookupMapper lookupMapper) {
this.lookupMapper = lookupMapper;
}

@Override
public List<String> batchRetrieve(RedisClusterState redisClusterState, List<TridentTuple> inputs) {
List<String> ret = Lists.newArrayList();

List<String> keys = Lists.newArrayList();
public List<List<Values>> batchRetrieve(RedisClusterState redisClusterState, List<TridentTuple> inputs) {
List<List<Values>> ret = Lists.newArrayList();

JedisCluster jedisCluster = null;
try {
jedisCluster = redisClusterState.getJedisCluster();

for (int i = 0 ; i < inputs.size() ; i++) {
TridentTuple input = inputs.get(i);

for (TridentTuple input : inputs) {
String key = this.tupleMapper.getKeyFromTuple(input);
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
key = redisKeyPrefix + key;
}
String key = lookupMapper.getKeyFromTuple(input);
String value = jedisCluster.get(key);
ret.add(value);

logger.debug("redis get key[" + key + "] count[" + value + "]");
ret.add(lookupMapper.toTuple(input, value));
logger.debug("redis get key[" + key + "] value [" + value + "]");
}
} finally {
if (jedisCluster != null) {
Expand All @@ -71,8 +65,9 @@ public List<String> batchRetrieve(RedisClusterState redisClusterState, List<Trid
}

@Override
public void execute(TridentTuple tuple, String s, TridentCollector collector) {
String key = this.tupleMapper.getKeyFromTuple(tuple);
collector.emit(new Values(key, s));
public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
for (Values value : values) {
collector.emit(value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.storm.redis.trident.state;

import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,13 +32,13 @@
public class RedisClusterStateUpdater extends BaseStateUpdater<RedisClusterState> {
private static final Logger logger = LoggerFactory.getLogger(RedisClusterState.class);

private final String redisKeyPrefix;
private final TupleMapper tupleMapper;
private final RedisStoreMapper storeMapper;
private final int expireIntervalSec;

public RedisClusterStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) {
this.redisKeyPrefix = redisKeyPrefix;
this.tupleMapper = tupleMapper;
public RedisClusterStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
this.storeMapper = storeMapper;
assertDataType(storeMapper.getDataTypeDescription());

if (expireIntervalSec > 0) {
this.expireIntervalSec = expireIntervalSec;
} else {
Expand All @@ -52,19 +54,15 @@ public void updateState(RedisClusterState redisClusterState, List<TridentTuple>
try {
jedisCluster = redisClusterState.getJedisCluster();
for (TridentTuple input : inputs) {
String key = this.tupleMapper.getKeyFromTuple(input);
String redisKey = key;
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
redisKey = redisKeyPrefix + redisKey;
}
String value = this.tupleMapper.getValueFromTuple(input);
String key = storeMapper.getKeyFromTuple(input);
String value = storeMapper.getValueFromTuple(input);

logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
logger.debug("update key[" + key + "] redisKey[" + key + "] value[" + value + "]");

if (this.expireIntervalSec > 0) {
jedisCluster.setex(redisKey, expireIntervalSec, value);
jedisCluster.setex(key, expireIntervalSec, value);
} else {
jedisCluster.set(redisKey, value);
jedisCluster.set(key, value);
}
}
} finally {
Expand All @@ -73,4 +71,11 @@ public void updateState(RedisClusterState redisClusterState, List<TridentTuple>
}
}
}

private void assertDataType(RedisDataTypeDescription storeMapper) {
if (storeMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
throw new IllegalArgumentException("State should be STRING type");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,43 @@

import backtype.storm.tuple.Values;
import com.google.common.collect.Lists;
import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import redis.clients.jedis.Jedis;
import storm.trident.operation.TridentCollector;
import storm.trident.state.BaseQueryFunction;
import storm.trident.tuple.TridentTuple;

import java.util.ArrayList;
import java.util.List;

public class RedisStateQuerier extends BaseQueryFunction<RedisState, String> {
private static final Logger logger = LoggerFactory.getLogger(RedisState.class);
public class RedisStateQuerier extends BaseQueryFunction<RedisState, List<Values>> {
private final RedisLookupMapper lookupMapper;

private final String redisKeyPrefix;
private final TupleMapper tupleMapper;

public RedisStateQuerier(String redisKeyPrefix, TupleMapper tupleMapper) {
this.redisKeyPrefix = redisKeyPrefix;
this.tupleMapper = tupleMapper;
public RedisStateQuerier(RedisLookupMapper lookupMapper) {
this.lookupMapper = lookupMapper;
assertDataType(lookupMapper.getDataTypeDescription());
}

@Override
public List<String> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
public List<List<Values>> batchRetrieve(RedisState redisState, List<TridentTuple> inputs) {
List<List<Values>> values = new ArrayList<List<Values>>();

List<String> keys = Lists.newArrayList();
for (TridentTuple input : inputs) {
String key = this.tupleMapper.getKeyFromTuple(input);
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
key = redisKeyPrefix + key;
}
keys.add(key);
keys.add(lookupMapper.getKeyFromTuple(input));
}

Jedis jedis = null;
try {
jedis = redisState.getJedis();
return jedis.mget(keys.toArray(new String[keys.size()]));
List<String> redisVals = jedis.mget(keys.toArray(new String[keys.size()]));

for (int i = 0 ; i < redisVals.size() ; i++) {
values.add(lookupMapper.toTuple(inputs.get(i), redisVals.get(i)));
}

return values;
} finally {
if (jedis != null) {
redisState.returnJedis(jedis);
Expand All @@ -63,8 +64,15 @@ public List<String> batchRetrieve(RedisState redisState, List<TridentTuple> inpu
}

@Override
public void execute(TridentTuple tuple, String s, TridentCollector collector) {
String key = this.tupleMapper.getKeyFromTuple(tuple);
collector.emit(new Values(key, s));
public void execute(TridentTuple tuple, List<Values> values, TridentCollector collector) {
for (Values value : values) {
collector.emit(value);
}
}

private void assertDataType(RedisDataTypeDescription lookupMapper) {
if (lookupMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
throw new IllegalArgumentException("State should be STRING type");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.storm.redis.trident.state;

import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.redis.common.mapper.TupleMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,13 +32,13 @@
public class RedisStateUpdater extends BaseStateUpdater<RedisState> {
private static final Logger logger = LoggerFactory.getLogger(RedisState.class);

private final String redisKeyPrefix;
private final TupleMapper tupleMapper;
private final RedisStoreMapper storeMapper;
private final int expireIntervalSec;

public RedisStateUpdater(String redisKeyPrefix, TupleMapper tupleMapper, int expireIntervalSec) {
this.redisKeyPrefix = redisKeyPrefix;
this.tupleMapper = tupleMapper;
public RedisStateUpdater(RedisStoreMapper storeMapper, int expireIntervalSec) {
this.storeMapper = storeMapper;
assertDataType(storeMapper.getDataTypeDescription());

if (expireIntervalSec > 0) {
this.expireIntervalSec = expireIntervalSec;
} else {
Expand All @@ -51,19 +53,15 @@ public void updateState(RedisState redisState, List<TridentTuple> inputs,
try {
jedis = redisState.getJedis();
for (TridentTuple input : inputs) {
String key = this.tupleMapper.getKeyFromTuple(input);
String redisKey = key;
if (redisKeyPrefix != null && redisKeyPrefix.length() > 0) {
redisKey = redisKeyPrefix + redisKey;
}
String value = this.tupleMapper.getValueFromTuple(input);
String key = storeMapper.getKeyFromTuple(input);
String value = storeMapper.getValueFromTuple(input);

logger.debug("update key[" + key + "] redisKey[" + redisKey + "] value[" + value + "]");
logger.debug("update key[" + key + "] redisKey[" + key+ "] value[" + value + "]");

if (this.expireIntervalSec > 0) {
jedis.setex(redisKey, expireIntervalSec, value);
jedis.setex(key, expireIntervalSec, value);
} else {
jedis.set(redisKey, value);
jedis.set(key, value);
}
}
} finally {
Expand All @@ -72,4 +70,10 @@ public void updateState(RedisState redisState, List<TridentTuple> inputs,
}
}
}

private void assertDataType(RedisDataTypeDescription storeMapper) {
if (storeMapper.getDataType() != RedisDataTypeDescription.RedisDataType.STRING) {
throw new IllegalArgumentException("State should be STRING type");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.apache.storm.redis.trident;

import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Values;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;

import java.util.ArrayList;
import java.util.List;

public class WordCountLookupMapper implements RedisLookupMapper {
@Override
public List<Values> toTuple(ITuple input, Object value) {
List<Values> values = new ArrayList<Values>();
values.add(new Values(getKeyFromTuple(input), value));
return values;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "value"));
}

@Override
public RedisDataTypeDescription getDataTypeDescription() {
return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
}

@Override
public String getKeyFromTuple(ITuple tuple) {
return "test_" + tuple.getString(0);
}

@Override
public String getValueFromTuple(ITuple tuple) {
return tuple.getInteger(1).toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.apache.storm.redis.trident;

import backtype.storm.tuple.ITuple;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;

public class WordCountStoreMapper implements RedisStoreMapper {
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.STRING);
}

@Override
public String getKeyFromTuple(ITuple tuple) {
return "test_" + tuple.getString(0);
}

@Override
public String getValueFromTuple(ITuple tuple) {
return tuple.getInteger(1).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import org.apache.storm.redis.common.mapper.TupleMapper;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.redis.trident.state.RedisState;
import org.apache.storm.redis.trident.state.RedisStateQuerier;
import org.apache.storm.redis.trident.state.RedisStateUpdater;
Expand All @@ -47,20 +48,22 @@ public static StormTopology buildTopology(String redisHost, Integer redisPort){
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(redisHost).setPort(redisPort)
.build();
TupleMapper tupleMapper = new WordCountTupleMapper();

RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisLookupMapper lookupMapper = new WordCountLookupMapper();
RedisState.Factory factory = new RedisState.Factory(poolConfig);

TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);

stream.partitionPersist(factory,
fields,
new RedisStateUpdater("test_", tupleMapper, 86400000),
new RedisStateUpdater(storeMapper, 86400000),
new Fields());

TridentState state = topology.newStaticState(factory);
stream = stream.stateQuery(state, new Fields("word"),
new RedisStateQuerier("test_", tupleMapper),
new RedisStateQuerier(lookupMapper),
new Fields("columnName","columnValue"));
stream.each(new Fields("word","columnValue"), new PrintFunction(), new Fields());
return topology.build();
Expand Down Expand Up @@ -92,5 +95,4 @@ public static void main(String[] args) throws Exception {
System.out.println("Usage: WordCountTrident 0(storm-local)|1(storm-cluster) redis-host redis-port");
}
}

}
Loading