Skip to content

Commit

Permalink
Adding basic support for abortable rebalances, more to follow
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Apr 18, 2013
1 parent db6ac44 commit fb8f1a8
Show file tree
Hide file tree
Showing 21 changed files with 1,941 additions and 1,340 deletions.
14 changes: 6 additions & 8 deletions contrib/ec2-testing/test/voldemort/utils/Ec2RebalanceTest.java
Expand Up @@ -44,7 +44,7 @@
import voldemort.client.protocol.RequestFormatType;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.client.rebalance.AbstractRebalanceTest;
import voldemort.client.rebalance.AbstractNonZonedRebalanceTest;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.server.RequestRoutingType;
Expand All @@ -55,7 +55,7 @@

/**
*/
public class Ec2RebalanceTest extends AbstractRebalanceTest {
public class Ec2RebalanceTest extends AbstractNonZonedRebalanceTest {

private static int NUM_KEYS;

Expand All @@ -66,7 +66,10 @@ public class Ec2RebalanceTest extends AbstractRebalanceTest {

private Map<Integer, String> nodeIdsInv = new HashMap<Integer, String>();
private List<String> activeHostNames = new ArrayList<String>();
private boolean useDonorBased = true;

public Ec2RebalanceTest() {
super(true, true);
}

@BeforeClass
public static void ec2Setup() throws Exception {
Expand Down Expand Up @@ -209,11 +212,6 @@ protected void stopServer(List<Integer> nodesToStop) throws Exception {
stopCluster(hostsToStop, ec2RebalanceTestConfig);
}

@Override
protected boolean useDonorBased() {
return this.useDonorBased;
}

private static class Ec2RebalanceTestConfig extends Ec2RemoteTestConfig {

private String configDirName;
Expand Down
5 changes: 2 additions & 3 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -73,7 +73,6 @@
import voldemort.store.StoreUtils;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.metadata.MetadataStore.VoldemortState;
import voldemort.store.mysql.MysqlStorageConfiguration;
import voldemort.store.readonly.ReadOnlyStorageConfiguration;
import voldemort.store.readonly.ReadOnlyStorageFormat;
import voldemort.store.readonly.ReadOnlyStorageMetadata;
Expand Down Expand Up @@ -2118,7 +2117,8 @@ public void updateEntries(int nodeId,
}

/**
* Fetch key/value tuples belonging to a node with given key values
* Fetch key/value tuples from a given server, directly from storage
* engine
*
* <p>
* Entries are being queried synchronously
Expand All @@ -2139,7 +2139,6 @@ public Iterator<QueryKeyResult> queryKeys(int nodeId,

try {
store = adminStoreClient.getSocketStore(nodeId, storeName);

} catch(Exception e) {
throw new VoldemortException(e);
}
Expand Down
52 changes: 37 additions & 15 deletions src/java/voldemort/server/protocol/admin/AsyncOperationService.java
@@ -1,12 +1,12 @@
/*
* Copyright 2008-2010 LinkedIn, Inc
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
Expand All @@ -16,10 +16,13 @@

package voldemort.server.protocol.admin;

import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.ImmutableSet;
import org.apache.log4j.Logger;

import voldemort.VoldemortException;
Expand All @@ -29,11 +32,14 @@
import voldemort.common.service.SchedulerService;
import voldemort.common.service.ServiceType;

import com.google.common.collect.ImmutableSet;

/**
* Asynchronous job scheduler for admin service operations.
*
* TODO: requesting a unique id, then creating an operation with that id seems like a bad API design.
*
*
* TODO: requesting a unique id, then creating an operation with that id seems
* like a bad API design.
*
*/
@JmxManaged(description = "Asynchronous operation execution")
public class AsyncOperationService extends AbstractService {
Expand Down Expand Up @@ -96,6 +102,19 @@ public String getStatus(int id) {
}
}

public List<Integer> getMatchingAsyncOperationList(String jobDescPattern, boolean showCompleted) {
List<Integer> operationIds = getAsyncOperationList(showCompleted);
List<Integer> matchingOperationIds = new ArrayList<Integer>(operationIds.size());
for(Integer operationId: operationIds) {
AsyncOperation operation = operations.get(operationId);
String operationDescription = operation.getStatus().getDescription();
if(operationDescription != null && operationDescription.indexOf(jobDescPattern) != -1) {
matchingOperationIds.add(operationId);
}
}
return matchingOperationIds;
}

@JmxOperation(description = "Retrieve all operations")
public String getAllAsyncOperations() {
String result;
Expand All @@ -108,23 +127,25 @@ public String getAllAsyncOperations() {
}

/**
* Get list of asynchronous operations on this node. By default, only the pending
* operations are returned.
* Get list of asynchronous operations on this node. By default, only the
* pending operations are returned.
*
* @param showCompleted Show completed operations
* @return A list of operation ids.
*/
public List<Integer> getAsyncOperationList(boolean showCompleted) {
/**
* Create a copy using an immutable set to avoid a {@link java.util.ConcurrentModificationException}
* Create a copy using an immutable set to avoid a
* {@link java.util.ConcurrentModificationException}
*/
Set<Integer> keySet = ImmutableSet.copyOf(operations.keySet());

if (showCompleted)
if(showCompleted)
return new ArrayList<Integer>(keySet);

List<Integer> keyList = new ArrayList<Integer>();
for (int key: keySet) {
if (!operations.get(key).getStatus().isComplete())
for(int key: keySet) {
if(!operations.get(key).getStatus().isComplete())
keyList.add(key);
}
return keyList;
Expand All @@ -142,7 +163,7 @@ public AsyncOperationStatus getOperationStatus(int requestId) {
public String stopAsyncOperation(int requestId) {
try {
stopOperation(requestId);
} catch (VoldemortException e) {
} catch(VoldemortException e) {
return e.getMessage();
}

Expand All @@ -158,6 +179,7 @@ public void stopOperation(int requestId) {

/**
* Generate a unique request id
*
* @return A new, guaranteed unique, request id
*/
public int getUniqueRequestId() {
Expand Down
6 changes: 3 additions & 3 deletions src/java/voldemort/server/storage/RepairJob.java
Expand Up @@ -97,8 +97,7 @@ public void run() {
long repairSlops = 0L;
long numDeletedKeys = 0;
while(iterator.hasNext()) {
Pair<ByteArray, Versioned<byte[]>> keyAndVal;
keyAndVal = iterator.next();
Pair<ByteArray, Versioned<byte[]>> keyAndVal = iterator.next();
List<Node> nodes = routingStrategy.routeRequest(keyAndVal.getFirst().get());

if(!hasDestination(nodes)) {
Expand All @@ -111,7 +110,8 @@ public void run() {
}
closeIterator(iterator);
localStats.put(storeDef.getName(), repairSlops);
logger.info("Completed store " + storeDef.getName());
logger.info("Completed store " + storeDef.getName() + " #Scanned:"
+ progress.get() + " #Deleted:" + numDeletedKeys);
}
}
} catch(Exception e) {
Expand Down
72 changes: 55 additions & 17 deletions src/java/voldemort/store/rebalancing/RedirectingStore.java
Expand Up @@ -91,22 +91,6 @@ public boolean getIsRedirectingStoreEnabled() {
return this.isRedirectingStoreEnabled.get();
}

@Override
public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
throws VoldemortException {
RebalancePartitionsInfo stealInfo = redirectingKey(key);

/**
* If I am rebalancing for this key, try to do remote get() , put it
* locally first to get the correct version ignoring any
* {@link ObsoleteVersionException}
*/
if(stealInfo != null)
proxyGetAndLocalPut(key, stealInfo.getDonorId(), transforms);

getInnerStore().put(key, value, transforms);
}

private RebalancePartitionsInfo redirectingKey(ByteArray key) {
if(VoldemortState.REBALANCING_MASTER_SERVER.equals(metadata.getServerState())
&& isRedirectingStoreEnabled.get()) {
Expand All @@ -129,6 +113,9 @@ public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws Vold
* locally first to get the correct version ignoring any
* {@link ObsoleteVersionException}
*/
// TODO this is correct per se. But, had a heavy performance hit for
// cross zone moves. Check locally first, if you cannot find, then go
// remote and store it locally.
if(stealInfo != null) {
proxyGetAndLocalPut(key, stealInfo.getDonorId(), transforms);
}
Expand All @@ -145,6 +132,9 @@ public List<Version> getVersions(ByteArray key) {
* locally first to get the correct version ignoring any
* {@link ObsoleteVersionException}.
*/
// TODO this is correct per se. But, had a heavy performance hit for
// cross zone moves. Check locally first, if you cannot find, then go
// remote and store it locally.
if(stealInfo != null) {
proxyGetAndLocalPut(key, stealInfo.getDonorId(), null);
}
Expand All @@ -163,14 +153,37 @@ public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
rebalancePartitionsInfoPerKey.put(key, info);
}
}

// TODO this is correct per se. But, had a heavy performance hit for
// cross zone moves. Check locally first, if you cannot find, then go
// remote and store it locally.
if(!rebalancePartitionsInfoPerKey.isEmpty()) {
proxyGetAllAndLocalPut(rebalancePartitionsInfoPerKey, transforms);
}

return getInnerStore().getAll(keys, transforms);
}

@Override
public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
throws VoldemortException {
RebalancePartitionsInfo stealInfo = redirectingKey(key);

/**
* If I am rebalancing for this key, try to do remote get() , put it
* locally first to get the correct version ignoring any
* {@link ObsoleteVersionException}
*/
if(stealInfo != null)
proxyGetAndLocalPut(key, stealInfo.getDonorId(), transforms);

// Just sychronous replication,if remote fails, I fail.
if(stealInfo != null)
proxyPut(key, value, transforms, stealInfo.getDonorId());
// TODO if I fail though for some reason, the aborting rebalance can
// surface phantom writes
getInnerStore().put(key, value, transforms);
}

/**
* TODO : Handle delete correctly.
* <p>
Expand Down Expand Up @@ -219,6 +232,31 @@ private List<Versioned<byte[]>> proxyGet(ByteArray key, int donorNodeId, byte[]
}
}

/**
* Replay the put to a remote proxy node so we will have the data available
* at the proxy host, in case the rebalancing fails
*
* @param key
* @param value
* @param transforms
* @param donorNodeId
* @throws ProxyUnreachableException if donor node can't be reached
*/
private void proxyPut(ByteArray key, Versioned<byte[]> value, byte[] transforms, int donorNodeId) {
Node donorNode = metadata.getCluster().getNodeById(donorNodeId);
checkNodeAvailable(donorNode);
long startNs = System.nanoTime();
try {
Store<ByteArray, byte[], byte[]> redirectingStore = getRedirectingSocketStore(getName(),
donorNodeId);
redirectingStore.put(key, value, transforms);
recordSuccess(donorNode, startNs);
} catch(UnreachableStoreException e) {
recordException(donorNode, startNs, e);
throw new ProxyUnreachableException("Failed to reach proxy node " + donorNode, e);
}
}

private void checkNodeAvailable(Node donorNode) {
if(!failureDetector.isAvailable(donorNode))
throw new ProxyUnreachableException("Failed to reach proxy node " + donorNode
Expand Down
Expand Up @@ -131,6 +131,17 @@ public ClientRequestExecutorFactory getFactory() {
return factory;
}

/***
* Create a new socket store to talk to a given server for a specific store
*
* Note: IGNORE_CHECKS will only be honored for Protobuf request format
*
* @param storeName
* @param hostName
* @param port
* @param requestFormatType protocol to use
* @param requestRoutingType routed/ignore checks/normal
*/
@Override
public SocketStore create(String storeName,
String hostName,
Expand Down
2 changes: 1 addition & 1 deletion src/java/voldemort/utils/ByteArray.java
Expand Up @@ -41,7 +41,7 @@ public boolean equals(Object obj) {

@Override
public String toString() {
return Arrays.toString(underlying);
return ByteUtils.toHexString(underlying);
}

/**
Expand Down

0 comments on commit fb8f1a8

Please sign in to comment.