From 4e2df4fa929ef5491acef7577c9603cbf41a3afb Mon Sep 17 00:00:00 2001 From: Balazs Kossovics Date: Thu, 2 Jun 2016 14:32:06 +0200 Subject: [PATCH 1/3] STORM-1886 Extend KeyValueState iface with delete The patch also provides implementation for delete in RedisKeyValueState and InMemoryKeyValueState. --- .../storm/redis/state/RedisKeyValueState.java | 66 +++++++++++++++---- .../redis/state/RedisKeyValueStateTest.java | 62 +++++++++++++++-- .../storm/state/InMemoryKeyValueState.java | 5 ++ .../org/apache/storm/state/KeyValueState.java | 7 ++ 4 files changed, 121 insertions(+), 19 deletions(-) diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java index 8769cb02434..c686941069d 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java @@ -31,6 +31,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.HashSet; import java.util.concurrent.ConcurrentHashMap; /** @@ -48,7 +50,9 @@ public class RedisKeyValueState implements KeyValueState { private final Serializer valueSerializer; private final JedisCommandsInstanceContainer jedisContainer; private Map pendingPrepare; + private Set pendingDeletePrepare; private Map pendingCommit; + private Set pendingDeleteCommit; private Map txIds; public RedisKeyValueState(String namespace) { @@ -71,7 +75,8 @@ public RedisKeyValueState(String namespace, JedisCommandsInstanceContainer jedis this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; this.jedisContainer = jedisContainer; - this.pendingPrepare = new ConcurrentHashMap<>(); + this.pendingPrepare = new HashMap<>(); + this.pendingDeletePrepare = new HashSet<>(); initTxids(); initPendingCommit(); } @@ -98,9 +103,11 @@ private void initPendingCommit() { if (commands.exists(prepareNamespace)) { LOG.debug("Loading previously prepared commit from {}", prepareNamespace); pendingCommit = Collections.unmodifiableMap(commands.hgetAll(prepareNamespace)); + pendingDeleteCommit = Collections.emptySet(); } else { LOG.debug("No previously prepared commits."); pendingCommit = Collections.emptyMap(); + pendingDeleteCommit = Collections.emptySet(); } } finally { jedisContainer.returnInstance(commands); @@ -110,8 +117,12 @@ private void initPendingCommit() { @Override public void put(K key, V value) { LOG.debug("put key '{}', value '{}'", key, value); - pendingPrepare.put(encode(keySerializer.serialize(key)), - encode(valueSerializer.serialize(value))); + String redisKey = encode(keySerializer.serialize(key)); + synchronized (this) { + pendingPrepare.put(redisKey, + encode(valueSerializer.serialize(value))); + pendingDeletePrepare.remove(redisKey); + } } @Override @@ -119,11 +130,17 @@ public V get(K key) { LOG.debug("get key '{}'", key); String redisKey = encode(keySerializer.serialize(key)); String redisValue = null; - if (pendingPrepare.containsKey(redisKey)) { - redisValue = pendingPrepare.get(redisKey); - } else if (pendingCommit.containsKey(redisKey)) { - redisValue = pendingCommit.get(redisKey); - } else { + boolean found = false; + synchronized (this) { + if (pendingPrepare.containsKey(redisKey) || pendingDeletePrepare.contains(redisKey)) { + redisValue = pendingPrepare.get(redisKey); + found = true; + } else if (pendingCommit.containsKey(redisKey) || pendingDeleteCommit.contains(redisKey)) { + redisValue = pendingCommit.get(redisKey); + found = true; + } + } + if (!found) { JedisCommands commands = null; try { commands = jedisContainer.getInstance(); @@ -147,7 +164,17 @@ public V get(K key, V defaultValue) { } @Override - public void prepareCommit(long txid) { + public void delete(K key) { + LOG.debug("delete key '{}'", key); + String redisKey = encode(keySerializer.serialize(key)); + synchronized (this) { + pendingDeletePrepare.add(redisKey); + pendingPrepare.remove(redisKey); + } + } + + @Override + public void prepareCommit(long txid){ LOG.debug("prepareCommit txid {}", txid); validatePrepareTxid(txid); JedisCommands commands = null; @@ -161,15 +188,20 @@ public void prepareCommit(long txid) { } } } - if (!pendingPrepare.isEmpty()) { + if (!pendingPrepare.isEmpty() || !pendingDeletePrepare.isEmpty()) { commands.hmset(prepareNamespace, pendingPrepare); + commands.hdel(prepareNamespace, pendingDeletePrepare.toArray(new String[pendingDeletePrepare.size()])); } else { LOG.debug("Nothing to save for prepareCommit, txid {}.", txid); } txIds.put(PREPARE_TXID_KEY, String.valueOf(txid)); commands.hmset(txidNamespace, txIds); - pendingCommit = Collections.unmodifiableMap(pendingPrepare); + synchronized (this) { + pendingCommit = Collections.unmodifiableMap(pendingPrepare); + pendingDeleteCommit = Collections.unmodifiableSet(pendingDeletePrepare); + } pendingPrepare = new ConcurrentHashMap<>(); + pendingDeletePrepare = Collections.synchronizedSet(new HashSet()); } finally { jedisContainer.returnInstance(commands); } @@ -182,8 +214,9 @@ public void commit(long txid) { JedisCommands commands = null; try { commands = jedisContainer.getInstance(); - if (!pendingCommit.isEmpty()) { + if (!pendingCommit.isEmpty() || !pendingDeleteCommit.isEmpty()) { commands.hmset(namespace, pendingCommit); + commands.hdel(namespace, pendingDeleteCommit.toArray(new String[pendingDeleteCommit.size()])); } else { LOG.debug("Nothing to save for commit, txid {}.", txid); } @@ -191,6 +224,7 @@ public void commit(long txid) { commands.hmset(txidNamespace, txIds); commands.del(prepareNamespace); pendingCommit = Collections.emptyMap(); + pendingDeleteCommit = Collections.emptySet(); } finally { jedisContainer.returnInstance(commands); } @@ -201,12 +235,14 @@ public void commit() { JedisCommands commands = null; try { commands = jedisContainer.getInstance(); - if (!pendingPrepare.isEmpty()) { + if (!pendingPrepare.isEmpty() || !pendingDeletePrepare.isEmpty()) { commands.hmset(namespace, pendingPrepare); + commands.hdel(namespace, pendingDeletePrepare.toArray(new String[pendingDeletePrepare.size()])); } else { LOG.debug("Nothing to save for commit"); } - pendingPrepare = new ConcurrentHashMap<>(); + pendingPrepare = new HashMap<>(); + pendingDeletePrepare = new HashSet<>(); } finally { jedisContainer.returnInstance(commands); } @@ -234,7 +270,9 @@ public void rollback() { commands.hmset(txidNamespace, txIds); } pendingCommit = Collections.emptyMap(); + pendingDeleteCommit = Collections.emptySet(); pendingPrepare = new ConcurrentHashMap<>(); + pendingDeletePrepare = Collections.synchronizedSet(new HashSet()); } finally { jedisContainer.returnInstance(commands); } diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java index ea8cc157bd3..f5525d0f74f 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java @@ -32,10 +32,9 @@ import redis.clients.jedis.Tuple; import java.util.HashMap; -import java.util.List; +import java.util.Arrays; import java.util.Map; import java.util.Set; - import static org.junit.Assert.*; /** @@ -93,6 +92,17 @@ public String answer(InvocationOnMock invocation) throws Throwable { } }); + Mockito.when(mockCommands.hdel(Mockito.anyString(), Mockito.anyVararg())) + .thenAnswer(new Answer() { + @Override + public Long answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + int argsSize = args.length; + String[] fields = Arrays.asList(args).subList(1, argsSize).toArray(new String[argsSize - 1]); + return hdel(mockMap, (String) args[0], fields); + } + }); + keyValueState = new RedisKeyValueState("test", mockContainer, new DefaultStateSerializer(), new DefaultStateSerializer()); } @@ -107,6 +117,19 @@ public void testPutAndGet() throws Exception { assertEquals(null, keyValueState.get("c")); } + @Test + public void testPutAndDelete() throws Exception { + keyValueState.put("a", "1"); + keyValueState.put("b", "2"); + assertEquals("1", keyValueState.get("a")); + assertEquals("2", keyValueState.get("b")); + assertEquals(null, keyValueState.get("c")); + keyValueState.delete("a"); + assertEquals(null, keyValueState.get("a")); + assertEquals("2", keyValueState.get("b")); + assertEquals(null, keyValueState.get("c")); + } + @Test public void testPrepareCommitRollback() throws Exception { keyValueState.put("a", "1"); @@ -124,6 +147,20 @@ public void testPrepareCommitRollback() throws Exception { assertArrayEquals(new String[]{"1", "2", "3"}, getValues()); keyValueState.rollback(); assertArrayEquals(new String[]{"1", "2", null}, getValues()); + keyValueState.put("c", "3"); + keyValueState.delete("b"); + keyValueState.delete("c"); + assertArrayEquals(new String[]{"1", null, null}, getValues()); + keyValueState.prepareCommit(2); + assertArrayEquals(new String[]{"1", null, null}, getValues()); + keyValueState.commit(2); + assertArrayEquals(new String[]{"1", null, null}, getValues()); + keyValueState.put("b", "2"); + keyValueState.prepareCommit(3); + keyValueState.put("c", "3"); + assertArrayEquals(new String[]{"1", "2", "3"}, getValues()); + keyValueState.rollback(); + assertArrayEquals(new String[]{"1", null, null}, getValues()); } private String[] getValues() { @@ -135,13 +172,20 @@ private String[] getValues() { } private String hmset(Map> mockMap, String key, Map value) { - mockMap.put(key, value); + Map currentValue = mockMap.get(key); + if (currentValue == null) { + currentValue = new HashMap<>(); + } + currentValue.putAll(value); + mockMap.put(key, currentValue); return ""; } private Long del(Map> mockMap, String key) { - mockMap.remove(key); - return 0L; + if (mockMap.remove(key) == null) + return 0L; + else + return 1L; } private String hget(Map> mockMap, String namespace, String key) { @@ -151,4 +195,12 @@ private String hget(Map> mockMap, String namespace, return null; } + private Long hdel(Map> mockMap, String namespace, String ... keys) { + Long count = 0L; + for (String key: keys) { + if (mockMap.get(namespace).remove(key) != null) count++; + } + return count; + } + } \ No newline at end of file diff --git a/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java b/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java index 4b116ba67d0..4774d721a4e 100644 --- a/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java +++ b/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java @@ -67,6 +67,11 @@ public V get(K key, V defaultValue) { return val != null ? val : defaultValue; } + @Override + public void delete(K key) { + state.remove(key); + } + @Override public void commit() { commitedState = new TxIdState<>(DEFAULT_TXID, new ConcurrentHashMap<>(state)); diff --git a/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java b/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java index 3ab60f1b09b..0e1facbf98b 100644 --- a/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java +++ b/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java @@ -45,4 +45,11 @@ public interface KeyValueState extends State { * @return the value or defaultValue if no mapping is found */ V get(K key, V defaultValue); + + /** + * Deletes the value mapped to the key, if there is any + * + * @param key the key + */ + void delete(K key); } From 0089f3998a07f4980ac52a2db637d13e570eefa3 Mon Sep 17 00:00:00 2001 From: Balazs Kossovics Date: Fri, 24 Jun 2016 17:51:32 +0200 Subject: [PATCH 2/3] Use a Map with tombstones to represent deletions --- .../storm/redis/state/RedisKeyValueState.java | 95 +++++++++---------- .../redis/state/RedisKeyValueStateTest.java | 9 +- .../storm/state/DefaultStateSerializer.java | 2 + .../storm/state/InMemoryKeyValueState.java | 4 +- .../org/apache/storm/state/KeyValueState.java | 2 +- .../state/InMemoryKeyValueStateTest.java | 27 ++++++ 6 files changed, 80 insertions(+), 59 deletions(-) diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java index c686941069d..db1a0614543 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java @@ -17,6 +17,7 @@ */ package org.apache.storm.redis.state; +import com.google.common.base.Optional; import org.apache.storm.state.DefaultStateSerializer; import org.apache.storm.state.KeyValueState; import org.apache.storm.state.Serializer; @@ -31,8 +32,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Set; -import java.util.HashSet; +import java.util.List; +import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; /** @@ -46,13 +47,12 @@ public class RedisKeyValueState implements KeyValueState { private final String namespace; private final String prepareNamespace; private final String txidNamespace; + private final String tombstone; private final Serializer keySerializer; - private final Serializer valueSerializer; + private final Serializer> valueSerializer; private final JedisCommandsInstanceContainer jedisContainer; private Map pendingPrepare; - private Set pendingDeletePrepare; private Map pendingCommit; - private Set pendingDeleteCommit; private Map txIds; public RedisKeyValueState(String namespace) { @@ -60,23 +60,23 @@ public RedisKeyValueState(String namespace) { } public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig) { - this(namespace, poolConfig, new DefaultStateSerializer(), new DefaultStateSerializer()); + this(namespace, poolConfig, new DefaultStateSerializer(), new DefaultStateSerializer>()); } - public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig, Serializer keySerializer, Serializer valueSerializer) { + public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig, Serializer keySerializer, Serializer> valueSerializer) { this(namespace, JedisCommandsContainerBuilder.build(poolConfig), keySerializer, valueSerializer); } public RedisKeyValueState(String namespace, JedisCommandsInstanceContainer jedisContainer, - Serializer keySerializer, Serializer valueSerializer) { + Serializer keySerializer, Serializer> valueSerializer) { this.namespace = namespace; this.prepareNamespace = namespace + "$prepare"; this.txidNamespace = namespace + "$txid"; this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; + this.tombstone = encode(valueSerializer.serialize(Optional.absent())); this.jedisContainer = jedisContainer; - this.pendingPrepare = new HashMap<>(); - this.pendingDeletePrepare = new HashSet<>(); + this.pendingPrepare = new ConcurrentHashMap<>(); initTxids(); initPendingCommit(); } @@ -103,11 +103,9 @@ private void initPendingCommit() { if (commands.exists(prepareNamespace)) { LOG.debug("Loading previously prepared commit from {}", prepareNamespace); pendingCommit = Collections.unmodifiableMap(commands.hgetAll(prepareNamespace)); - pendingDeleteCommit = Collections.emptySet(); } else { LOG.debug("No previously prepared commits."); pendingCommit = Collections.emptyMap(); - pendingDeleteCommit = Collections.emptySet(); } } finally { jedisContainer.returnInstance(commands); @@ -118,11 +116,8 @@ private void initPendingCommit() { public void put(K key, V value) { LOG.debug("put key '{}', value '{}'", key, value); String redisKey = encode(keySerializer.serialize(key)); - synchronized (this) { - pendingPrepare.put(redisKey, - encode(valueSerializer.serialize(value))); - pendingDeletePrepare.remove(redisKey); - } + pendingPrepare.put(redisKey, + encode(valueSerializer.serialize(Optional.of(value)))); } @Override @@ -130,17 +125,11 @@ public V get(K key) { LOG.debug("get key '{}'", key); String redisKey = encode(keySerializer.serialize(key)); String redisValue = null; - boolean found = false; - synchronized (this) { - if (pendingPrepare.containsKey(redisKey) || pendingDeletePrepare.contains(redisKey)) { - redisValue = pendingPrepare.get(redisKey); - found = true; - } else if (pendingCommit.containsKey(redisKey) || pendingDeleteCommit.contains(redisKey)) { - redisValue = pendingCommit.get(redisKey); - found = true; - } - } - if (!found) { + if (pendingPrepare.containsKey(redisKey)) { + redisValue = pendingPrepare.get(redisKey); + } else if (pendingCommit.containsKey(redisKey)) { + redisValue = pendingCommit.get(redisKey); + } else { JedisCommands commands = null; try { commands = jedisContainer.getInstance(); @@ -151,7 +140,7 @@ public V get(K key) { } V value = null; if (redisValue != null) { - value = valueSerializer.deserialize(decode(redisValue)); + value = valueSerializer.deserialize(decode(redisValue)).orNull(); } LOG.debug("Value for key '{}' is '{}'", key, value); return value; @@ -164,17 +153,16 @@ public V get(K key, V defaultValue) { } @Override - public void delete(K key) { + public V delete(K key) { LOG.debug("delete key '{}'", key); String redisKey = encode(keySerializer.serialize(key)); - synchronized (this) { - pendingDeletePrepare.add(redisKey); - pendingPrepare.remove(redisKey); - } + V curr = get(key); + pendingPrepare.put(redisKey, tombstone); + return curr; } @Override - public void prepareCommit(long txid){ + public void prepareCommit(long txid) { LOG.debug("prepareCommit txid {}", txid); validatePrepareTxid(txid); JedisCommands commands = null; @@ -188,20 +176,15 @@ public void prepareCommit(long txid){ } } } - if (!pendingPrepare.isEmpty() || !pendingDeletePrepare.isEmpty()) { + if (!pendingPrepare.isEmpty()) { commands.hmset(prepareNamespace, pendingPrepare); - commands.hdel(prepareNamespace, pendingDeletePrepare.toArray(new String[pendingDeletePrepare.size()])); } else { LOG.debug("Nothing to save for prepareCommit, txid {}.", txid); } txIds.put(PREPARE_TXID_KEY, String.valueOf(txid)); commands.hmset(txidNamespace, txIds); - synchronized (this) { - pendingCommit = Collections.unmodifiableMap(pendingPrepare); - pendingDeleteCommit = Collections.unmodifiableSet(pendingDeletePrepare); - } + pendingCommit = Collections.unmodifiableMap(pendingPrepare); pendingPrepare = new ConcurrentHashMap<>(); - pendingDeletePrepare = Collections.synchronizedSet(new HashSet()); } finally { jedisContainer.returnInstance(commands); } @@ -214,9 +197,22 @@ public void commit(long txid) { JedisCommands commands = null; try { commands = jedisContainer.getInstance(); - if (!pendingCommit.isEmpty() || !pendingDeleteCommit.isEmpty()) { - commands.hmset(namespace, pendingCommit); - commands.hdel(namespace, pendingDeleteCommit.toArray(new String[pendingDeleteCommit.size()])); + if (!pendingCommit.isEmpty()) { + List keysToDelete = new ArrayList<>(); + Map keysToAdd = new HashMap<>(); + for(Map.Entry entry: pendingCommit.entrySet()) { + if (tombstone.equals(entry.getValue())) { + keysToDelete.add(entry.getKey()); + } else { + keysToAdd.put(entry.getKey(), entry.getValue()); + } + } + if (!keysToAdd.isEmpty()) { + commands.hmset(namespace, keysToAdd); + } + if (!keysToDelete.isEmpty()) { + commands.hdel(namespace, keysToDelete.toArray(new String[0])); + } } else { LOG.debug("Nothing to save for commit, txid {}.", txid); } @@ -224,7 +220,6 @@ public void commit(long txid) { commands.hmset(txidNamespace, txIds); commands.del(prepareNamespace); pendingCommit = Collections.emptyMap(); - pendingDeleteCommit = Collections.emptySet(); } finally { jedisContainer.returnInstance(commands); } @@ -235,14 +230,12 @@ public void commit() { JedisCommands commands = null; try { commands = jedisContainer.getInstance(); - if (!pendingPrepare.isEmpty() || !pendingDeletePrepare.isEmpty()) { + if (!pendingPrepare.isEmpty()) { commands.hmset(namespace, pendingPrepare); - commands.hdel(namespace, pendingDeletePrepare.toArray(new String[pendingDeletePrepare.size()])); } else { LOG.debug("Nothing to save for commit"); } - pendingPrepare = new HashMap<>(); - pendingDeletePrepare = new HashSet<>(); + pendingPrepare = new ConcurrentHashMap<>(); } finally { jedisContainer.returnInstance(commands); } @@ -270,9 +263,7 @@ public void rollback() { commands.hmset(txidNamespace, txIds); } pendingCommit = Collections.emptyMap(); - pendingDeleteCommit = Collections.emptySet(); pendingPrepare = new ConcurrentHashMap<>(); - pendingDeletePrepare = Collections.synchronizedSet(new HashSet()); } finally { jedisContainer.returnInstance(commands); } diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java index f5525d0f74f..1bb72a75e08 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java @@ -17,6 +17,7 @@ */ package org.apache.storm.redis.state; +import com.google.common.base.Optional; import org.apache.storm.state.DefaultStateSerializer; import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer; import org.junit.Before; @@ -104,7 +105,7 @@ public Long answer(InvocationOnMock invocation) throws Throwable { }); keyValueState = new RedisKeyValueState("test", mockContainer, new DefaultStateSerializer(), - new DefaultStateSerializer()); + new DefaultStateSerializer>()); } @@ -124,7 +125,7 @@ public void testPutAndDelete() throws Exception { assertEquals("1", keyValueState.get("a")); assertEquals("2", keyValueState.get("b")); assertEquals(null, keyValueState.get("c")); - keyValueState.delete("a"); + assertEquals("1", keyValueState.delete("a")); assertEquals(null, keyValueState.get("a")); assertEquals("2", keyValueState.get("b")); assertEquals(null, keyValueState.get("c")); @@ -148,8 +149,8 @@ public void testPrepareCommitRollback() throws Exception { keyValueState.rollback(); assertArrayEquals(new String[]{"1", "2", null}, getValues()); keyValueState.put("c", "3"); - keyValueState.delete("b"); - keyValueState.delete("c"); + assertEquals("2", keyValueState.delete("b")); + assertEquals("3", keyValueState.delete("c")); assertArrayEquals(new String[]{"1", null, null}, getValues()); keyValueState.prepareCommit(2); assertArrayEquals(new String[]{"1", null, null}, getValues()); diff --git a/storm-core/src/jvm/org/apache/storm/state/DefaultStateSerializer.java b/storm-core/src/jvm/org/apache/storm/state/DefaultStateSerializer.java index 55e934e7bb8..d37101d5d0d 100644 --- a/storm-core/src/jvm/org/apache/storm/state/DefaultStateSerializer.java +++ b/storm-core/src/jvm/org/apache/storm/state/DefaultStateSerializer.java @@ -20,6 +20,7 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import org.objenesis.strategy.StdInstantiatorStrategy; import java.util.Collections; import java.util.List; @@ -40,6 +41,7 @@ public class DefaultStateSerializer implements Serializer { */ public DefaultStateSerializer(List> classesToRegister) { kryo = new Kryo(); + kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); output = new Output(2000, 2000000000); for (Class klazz : classesToRegister) { kryo.register(klazz); diff --git a/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java b/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java index 4774d721a4e..72c1523d6fe 100644 --- a/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java +++ b/storm-core/src/jvm/org/apache/storm/state/InMemoryKeyValueState.java @@ -68,8 +68,8 @@ public V get(K key, V defaultValue) { } @Override - public void delete(K key) { - state.remove(key); + public V delete(K key) { + return state.remove(key); } @Override diff --git a/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java b/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java index 0e1facbf98b..70863313d8e 100644 --- a/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java +++ b/storm-core/src/jvm/org/apache/storm/state/KeyValueState.java @@ -51,5 +51,5 @@ public interface KeyValueState extends State { * * @param key the key */ - void delete(K key); + V delete(K key); } diff --git a/storm-core/test/jvm/org/apache/storm/state/InMemoryKeyValueStateTest.java b/storm-core/test/jvm/org/apache/storm/state/InMemoryKeyValueStateTest.java index 361865bb258..d215627aae8 100644 --- a/storm-core/test/jvm/org/apache/storm/state/InMemoryKeyValueStateTest.java +++ b/storm-core/test/jvm/org/apache/storm/state/InMemoryKeyValueStateTest.java @@ -43,6 +43,19 @@ public void testPutAndGet() throws Exception { assertArrayEquals(new String[]{"1", "2", null}, getValues()); } + @Test + public void testPutAndDelete() throws Exception { + keyValueState.put("a", "1"); + keyValueState.put("b", "2"); + assertEquals("1", keyValueState.get("a")); + assertEquals("2", keyValueState.get("b")); + assertEquals(null, keyValueState.get("c")); + assertEquals("1", keyValueState.delete("a")); + assertEquals(null, keyValueState.get("a")); + assertEquals("2", keyValueState.get("b")); + assertEquals(null, keyValueState.get("c")); + } + @Test public void testPrepareCommitRollback() throws Exception { keyValueState.put("a", "1"); @@ -60,6 +73,20 @@ public void testPrepareCommitRollback() throws Exception { assertArrayEquals(new String[]{"1", "2", "3"}, getValues()); keyValueState.rollback(); assertArrayEquals(new String[]{"1", "2", null}, getValues()); + keyValueState.put("c", "3"); + assertEquals("2", keyValueState.delete("b")); + assertEquals("3", keyValueState.delete("c")); + assertArrayEquals(new String[]{"1", null, null}, getValues()); + keyValueState.prepareCommit(2); + assertArrayEquals(new String[]{"1", null, null}, getValues()); + keyValueState.commit(2); + assertArrayEquals(new String[]{"1", null, null}, getValues()); + keyValueState.put("b", "2"); + keyValueState.prepareCommit(3); + keyValueState.put("c", "3"); + assertArrayEquals(new String[]{"1", "2", "3"}, getValues()); + keyValueState.rollback(); + assertArrayEquals(new String[]{"1", null, null}, getValues()); } private String[] getValues() { From d34717c27d45f7bc8def82339a547e349c2c7975 Mon Sep 17 00:00:00 2001 From: Balazs Kossovics Date: Mon, 28 Nov 2016 17:24:16 +0100 Subject: [PATCH 3/3] STORM-1886 Address last comments * make tombstone static * hide the use of Optional from the users * fix race condition * make encode and decode methods static --- .../storm/redis/state/RedisKeyValueState.java | 43 +++++++++++-------- .../redis/state/RedisKeyValueStateTest.java | 2 +- 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java index db1a0614543..c2ca4dfda7d 100644 --- a/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java +++ b/external/storm-redis/src/main/java/org/apache/storm/redis/state/RedisKeyValueState.java @@ -43,13 +43,15 @@ public class RedisKeyValueState implements KeyValueState { private static final Logger LOG = LoggerFactory.getLogger(RedisKeyValueState.class); private static final String COMMIT_TXID_KEY = "commit"; private static final String PREPARE_TXID_KEY = "prepare"; + private static final Serializer> internalValueSerializer = new DefaultStateSerializer<>(); + private static final String TOMBSTONE = encode(internalValueSerializer.serialize(Optional.absent())); private final String namespace; private final String prepareNamespace; private final String txidNamespace; - private final String tombstone; private final Serializer keySerializer; - private final Serializer> valueSerializer; + private final Serializer valueSerializer; + private final JedisCommandsInstanceContainer jedisContainer; private Map pendingPrepare; private Map pendingCommit; @@ -60,21 +62,20 @@ public RedisKeyValueState(String namespace) { } public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig) { - this(namespace, poolConfig, new DefaultStateSerializer(), new DefaultStateSerializer>()); + this(namespace, poolConfig, new DefaultStateSerializer(), new DefaultStateSerializer()); } - public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig, Serializer keySerializer, Serializer> valueSerializer) { + public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig, Serializer keySerializer, Serializer valueSerializer) { this(namespace, JedisCommandsContainerBuilder.build(poolConfig), keySerializer, valueSerializer); } public RedisKeyValueState(String namespace, JedisCommandsInstanceContainer jedisContainer, - Serializer keySerializer, Serializer> valueSerializer) { + Serializer keySerializer, Serializer valueSerializer) { this.namespace = namespace; this.prepareNamespace = namespace + "$prepare"; this.txidNamespace = namespace + "$txid"; this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; - this.tombstone = encode(valueSerializer.serialize(Optional.absent())); this.jedisContainer = jedisContainer; this.pendingPrepare = new ConcurrentHashMap<>(); initTxids(); @@ -117,7 +118,7 @@ public void put(K key, V value) { LOG.debug("put key '{}', value '{}'", key, value); String redisKey = encode(keySerializer.serialize(key)); pendingPrepare.put(redisKey, - encode(valueSerializer.serialize(Optional.of(value)))); + encode(internalValueSerializer.serialize(Optional.of(valueSerializer.serialize(value))))); } @Override @@ -140,7 +141,12 @@ public V get(K key) { } V value = null; if (redisValue != null) { - value = valueSerializer.deserialize(decode(redisValue)).orNull(); + Optional internalValue = internalValueSerializer.deserialize(decode(redisValue)); + if (internalValue.isPresent()) { + value = valueSerializer.deserialize(internalValue.get()); + } else { + value = null; + } } LOG.debug("Value for key '{}' is '{}'", key, value); return value; @@ -157,7 +163,7 @@ public V delete(K key) { LOG.debug("delete key '{}'", key); String redisKey = encode(keySerializer.serialize(key)); V curr = get(key); - pendingPrepare.put(redisKey, tombstone); + pendingPrepare.put(redisKey, TOMBSTONE); return curr; } @@ -167,24 +173,25 @@ public void prepareCommit(long txid) { validatePrepareTxid(txid); JedisCommands commands = null; try { + Map currentPending = pendingPrepare; + pendingPrepare = new ConcurrentHashMap<>(); commands = jedisContainer.getInstance(); if (commands.exists(prepareNamespace)) { LOG.debug("Prepared txn already exists, will merge", txid); for (Map.Entry e: pendingCommit.entrySet()) { - if (!pendingPrepare.containsKey(e.getKey())) { - pendingPrepare.put(e.getKey(), e.getValue()); + if (!currentPending.containsKey(e.getKey())) { + currentPending.put(e.getKey(), e.getValue()); } } } - if (!pendingPrepare.isEmpty()) { - commands.hmset(prepareNamespace, pendingPrepare); + if (!currentPending.isEmpty()) { + commands.hmset(prepareNamespace, currentPending); } else { LOG.debug("Nothing to save for prepareCommit, txid {}.", txid); } txIds.put(PREPARE_TXID_KEY, String.valueOf(txid)); commands.hmset(txidNamespace, txIds); - pendingCommit = Collections.unmodifiableMap(pendingPrepare); - pendingPrepare = new ConcurrentHashMap<>(); + pendingCommit = Collections.unmodifiableMap(currentPending); } finally { jedisContainer.returnInstance(commands); } @@ -201,7 +208,7 @@ public void commit(long txid) { List keysToDelete = new ArrayList<>(); Map keysToAdd = new HashMap<>(); for(Map.Entry entry: pendingCommit.entrySet()) { - if (tombstone.equals(entry.getValue())) { + if (TOMBSTONE.equals(entry.getValue())) { keysToDelete.add(entry.getKey()); } else { keysToAdd.put(entry.getKey(), entry.getValue()); @@ -319,11 +326,11 @@ private Long lastId(String key) { return lastId; } - private String encode(byte[] bytes) { + private static String encode(byte[] bytes) { return Base64.encodeBase64String(bytes); } - private byte[] decode(String s) { + private static byte[] decode(String s) { return Base64.decodeBase64(s); } } diff --git a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java index 1bb72a75e08..e40885daec2 100644 --- a/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java +++ b/external/storm-redis/src/test/java/org/apache/storm/redis/state/RedisKeyValueStateTest.java @@ -105,7 +105,7 @@ public Long answer(InvocationOnMock invocation) throws Throwable { }); keyValueState = new RedisKeyValueState("test", mockContainer, new DefaultStateSerializer(), - new DefaultStateSerializer>()); + new DefaultStateSerializer()); }