Skip to content

Commit

Permalink
Merge commit 'db6ac447895255c84ef0e0cbd0303ffa6b45e05b'
Browse files Browse the repository at this point in the history
  • Loading branch information
ctasada committed Apr 17, 2013
2 parents 90c0b5f + db6ac44 commit 266221c
Show file tree
Hide file tree
Showing 12 changed files with 2,153 additions and 283 deletions.
10 changes: 8 additions & 2 deletions bin/run-class.sh
Expand Up @@ -21,8 +21,14 @@ if [ $# -lt 1 ]; then
exit 1
fi

script_path=$(readlink -f "$0")
script_dir=`dirname "$script_path"`
if [ $(uname) == 'Darwin' ]; then
pushd `dirname $0` > /dev/null
script_dir=$(pwd)
popd > /dev/null
else
script_path=$(readlink -f "$0")
script_dir=`dirname "$script_path"`
fi

base_dir=`dirname "$script_dir"`

Expand Down
11 changes: 7 additions & 4 deletions src/java/voldemort/client/protocol/admin/StreamingClient.java
Expand Up @@ -105,8 +105,8 @@ public class StreamingClient {

protected EventThrottler throttler;

AdminClient adminClient;
AdminClientConfig adminClientConfig;
private AdminClient adminClient;
private AdminClientConfig adminClientConfig;

String bootstrapURL;

Expand Down Expand Up @@ -135,7 +135,12 @@ public StreamingClient(StreamingClientConfig config) {
this.bootstrapURL = config.getBootstrapURL();
CHECKPOINT_COMMIT_SIZE = config.getBatchSize();
THROTTLE_QPS = config.getThrottleQPS();
adminClientConfig = new AdminClientConfig();
adminClient = new AdminClient(bootstrapURL, adminClientConfig, new ClientConfig());
}

public AdminClient getAdminClient() {
return adminClient;
}

public synchronized void updateThrottleLimit(int throttleQPS) {
Expand Down Expand Up @@ -297,8 +302,6 @@ public synchronized void initStreamingSessions(List<String> stores,
List<Integer> blackListedNodes) {

logger.info("Initializing a streaming session");
adminClientConfig = new AdminClientConfig();
adminClient = new AdminClient(bootstrapURL, adminClientConfig, new ClientConfig());
this.checkpointCallback = checkpointCallback;
this.recoveryCallback = recoveryCallback;
this.allowMerge = allowMerge;
Expand Down
10 changes: 10 additions & 0 deletions src/java/voldemort/store/StoreUtils.java
Expand Up @@ -215,4 +215,14 @@ public static List<String> getStoreNames(List<StoreDefinition> list, boolean ign
storeNameSet.add(def.getName());
return storeNameSet;
}

public static HashMap<String, StoreDefinition> getStoreDefsAsMap(List<StoreDefinition> storeDefs) {
if(storeDefs == null)
return null;

HashMap<String, StoreDefinition> storeDefMap = new HashMap<String, StoreDefinition>();
for(StoreDefinition def: storeDefs)
storeDefMap.put(def.getName(), def);
return storeDefMap;
}
}
10 changes: 5 additions & 5 deletions src/java/voldemort/store/routed/PipelineRoutedStore.java
Expand Up @@ -29,11 +29,11 @@
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.common.VoldemortOpCode;
import voldemort.routing.RoutingStrategyType;
import voldemort.store.CompositeVoldemortRequest;
import voldemort.store.Store;
import voldemort.store.StoreDefinition;
import voldemort.store.StoreRequest;
import voldemort.store.StoreUtils;
import voldemort.store.CompositeVoldemortRequest;
import voldemort.store.nonblockingstore.NonblockingStore;
import voldemort.store.routed.Pipeline.Event;
import voldemort.store.routed.Pipeline.Operation;
Expand Down Expand Up @@ -75,10 +75,10 @@
*/
public class PipelineRoutedStore extends RoutedStore {

private final Map<Integer, NonblockingStore> nonblockingStores;
private final Map<Integer, Store<ByteArray, Slop, byte[]>> slopStores;
private final Map<Integer, NonblockingStore> nonblockingSlopStores;
private final HintedHandoffStrategy handoffStrategy;
protected final Map<Integer, NonblockingStore> nonblockingStores;
protected final Map<Integer, Store<ByteArray, Slop, byte[]>> slopStores;
protected final Map<Integer, NonblockingStore> nonblockingSlopStores;
protected final HintedHandoffStrategy handoffStrategy;
private Zone clientZone;
private boolean zoneRoutingEnabled;
private PipelineRoutedStats stats;
Expand Down

0 comments on commit 266221c

Please sign in to comment.