Skip to content

Commit

Permalink
convert ConsistencyChecker to use an executor as in 0.7 to ease mergi…
Browse files Browse the repository at this point in the history
…ng of CASSANDRA-1905

patch by jbellis

git-svn-id: https://svn.apache.org/repos/asf/cassandra/branches/cassandra-0.6@1053105 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
jbellis committed Dec 27, 2010
1 parent f36473b commit 56e8fd8
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 130 deletions.
24 changes: 0 additions & 24 deletions src/java/org/apache/cassandra/cache/ICacheExpungeHook.java

This file was deleted.

43 changes: 21 additions & 22 deletions src/java/org/apache/cassandra/service/ConsistencyChecker.java
Expand Up @@ -26,12 +26,15 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.cache.ICacheExpungeHook;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
Expand All @@ -41,7 +44,7 @@
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ExpiringMap;
import org.apache.cassandra.utils.WrappedRunnable;

/**
* ConsistencyChecker does the following:
Expand All @@ -59,8 +62,9 @@
*/
class ConsistencyChecker implements Runnable
{
private static Logger logger_ = Logger.getLogger(ConsistencyChecker.class);
private static ExpiringMap<String, String> readRepairTable_ = new ExpiringMap<String, String>(DatabaseDescriptor.getRpcTimeout());
private static Logger logger_ = LoggerFactory.getLogger(ConsistencyChecker.class);

private static ScheduledExecutorService executor_ = new ScheduledThreadPoolExecutor(1); // TODO add JMX

private final Row row_;
protected final List<InetAddress> replicas_;
Expand Down Expand Up @@ -126,7 +130,7 @@ public synchronized void response(Message response)
ReadCommand readCommand = constructReadMessage(false);
Message message = readCommand.makeReadMessage();
if (logger_.isDebugEnabled())
logger_.debug("Digest mismatch; re-reading " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
logger_.debug("Digest mismatch; re-reading " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
MessagingService.instance.addCallback(new DataRepairHandler(), message.getMessageId());
for (InetAddress endpoint : replicas_)
{
Expand All @@ -144,12 +148,12 @@ public synchronized void response(Message response)
}
}

class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String>
class DataRepairHandler implements IAsyncCallback
{
private final Collection<Message> responses_ = new LinkedBlockingQueue<Message>();
private final ReadResponseResolver readResponseResolver_;
private final int majority_;

public DataRepairHandler() throws IOException
{
readResponseResolver_ = new ReadResponseResolver(readCommand_.table, readCommand_.key, replicas_.size());
Expand All @@ -170,20 +174,15 @@ public synchronized void response(Message message)
readResponseResolver_.preprocess(message);
if (responses_.size() == majority_)
{
String messageId = message.getMessageId();
readRepairTable_.put(messageId, messageId, this);
}
}

public void callMe(String key, String value)
{
try
{
readResponseResolver_.resolve(responses_);
}
catch (Exception ex)
{
throw new RuntimeException(ex);
Runnable runnable = new WrappedRunnable()
{
public void runMayThrow() throws IOException, DigestMismatchException
{
readResponseResolver_.resolve(responses_);
}
};
// give remaining replicas until timeout to reply and get added to responses_
executor_.schedule(runnable, DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
}
}
}
Expand Down
126 changes: 42 additions & 84 deletions src/java/org/apache/cassandra/utils/ExpiringMap.java
Expand Up @@ -18,41 +18,34 @@

package org.apache.cassandra.utils;

import java.util.*;
import java.util.Map.Entry;
import java.util.Enumeration;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;

import org.apache.log4j.Logger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.cache.ICacheExpungeHook;
import org.cliffc.high_scale_lib.NonBlockingHashMap;

public class ExpiringMap<K, V>
{
private class CacheableObject
private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);

private static class CacheableObject<T>
{
private V value_;
public long age;
private final T value;
private final long age;

CacheableObject(V o)
CacheableObject(T o)
{
value_ = o;
value = o;
age = System.currentTimeMillis();
}

@Override
public boolean equals(Object o)
{
return value_.equals(o);
}

@Override
public int hashCode()
T getValue()
{
return value_.hashCode();
}

V getValue()
{
return value_;
return value;
}

boolean isReadyToDie(long expiration)
Expand All @@ -63,100 +56,65 @@ boolean isReadyToDie(long expiration)

private class CacheMonitor extends TimerTask
{
private long expiration_;
private final long expiration;

CacheMonitor(long expiration)
{
expiration_ = expiration;
this.expiration = expiration;
}

@Override
public void run()
{
Map<K, V> expungedValues = new HashMap<K, V>();
synchronized (cache_)
synchronized (cache)
{
Enumeration<K> e = cache_.keys();
Enumeration<K> e = cache.keys();
while (e.hasMoreElements())
{
K key = e.nextElement();
CacheableObject co = cache_.get(key);
if (co != null && co.isReadyToDie(expiration_))
CacheableObject co = cache.get(key);
if (co != null && co.isReadyToDie(expiration))
{
V v = co.getValue();
if (null != v)
{
expungedValues.put(key, v);
}
cache_.remove(key);
cache.remove(key);
}
}
}

/* Calling the hooks on the keys that have been expunged */
for (Entry<K, V> entry : expungedValues.entrySet())
{
K key = entry.getKey();
V value = entry.getValue();

ICacheExpungeHook<K, V> hook = hooks_.remove(key);
if (hook != null)
{
hook.callMe(key, value);
}
}
expungedValues.clear();
}
}

private Hashtable<K, CacheableObject> cache_;
private Map<K, ICacheExpungeHook<K, V>> hooks_;
private Timer timer_;
private static int counter_ = 0;
private static final Logger LOGGER = Logger.getLogger(ExpiringMap.class);

private void init(long expiration)
{
if (expiration <= 0)
{
throw new IllegalArgumentException("Argument specified must be a positive number");
}

cache_ = new Hashtable<K, CacheableObject>();
hooks_ = new Hashtable<K, ICacheExpungeHook<K, V>>();
timer_ = new Timer("CACHETABLE-TIMER-" + (++counter_), true);
timer_.schedule(new CacheMonitor(expiration), expiration, expiration);
}
private final NonBlockingHashMap<K, CacheableObject> cache = new NonBlockingHashMap<K, CacheableObject>();
private final Timer timer;
private static int counter = 0;

/*
* Specify the TTL for objects in the cache
* in milliseconds.
*/
public ExpiringMap(long expiration)
{
init(expiration);
if (expiration <= 0)
{
throw new IllegalArgumentException("Argument specified must be a positive number");
}

timer = new Timer("EXPIRING-MAP-TIMER-" + (++counter), true);
timer.schedule(new CacheMonitor(expiration), expiration / 2, expiration / 2);
}

public void shutdown()
{
timer_.cancel();
timer.cancel();
}

public void put(K key, V value)
{
cache_.put(key, new CacheableObject(value));
}

public void put(K key, V value, ICacheExpungeHook<K, V> hook)
{
put(key, value);
hooks_.put(key, hook);
cache.put(key, new CacheableObject<V>(value));
}

public V get(K key)
{
V result = null;
CacheableObject co = cache_.get(key);
CacheableObject<V> co = cache.get(key);
if (co != null)
{
result = co.getValue();
Expand All @@ -166,7 +124,7 @@ public V get(K key)

public V remove(K key)
{
CacheableObject co = cache_.remove(key);
CacheableObject<V> co = cache.remove(key);
V result = null;
if (co != null)
{
Expand All @@ -178,7 +136,7 @@ public V remove(K key)
public long getAge(K key)
{
long age = 0;
CacheableObject co = cache_.get(key);
CacheableObject<V> co = cache.get(key);
if (co != null)
{
age = co.age;
Expand All @@ -188,21 +146,21 @@ public long getAge(K key)

public int size()
{
return cache_.size();
return cache.size();
}

public boolean containsKey(K key)
{
return cache_.containsKey(key);
return cache.containsKey(key);
}

public boolean isEmpty()
{
return cache_.isEmpty();
return cache.isEmpty();
}

public Set<K> keySet()
{
return cache_.keySet();
return cache.keySet();
}
}

0 comments on commit 56e8fd8

Please sign in to comment.