Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storm-2203 Add a getAll method to KeyValueState interface #1798

Closed
wants to merge 10 commits into from
Expand Up @@ -17,20 +17,20 @@
*/
package org.apache.storm.redis.state;

import com.google.common.base.Optional;
import org.apache.storm.state.DefaultStateSerializer;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.Serializer;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder;
import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer;
import org.apache.storm.redis.utils.RedisEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;
import org.apache.commons.codec.binary.Base64;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
Expand All @@ -40,17 +40,16 @@
* A redis based implementation that persists the state in Redis.
*/
public class RedisKeyValueState<K, V> implements KeyValueState<K, V> {
public static final int ITERATOR_CHUNK_SIZE = 100;

private static final Logger LOG = LoggerFactory.getLogger(RedisKeyValueState.class);
private static final String COMMIT_TXID_KEY = "commit";
private static final String PREPARE_TXID_KEY = "prepare";
private static final Serializer<Optional<byte[]>> internalValueSerializer = new DefaultStateSerializer<>();
private static final String TOMBSTONE = encode(internalValueSerializer.serialize(Optional.absent()));


private final String namespace;
private final String prepareNamespace;
private final String txidNamespace;
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
private final RedisEncoder<K, V> encoder;

private final JedisCommandsInstanceContainer jedisContainer;
private Map<String, String> pendingPrepare;
Expand All @@ -74,8 +73,7 @@ public RedisKeyValueState(String namespace, JedisCommandsInstanceContainer jedis
this.namespace = namespace;
this.prepareNamespace = namespace + "$prepare";
this.txidNamespace = namespace + "$txid";
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.encoder = new RedisEncoder<K, V>(keySerializer, valueSerializer);
this.jedisContainer = jedisContainer;
this.pendingPrepare = new ConcurrentHashMap<>();
initTxids();
Expand Down Expand Up @@ -116,15 +114,14 @@ private void initPendingCommit() {
@Override
public void put(K key, V value) {
LOG.debug("put key '{}', value '{}'", key, value);
String redisKey = encode(keySerializer.serialize(key));
pendingPrepare.put(redisKey,
encode(internalValueSerializer.serialize(Optional.of(valueSerializer.serialize(value)))));
String redisKey = encoder.encodeKey(key);
pendingPrepare.put(redisKey, encoder.encodeValue(value));
}

@Override
public V get(K key) {
LOG.debug("get key '{}'", key);
String redisKey = encode(keySerializer.serialize(key));
String redisKey = encoder.encodeKey(key);
String redisValue = null;
if (pendingPrepare.containsKey(redisKey)) {
redisValue = pendingPrepare.get(redisKey);
Expand All @@ -141,12 +138,7 @@ public V get(K key) {
}
V value = null;
if (redisValue != null) {
Optional<byte[]> internalValue = internalValueSerializer.deserialize(decode(redisValue));
if (internalValue.isPresent()) {
value = valueSerializer.deserialize(internalValue.get());
} else {
value = null;
}
value = encoder.decodeValue(redisValue);
}
LOG.debug("Value for key '{}' is '{}'", key, value);
return value;
Expand All @@ -161,12 +153,18 @@ public V get(K key, V defaultValue) {
@Override
public V delete(K key) {
LOG.debug("delete key '{}'", key);
String redisKey = encode(keySerializer.serialize(key));
String redisKey = encoder.encodeKey(key);
V curr = get(key);
pendingPrepare.put(redisKey, TOMBSTONE);
pendingPrepare.put(redisKey, RedisEncoder.TOMBSTONE);
return curr;
}

@Override
public Iterator<Map.Entry<K, V>> iterator() {
return new RedisKeyValueStateIterator<K, V>(namespace, jedisContainer, pendingPrepare.entrySet().iterator(), pendingCommit.entrySet().iterator(),
ITERATOR_CHUNK_SIZE, encoder.getKeySerializer(), encoder.getValueSerializer());
}

@Override
public void prepareCommit(long txid) {
LOG.debug("prepareCommit txid {}", txid);
Expand Down Expand Up @@ -208,7 +206,7 @@ public void commit(long txid) {
List<String> keysToDelete = new ArrayList<>();
Map<String, String> keysToAdd = new HashMap<>();
for(Map.Entry<String, String> entry: pendingCommit.entrySet()) {
if (TOMBSTONE.equals(entry.getValue())) {
if (RedisEncoder.TOMBSTONE.equals(entry.getValue())) {
keysToDelete.add(entry.getKey());
} else {
keysToAdd.put(entry.getKey(), entry.getValue());
Expand Down Expand Up @@ -325,12 +323,4 @@ private Long lastId(String key) {
}
return lastId;
}

private static String encode(byte[] bytes) {
return Base64.encodeBase64String(bytes);
}

private static byte[] decode(String s) {
return Base64.decodeBase64(s);
}
}
@@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.redis.state;

import com.google.common.base.Optional;

import java.util.AbstractMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;

import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer;
import org.apache.storm.redis.utils.RedisEncoder;
import org.apache.storm.state.DefaultStateSerializer;
import org.apache.storm.state.Serializer;

import redis.clients.jedis.JedisCommands;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;

/**
* An iterator over {@link RedisKeyValueState}
*/
public class RedisKeyValueStateIterator<K, V> implements Iterator<Map.Entry<K, V>> {

private final String namespace;
private final Iterator<Map.Entry<String, String>> pendingPrepareIterator;
private final Iterator<Map.Entry<String, String>> pendingCommitIterator;
private final RedisEncoder<K, V> decoder;
private final JedisCommandsInstanceContainer jedisContainer;
private final ScanParams scanParams;
private Iterator<Map.Entry<String, String>> pendingIterator;
private String cursor;
private List<Map.Entry<String, String>> cachedResult;
private int readPosition;

public RedisKeyValueStateIterator(String namespace, JedisCommandsInstanceContainer jedisContainer, Iterator<Map.Entry<String, String>> pendingPrepareIterator, Iterator<Map.Entry<String, String>> pendingCommitIterator, int chunkSize, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this.namespace = namespace;
this.pendingPrepareIterator = pendingPrepareIterator;
this.pendingCommitIterator = pendingCommitIterator;
this.jedisContainer = jedisContainer;
this.decoder = new RedisEncoder<K, V>(keySerializer, valueSerializer);
this.scanParams = new ScanParams().count(chunkSize);
this.cursor = ScanParams.SCAN_POINTER_START;
}

@Override
public boolean hasNext() {
if (pendingPrepareIterator != null && pendingPrepareIterator.hasNext()) {
pendingIterator = pendingPrepareIterator;
return true;
} else if (pendingCommitIterator != null && pendingCommitIterator.hasNext()) {
pendingIterator = pendingCommitIterator;
return true;
} else {
pendingIterator = null;
return !cursor.equals("0");
}
}

@Override
public Map.Entry<K, V> next() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next should throw a NoSuchElementException if there are no more elements. https://docs.oracle.com/javase/7/docs/api/java/util/Iterator.html#next()

in next add a check in the beginning.

if (!hasNext()) {
 throw new NoSuchElementException();
}

if (!hasNext()) {
throw new NoSuchElementException();
}
Map.Entry<String, String> redisKeyValue = null;
if (pendingIterator != null) {
redisKeyValue = pendingIterator.next();
} else {
if (cachedResult == null || readPosition >= cachedResult.size()) {
JedisCommands commands = null;
try {
commands = jedisContainer.getInstance();
ScanResult<Map.Entry<String, String>> scanResult = commands.hscan(namespace, cursor, scanParams);
cachedResult = scanResult.getResult();
cursor = scanResult.getStringCursor();
readPosition = 0;
} finally {
jedisContainer.returnInstance(commands);
}
}
redisKeyValue = cachedResult.get(readPosition);
readPosition += 1;
}
K key = decoder.decodeKey(redisKeyValue.getKey());
V value = decoder.decodeValue(redisKeyValue.getValue());
return new AbstractMap.SimpleEntry(key, value);
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
@@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.redis.utils;

import com.google.common.base.Optional;

import org.apache.commons.codec.binary.Base64;
import org.apache.storm.state.DefaultStateSerializer;
import org.apache.storm.state.Serializer;

/**
* Helper class for encoding/decoding redis key values.
*/
public class RedisEncoder<K, V> {

public static final Serializer<Optional<byte[]>> internalValueSerializer = new DefaultStateSerializer<>();

public static final String TOMBSTONE = encode(internalValueSerializer.serialize(Optional.absent()));

private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;

public RedisEncoder(Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
}

public Serializer<K> getKeySerializer() {
return keySerializer;
}

public Serializer<V> getValueSerializer() {
return valueSerializer;
}

public String encodeKey(K key) {
return encode(keySerializer.serialize(key));
}

public String encodeValue(V value) {
return encode(internalValueSerializer.serialize(
Optional.of(valueSerializer.serialize(value))));
}

public K decodeKey(String redisKey) {
return keySerializer.deserialize(decode(redisKey));
}

public V decodeValue(String redisValue) {
Optional<byte[]> internalValue = internalValueSerializer.deserialize(decode(redisValue));
if (internalValue.isPresent()) {
return valueSerializer.deserialize(internalValue.get());
}
return null;
}

private static String encode(byte[] bytes) {
return Base64.encodeBase64String(bytes);
}

private static byte[] decode(String s) {
return Base64.decodeBase64(s);
}
}
Expand Up @@ -20,6 +20,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -72,6 +74,11 @@ public V delete(K key) {
return state.remove(key);
}

@Override
public Iterator<Map.Entry<K, V>> iterator() {
return state.entrySet().iterator();
}

@Override
public void commit() {
commitedState = new TxIdState<>(DEFAULT_TXID, new ConcurrentHashMap<>(state));
Expand Down
5 changes: 4 additions & 1 deletion storm-core/src/jvm/org/apache/storm/state/KeyValueState.java
Expand Up @@ -17,10 +17,13 @@
*/
package org.apache.storm.state;

import java.util.Iterator;
import java.util.Map;

/**
* A state that supports key-value mappings.
*/
public interface KeyValueState<K, V> extends State {
public interface KeyValueState<K, V> extends State, Iterable<Map.Entry<K, V>> {
/**
* Maps the value with the key
*
Expand Down