From 356e7474c9ede3c45232b39e895d554349a55328 Mon Sep 17 00:00:00 2001 From: ddebree Date: Thu, 13 Aug 2015 13:41:36 +0200 Subject: [PATCH 1/2] Refactored TimeCacheMap to extend RotatingMap. This should allow current users of the TimeCacheMap to progressively migrate off the deprecated TimeCacheMap to the simpler RotatingMap. This also improves code reuse since many of the methods were shared. --- .../jvm/backtype/storm/utils/RotatingMap.java | 12 ++- .../backtype/storm/utils/TimeCacheMap.java | 79 +++++-------------- 2 files changed, 26 insertions(+), 65 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java b/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java index aca8db6943a..2b8d66bba56 100644 --- a/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java +++ b/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java @@ -32,18 +32,22 @@ * * The advantage of this design is that the expiration thread only locks the object * for O(1) time, meaning the object is essentially always available for gets/puts. + * + * Note: This class is not thread-safe since it does not protect against changes to + * _buckets while it is being read + * */ public class RotatingMap { //this default ensures things expire at most 50% past the expiration time private static final int DEFAULT_NUM_BUCKETS = 3; - public static interface ExpiredCallback { - public void expire(K key, V val); + public interface ExpiredCallback { + void expire(K key, V val); } - private LinkedList> _buckets; + private final LinkedList> _buckets; - private ExpiredCallback _callback; + private final ExpiredCallback _callback; public RotatingMap(int numBuckets, ExpiredCallback callback) { if(numBuckets<2) { diff --git a/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java b/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java index 36d1baeb264..ba9594ca2cd 100644 --- a/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java +++ b/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java @@ -17,11 +17,7 @@ */ package backtype.storm.utils; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; import java.util.Map; -import java.util.Map.Entry; /** * Expires keys that have not been updated in the configured number of seconds. @@ -35,48 +31,28 @@ */ //deprecated in favor of non-threaded RotatingMap @Deprecated -public class TimeCacheMap { +public class TimeCacheMap extends RotatingMap { //this default ensures things expire at most 50% past the expiration time private static final int DEFAULT_NUM_BUCKETS = 3; - public static interface ExpiredCallback { - public void expire(K key, V val); + public interface ExpiredCallback extends RotatingMap.ExpiredCallback { + void expire(K key, V val); } - private LinkedList> _buckets; - private final Object _lock = new Object(); - private Thread _cleaner; - private ExpiredCallback _callback; + private final Thread _cleaner; public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback callback) { - if(numBuckets<2) { - throw new IllegalArgumentException("numBuckets must be >= 2"); - } - _buckets = new LinkedList>(); - for(int i=0; i()); - } - + super(numBuckets, callback); - _callback = callback; final long expirationMillis = expirationSecs * 1000L; final long sleepTime = expirationMillis / (numBuckets-1); _cleaner = new Thread(new Runnable() { public void run() { try { while(true) { - Map dead = null; Time.sleep(sleepTime); - synchronized(_lock) { - dead = _buckets.removeLast(); - _buckets.addFirst(new HashMap()); - } - if(_callback!=null) { - for(Entry entry: dead.entrySet()) { - _callback.expire(entry.getKey(), entry.getValue()); - } - } + rotate(); } } catch (InterruptedException ex) { @@ -99,59 +75,40 @@ public TimeCacheMap(int expirationSecs, int numBuckets) { this(expirationSecs, numBuckets, null); } - public boolean containsKey(K key) { synchronized(_lock) { - for(HashMap bucket: _buckets) { - if(bucket.containsKey(key)) { - return true; - } - } - return false; + return super.containsKey(key); } } public V get(K key) { synchronized(_lock) { - for(HashMap bucket: _buckets) { - if(bucket.containsKey(key)) { - return bucket.get(key); - } - } - return null; + return super.get(key); } } public void put(K key, V value) { synchronized(_lock) { - Iterator> it = _buckets.iterator(); - HashMap bucket = it.next(); - bucket.put(key, value); - while(it.hasNext()) { - bucket = it.next(); - bucket.remove(key); - } + super.put(key, value); } } public Object remove(K key) { synchronized(_lock) { - for(HashMap bucket: _buckets) { - if(bucket.containsKey(key)) { - return bucket.remove(key); - } - } - return null; + return super.remove(key); } } public int size() { synchronized(_lock) { - int size = 0; - for(HashMap bucket: _buckets) { - size+=bucket.size(); - } - return size; + return super.size(); + } + } + + @Override + public Map rotate() { + synchronized (_lock) { + return super.rotate(); } } From 0f283f662e98a2163d20c4fd8bf719d1885edfce Mon Sep 17 00:00:00 2001 From: ddebree Date: Fri, 14 Aug 2015 23:02:01 +0200 Subject: [PATCH 2/2] Changed to use RotatingMap as a member variable instead of a super-class. --- .../backtype/storm/utils/TimeCacheMap.java | 39 +++++++++++-------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java b/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java index ba9594ca2cd..dd926960c73 100644 --- a/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java +++ b/storm-core/src/jvm/backtype/storm/utils/TimeCacheMap.java @@ -18,6 +18,7 @@ package backtype.storm.utils; import java.util.Map; +import java.util.Map.Entry; /** * Expires keys that have not been updated in the configured number of seconds. @@ -31,28 +32,41 @@ */ //deprecated in favor of non-threaded RotatingMap @Deprecated -public class TimeCacheMap extends RotatingMap { +public class TimeCacheMap { //this default ensures things expire at most 50% past the expiration time private static final int DEFAULT_NUM_BUCKETS = 3; - public interface ExpiredCallback extends RotatingMap.ExpiredCallback { + public interface ExpiredCallback { void expire(K key, V val); } + private final RotatingMap _rotatingMap; + private final Object _lock = new Object(); private final Thread _cleaner; + private ExpiredCallback _callback; public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback callback) { - super(numBuckets, callback); + _rotatingMap = new RotatingMap<>(numBuckets); + + _callback = callback; final long expirationMillis = expirationSecs * 1000L; final long sleepTime = expirationMillis / (numBuckets-1); _cleaner = new Thread(new Runnable() { public void run() { try { while(true) { + Map dead = null; Time.sleep(sleepTime); - rotate(); + synchronized(_lock) { + dead = _rotatingMap.rotate(); + } + if(_callback!=null) { + for(Entry entry: dead.entrySet()) { + _callback.expire(entry.getKey(), entry.getValue()); + } + } } } catch (InterruptedException ex) { @@ -77,38 +91,31 @@ public TimeCacheMap(int expirationSecs, int numBuckets) { public boolean containsKey(K key) { synchronized(_lock) { - return super.containsKey(key); + return _rotatingMap.containsKey(key); } } public V get(K key) { synchronized(_lock) { - return super.get(key); + return _rotatingMap.get(key); } } public void put(K key, V value) { synchronized(_lock) { - super.put(key, value); + _rotatingMap.put(key, value); } } public Object remove(K key) { synchronized(_lock) { - return super.remove(key); + return _rotatingMap.remove(key); } } public int size() { synchronized(_lock) { - return super.size(); - } - } - - @Override - public Map rotate() { - synchronized (_lock) { - return super.rotate(); + return _rotatingMap.size(); } }