From 52a7431ef35f54601c3ca80da3a3c5a787ec1f61 Mon Sep 17 00:00:00 2001 From: eric-mulvaney Date: Thu, 2 Apr 2015 16:18:41 -0400 Subject: [PATCH 1/2] Made the keyFactory option public. It can be set from the public API in other ways. Having it public means we can set it along with other values like hkey. --- .../org/apache/storm/redis/trident/state/RedisMapState.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java index f934cea2a8b..b1d4547aeb8 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java @@ -72,7 +72,7 @@ public String build(List key) { public static class Options implements Serializable { public int localCacheSize = 1000; public String globalKey = "$REDIS-MAP-STATE-GLOBAL"; - KeyFactory keyFactory = null; + public KeyFactory keyFactory = null; public Serializer serializer = null; public String hkey = null; } From 4a2ba19bd3577e081f7d007d7e42350a8d7c30ff Mon Sep 17 00:00:00 2001 From: eric-mulvaney Date: Thu, 2 Apr 2015 16:20:28 -0400 Subject: [PATCH 2/2] Added expireIntervalSec to RedisMapState.Options This matches the option available to users of RedisStateUpdater. Bit heavy when setting top-level keys, since Redis doesn't have MSETEX to match its SETEX command where we could include an expiry timeout. Pipelined to mitigate the overhead of needing to issue a separate command per key. --- .../redis/trident/state/RedisMapState.java | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java index b1d4547aeb8..9376f5dda4d 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/trident/state/RedisMapState.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; +import redis.clients.jedis.Pipeline; import storm.trident.state.JSONNonTransactionalSerializer; import storm.trident.state.JSONOpaqueSerializer; import storm.trident.state.JSONTransactionalSerializer; @@ -75,6 +76,7 @@ public static class Options implements Serializable { public KeyFactory keyFactory = null; public Serializer serializer = null; public String hkey = null; + public int expireIntervalSec = 0; } public static interface KeyFactory extends Serializable { @@ -285,30 +287,31 @@ public void multiPut(List> keys, List vals) { return; } - if (Strings.isNullOrEmpty(this.options.hkey)) { - Jedis jedis = null; - try { - jedis = jedisPool.getResource(); + Jedis jedis = jedisPool.getResource(); + try { + if (Strings.isNullOrEmpty(this.options.hkey)) { String[] keyValue = buildKeyValuesList(keys, vals); jedis.mset(keyValue); - } finally { - if (jedis != null) { - jedisPool.returnResource(jedis); + if (this.options.expireIntervalSec > 0) { + Pipeline pipe = jedis.pipelined(); + for(int i = 0; i < keyValue.length; i += 2) { + pipe.expire(keyValue[i], this.options.expireIntervalSec); + } + pipe.sync(); } - } - } else { - Jedis jedis = jedisPool.getResource(); - try { + } else { Map keyValues = new HashMap(); for (int i = 0; i < keys.size(); i++) { String val = new String(serializer.serialize(vals.get(i))); keyValues.put(keyFactory.build(keys.get(i)), val); - } + } jedis.hmset(this.options.hkey, keyValues); - - } finally { - jedisPool.returnResource(jedis); + if (this.options.expireIntervalSec > 0) { + jedis.expire(this.options.hkey, this.options.expireIntervalSec); + } } + } finally { + jedisPool.returnResource(jedis); } }