Skip to content

Commit

Permalink
Add RetentionEnforcingStore, with support for online retention on reads
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Oct 19, 2012
1 parent 08e5258 commit cd16456
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 32 deletions.
24 changes: 24 additions & 0 deletions src/java/voldemort/server/VoldemortConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ public class VoldemortConfig implements Serializable {
private int retentionCleanupScheduledPeriodInHour;
private int retentionCleanupFirstStartDayOfWeek;
private boolean retentionCleanupPinStartTime;
private boolean enforceRetentionPolicyOnRead;
private boolean deleteExpiredValuesOnRead;

private int maxRebalancingAttempt;
private long rebalancingTimeoutSec;
Expand Down Expand Up @@ -407,6 +409,12 @@ public VoldemortConfig(Props props) {
// should the retention job always start at the 'start time' specified
this.retentionCleanupPinStartTime = props.getBoolean("retention.cleanup.pin.start.time",
true);
// should the online reads filter out stale values when reading them ?
this.enforceRetentionPolicyOnRead = props.getBoolean("enforce.retention.policy.on.read",
false);
// should the online reads issue deletes to clear out stale values when
// reading them?
this.deleteExpiredValuesOnRead = props.getBoolean("delete.expired.values.on.read", false);

// save props for access from plugins
this.allProps = props;
Expand Down Expand Up @@ -1756,6 +1764,22 @@ public void setRetentionCleanupPinStartTime(boolean retentionCleanupFixStartTime
this.retentionCleanupPinStartTime = retentionCleanupFixStartTime;
}

public boolean isEnforceRetentionPolicyOnRead() {
return enforceRetentionPolicyOnRead;
}

public void setEnforceRetentionPolicyOnRead(boolean enforceRetentionPolicyOnRead) {
this.enforceRetentionPolicyOnRead = enforceRetentionPolicyOnRead;
}

public boolean isDeleteExpiredValuesOnRead() {
return deleteExpiredValuesOnRead;
}

public void setDeleteExpiredValuesOnRead(boolean deleteExpiredValuesOnRead) {
this.deleteExpiredValuesOnRead = deleteExpiredValuesOnRead;
}

public int getAdminSocketTimeout() {
return adminSocketTimeout;
}
Expand Down
63 changes: 41 additions & 22 deletions src/java/voldemort/server/storage/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import voldemort.store.readonly.ReadOnlyStorageEngine;
import voldemort.store.rebalancing.RebootstrappingStore;
import voldemort.store.rebalancing.RedirectingStore;
import voldemort.store.retention.RetentionEnforcingStore;
import voldemort.store.routed.RoutedStore;
import voldemort.store.routed.RoutedStoreFactory;
import voldemort.store.slop.SlopStorageEngine;
Expand Down Expand Up @@ -600,6 +601,10 @@ public void openStore(StoreDefinition storeDef) {
public void updateRoutingStrategy(RoutingStrategy updatedRoutingStrategy) {
((ReadOnlyStorageEngine) engine).setRoutingStrategy(updatedRoutingStrategy);
}

public void updateStoreDefinition(StoreDefinition storeDef) {
return;
}
});
}

Expand Down Expand Up @@ -715,31 +720,45 @@ public void registerEngine(StorageEngine<ByteArray, byte[], byte[]> engine,
cluster.getName(),
SystemTime.INSTANCE);
if(!isSlop) {
if(voldemortConfig.isEnableRebalanceService() && !isReadOnly && !isMetadata && !isView) {
store = new RedirectingStore(store,
metadata,
storeRepository,
failureDetector,
storeFactory);
if(voldemortConfig.isJmxEnabled()) {
MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName name = null;
if(this.voldemortConfig.isEnableJmxClusterName())
name = JmxUtils.createObjectName(cluster.getName()
+ "."
+ JmxUtils.getPackageName(RedirectingStore.class),
store.getName());
else
name = JmxUtils.createObjectName(JmxUtils.getPackageName(RedirectingStore.class),
store.getName());
if(!isReadOnly && !isMetadata && !isView) {
// wrap store to enforce retention policy
if(voldemortConfig.isEnforceRetentionPolicyOnRead()) {
RetentionEnforcingStore retentionEnforcingStore = new RetentionEnforcingStore(store,
metadata.getStoreDef(store.getName()),
voldemortConfig.isDeleteExpiredValuesOnRead(),
SystemTime.INSTANCE);
metadata.addMetadataStoreListener(store.getName(), retentionEnforcingStore);
store = retentionEnforcingStore;
}

synchronized(mbeanServer) {
if(mbeanServer.isRegistered(name))
JmxUtils.unregisterMbean(mbeanServer, name);
if(voldemortConfig.isEnableRebalanceService()) {
store = new RedirectingStore(store,
metadata,
storeRepository,
failureDetector,
storeFactory);
if(voldemortConfig.isJmxEnabled()) {
MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName name = null;
if(this.voldemortConfig.isEnableJmxClusterName())
name = JmxUtils.createObjectName(cluster.getName()
+ "."
+ JmxUtils.getPackageName(RedirectingStore.class),
store.getName());
else
name = JmxUtils.createObjectName(JmxUtils.getPackageName(RedirectingStore.class),
store.getName());

synchronized(mbeanServer) {
if(mbeanServer.isRegistered(name))
JmxUtils.unregisterMbean(mbeanServer, name);

JmxUtils.registerMbean(mbeanServer,
JmxUtils.createModelMBean(store),
name);
}

JmxUtils.registerMbean(mbeanServer, JmxUtils.createModelMBean(store), name);
}

}
}

Expand Down
38 changes: 28 additions & 10 deletions src/java/voldemort/store/metadata/MetadataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ public static enum VoldemortState {
public final Lock readLock = lock.readLock();
public final Lock writeLock = lock.writeLock();

private final ConcurrentHashMap<String, MetadataStoreListener> storeNameTolisteners;
private final ConcurrentHashMap<String, List<MetadataStoreListener>> storeNameTolisteners;

private static final Logger logger = Logger.getLogger(MetadataStore.class);

public MetadataStore(Store<String, String, String> innerStore, int nodeId) {
this.innerStore = innerStore;
this.metadataCache = new HashMap<String, Versioned<Object>>();
this.storeNameTolisteners = new ConcurrentHashMap<String, MetadataStoreListener>();
this.storeNameTolisteners = new ConcurrentHashMap<String, List<MetadataStoreListener>>();

init(nodeId);
}
Expand All @@ -130,10 +130,12 @@ public void addMetadataStoreListener(String storeName, MetadataStoreListener lis
if(this.storeNameTolisteners == null)
throw new VoldemortException("MetadataStoreListener must be non-null");

this.storeNameTolisteners.put(storeName, listener);
if(!this.storeNameTolisteners.containsKey(storeName))
this.storeNameTolisteners.put(storeName, new ArrayList<MetadataStoreListener>(2));
this.storeNameTolisteners.get(storeName).add(listener);
}

public void remoteMetadataStoreListener(String storeName) {
public void removeMetadataStoreListener(String storeName) {
if(this.storeNameTolisteners == null)
throw new VoldemortException("MetadataStoreListener must be non-null");

Expand Down Expand Up @@ -351,6 +353,19 @@ public RoutingStrategy getRoutingStrategy(String storeName) {
return strategy;
}

/**
* Returns the list of store defs as a map
*
* @param storeDefs
* @return
*/
private HashMap<String, StoreDefinition> makeStoreDefinitionMap(List<StoreDefinition> storeDefs) {
HashMap<String, StoreDefinition> storeDefMap = new HashMap<String, StoreDefinition>();
for(StoreDefinition storeDef: storeDefs)
storeDefMap.put(storeDef.getName(), storeDef);
return storeDefMap;
}

/**
* Changes to cluster OR store definition metadata results in routing
* strategies changing. These changes need to be propagated to all the
Expand All @@ -365,8 +380,9 @@ private void updateRoutingStrategies(Cluster cluster, List<StoreDefinition> stor
clock = (VectorClock) metadataCache.get(ROUTING_STRATEGY_KEY).getVersion();

logger.info("Updating routing strategy for all stores");
HashMap<String, StoreDefinition> storeDefMap = makeStoreDefinitionMap(storeDefs);
HashMap<String, RoutingStrategy> routingStrategyMap = createRoutingStrategyMap(cluster,
storeDefs);
storeDefMap);
this.metadataCache.put(ROUTING_STRATEGY_KEY,
new Versioned<Object>(routingStrategyMap,
clock.incremented(getNodeId(),
Expand All @@ -376,8 +392,10 @@ private void updateRoutingStrategies(Cluster cluster, List<StoreDefinition> stor
RoutingStrategy updatedRoutingStrategy = routingStrategyMap.get(storeName);
if(updatedRoutingStrategy != null) {
try {
storeNameTolisteners.get(storeName)
.updateRoutingStrategy(updatedRoutingStrategy);
for(MetadataStoreListener listener: storeNameTolisteners.get(storeName)) {
listener.updateRoutingStrategy(updatedRoutingStrategy);
listener.updateStoreDefinition(storeDefMap.get(storeName));
}
} catch(Exception e) {
if(logger.isEnabledFor(Level.WARN))
logger.warn(e, e);
Expand All @@ -393,7 +411,7 @@ private void updateRoutingStrategies(Cluster cluster, List<StoreDefinition> stor
*/
private void initSystemRoutingStrategies(Cluster cluster) {
HashMap<String, RoutingStrategy> routingStrategyMap = createRoutingStrategyMap(cluster,
getSystemStoreDefList());
makeStoreDefinitionMap(getSystemStoreDefList()));
this.metadataCache.put(SYSTEM_ROUTING_STRATEGY_KEY,
new Versioned<Object>(routingStrategyMap));
}
Expand Down Expand Up @@ -537,10 +555,10 @@ private void initCache(String key, Object defaultValue) {
}

private HashMap<String, RoutingStrategy> createRoutingStrategyMap(Cluster cluster,
List<StoreDefinition> storeDefs) {
HashMap<String, StoreDefinition> storeDefs) {
HashMap<String, RoutingStrategy> map = new HashMap<String, RoutingStrategy>();

for(StoreDefinition store: storeDefs) {
for(StoreDefinition store: storeDefs.values()) {
map.put(store.getName(), routingFactory.updateRoutingStrategy(store, cluster));
}

Expand Down
3 changes: 3 additions & 0 deletions src/java/voldemort/store/metadata/MetadataStoreListener.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package voldemort.store.metadata;

import voldemort.routing.RoutingStrategy;
import voldemort.store.StoreDefinition;

public interface MetadataStoreListener {

void updateRoutingStrategy(RoutingStrategy routingStrategyMap);

void updateStoreDefinition(StoreDefinition storeDef);
}
110 changes: 110 additions & 0 deletions src/java/voldemort/store/retention/RetentionEnforcingStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package voldemort.store.retention;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

import voldemort.VoldemortException;
import voldemort.routing.RoutingStrategy;
import voldemort.store.DelegatingStore;
import voldemort.store.Store;
import voldemort.store.StoreDefinition;
import voldemort.store.StoreUtils;
import voldemort.store.metadata.MetadataStoreListener;
import voldemort.utils.ByteArray;
import voldemort.utils.Time;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;

/**
* Wraps the storage layer and ensures we don't return any values that are
* stale. Optionally, deletes the expired versions.
*
*/
public class RetentionEnforcingStore extends DelegatingStore<ByteArray, byte[], byte[]> implements
MetadataStoreListener {

private volatile StoreDefinition storeDef;
private boolean deleteExpiredEntries;
private volatile long retentionTimeMs;
private Time time;

public RetentionEnforcingStore(Store<ByteArray, byte[], byte[]> innerStore,
StoreDefinition storeDef,
boolean deleteExpiredEntries,
Time time) {
super(innerStore);
updateStoreDefinition(storeDef);
this.deleteExpiredEntries = deleteExpiredEntries;
this.time = time;
}

public void updateRoutingStrategy(RoutingStrategy routingStrategyMap) {
return; // no-op
}

/**
* Updates the store definition object and the retention time based on the
* updated store definition
*/
public void updateStoreDefinition(StoreDefinition storeDef) {
this.storeDef = storeDef;
if(storeDef.hasRetentionPeriod())
this.retentionTimeMs = storeDef.getRetentionDays() * Time.MS_PER_DAY;
}

/**
* Performs the filtering of the expired entries based on retention time.
* Optionally, deletes them also
*
* @param key the key whose value is to be deleted if needed
* @param vals set of values to be filtered out
* @return filtered list of values which are currently valid
*/
private List<Versioned<byte[]>> filterExpiredEntries(ByteArray key, List<Versioned<byte[]>> vals) {
Iterator<Versioned<byte[]>> valsIterator = vals.iterator();
while(valsIterator.hasNext()) {
Versioned<byte[]> val = valsIterator.next();
VectorClock clock = (VectorClock) val.getVersion();
// omit if expired
if(clock.getTimestamp() < (time.getMilliseconds() - this.retentionTimeMs)) {
valsIterator.remove();
// delete stale value if configured
if(deleteExpiredEntries) {
getInnerStore().delete(key, clock);
}
}
}
return vals;
}

@Override
public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
Map<ByteArray, byte[]> transforms)
throws VoldemortException {
StoreUtils.assertValidKeys(keys);
Map<ByteArray, List<Versioned<byte[]>>> results = getInnerStore().getAll(keys, transforms);
if(!storeDef.hasRetentionPeriod())
return results;

for(ByteArray key: results.keySet()) {
List<Versioned<byte[]>> filteredVals = filterExpiredEntries(key, results.get(key));
// remove/update the entry for the key, depending on how much is
// filtered
if(!filteredVals.isEmpty())
results.put(key, filteredVals);
else
results.remove(key);
}
return results;
}

@Override
public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws VoldemortException {
StoreUtils.assertValidKey(key);
List<Versioned<byte[]>> vals = getInnerStore().get(key, transforms);
if(!storeDef.hasRetentionPeriod())
return vals;
return filterExpiredEntries(key, vals);
}
}
Loading

0 comments on commit cd16456

Please sign in to comment.