Skip to content

Commit

Permalink
more refactorings on RoutedStore
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongjiewu committed Jul 31, 2013
1 parent b603a60 commit 1520f50
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 103 deletions.
5 changes: 3 additions & 2 deletions src/java/voldemort/client/AbstractStoreClientFactory.java
Expand Up @@ -125,7 +125,7 @@ public AbstractStoreClientFactory(ClientConfig config) {
this.routedStoreConfig = new RoutedStoreConfig(config);
this.routedStoreConfig.setJmxId(this.jmxId);

this.routedStoreFactory = new RoutedStoreFactory(this.routedStoreConfig);
this.routedStoreFactory = new RoutedStoreFactory();
this.routedStoreFactory.setThreadPool(this.threadPool);

this.clientSequencer = new AtomicInteger(0);
Expand Down Expand Up @@ -292,7 +292,8 @@ public <K, V, T> Store<K, V, T> getRawStore(String storeName,
nonblockingStores,
slopStores,
nonblockingSlopStores,
failureDetectorRef);
failureDetectorRef,
this.routedStoreConfig);

store = new LoggingStore(store);

Expand Down
32 changes: 16 additions & 16 deletions src/java/voldemort/client/TimeoutConfig.java
Expand Up @@ -24,36 +24,36 @@
*/
public class TimeoutConfig {

public static int DEFAULT_GLOBAL_TIMEOUT = 5000;
public static int DEFAULT_GLOBAL_TIMEOUT_MS = 5000;
public static boolean DEFAULT_ALLOW_PARTIAL_GETALLS = false;
private OpTimeMap timeoutMap;

private boolean partialGetAllAllowed;

public TimeoutConfig() {
this(DEFAULT_GLOBAL_TIMEOUT, DEFAULT_ALLOW_PARTIAL_GETALLS);
this(DEFAULT_GLOBAL_TIMEOUT_MS, DEFAULT_ALLOW_PARTIAL_GETALLS);
}

public TimeoutConfig(long globalTimeout) {
this(globalTimeout, DEFAULT_ALLOW_PARTIAL_GETALLS);
public TimeoutConfig(long globalTimeoutMs) {
this(globalTimeoutMs, DEFAULT_ALLOW_PARTIAL_GETALLS);
}

public TimeoutConfig(long globalTimeout, boolean allowPartialGetAlls) {
timeoutMap = new OpTimeMap(globalTimeout);
public TimeoutConfig(long globalTimeoutMs, boolean allowPartialGetAlls) {
timeoutMap = new OpTimeMap(globalTimeoutMs);
setPartialGetAllAllowed(allowPartialGetAlls);
}

public TimeoutConfig(long getTimeout,
long putTimeout,
long deleteTimeout,
long getAllTimeout,
long getVersionsTimeout,
public TimeoutConfig(long getTimeoutMs,
long putTimeoutMs,
long deleteTimeoutMs,
long getAllTimeoutMs,
long getVersionsTimeoutMs,
boolean allowPartialGetAlls) {
timeoutMap = new OpTimeMap(getTimeout,
putTimeout,
deleteTimeout,
getAllTimeout,
getVersionsTimeout);
timeoutMap = new OpTimeMap(getTimeoutMs,
putTimeoutMs,
deleteTimeoutMs,
getAllTimeoutMs,
getVersionsTimeoutMs);
setPartialGetAllAllowed(allowPartialGetAlls);
}

Expand Down
7 changes: 5 additions & 2 deletions src/java/voldemort/server/storage/StorageService.java
Expand Up @@ -140,6 +140,7 @@ public class StorageService extends AbstractService {
private final FailureDetector failureDetector;
private final StoreStats storeStats;
private final RoutedStoreFactory routedStoreFactory;
private final RoutedStoreConfig routedStoreConfig;
private final ExecutorService proxyPutWorkerPool;
private final ProxyPutStats aggregatedProxyPutStats;

Expand Down Expand Up @@ -170,8 +171,9 @@ public StorageService(StoreRepository storeRepository,
config));
this.failureDetector = create(failureDetectorConfig, config.isJmxEnabled());
this.storeStats = new StoreStats();
this.routedStoreFactory = new RoutedStoreFactory(new RoutedStoreConfig(voldemortConfig));
this.routedStoreFactory = new RoutedStoreFactory();
this.routedStoreFactory.setThreadPool(this.clientThreadPool);
this.routedStoreConfig = new RoutedStoreConfig(voldemortConfig);

/*
* Initialize the dynamic throttle limit based on the per node limit
Expand Down Expand Up @@ -875,7 +877,8 @@ public void registerNodeStores(StoreDefinition def, Cluster cluster, int localNo
nonblockingStores,
null,
null,
failureDetector);
failureDetector,
routedStoreConfig);

store = new RebootstrappingStore(metadata,
storeRepository,
Expand Down
17 changes: 11 additions & 6 deletions src/java/voldemort/store/routed/PipelineRoutedStore.java
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.TimeUnit;

import voldemort.VoldemortException;
import voldemort.client.TimeoutConfig;
import voldemort.cluster.Cluster;
import voldemort.cluster.Zone;
import voldemort.cluster.failuredetector.FailureDetector;
Expand Down Expand Up @@ -111,17 +112,21 @@ public PipelineRoutedStore(Map<Integer, Store<ByteArray, byte[], byte[]>> innerS
Cluster cluster,
StoreDefinition storeDef,
FailureDetector failureDetector,
RoutedStoreConfig storeConfig) {
boolean repairReads,
TimeoutConfig timeoutConfig,
int clientZoneId,
boolean isJmxEnabled,
int jmxId) {
super(storeDef.getName(),
innerStores,
cluster,
storeDef,
storeConfig.getRepairReads(),
storeConfig.getTimeoutConfig(),
repairReads,
timeoutConfig,
failureDetector,
SystemTime.INSTANCE);
this.nonblockingSlopStores = nonblockingSlopStores;
this.clientZone = cluster.getZoneById(storeConfig.getClientZoneId());
this.clientZone = cluster.getZoneById(clientZoneId);
this.nonblockingStores = new ConcurrentHashMap<Integer, NonblockingStore>(nonblockingStores);
this.slopStores = slopStores;
if(storeDef.getRoutingStrategyType().compareTo(RoutingStrategyType.ZONE_STRATEGY) == 0) {
Expand All @@ -137,8 +142,8 @@ public PipelineRoutedStore(Map<Integer, Store<ByteArray, byte[], byte[]>> innerS
this.handoffStrategy = null;
}

this.jmxEnabled = storeConfig.isJmxEnabled();
this.jmxId = storeConfig.getJmxId();
this.jmxEnabled = isJmxEnabled;
this.jmxId = jmxId;
if(this.jmxEnabled) {
stats = new PipelineRoutedStats();
JmxUtils.registerMbean(stats,
Expand Down
15 changes: 15 additions & 0 deletions src/java/voldemort/store/routed/RoutedStoreConfig.java
@@ -1,3 +1,18 @@
/*
* Copyright 2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package voldemort.store.routed;

import voldemort.client.ClientConfig;
Expand Down
30 changes: 17 additions & 13 deletions src/java/voldemort/store/routed/RoutedStoreFactory.java
Expand Up @@ -7,7 +7,6 @@
import org.apache.log4j.Logger;

import voldemort.VoldemortException;
import voldemort.client.TimeoutConfig;
import voldemort.cluster.Cluster;
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.store.Store;
Expand All @@ -23,20 +22,19 @@ public class RoutedStoreFactory {

private ExecutorService threadPool;

private final RoutedStoreConfig routedStoreConfig;

private final Logger logger = Logger.getLogger(getClass());

public RoutedStoreFactory(RoutedStoreConfig routedStoreConfig) {
this.routedStoreConfig = routedStoreConfig;
public RoutedStoreFactory() {
this.threadPool = null;
}

// TODO using threadPool to simulate nonblocking store isn't a good way and
// should be deprecated
// There are a few tests stilling using this path and should be cleaned up
// They can be identified by examining the call Hierarchy of this method
@Deprecated
public RoutedStoreFactory(ExecutorService threadPool, TimeoutConfig timeoutConfig) {
public RoutedStoreFactory(ExecutorService threadPool) {
this.threadPool = threadPool;
this.routedStoreConfig = new RoutedStoreConfig();
routedStoreConfig.setTimeoutConfig(timeoutConfig);
}

public NonblockingStore toNonblockingStore(Store<ByteArray, byte[], byte[]> store) {
Expand All @@ -60,7 +58,8 @@ public void setThreadPool(ExecutorService threadPool) {
public RoutedStore create(Cluster cluster,
StoreDefinition storeDefinition,
Map<Integer, Store<ByteArray, byte[], byte[]>> nodeStores,
FailureDetector failureDetector) {
FailureDetector failureDetector,
RoutedStoreConfig routedStoreConfig) {
Map<Integer, NonblockingStore> nonblockingStores = Maps.newHashMap();

for(Map.Entry<Integer, Store<ByteArray, byte[], byte[]>> entry: nodeStores.entrySet())
Expand All @@ -72,7 +71,8 @@ public RoutedStore create(Cluster cluster,
nonblockingStores,
null,
null,
failureDetector);
failureDetector,
routedStoreConfig);
}

public RoutedStore create(Cluster cluster,
Expand All @@ -81,15 +81,19 @@ public RoutedStore create(Cluster cluster,
Map<Integer, NonblockingStore> nonblockingStores,
Map<Integer, Store<ByteArray, Slop, byte[]>> slopStores,
Map<Integer, NonblockingStore> nonblockingSlopStores,
FailureDetector failureDetector) {

FailureDetector failureDetector,
RoutedStoreConfig routedStoreConfig) {
return new PipelineRoutedStore(nodeStores,
nonblockingStores,
slopStores,
nonblockingSlopStores,
cluster,
storeDefinition,
failureDetector,
routedStoreConfig);
routedStoreConfig.getRepairReads(),
routedStoreConfig.getTimeoutConfig(),
routedStoreConfig.getClientZoneId(),
routedStoreConfig.isJmxEnabled(),
routedStoreConfig.getJmxId());
}
}
Expand Up @@ -45,6 +45,7 @@
import voldemort.store.StoreDefinition;
import voldemort.store.memory.InMemoryStorageEngine;
import voldemort.store.routed.RoutedStore;
import voldemort.store.routed.RoutedStoreConfig;
import voldemort.store.routed.RoutedStoreFactory;
import voldemort.store.socket.SocketStoreFactory;
import voldemort.store.socket.clientrequest.ClientRequestExecutorPool;
Expand Down Expand Up @@ -185,13 +186,14 @@ public static void main(String[] args) throws Throwable {
FailureDetector failureDetector = FailureDetectorUtils.create(failureDetectorConfig, false);

ExecutorService routedStoreThreadPool = Executors.newFixedThreadPool(clientConfig.getMaxThreads());
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(routedStoreThreadPool,
clientConfig.getTimeoutConfig());
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(routedStoreThreadPool);
RoutedStoreConfig routedStoreConfig = new RoutedStoreConfig(clientConfig);

final RoutedStore routedStore = routedStoreFactory.create(cluster,
storeDefinition,
stores,
failureDetector);
failureDetector,
routedStoreConfig);

ExecutorService runner = Executors.newFixedThreadPool(numClients);
long start = System.nanoTime();
Expand Down
10 changes: 6 additions & 4 deletions test/unit/voldemort/store/routed/GetallNodeReachTest.java
Expand Up @@ -48,11 +48,13 @@ private void makeStore() {
Store<ByteArray, byte[], byte[]> subStore = new InMemoryStorageEngine<ByteArray, byte[], byte[]>("test");
subStores.put(n.getId(), subStore);
}
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(Executors.newFixedThreadPool(2),
new TimeoutConfig(1000L,
false));
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(Executors.newFixedThreadPool(2));

store = routedStoreFactory.create(cluster, storeDef, subStores, new NoopFailureDetector());
store = routedStoreFactory.create(cluster,
storeDef,
subStores,
new NoopFailureDetector(),
new RoutedStoreConfig().setTimeoutConfig(new TimeoutConfig(1000L)));
}

@Test
Expand Down
10 changes: 7 additions & 3 deletions test/unit/voldemort/store/routed/HintedHandoffFailureTest.java
Expand Up @@ -40,6 +40,7 @@
import voldemort.client.TimeoutConfig;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.cluster.Zone;
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.cluster.failuredetector.FailureDetectorConfig;
import voldemort.cluster.failuredetector.FailureDetectorUtils;
Expand Down Expand Up @@ -240,8 +241,7 @@ public List<Integer> customSetup(ByteArray key, FAILURE_MODE failureMode) throws
setFailureDetector(subStores);

routedStoreThreadPool = Executors.newFixedThreadPool(NUM_THREADS);
routedStoreFactory = new RoutedStoreFactory(routedStoreThreadPool,
new TimeoutConfig(routingTimeoutInMs, false));
routedStoreFactory = new RoutedStoreFactory(routedStoreThreadPool);

Map<Integer, NonblockingStore> nonblockingSlopStores = Maps.newHashMap();
for(Node node: cluster.getNodes()) {
Expand Down Expand Up @@ -704,7 +704,11 @@ public DelayedPutPipelineRoutedStore(Map<Integer, Store<ByteArray, byte[], byte[
cluster,
storeDef,
failureDetector,
new RoutedStoreConfig().setTimeoutConfig(new TimeoutConfig(routingTimeoutInMs)));
true,
new TimeoutConfig(routingTimeoutInMs),
Zone.DEFAULT_ZONE_ID,
false,
0);

}

Expand Down
Expand Up @@ -278,14 +278,16 @@ public void setUp() throws Exception {
failureDetector = FailureDetectorUtils.create(failureDetectorConfig, false);

// make routedStore
RoutedStoreFactory factory = new RoutedStoreFactory(null, new TimeoutConfig(1500L, false));
RoutedStoreFactory factory = new RoutedStoreFactory();
routedStore = factory.create(cluster,
storeDef,
testStores,
socketTestStores,
slopStores,
socketSlopStores,
failureDetector);
failureDetector,
new RoutedStoreConfig().setTimeoutConfig(new TimeoutConfig(1500L,
false)));

// generate the keys
for(int i = 0; i < 5; i++) {
Expand Down
13 changes: 8 additions & 5 deletions test/unit/voldemort/store/routed/ReadRepairerTest.java
Expand Up @@ -152,11 +152,14 @@ public void testMissingKeysAreAddedToNodeWhenDoingReadRepair() throws Exception

routedStoreThreadPool = Executors.newFixedThreadPool(1);

RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(routedStoreThreadPool,
new TimeoutConfig(1000L,
false));

RoutedStore store = routedStoreFactory.create(cluster, storeDef, subStores, failureDetector);
RoutedStoreFactory routedStoreFactory = new RoutedStoreFactory(routedStoreThreadPool);

RoutedStore store = routedStoreFactory.create(cluster,
storeDef,
subStores,
failureDetector,
new RoutedStoreConfig().setTimeoutConfig(new TimeoutConfig(1000L,
false)));

recordException(failureDetector, Iterables.get(cluster.getNodes(), 0));
store.put(key, new Versioned<byte[]>(value), null);
Expand Down

0 comments on commit 1520f50

Please sign in to comment.