From 8f6b4d8ea820a55fe46f70f1780e6b3477652569 Mon Sep 17 00:00:00 2001 From: ariskk Date: Tue, 14 Mar 2017 10:13:32 +0000 Subject: [PATCH] [BAHIR-95] Add ZREM to Redis commands --- flink-connector-redis/README.md | 5 ++++- .../streaming/connectors/redis/RedisSink.java | 3 +++ .../common/container/RedisClusterContainer.java | 12 ++++++++++++ .../container/RedisCommandsContainer.java | 8 ++++++++ .../redis/common/container/RedisContainer.java | 17 +++++++++++++++++ .../redis/common/mapper/RedisCommand.java | 5 +++++ .../connectors/redis/RedisSinkITCase.java | 15 ++++++++++++--- 7 files changed, 61 insertions(+), 4 deletions(-) diff --git a/flink-connector-redis/README.md b/flink-connector-redis/README.md index 0748a92a..87da6d33 100644 --- a/flink-connector-redis/README.md +++ b/flink-connector-redis/README.md @@ -141,6 +141,9 @@ This section gives a description of all the available data types and what Redis SORTED_SETZADD - + + + SORTED_SETZREM + diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java index 688f94ae..9138862f 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java @@ -151,6 +151,9 @@ public void invoke(IN input) throws Exception { case ZADD: this.redisCommandsContainer.zadd(this.additionalKey, value, key); break; + case ZREM: + this.redisCommandsContainer.zrem(this.additionalKey, key); + break; case HSET: this.redisCommandsContainer.hset(this.additionalKey, key, value); break; diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java index cc1d626a..ba733f75 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisClusterContainer.java @@ -160,6 +160,18 @@ public void zadd(final String key, final String score, final String element) { } } + @Override + public void zrem(final String key, final String element) { + try { + jedisCluster.zrem(key, element); + } catch (Exception e) { + if (LOG.isDebugEnabled()) { + LOG.error("Cannot send Redis message with command ZREM to set {} error message {}", + key, e.getMessage()); + } + } + } + /** * Closes the {@link JedisCluster}. */ diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java index 78771f15..5d7993cf 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainer.java @@ -106,6 +106,14 @@ public interface RedisCommandsContainer extends Serializable { */ void zadd(String key, String score, String element); + /** + * Removes the specified member from the sorted set stored at key. + * + * @param key The name of the Sorted Set + * @param element element to be removed + */ + void zrem(String key, String element); + /** * Close the Jedis container. * diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java index fb73a27b..b862ea4c 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisContainer.java @@ -221,6 +221,23 @@ public void zadd(final String key, final String score, final String element) { } } + @Override + public void zrem(final String key, final String element) { + Jedis jedis = null; + try { + jedis = getInstance(); + jedis.zrem(key, element); + } catch (Exception e) { + if (LOG.isErrorEnabled()) { + LOG.error("Cannot send Redis message with command ZREM to set {} error message {}", + key, e.getMessage()); + } + throw e; + } finally { + releaseInstance(jedis); + } + } + /** * Returns Jedis instance from the pool. * diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java index cf9842c7..019ad463 100644 --- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java +++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/mapper/RedisCommand.java @@ -60,6 +60,11 @@ public enum RedisCommand { */ ZADD(RedisDataType.SORTED_SET), + /** + * Removes the specified members from the sorted set stored at key. + */ + ZREM(RedisDataType.SORTED_SET), + /** * Sets field in the hash stored at key to value. If key does not exist, * a new key holding a hash is created. If field already exists in the hash, it is overwritten. diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java index e0718946..47544f74 100644 --- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java +++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/RedisSinkITCase.java @@ -35,6 +35,7 @@ public class RedisSinkITCase extends RedisITCaseBase { private FlinkJedisPoolConfig jedisPoolConfig; private static final Long NUM_ELEMENTS = 20L; + private static final Long ZERO = 0L; private static final String REDIS_KEY = "TEST_KEY"; private static final String REDIS_ADDITIONAL_KEY = "TEST_ADDITIONAL_KEY"; @@ -97,14 +98,22 @@ public void testRedisHyperLogLogDataType() throws Exception { @Test public void testRedisSortedSetDataType() throws Exception { DataStreamSource> source = env.addSource(new TestSourceFunctionSortedSet()); - RedisSink> redisSink = new RedisSink<>(jedisPoolConfig, + RedisSink> redisZaddSink = new RedisSink<>(jedisPoolConfig, new RedisAdditionalDataMapper(RedisCommand.ZADD)); - source.addSink(redisSink); - env.execute("Test Redis Sorted Set Data Type"); + source.addSink(redisZaddSink); + env.execute("Test ZADD"); assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY)); + RedisSink> redisZremSink = new RedisSink<>(jedisPoolConfig, + new RedisAdditionalDataMapper(RedisCommand.ZREM)); + + source.addSink(redisZremSink); + env.execute("Test ZREM"); + + assertEquals(ZERO, jedis.zcard(REDIS_ADDITIONAL_KEY)); + jedis.del(REDIS_ADDITIONAL_KEY); }