From df5c3cf2c1d7b7f6989e2a9d4c2fa1c52b09833c Mon Sep 17 00:00:00 2001 From: zenfenan Date: Sun, 20 May 2018 19:22:26 +0530 Subject: [PATCH 1/3] NIFI-4987: Added TTL to RedisDistributedMapCacheClientService --- ...RedisDistributedMapCacheClientService.java | 55 +++++++++---------- 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java index 94b195c2133c..1bf22d5c53b4 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java @@ -29,6 +29,7 @@ import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.redis.RedisConnectionPool; import org.apache.nifi.redis.RedisType; import org.apache.nifi.redis.util.RedisAction; @@ -36,6 +37,7 @@ import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.ScanOptions; +import org.springframework.data.redis.core.types.Expiration; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -44,6 +46,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; @Tags({ "redis", "distributed", "cache", "map" }) @CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on " + @@ -59,14 +62,25 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer .required(true) .build(); + public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder() + .name("redis-cache-ttl") + .displayName("TTL") + .description("Indicates how long the data should exist in Redis. Setting '0 secs' would mean the data would exist forever") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .required(true) + .defaultValue("0 secs") + .build(); + static final List PROPERTY_DESCRIPTORS; static { final List props = new ArrayList<>(); props.add(REDIS_CONNECTION_POOL); + props.add(TTL); PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props); } private volatile RedisConnectionPool redisConnectionPool; + private Long ttl; @Override protected List getSupportedPropertyDescriptors() { @@ -96,6 +110,11 @@ protected Collection customValidate(ValidationContext validati @OnEnabled public void onEnabled(final ConfigurationContext context) { this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class); + this.ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS); + + if (ttl == 0) { + this.ttl = -1L; + } } @OnDisabled @@ -115,38 +134,16 @@ public boolean putIfAbsent(final K key, final V value, final Serializer V getAndPutIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, final Deserializer valueDeserializer) throws IOException { return withConnection(redisConnection -> { final Tuple kv = serialize(key, value, keySerializer, valueSerializer); - do { - // start a watch on the key and retrieve the current value - redisConnection.watch(kv.getKey()); - final byte[] existingValue = redisConnection.get(kv.getKey()); - - // start a transaction and perform the put-if-absent - redisConnection.multi(); - redisConnection.setNX(kv.getKey(), kv.getValue()); - - // execute the transaction - final List results = redisConnection.exec(); - - // if the results list was empty, then the transaction failed (i.e. key was modified after we started watching), so keep looping to retry - // if the results list has results, then the transaction succeeded and it should have the result of the setNX operation - if (results.size() > 0) { - final Object firstResult = results.get(0); - if (firstResult instanceof Boolean) { - final Boolean absent = (Boolean) firstResult; - return absent ? null : valueDeserializer.deserialize(existingValue); - } else { - // this shouldn't really happen, but just in case there is a non-boolean result then bounce out of the loop - throw new IOException("Unexpected result from Redis transaction: Expected Boolean result, but got " - + firstResult.getClass().getName() + " with value " + firstResult.toString()); - } - } - } while (isEnabled()); + final byte[] existingValue = redisConnection.get(kv.getKey()); - return null; + if (!redisConnection.exists(kv.getKey())) { + redisConnection.set(kv.getKey(), kv.getValue(), Expiration.seconds(ttl), null); + } + + return (existingValue == null) ? null : valueDeserializer.deserialize(existingValue); }); } - @Override public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { return withConnection(redisConnection -> { @@ -159,7 +156,7 @@ public boolean containsKey(final K key, final Serializer keySerializer) t public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { withConnection(redisConnection -> { final Tuple kv = serialize(key, value, keySerializer, valueSerializer); - redisConnection.set(kv.getKey(), kv.getValue()); + redisConnection.set(kv.getKey(), kv.getValue(), Expiration.seconds(ttl), null); return null; }); } From e95a742949da3ccbade9ea564f4dffe4fa2e0dd1 Mon Sep 17 00:00:00 2001 From: zenfenan Date: Tue, 22 May 2018 22:17:25 +0530 Subject: [PATCH 2/3] NIFI-4987: PR Review Fixes - Reverted getAndPutIfAbsent and added TTL setting with a different approach --- ...RedisDistributedMapCacheClientService.java | 36 ++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java index 1bf22d5c53b4..d09f726d28b0 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java @@ -134,13 +134,39 @@ public boolean putIfAbsent(final K key, final V value, final Serializer V getAndPutIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, final Deserializer valueDeserializer) throws IOException { return withConnection(redisConnection -> { final Tuple kv = serialize(key, value, keySerializer, valueSerializer); - final byte[] existingValue = redisConnection.get(kv.getKey()); + do { + // start a watch on the key and retrieve the current value + redisConnection.watch(kv.getKey()); + final byte[] existingValue = redisConnection.get(kv.getKey()); + + // start a transaction and perform the put-if-absent + redisConnection.multi(); + redisConnection.setNX(kv.getKey(), kv.getValue()); + + // Set the TTL only if the key doesn't exist already + if (ttl != -1L && existingValue == null) { + redisConnection.expire(kv.getKey(), ttl); + } - if (!redisConnection.exists(kv.getKey())) { - redisConnection.set(kv.getKey(), kv.getValue(), Expiration.seconds(ttl), null); - } + // execute the transaction + final List results = redisConnection.exec(); + + // if the results list was empty, then the transaction failed (i.e. key was modified after we started watching), so keep looping to retry + // if the results list has results, then the transaction succeeded and it should have the result of the setNX operation + if (results.size() > 0) { + final Object firstResult = results.get(0); + if (firstResult instanceof Boolean) { + final Boolean absent = (Boolean) firstResult; + return absent ? null : valueDeserializer.deserialize(existingValue); + } else { + // this shouldn't really happen, but just in case there is a non-boolean result then bounce out of the loop + throw new IOException("Unexpected result from Redis transaction: Expected Boolean result, but got " + + firstResult.getClass().getName() + " with value " + firstResult.toString()); + } + } + } while (isEnabled()); - return (existingValue == null) ? null : valueDeserializer.deserialize(existingValue); + return null; }); } From 5153c03c62d009b8482cd41ed320647aca52c859 Mon Sep 17 00:00:00 2001 From: zenfenan Date: Wed, 23 May 2018 20:55:44 +0530 Subject: [PATCH 3/3] NIFI-4987: PR Review Fixes - Added TTL to putIfAbsent() --- .../service/RedisDistributedMapCacheClientService.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java index d09f726d28b0..604c5ef9c833 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java @@ -126,7 +126,13 @@ public void onDisabled() { public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { return withConnection(redisConnection -> { final Tuple kv = serialize(key, value, keySerializer, valueSerializer); - return redisConnection.setNX(kv.getKey(), kv.getValue()); + boolean set = redisConnection.setNX(kv.getKey(), kv.getValue()); + + if (ttl != -1L && set) { + redisConnection.expire(kv.getKey(), ttl); + } + + return set; }); }