Skip to content

Commit

Permalink
Proxy to donor implementation + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Apr 18, 2013
1 parent 927b02c commit d24d0b1
Show file tree
Hide file tree
Showing 13 changed files with 1,568 additions and 1,315 deletions.
4 changes: 2 additions & 2 deletions META-INF/MANIFEST.MF
@@ -1,8 +1,8 @@
Manifest-Version: 1.0
Ant-Version: Apache Ant 1.7.1
Created-By: 20.2-b06 (Sun Microsystems Inc.)
Voldemort-Implementation-Version: 1.3.0
Voldemort-Implementation-Version: 1.3.1
Implementation-Title: Voldemort
Implementation-Version: 1.3.0
Implementation-Version: 1.3.1
Implementation-Vendor: LinkedIn

4 changes: 3 additions & 1 deletion src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -1960,10 +1960,12 @@ public SocketStore getSocketStore(int nodeId, String storeName) {

SocketStore newSocketStore = null;
try {
// request format is always protobuf since IGNORE_CHECKS
// does not work otherwise
newSocketStore = clientPool.create(storeName,
node.getHost(),
node.getSocketPort(),
clientConfig.getRequestFormatType(),
RequestFormatType.PROTOCOL_BUFFERS,
RequestRoutingType.IGNORE_CHECKS);
} catch(Exception e) {
clientPool.close();
Expand Down
Expand Up @@ -131,13 +131,13 @@ public static RebalancePartitionsInfo create(Map<?, ?> map) {
List<Integer> partitionList = Utils.uncheckedCast(map.get(unbalancedStore
+ "replicaToAddPartitionList"
+ Integer.toString(replicaNo)));
if(partitionList.size() > 0)
if(partitionList != null && partitionList.size() > 0)
replicaToAddPartition.put(replicaNo, partitionList);

List<Integer> deletePartitionList = Utils.uncheckedCast(map.get(unbalancedStore
+ "replicaToDeletePartitionList"
+ Integer.toString(replicaNo)));
if(deletePartitionList.size() > 0)
if(deletePartitionList != null && deletePartitionList.size() > 0)
replicaToDeletePartitionList.put(replicaNo, deletePartitionList);
}

Expand Down
20 changes: 20 additions & 0 deletions src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -223,6 +223,7 @@ public class VoldemortConfig implements Serializable {
private int maxParallelStoresRebalancing;
private boolean rebalancingOptimization;
private boolean usePartitionScanForRebalance;
private boolean proxyPutsDuringRebalance;

public VoldemortConfig(Properties props) {
this(new Props(props));
Expand Down Expand Up @@ -462,6 +463,7 @@ public VoldemortConfig(Props props) {
this.rebalancingOptimization = props.getBoolean("rebalancing.optimization", true);
this.usePartitionScanForRebalance = props.getBoolean("use.partition.scan.for.rebalance",
true);
this.proxyPutsDuringRebalance = props.getBoolean("proxy.puts.during.rebalance", false);

this.failureDetectorImplementation = props.getString("failuredetector.implementation",
FailureDetectorConfig.DEFAULT_IMPLEMENTATION_CLASS_NAME);
Expand Down Expand Up @@ -2664,6 +2666,24 @@ public boolean usePartitionScanForRebalance() {
return usePartitionScanForRebalance;
}

/**
* If set to true, the puts to the new replicas will be relayed back to the
* original donor nodes, such that they exist if rebalance were to abort in
* the middle for some reason.
*
* <ul>
* <li>Property :"proxy.puts.during.rebalance"</li>
* <li>Default :false</li>
* </ul>
*/
public void setProxyPutsDuringRebalance(boolean proxyPutsDuringRebalance) {
this.proxyPutsDuringRebalance = proxyPutsDuringRebalance;
}

public boolean getProxyPutsDuringRebalance() {
return this.proxyPutsDuringRebalance;
}

/**
* Enables fast, efficient range scans to be used for rebalancing
*
Expand Down
8 changes: 7 additions & 1 deletion src/java/voldemort/server/rebalance/Rebalancer.java
Expand Up @@ -127,6 +127,12 @@ public synchronized void releaseRebalancingPermit(int nodeId) {
* In general we need to do [ cluster change -> swap -> rebalance state
* change ]
*
* NOTE: The update of the cluster metadata and the rebalancer state is not
* "atomic". Ergo, there could theoretically be a race where a client picks
* up new cluster metadata sends a request based on that, but the proxy
* bridges have not been setup and we either miss a proxy put or return a
* null for get/getalls
*
* @param cluster Cluster metadata to change
* @param rebalancePartitionsInfo List of rebalance partitions info
* @param swapRO Boolean to indicate swapping of RO store
Expand All @@ -144,7 +150,7 @@ public void rebalanceStateChange(Cluster cluster,
boolean rollback) {
Cluster currentCluster = metadataStore.getCluster();

logger.info("Doing rebalance state change with options [ cluster metadata change - "
logger.info("Server doing rebalance state change with options [ cluster metadata change - "
+ changeClusterMetadata + " ], [ changing rebalancing state - "
+ changeRebalanceState + " ], [ changing swapping RO - " + swapRO
+ " ], [ rollback - " + rollback + " ]");
Expand Down
3 changes: 2 additions & 1 deletion src/java/voldemort/server/storage/StorageService.java
Expand Up @@ -756,7 +756,8 @@ public void registerEngine(StorageEngine<ByteArray, byte[], byte[]> engine,
metadata,
storeRepository,
failureDetector,
storeFactory);
storeFactory,
voldemortConfig.getProxyPutsDuringRebalance());
if(voldemortConfig.isJmxEnabled()) {
MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName name = null;
Expand Down
146 changes: 102 additions & 44 deletions src/java/voldemort/store/rebalancing/RedirectingStore.java
Expand Up @@ -29,6 +29,8 @@
import voldemort.client.rebalance.RebalancePartitionsInfo;
import voldemort.cluster.Node;
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.routing.RoutingStrategy;
import voldemort.routing.RoutingStrategyFactory;
import voldemort.server.RequestRoutingType;
import voldemort.server.StoreRepository;
import voldemort.store.DelegatingStore;
Expand Down Expand Up @@ -66,18 +68,21 @@ public class RedirectingStore extends DelegatingStore<ByteArray, byte[], byte[]>
private final SocketStoreFactory storeFactory;
private FailureDetector failureDetector;
private AtomicBoolean isRedirectingStoreEnabled;
private boolean isProxyPutEnabled;

public RedirectingStore(Store<ByteArray, byte[], byte[]> innerStore,
MetadataStore metadata,
StoreRepository storeRepository,
FailureDetector detector,
SocketStoreFactory storeFactory) {
SocketStoreFactory storeFactory,
boolean isProxyPutEnabled) {
super(innerStore);
this.metadata = metadata;
this.storeRepository = storeRepository;
this.storeFactory = storeFactory;
this.failureDetector = detector;
this.isRedirectingStoreEnabled = new AtomicBoolean(true);
this.isProxyPutEnabled = isProxyPutEnabled;
}

@JmxSetter(name = "setRedirectingStoreEnabled", description = "Enable the redirecting store for this store")
Expand Down Expand Up @@ -108,42 +113,43 @@ private RebalancePartitionsInfo redirectingKey(ByteArray key) {
@Override
public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws VoldemortException {
RebalancePartitionsInfo stealInfo = redirectingKey(key);
logger.info("BEGIN GET from stealer:" + metadata.getNodeId() + " key "
+ ByteUtils.toHexString(key.get()));
/**
* If I am rebalancing for this key, try to do remote get(), put it
* locally first to get the correct version ignoring any
* {@link ObsoleteVersionException}
*/
// TODO this is correct per se. But, had a heavy performance hit for
// cross zone moves. Check locally first, if you cannot find, then go
// remote and store it locally.
// TODO These is some unneccessary performance hit here. keys already
// moved over will always result in OVE and time spent waiting on this
// (esp for cross zone moves) would be a total waste. Need to rework to
// this logic to incur this when necessary
if(stealInfo != null) {
if(logger.isTraceEnabled()) {
logger.trace("Proxying GET on stealer:" + metadata.getNodeId() + " for key "
+ ByteUtils.toHexString(key.get()) + " to donor:"
+ stealInfo.getDonorId());
}
proxyGetAndLocalPut(key, stealInfo.getDonorId(), transforms);
}
logger.info("END GET from stealer:" + metadata.getNodeId() + " key "
+ ByteUtils.toHexString(key.get()));
return getInnerStore().get(key, transforms);
}

@Override
public List<Version> getVersions(ByteArray key) {
RebalancePartitionsInfo stealInfo = redirectingKey(key);
logger.info("BEGIN GETVERSIONS from stealer:" + metadata.getNodeId() + " key "
+ ByteUtils.toHexString(key.get()));
/**
* If I am rebalancing for this key, try to do remote get(), put it
* locally first to get the correct version ignoring any
* {@link ObsoleteVersionException}.
*/
// TODO this is correct per se. But, had a heavy performance hit for
// cross zone moves. Check locally first, if you cannot find, then go
// remote and store it locally.
// TODO same fixes apply here as in get(..) above
if(stealInfo != null) {
if(logger.isTraceEnabled()) {
logger.trace("Proxying GETVERSIONS on stealer:" + metadata.getNodeId()
+ " for key " + ByteUtils.toHexString(key.get()) + " to donor:"
+ stealInfo.getDonorId());
}
proxyGetAndLocalPut(key, stealInfo.getDonorId(), null);
}
logger.info("END GETVERSIONS from stealer:" + metadata.getNodeId() + " key "
+ ByteUtils.toHexString(key.get()));
return getInnerStore().getVersions(key);
}

Expand All @@ -158,10 +164,16 @@ public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
rebalancePartitionsInfoPerKey.put(key, info);
}
}
// TODO this is correct per se. But, had a heavy performance hit for
// cross zone moves. Check locally first, if you cannot find, then go
// remote and store it locally.
// TODO Same optimizations. Go to the proxy only for keys that this node
// does not have..
if(!rebalancePartitionsInfoPerKey.isEmpty()) {
if(logger.isTraceEnabled()) {
String keyStr = "";
for(ByteArray key: keys)
keyStr += key + " ";
logger.trace("Proxying GETALL on stealer:" + metadata.getNodeId() + " for keys "
+ keyStr);
}
proxyGetAllAndLocalPut(rebalancePartitionsInfoPerKey, transforms);
}

Expand All @@ -172,24 +184,31 @@ public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
throws VoldemortException {
RebalancePartitionsInfo stealInfo = redirectingKey(key);
logger.info("BEGIN PUT from stealer:" + metadata.getNodeId() + " key "
+ ByteUtils.toHexString(key.get()) + " value " + value);
/**
* If I am rebalancing for this key, try to do remote get() , put it
* locally first to get the correct version ignoring any
* {@link ObsoleteVersionException}
*/
if(stealInfo != null)
// TODO same optimizations apply here.. If the key already exists skip
// this
if(stealInfo != null) {
if(logger.isTraceEnabled()) {
logger.trace("Proxying GET (before PUT) on stealer:" + metadata.getNodeId()
+ " for key " + ByteUtils.toHexString(key.get()) + " to donor:"
+ stealInfo.getDonorId());
}
proxyGetAndLocalPut(key, stealInfo.getDonorId(), transforms);
}

// Just sychronous replication,if remote fails, I fail.
if(stealInfo != null)
proxyPut(key, value, transforms, stealInfo.getDonorId());
// TODO if I fail though for some reason, the aborting rebalance can
// surface phantom writes
logger.info("END PUT from stealer:" + metadata.getNodeId() + " key "
+ ByteUtils.toHexString(key.get()) + " value " + value);
// put the data locally, if this step fails, there will be no proxy puts
getInnerStore().put(key, value, transforms);

// TODO make this best effort async replication. Failures will be
// logged and the server log will be post processed in case the
// rebalancing fails and we move back to old topology
if(isProxyPutEnabled && stealInfo != null) {
proxyPut(key, value, transforms, stealInfo.getDonorId());
}
}

/**
Expand Down Expand Up @@ -241,8 +260,15 @@ private List<Versioned<byte[]>> proxyGet(ByteArray key, int donorNodeId, byte[]
}

/**
* Replay the put to a remote proxy node so we will have the data available
* at the proxy host, in case the rebalancing fails
* Replay the put to the donor proxy node so we will have the data available
* at the proxy host, in case the rebalancing fails.
*
* NOTE: This logic depends on the assumption that all the replicas for this
* partition in the old topology are now either donors during rebalancing or
* still active replicas. Otherwise, some old replica might not have any
* incoming proxy puts. As a result, if the rebalancing fails, the updates
* during the rebalancing window would not have made it to all the old
* replicas.
*
* @param key
* @param value
Expand All @@ -252,9 +278,27 @@ private List<Versioned<byte[]>> proxyGet(ByteArray key, int donorNodeId, byte[]
*/
private void proxyPut(ByteArray key, Versioned<byte[]> value, byte[] transforms, int donorNodeId) {
Node donorNode = metadata.getCluster().getNodeById(donorNodeId);

// Check if the donor is still a replica for the key. If we send proxy
// puts there, then we could have a situation where the online
// replicated write could lose out to the proxy put and hence fail the
// client operation with an OVE
// TODO not sure constructing this object everytime is a good idea. But
// the current design lack the ability to be able to clearly detect when
// the server goes back to NORMAL mode
RoutingStrategy routingStrategy = new RoutingStrategyFactory().updateRoutingStrategy(metadata.getStoreDef(getName()),
metadata.getCluster());
if(routingStrategy.routeRequest(key.get()).contains(donorNode)) {
if(logger.isTraceEnabled()) {
logger.trace("Donor " + donorNode.getId()
+ " still a replica in the updated cluster for the key "
+ ByteUtils.toHexString(key.get()) + ". Skipping proxy put");
}
return;
}

checkNodeAvailable(donorNode);
logger.info("BEGIN PROXY PUT for donor: " + donorNodeId + " from stealer:"
+ metadata.getNodeId() + " key " + ByteUtils.toHexString(key.get()));

long startNs = System.nanoTime();
try {
Store<ByteArray, byte[], byte[]> redirectingStore = getRedirectingSocketStore(getName(),
Expand All @@ -263,15 +307,26 @@ private void proxyPut(ByteArray key, Versioned<byte[]> value, byte[] transforms,
recordSuccess(donorNode, startNs);
} catch(UnreachableStoreException e) {
recordException(donorNode, startNs, e);
throw new ProxyUnreachableException("Failed to reach proxy node " + donorNode, e);
logger.error("Failed to reach proxy node " + donorNode, e);
} catch(ObsoleteVersionException ove) {
logger.error("OVE in proxy put for donor: " + donorNodeId + " from stealer:"
+ metadata.getNodeId() + " key "
+ ByteUtils.toHexString(key.get()),
ove);
// Proxy puts can get an OVE if somehow there are two stealers for
// the same donor and the other stealer's proxy put already got to
// the donor.. This will not result from online put winning, since
// we don't issue proxy puts if the donor is still a replica
if(logger.isTraceEnabled()) {
logger.trace("OVE in proxy put for donor: " + donorNodeId + " from stealer:"
+ metadata.getNodeId() + " key "
+ ByteUtils.toHexString(key.get()),
ove);
}
} catch(Exception e) {
// Just log the key.. Not sure having values in the log is a good
// idea.
logger.error("Unexpected exception in proxy put for key:"
+ ByteUtils.toHexString(key.get()) + " against donor node "
+ donorNodeId,
e);
}
logger.info("END PROXY PUT for donor: " + donorNodeId + " from stealer:"
+ metadata.getNodeId() + " key " + ByteUtils.toHexString(key.get()));
}

private void checkNodeAvailable(Node donorNode) {
Expand Down Expand Up @@ -345,18 +400,21 @@ private List<Versioned<byte[]>> proxyGetAndLocalPut(ByteArray key,
int donorId,
byte[] transforms)
throws VoldemortException {
logger.info("BEGIN PROXY GET LOCAL PUT for donor: " + donorId + " from stealer:"
+ metadata.getNodeId() + " key " + ByteUtils.toHexString(key.get()));
List<Versioned<byte[]>> proxyValues = proxyGet(key, donorId, transforms);
for(Versioned<byte[]> proxyValue: proxyValues) {
try {
getInnerStore().put(key, proxyValue, null);
} catch(ObsoleteVersionException e) {
logger.info("OVE in proxy get local put", e);
// TODO this is in TRACE because OVE is expected here, for keys
// that are already moved over or proxy got. This will become
// ERROR later post redesign
if(logger.isTraceEnabled())
logger.trace("OVE in proxy get local put for key "
+ ByteUtils.toHexString(key.get()) + " Stealer:"
+ metadata.getNodeId() + " Donor:" + donorId,
e);
}
}
logger.info("END PROXY GET LOCAL PUT for donor: " + donorId + " from stealer:"
+ metadata.getNodeId() + " key " + ByteUtils.toHexString(key.get()));
return proxyValues;
}

Expand Down

0 comments on commit d24d0b1

Please sign in to comment.