Navigation Menu

Skip to content

Commit

Permalink
Eliminate unnecessary proxy fetches
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed May 8, 2013
1 parent 82f97c4 commit 63beb58
Show file tree
Hide file tree
Showing 2 changed files with 294 additions and 55 deletions.
165 changes: 127 additions & 38 deletions src/java/voldemort/store/rebalancing/RedirectingStore.java
Expand Up @@ -107,19 +107,27 @@ public boolean getIsRedirectingStoreEnabled() {
return this.isRedirectingStoreEnabled.get();
}

@Override
public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws VoldemortException {
Integer redirectNode = getProxyNode(key.get());
public List<Versioned<byte[]>> redirectingGet(ByteArray key, byte[] transforms)
throws VoldemortException {
/**
* If I am rebalancing for this key, try to do remote get(), put it
* locally first to get the correct version ignoring any
* {@link ObsoleteVersionException}
*/
// FIXME AR There is some unneccessary performance hit here. keys
// already moved over will always result in OVE and time spent waiting
// on this (esp for cross zone moves) would be a total waste. Need to
// rework to this logic to incur this when necessary
Integer redirectNode = getProxyNode(key.get());
if(redirectNode != null) {
// First, attempt a local get
List<Versioned<byte[]>> vals = getInnerStore().get(key, transforms);
// If found, return
if(!vals.isEmpty()) {
// FIXME AR there is a subtle race here. This can return some
// versions of the key without all of it being transferred over
// by the background fetch. This needs to be fixed by adding
// support for bulk atomic writes of multiple versions of the
// same key into storage
return vals;
}

if(logger.isTraceEnabled()) {
logger.trace("Proxying GET on stealer:" + metadata.getNodeId() + " for key "
+ ByteUtils.toHexString(key.get()) + " to node:" + redirectNode);
Expand All @@ -129,16 +137,22 @@ public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws Vold
return getInnerStore().get(key, transforms);
}

@Override
public List<Version> getVersions(ByteArray key) {
Integer redirectNode = getProxyNode(key.get());
public List<Version> redirectingGetVersions(ByteArray key) {
/**
* If I am rebalancing for this key, try to do remote get(), put it
* locally first to get the correct version ignoring any
* {@link ObsoleteVersionException}.
*/
// TODO same fixes apply here as in get(..) above
Integer redirectNode = getProxyNode(key.get());
if(redirectNode != null) {
// First, attempt a local getVersions()
List<Version> versions = getInnerStore().getVersions(key);
// If found some versions, return
if(!versions.isEmpty()) {
// FIXME AR same subtle race here
return versions;
}

if(logger.isTraceEnabled()) {
logger.trace("Proxying GETVERSIONS on stealer:" + metadata.getNodeId()
+ " for key " + ByteUtils.toHexString(key.get()) + " to node:"
Expand All @@ -149,19 +163,38 @@ public List<Version> getVersions(ByteArray key) {
return getInnerStore().getVersions(key);
}

@Override
public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
Map<ByteArray, byte[]> transforms)
public Map<ByteArray, List<Versioned<byte[]>>> redirectingGetAll(Iterable<ByteArray> keys,
Map<ByteArray, byte[]> transforms)
throws VoldemortException {

// first determine how many keys are already present locally.
Map<ByteArray, List<Versioned<byte[]>>> localVals = getInnerStore().getAll(keys, transforms);
Map<ByteArray, Integer> keyToProxyNodeMap = Maps.newHashMapWithExpectedSize(Iterables.size(keys));
for(ByteArray key: keys) {
// Relies on inner getAll() to not return an entry for the key in
// the result hashmap, in case the key does not exist on storage
if(localVals.containsKey(key)) {
// if you have it locally, move to next key
continue;
}
Integer redirectNode = getProxyNode(key.get());
/*
* Else check if we are rebalancing for the key.. Intuitively, if we
* don't have the key, then we must be rebalancing for that key,
* right? Otherwise the key should have been here? Wrong, what if
* this is a non-existent key. We can't really confirm key does not
* exist, without going to the proxy node..
*/
if(redirectNode != null) {
/*
* If we are indeed rebalancing for the key, then a proxy fetch
* will make things certain.
*/
keyToProxyNodeMap.put(key, redirectNode);
}
}
// FIXME AR Same optimizations. Go to the proxy only for keys that this
// node does not have

// If all keys were present locally, return. If not, do proxy fetch
if(!keyToProxyNodeMap.isEmpty()) {
if(logger.isTraceEnabled()) {
String keyStr = "";
Expand All @@ -170,43 +203,58 @@ public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
logger.trace("Proxying GETALL on stealer:" + metadata.getNodeId() + " for keys "
+ keyStr);
}
// Issue proxy fetches for non-rebalancing keys that did not exist
// locally
proxyGetAllAndLocalPut(keyToProxyNodeMap, transforms);
// Now, issue a getAll for those keys alone
Map<ByteArray, List<Versioned<byte[]>>> proxyFetchedVals = getInnerStore().getAll(keyToProxyNodeMap.keySet(),
transforms);
// Merge the results
for(Map.Entry<ByteArray, List<Versioned<byte[]>>> entry: proxyFetchedVals.entrySet()) {
localVals.put(entry.getKey(), entry.getValue());
}
}

return getInnerStore().getAll(keys, transforms);
return localVals;
}

@Override
public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
public void redirectingPut(ByteArray key, Versioned<byte[]> value, byte[] transforms)
throws VoldemortException {

Cluster currentCluster = metadata.getCluster();
// FIXME AR O(n) linear lookup of storedef
// TODO O(n) linear lookup of storedef here. But not critical since this
// has been eliminated from the "fast path". This hit is only during
// rebalancing
StoreDefinition storeDef = metadata.getStoreDef(getName());

// defensively, error out if this is a read-only store and someone is
// doing puts against it. We don't to do extra work and fill the log
// with errors in that case.
/*
* defensively, error out if this is a read-only store and someone is
* doing puts against it. We don't to do extra work and fill the log
* with errors in that case.
*/
if(storeDef.getType().compareTo(ReadOnlyStorageConfiguration.TYPE_NAME) == 0) {
throw new UnsupportedOperationException("put() not supported on read-only store");
}
StoreRoutingPlan currentRoutingPlan = new StoreRoutingPlan(currentCluster, storeDef);
Integer redirectNode = getProxyNode(currentRoutingPlan, storeDef, key.get());

/**
* If I am rebalancing for this key, try to do remote get() , put it
* locally first to get the correct version ignoring any
* {@link ObsoleteVersionException}
*/
// FIXME AR same optimizations apply here.. If the key already exists
// skip this
if(redirectNode != null) {
if(logger.isTraceEnabled()) {
logger.trace("Proxying GET (before PUT) on stealer:" + metadata.getNodeId()
+ " for key " + ByteUtils.toHexString(key.get()) + " to node:"
+ redirectNode);
/*
* first check if the key exists locally. If so, it means, it has
* been moved over (either by a proxy fetch or background fetch) and
* we are good simply issuing the put on top of that.
*/
List<Versioned<byte[]>> vals = getInnerStore().get(key, transforms);
if(vals.isEmpty()) {
// if not, then go proxy fetch it
if(logger.isTraceEnabled()) {
logger.trace("Proxying GET (before PUT) on stealer:" + metadata.getNodeId()
+ " for key " + ByteUtils.toHexString(key.get()) + " to node:"
+ redirectNode);
}
proxyGetAndLocalPut(key, redirectNode, transforms);
}
proxyGetAndLocalPut(key, redirectNode, transforms);
}

// put the data locally, if this step fails, there will be no proxy puts
Expand All @@ -230,6 +278,45 @@ public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
}
}

@Override
public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws VoldemortException {
if(isServerRebalancing()) {
return redirectingGet(key, transforms);
} else {
return getInnerStore().get(key, transforms);
}
}

@Override
public List<Version> getVersions(ByteArray key) {
if(isServerRebalancing()) {
return redirectingGetVersions(key);
} else {
return getInnerStore().getVersions(key);
}
}

@Override
public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
Map<ByteArray, byte[]> transforms)
throws VoldemortException {
if(isServerRebalancing()) {
return redirectingGetAll(keys, transforms);
} else {
return getInnerStore().getAll(keys, transforms);
}
}

@Override
public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
throws VoldemortException {
if(isServerRebalancing()) {
redirectingPut(key, value, transforms);
} else {
getInnerStore().put(key, value, transforms);
}
}

/**
* TODO : Handle delete correctly.
* <p>
Expand All @@ -253,6 +340,10 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException
return getInnerStore().delete(key, version);
}

public boolean isServerRebalancing() {
return VoldemortState.REBALANCING_MASTER_SERVER.equals(metadata.getServerState());
}

/**
* Checks if the server has to do any proxying of gets/puts to another
* server, as a part of an ongoing rebalance operation.
Expand Down Expand Up @@ -281,9 +372,8 @@ public boolean delete(ByteArray key, Version version) throws VoldemortException
private Integer getProxyNode(StoreRoutingPlan currentRoutingPlan,
StoreDefinition storeDef,
byte[] key) {
// get out if not rebalancing or if redirecting is disabled.
if(!VoldemortState.REBALANCING_MASTER_SERVER.equals(metadata.getServerState())
|| !isRedirectingStoreEnabled.get()) {
// get out if redirecting is disabled.
if(!isRedirectingStoreEnabled.get()) {
return null;
}

Expand Down Expand Up @@ -336,7 +426,6 @@ private Integer getProxyNode(StoreRoutingPlan currentRoutingPlan,
*/
private Integer getProxyNode(byte[] key) {
Cluster currentCluster = metadata.getCluster();
// FIXME AR O(n) linear lookup of storedef
StoreDefinition storeDef = metadata.getStoreDef(getName());
StoreRoutingPlan currentRoutingPlan = new StoreRoutingPlan(currentCluster, storeDef);
return getProxyNode(currentRoutingPlan, storeDef, key);
Expand Down

0 comments on commit 63beb58

Please sign in to comment.