diff --git a/src/java/voldemort/VoldemortAdminTool.java b/src/java/voldemort/VoldemortAdminTool.java index bc22f8ab56..5629691e97 100644 --- a/src/java/voldemort/VoldemortAdminTool.java +++ b/src/java/voldemort/VoldemortAdminTool.java @@ -45,6 +45,7 @@ import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.map.ObjectMapper; +import voldemort.client.SystemStore; import voldemort.client.protocol.admin.AdminClient; import voldemort.client.protocol.admin.AdminClientConfig; import voldemort.cluster.Cluster; @@ -85,6 +86,7 @@ public class VoldemortAdminTool { private static final String ALL_METADATA = "all"; + private static SystemStore sysStoreVersion = null; @SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { @@ -250,6 +252,13 @@ public static void main(String[] args) throws Exception { AdminClient adminClient = new AdminClient(url, new AdminClientConfig()); + // Initialize the system store for stores.xml version + String[] bootstrapUrls = new String[1]; + bootstrapUrls[0] = url; + sysStoreVersion = new SystemStore("voldsys$_metadata_version", + bootstrapUrls, + 0); + String ops = ""; if(options.has("delete-partitions")) { ops += "d"; @@ -433,6 +442,10 @@ public static void main(String[] args) throws Exception { adminClient, MetadataStore.STORES_KEY, mapper.writeStoreList(storeDefs)); + + // Update the store metadata version + updateStoreMetadataversion(); + } else if(metadataKey.compareTo(MetadataStore.REBALANCING_STEAL_INFO) == 0) { if(!Utils.isReadableFile(metadataValue)) throw new VoldemortException("Rebalancing steal info file path incorrect"); @@ -722,6 +735,24 @@ private static void executeCheckMetadata(AdminClient adminClient, String metadat } } + /* + * TODO: For now write one version for the entire stores.xml When we split + * the stores.xml, make this more granular + */ + private static void updateStoreMetadataversion() { + String versionKey = "stores.xml"; + Versioned storesVersion = sysStoreVersion.getSysStore(versionKey); + if(storesVersion == null) { + System.err.println("Current version is null. Assuming version 0."); + storesVersion = new Versioned((long) 1); + } else { + System.out.println("Version obtained = " + storesVersion.getValue()); + long newValue = storesVersion.getValue() + 1; + storesVersion.setObject(newValue); + } + sysStoreVersion.putSysStore(versionKey, storesVersion); + } + private static void executeSetMetadata(Integer nodeId, AdminClient adminClient, String key, diff --git a/src/java/voldemort/client/AbstractStoreClientFactory.java b/src/java/voldemort/client/AbstractStoreClientFactory.java index bfc36b2f2f..89ba43b3c2 100644 --- a/src/java/voldemort/client/AbstractStoreClientFactory.java +++ b/src/java/voldemort/client/AbstractStoreClientFactory.java @@ -151,7 +151,8 @@ public StoreClient getStoreClient(String storeName, this, 3, clientContextName, - sequencer.getAndIncrement()); + sequencer.getAndIncrement(), + config); } @SuppressWarnings("unchecked") diff --git a/src/java/voldemort/client/AsyncMetadataVersionManager.java b/src/java/voldemort/client/AsyncMetadataVersionManager.java new file mode 100644 index 0000000000..a0ce826e8b --- /dev/null +++ b/src/java/voldemort/client/AsyncMetadataVersionManager.java @@ -0,0 +1,125 @@ +package voldemort.client; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Random; +import java.util.concurrent.Callable; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import voldemort.versioning.Versioned; + +/* + * The AsyncMetadataVersionManager is used to track the Metadata version on the + * cluster and if necessary Re-bootstrap the client. + * + * During initialization, it will retrieve the current version of the store (or + * the entire stores.xml depending upon granularity) and then periodically check + * whether this has been updated. During init if the initial version turns out + * to be null, it means that no change has been done to that store since it was + * created. In this case, we assume version '0'. + */ + +public class AsyncMetadataVersionManager implements Runnable { + + private final Logger logger = Logger.getLogger(this.getClass()); + private Versioned currentVersion; + private final SystemStore sysStore; + private final String systemKey = "stores.xml"; + private volatile boolean isRunning; + private final Callable storeClientThunk; + private long asyncMetadataCheckInterval; + + // Random delta generator + final int DELTA_MAX = 1000; + Random randomGenerator = new Random(System.currentTimeMillis()); + + public AsyncMetadataVersionManager(SystemStore systemStore, + long asyncMetadataCheckInterval, + Callable storeClientThunk) { + this(null, systemStore, asyncMetadataCheckInterval, storeClientThunk); + } + + public AsyncMetadataVersionManager(Versioned initialVersion, + SystemStore systemStore, + long asyncMetadataCheckInterval, + Callable storeClientThunk) { + this.sysStore = systemStore; + if(initialVersion == null) { + this.currentVersion = sysStore.getSysStore("stores.xml"); + + // If the received store version is null, assume version 0 + if(currentVersion == null) + currentVersion = new Versioned((long) 0); + } else { + currentVersion = initialVersion; + } + + // Initialize and start the background check thread + isRunning = true; + + Thread checkVersionThread = new Thread(this, "AsyncVersionCheckThread"); + checkVersionThread.setDaemon(true); + checkVersionThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() { + + public void uncaughtException(Thread t, Throwable e) { + if(logger.isEnabledFor(Level.ERROR)) + logger.error("Uncaught exception in Metadata Version check thread:", e); + } + }); + + this.storeClientThunk = storeClientThunk; + this.asyncMetadataCheckInterval = asyncMetadataCheckInterval; + checkVersionThread.start(); + + } + + public void destroy() { + isRunning = false; + } + + public void run() { + while(!Thread.currentThread().isInterrupted() && isRunning) { + try { + Thread.sleep(asyncMetadataCheckInterval); + } catch(InterruptedException e) { + break; + } + + Versioned newVersion = this.sysStore.getSysStore(systemKey); + + // If version obtained is null, the store is untouched. Continue + if(newVersion == null) { + logger.info("Metadata unchanged after creation ..."); + continue; + } + + logger.info("MetadataVersion check => Obtained " + systemKey + " version : " + + newVersion); + + if(!newVersion.equals(currentVersion)) { + logger.info("Metadata version mismatch detected."); + + // Determine a random delta delay between 0 to 1000 (ms) + int delta = randomGenerator.nextInt(DELTA_MAX); + + try { + logger.info("Sleeping for delta : " + delta + " (ms) before re-bootstrapping."); + Thread.sleep(delta); + } catch(InterruptedException e) { + break; + } + + // Invoke callback for bootstrap + try { + this.storeClientThunk.call(); + } catch(Exception e) { + e.printStackTrace(); + } + + // Update the current version + currentVersion = newVersion; + } + } + } +} diff --git a/src/java/voldemort/client/ClientConfig.java b/src/java/voldemort/client/ClientConfig.java index 4fb0011e2b..4751f74be8 100644 --- a/src/java/voldemort/client/ClientConfig.java +++ b/src/java/voldemort/client/ClientConfig.java @@ -79,6 +79,7 @@ public class ClientConfig { private volatile int maxBootstrapRetries = 2; private volatile String clientContextName = "default"; + private volatile long asyncCheckMetadataInterval = 5000; public ClientConfig() {} @@ -120,6 +121,7 @@ public ClientConfig() {} public static final String FAILUREDETECTOR_REQUEST_LENGTH_THRESHOLD_PROPERTY = "failuredetector_request_length_threshold"; public static final String MAX_BOOTSTRAP_RETRIES = "max_bootstrap_retries"; public static final String CLIENT_CONTEXT_NAME = "voldemort_client_context"; + public static final String ASYNC_CHECK_METADATA_INTERVAL = "check_metadata_interval"; /** * Instantiate the client config using a properties file @@ -282,6 +284,10 @@ private void setProperties(Properties properties) { if(props.containsKey(CLIENT_CONTEXT_NAME)) { this.setClientContextName(props.getString(CLIENT_CONTEXT_NAME, null)); } + + if(props.containsKey(ASYNC_CHECK_METADATA_INTERVAL)) { + this.setAsyncCheckMetadataInterval(props.getLong(ASYNC_CHECK_METADATA_INTERVAL, 5000)); + } } public int getMaxConnectionsPerNode() { @@ -707,4 +713,12 @@ public ClientConfig setClientContextName(String clientContextName) { return this; } + public long getAsyncCheckMetadataInterval() { + return asyncCheckMetadataInterval; + } + + public void setAsyncCheckMetadataInterval(long asyncCheckMetadataInterval) { + this.asyncCheckMetadataInterval = asyncCheckMetadataInterval; + } + } diff --git a/src/java/voldemort/client/DefaultStoreClient.java b/src/java/voldemort/client/DefaultStoreClient.java index c056f5ea10..436fcfd292 100644 --- a/src/java/voldemort/client/DefaultStoreClient.java +++ b/src/java/voldemort/client/DefaultStoreClient.java @@ -16,10 +16,12 @@ package voldemort.client; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.UUID; +import java.util.concurrent.Callable; import org.apache.log4j.Logger; @@ -60,26 +62,37 @@ public class DefaultStoreClient implements StoreClient { private final Logger logger = Logger.getLogger(DefaultStoreClient.class); private final StoreClientFactory storeFactory; + private final ClientConfig config; private final int metadataRefreshAttempts; private final String storeName; private final InconsistencyResolver> resolver; private volatile Store store; private final UUID clientId; - private SystemStore sysStore; + private final Map sysStoreMap; + private AsyncMetadataVersionManager asyncCheckMetadata; + + // Enumerate all the system stores + private final String METADATA_VERSION_STORE = "voldsys$_metadata_version"; + private final String CLIENT_REGISTRY_STORE = "voldsys$_client_registry"; + private final String STORE_DEFINITION_STORE = "voldsys$_client_store_definition"; + private final String[] systemStoreNames = { METADATA_VERSION_STORE, CLIENT_REGISTRY_STORE, + STORE_DEFINITION_STORE }; public DefaultStoreClient(String storeName, InconsistencyResolver> resolver, StoreClientFactory storeFactory, int maxMetadataRefreshAttempts) { - this(storeName, resolver, storeFactory, maxMetadataRefreshAttempts, null, 0); + this(storeName, resolver, storeFactory, maxMetadataRefreshAttempts, null, 0, null); } + @SuppressWarnings("unchecked") public DefaultStoreClient(String storeName, InconsistencyResolver> resolver, StoreClientFactory storeFactory, int maxMetadataRefreshAttempts, String clientContext, - int clientSequence) { + int clientSequence, + ClientConfig config) { this.storeName = Utils.notNull(storeName); this.resolver = resolver; @@ -88,6 +101,8 @@ public DefaultStoreClient(String storeName, this.clientId = AbstractStoreClientFactory.generateClientId(storeName, clientContext, clientSequence); + this.config = config; + // Registering self to be able to bootstrap client dynamically via JMX JmxUtils.registerMbean(this, JmxUtils.createObjectName(JmxUtils.getPackageName(this.getClass()), @@ -96,21 +111,50 @@ public DefaultStoreClient(String storeName, + storeName + "." + clientId.toString())); bootStrap(); + + // Initialize all the system stores + sysStoreMap = new HashMap(); + initializeSystemStores(); + + // Initialize the background thread for checking metadata version + if(config != null) { + SystemStore versionStore = this.sysStoreMap.get(METADATA_VERSION_STORE); + if(versionStore == null) + logger.info("Metadata version system store not found. Cannot run Metadata version check thread."); + else { + Callable bootstrapCallback = new Callable() { + + public Void call() throws Exception { + bootStrap(); + return null; + } + }; + + asyncCheckMetadata = new AsyncMetadataVersionManager(versionStore, + config.getAsyncCheckMetadataInterval(), + bootstrapCallback); + logger.info("Metadata version check thread started. Frequency = Every " + + config.getAsyncCheckMetadataInterval() + " ms"); + } + } + logger.info("Voldemort client created: clientContext=" + clientContext + " clientSequence=" + clientSequence + " clientId=" + clientId.toString()); } + public void initializeSystemStores() { + for(String storeName: systemStoreNames) { + SystemStore sysStore = new SystemStore(storeName, + config.getBootstrapUrls(), + config.getClientZoneId()); + this.sysStoreMap.put(storeName, sysStore); + } + } + @JmxOperation(description = "bootstrap metadata from the cluster.") public void bootStrap() { logger.info("Bootstrapping metadata for store " + this.storeName); this.store = storeFactory.getRawStore(storeName, resolver, clientId); - - logger.info("Creating System store"); - String systemKey = storeName + "-client"; - this.sysStore = new SystemStore("voldsys$_client_registry", - this.storeFactory); - sysStore.putSysStore(systemKey, "Registered"); - logger.info("Getting value - " + sysStore.getSysStore(systemKey)); } public boolean delete(K key) { diff --git a/src/java/voldemort/client/SystemStore.java b/src/java/voldemort/client/SystemStore.java index 7617fc4b24..d4c901ebbe 100644 --- a/src/java/voldemort/client/SystemStore.java +++ b/src/java/voldemort/client/SystemStore.java @@ -19,14 +19,11 @@ public class SystemStore { private final String storeName; private volatile Store sysStore; - SystemStore(String storeName, StoreClientFactory factory) { + public SystemStore(String storeName, String[] bootstrapUrls, int clientZoneID) { String prefix = storeName.substring(0, SystemStoreConstants.NAME_PREFIX.length()); if(!SystemStoreConstants.NAME_PREFIX.equals(prefix)) throw new VoldemortException("Illegal system store : " + storeName); - if(!(factory instanceof SocketStoreClientFactory)) - throw new VoldemortException("System store cannot be created without a Socket store client factory"); - SocketStoreClientFactory clientFactory = (SocketStoreClientFactory) factory; ClientConfig config = new ClientConfig(); config.setSelectors(1) .setBootstrapUrls(config.getBootstrapUrls()) @@ -43,7 +40,7 @@ public class SystemStore { } public void putSysStore(K key, V value) throws VoldemortException { - logger.info("Invoking Put for key : " + key + " on store name : " + this.storeName); + logger.debug("Invoking Put for key : " + key + " on store name : " + this.storeName); Versioned versioned = getSysStore(key); if(versioned == null) versioned = Versioned.value(value, new VectorClock()); @@ -53,12 +50,12 @@ public void putSysStore(K key, V value) throws VoldemortException { } public void putSysStore(K key, Versioned value) throws VoldemortException { - logger.info("Invoking Put for key : " + key + " on store name : " + this.storeName); + logger.debug("Invoking Put for key : " + key + " on store name : " + this.storeName); this.sysStore.put(key, value, null); } public Versioned getSysStore(K key) throws VoldemortException { - logger.info("Invoking Get for key : " + key + " on store name : " + this.storeName); + logger.debug("Invoking Get for key : " + key + " on store name : " + this.storeName); Versioned versioned = null; List> items = this.sysStore.get(key, null); if(items.size() == 1) @@ -67,19 +64,19 @@ else if(items.size() > 1) throw new InconsistentDataException("Unresolved versions returned from get(" + key + ") = " + items, items); if(versioned != null) - logger.info("Value for key : " + key + " = " + versioned.getValue() - + " on store name : " + this.storeName); + logger.debug("Value for key : " + key + " = " + versioned.getValue() + + " on store name : " + this.storeName); else - logger.info("Got null value"); + logger.debug("Got null value"); return versioned; } public V getValueSysStore(K key) throws VoldemortException { - logger.info("Invoking Get for key : " + key + " on store name : " + this.storeName); + logger.debug("Invoking Get for key : " + key + " on store name : " + this.storeName); Versioned versioned = getSysStore(key); if(versioned != null) { - logger.info("Value for key : " + key + " = " + versioned.getValue() - + " on store name : " + this.storeName); + logger.debug("Value for key : " + key + " = " + versioned.getValue() + + " on store name : " + this.storeName); return versioned.getValue(); } return null; diff --git a/src/java/voldemort/routing/RouteToAllLocalPrefStrategy.java b/src/java/voldemort/routing/RouteToAllLocalPrefStrategy.java new file mode 100644 index 0000000000..240aff1d49 --- /dev/null +++ b/src/java/voldemort/routing/RouteToAllLocalPrefStrategy.java @@ -0,0 +1,17 @@ +package voldemort.routing; + +import java.util.Collection; + +import voldemort.cluster.Node; + +public class RouteToAllLocalPrefStrategy extends RouteToAllStrategy { + + public RouteToAllLocalPrefStrategy(Collection nodes) { + super(nodes); + } + + @Override + public String getType() { + return RoutingStrategyType.TO_ALL_LOCAL_PREF_STRATEGY; + } +} diff --git a/src/java/voldemort/routing/RoutingStrategyFactory.java b/src/java/voldemort/routing/RoutingStrategyFactory.java index c8b8fe0158..b4901ba40c 100644 --- a/src/java/voldemort/routing/RoutingStrategyFactory.java +++ b/src/java/voldemort/routing/RoutingStrategyFactory.java @@ -24,6 +24,8 @@ public RoutingStrategy updateRoutingStrategy(StoreDefinition storeDef, Cluster c return new ZoneRoutingStrategy(cluster.getNodes(), storeDef.getZoneReplicationFactor(), storeDef.getReplicationFactor()); + } else if(RoutingStrategyType.TO_ALL_LOCAL_PREF_STRATEGY.equals(storeDef.getRoutingStrategyType())) { + return new RouteToAllLocalPrefStrategy(cluster.getNodes()); } else { throw new VoldemortException("RoutingStrategyType:" + storeDef.getRoutingStrategyType() + " not handled by " + this.getClass()); diff --git a/src/java/voldemort/routing/RoutingStrategyType.java b/src/java/voldemort/routing/RoutingStrategyType.java index 92ead1b1ae..60c1a49733 100644 --- a/src/java/voldemort/routing/RoutingStrategyType.java +++ b/src/java/voldemort/routing/RoutingStrategyType.java @@ -10,6 +10,7 @@ public class RoutingStrategyType { public final static String CONSISTENT_STRATEGY = "consistent-routing"; public final static String TO_ALL_STRATEGY = "all-routing"; public final static String ZONE_STRATEGY = "zone-routing"; + public final static String TO_ALL_LOCAL_PREF_STRATEGY = "local-pref-all-routing"; private final String name; diff --git a/src/java/voldemort/server/SystemStoreConstants.java b/src/java/voldemort/server/SystemStoreConstants.java index cb048c66d1..a7c546681c 100644 --- a/src/java/voldemort/server/SystemStoreConstants.java +++ b/src/java/voldemort/server/SystemStoreConstants.java @@ -34,6 +34,7 @@ public static enum SystemStoreName { + " " + " 7" + " " + + " " + " voldsys$_client_store_definition" + " zone-routing" @@ -50,7 +51,25 @@ public static enum SystemStoreName { + " string" + " " + " 7" - + " " + ""; + + " " + + + " " + + " voldsys$_metadata_version" + + " local-pref-all-routing" + + " proximity-handoff" + + " memory" + + " client" + + " 1" + + " 1" + + " 1" + + " " + + " string" + + " " + + " " + + " java-serialization" + + " " + " " + + + ""; public static boolean isSystemStore(String storeName) { return (null == storeName ? false : storeName.startsWith(NAME_PREFIX)); diff --git a/src/java/voldemort/store/routed/PipelineRoutedStore.java b/src/java/voldemort/store/routed/PipelineRoutedStore.java index 38ff90517d..3016a097a3 100644 --- a/src/java/voldemort/store/routed/PipelineRoutedStore.java +++ b/src/java/voldemort/store/routed/PipelineRoutedStore.java @@ -37,7 +37,12 @@ import voldemort.store.nonblockingstore.NonblockingStore; import voldemort.store.routed.Pipeline.Event; import voldemort.store.routed.Pipeline.Operation; +import voldemort.store.routed.action.AbstractConfigureNodes; import voldemort.store.routed.action.ConfigureNodes; +import voldemort.store.routed.action.ConfigureNodesByZone; +import voldemort.store.routed.action.ConfigureNodesDefault; +import voldemort.store.routed.action.ConfigureNodesLocalHost; +import voldemort.store.routed.action.ConfigureNodesLocalHostByZone; import voldemort.store.routed.action.GetAllConfigureNodes; import voldemort.store.routed.action.GetAllReadRepair; import voldemort.store.routed.action.IncrementClock; @@ -81,6 +86,13 @@ public class PipelineRoutedStore extends RoutedStore { private PipelineRoutedStats stats; private final int jmxId; + private enum ConfigureNodesType { + DEFAULT, + BYZONE, + DEFAULT_LOCAL, + BYZONE_LOCAL + } + /** * Create a PipelineRoutedStore * @@ -141,6 +153,66 @@ public PipelineRoutedStore(String name, } } + private ConfigureNodesType getNodeConfigurationType(BasicPipelineData>> pipelineData) { + // If Zone and local preference required + if(pipelineData.getZonesRequired() != null + && routingStrategy.getType().equals(RoutingStrategyType.TO_ALL_LOCAL_PREF_STRATEGY)) + return ConfigureNodesType.BYZONE_LOCAL; + + // If only local preference required + else if(pipelineData.getZonesRequired() == null + && routingStrategy.getType().equals(RoutingStrategyType.TO_ALL_LOCAL_PREF_STRATEGY)) + return ConfigureNodesType.DEFAULT_LOCAL; + + // If only Zone required + else if(pipelineData.getZonesRequired() != null + && !routingStrategy.getType() + .equals(RoutingStrategyType.TO_ALL_LOCAL_PREF_STRATEGY)) + return ConfigureNodesType.BYZONE; + + // Default case + return ConfigureNodesType.DEFAULT; + } + + private AbstractConfigureNodes>, BasicPipelineData>>> getNodeConfiguration(BasicPipelineData>> pipelineData, + ByteArray key) { + switch(getNodeConfigurationType(pipelineData)) { + case DEFAULT: + return new ConfigureNodesDefault>, BasicPipelineData>>>(pipelineData, + Event.CONFIGURED, + failureDetector, + storeDef.getRequiredReads(), + routingStrategy, + key); + case BYZONE: + return new ConfigureNodesByZone>, BasicPipelineData>>>(pipelineData, + Event.CONFIGURED, + failureDetector, + storeDef.getRequiredReads(), + routingStrategy, + key, + clientZone); + case DEFAULT_LOCAL: + return new ConfigureNodesLocalHost>, BasicPipelineData>>>(pipelineData, + Event.CONFIGURED, + failureDetector, + storeDef.getRequiredReads(), + routingStrategy, + key); + case BYZONE_LOCAL: + return new ConfigureNodesLocalHostByZone>, BasicPipelineData>>>(pipelineData, + Event.CONFIGURED, + failureDetector, + storeDef.getRequiredReads(), + routingStrategy, + key, + clientZone); + default: + return null; + } + + } + public List> get(final ByteArray key, final byte[] transforms) { StoreUtils.assertValidKey(key); @@ -164,14 +236,13 @@ public List> request(Store store) { }; - pipeline.addEventAction(Event.STARTED, - new ConfigureNodes>, BasicPipelineData>>>(pipelineData, - Event.CONFIGURED, - failureDetector, - storeDef.getRequiredReads(), - routingStrategy, - key, - clientZone)); + // Get the correct type of configure nodes action depending on the store + // requirements + AbstractConfigureNodes>, BasicPipelineData>>> configureNodes = getNodeConfiguration(pipelineData, + key); + + pipeline.addEventAction(Event.STARTED, configureNodes); + pipeline.addEventAction(Event.CONFIGURED, new PerformParallelRequests>, BasicPipelineData>>>(pipelineData, allowReadRepair ? Event.RESPONSES_RECEIVED diff --git a/src/java/voldemort/store/routed/action/ConfigureNodesByZone.java b/src/java/voldemort/store/routed/action/ConfigureNodesByZone.java new file mode 100644 index 0000000000..7a194f649b --- /dev/null +++ b/src/java/voldemort/store/routed/action/ConfigureNodesByZone.java @@ -0,0 +1,147 @@ +/* + * Copyright 2010 LinkedIn, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package voldemort.store.routed.action; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import voldemort.VoldemortException; +import voldemort.cluster.Node; +import voldemort.cluster.Zone; +import voldemort.cluster.failuredetector.FailureDetector; +import voldemort.routing.RoutingStrategy; +import voldemort.store.routed.BasicPipelineData; +import voldemort.store.routed.Pipeline; +import voldemort.store.routed.Pipeline.Event; +import voldemort.store.routed.Pipeline.Operation; +import voldemort.utils.ByteArray; +import voldemort.utils.ByteUtils; + +/* + * Configure the Nodes obtained via the routing strategy based on the zone + * information. Local zone nodes first, followed by the corresponding nodes from + * each of the other zones, ordered by proximity. + */ +public class ConfigureNodesByZone> extends + AbstractConfigureNodes { + + private final ByteArray key; + + private final Zone clientZone; + + public ConfigureNodesByZone(PD pipelineData, + Event completeEvent, + FailureDetector failureDetector, + int required, + RoutingStrategy routingStrategy, + ByteArray key, + Zone clientZone) { + super(pipelineData, completeEvent, failureDetector, required, routingStrategy); + this.key = key; + this.clientZone = clientZone; + } + + public List getNodes(ByteArray key, Operation op) { + List nodes = null; + try { + nodes = super.getNodes(key); + } catch(VoldemortException e) { + pipelineData.setFatalError(e); + return null; + } + + if(logger.isDebugEnabled()) + logger.debug("Adding " + nodes.size() + " node(s) to preference list"); + + if(pipelineData.getZonesRequired() > this.clientZone.getProximityList().size()) { + throw new VoldemortException("Number of zones required should be less than the total number of zones"); + } + + if(pipelineData.getZonesRequired() > required) { + throw new VoldemortException("Number of zones required should be less than the required number of " + + op.getSimpleName() + "s"); + } + + // Create zone id to node mapping + Map> zoneIdToNode = new HashMap>(); + for(Node node: nodes) { + List nodesList = null; + if(zoneIdToNode.containsKey(node.getZoneId())) { + nodesList = zoneIdToNode.get(node.getZoneId()); + } else { + nodesList = new ArrayList(); + zoneIdToNode.put(node.getZoneId(), nodesList); + } + nodesList.add(node); + } + + nodes = new ArrayList(); + LinkedList zoneProximityList = this.clientZone.getProximityList(); + if(op != Operation.PUT) { + // GET, GET_VERSIONS, DELETE + + // Add a node from every zone, upto a max of + // zoneCountReads/zoneCountWrites. + for(int index = 0; index < pipelineData.getZonesRequired(); index++) { + List zoneNodes = zoneIdToNode.get(zoneProximityList.get(index)); + if(zoneNodes != null && zoneNodes.size() > 0) { + nodes.add(zoneNodes.remove(0)); + } + } + + } + + // Add the rest, starting with client zone... + List clientZoneNodes = zoneIdToNode.get(clientZone.getId()); + if(clientZoneNodes != null && clientZoneNodes.size() > 0) + nodes.addAll(clientZoneNodes); + // ...followed by other zones sorted by proximity list + for(int index = 0; index < zoneProximityList.size(); index++) { + List zoneNodes = zoneIdToNode.get(zoneProximityList.get(index)); + if(zoneNodes != null && zoneNodes.size() > 0) { + nodes.addAll(zoneNodes); + } + } + + return nodes; + } + + public void execute(Pipeline pipeline) { + List nodes = null; + + nodes = getNodes(key, pipeline.getOperation()); + if(nodes == null) { + pipeline.abort(); + return; + } + + if(logger.isDebugEnabled()) { + StringBuilder nodeStr = new StringBuilder(); + for(Node node: nodes) { + nodeStr.append(node.getId() + ","); + } + logger.debug("Key " + ByteUtils.toHexString(key.get()) + + " final preference list to contact " + nodeStr); + } + pipelineData.setNodes(nodes); + pipeline.addEvent(completeEvent); + } + +} diff --git a/src/java/voldemort/store/routed/action/ConfigureNodesDefault.java b/src/java/voldemort/store/routed/action/ConfigureNodesDefault.java new file mode 100644 index 0000000000..b05379d74f --- /dev/null +++ b/src/java/voldemort/store/routed/action/ConfigureNodesDefault.java @@ -0,0 +1,71 @@ +package voldemort.store.routed.action; + +import java.util.List; + +import voldemort.VoldemortException; +import voldemort.cluster.Node; +import voldemort.cluster.failuredetector.FailureDetector; +import voldemort.routing.RoutingStrategy; +import voldemort.store.routed.BasicPipelineData; +import voldemort.store.routed.Pipeline; +import voldemort.store.routed.Pipeline.Event; +import voldemort.utils.ByteArray; +import voldemort.utils.ByteUtils; + +/* + * Default Configure Nodes that does not reorder the list of nodes obtained via + * the routing strategy + */ +public class ConfigureNodesDefault> extends + AbstractConfigureNodes { + + private final ByteArray key; + + public ConfigureNodesDefault(PD pipelineData, + Event completeEvent, + FailureDetector failureDetector, + int required, + RoutingStrategy routingStrategy, + ByteArray key) { + super(pipelineData, completeEvent, failureDetector, required, routingStrategy); + this.key = key; + } + + @Override + public List getNodes(ByteArray key) { + List nodes = null; + + try { + nodes = super.getNodes(key); + } catch(VoldemortException e) { + pipelineData.setFatalError(e); + return null; + } + return nodes; + } + + public void execute(Pipeline pipeline) { + List nodes = null; + + nodes = getNodes(key); + if(nodes == null) { + pipeline.abort(); + return; + } + + if(logger.isDebugEnabled()) + logger.debug("Adding " + nodes.size() + " node(s) to preference list"); + + if(logger.isDebugEnabled()) { + StringBuilder nodeStr = new StringBuilder(); + for(Node node: nodes) { + nodeStr.append(node.getId() + ","); + } + logger.debug("Key " + ByteUtils.toHexString(key.get()) + + " final preference list to contact " + nodeStr); + } + pipelineData.setNodes(nodes); + pipeline.addEvent(completeEvent); + } + +} diff --git a/src/java/voldemort/store/routed/action/ConfigureNodesLocalHost.java b/src/java/voldemort/store/routed/action/ConfigureNodesLocalHost.java new file mode 100644 index 0000000000..4eb2237380 --- /dev/null +++ b/src/java/voldemort/store/routed/action/ConfigureNodesLocalHost.java @@ -0,0 +1,71 @@ +package voldemort.store.routed.action; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.log4j.Logger; + +import voldemort.VoldemortException; +import voldemort.cluster.Node; +import voldemort.cluster.failuredetector.FailureDetector; +import voldemort.routing.RoutingStrategy; +import voldemort.store.routed.BasicPipelineData; +import voldemort.store.routed.Pipeline.Event; +import voldemort.utils.ByteArray; + +/* + * Use the default node list returned via the routing strategy. However give + * preference to the current node, if it is part of the preflist returned from + * the routing strategy. + */ + +public class ConfigureNodesLocalHost> extends + ConfigureNodesDefault { + + @SuppressWarnings("hiding") + private final Logger logger = Logger.getLogger(this.getClass()); + + public ConfigureNodesLocalHost(PD pipelineData, + Event completeEvent, + FailureDetector failureDetector, + int required, + RoutingStrategy routingStrategy, + ByteArray key) { + super(pipelineData, completeEvent, failureDetector, required, routingStrategy, key); + } + + /* + * If the current node exists in the nodes list, bring it to the front + */ + @Override + public List getNodes(ByteArray key) { + logger.debug("Giving pref to localhost ! "); + List nodes = null; + List reorderedNodes = new ArrayList(); + + try { + nodes = super.getNodes(key); + String currentHost = InetAddress.getLocalHost().getHostName(); + for(Node n: nodes) { + if(currentHost.contains(n.getHost()) || n.getHost().contains(currentHost)) { + logger.debug("Found localhost ! "); + reorderedNodes.add(n); + nodes.remove(n); + break; + } + } + reorderedNodes.addAll(nodes); + nodes = reorderedNodes; + } catch(VoldemortException e) { + pipelineData.setFatalError(e); + return null; + } catch(UnknownHostException e) { + e.printStackTrace(); + return null; + } + return nodes; + } + +} diff --git a/src/java/voldemort/store/routed/action/ConfigureNodesLocalHostByZone.java b/src/java/voldemort/store/routed/action/ConfigureNodesLocalHostByZone.java new file mode 100644 index 0000000000..ceb5e688eb --- /dev/null +++ b/src/java/voldemort/store/routed/action/ConfigureNodesLocalHostByZone.java @@ -0,0 +1,72 @@ +package voldemort.store.routed.action; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + +import voldemort.VoldemortException; +import voldemort.cluster.Node; +import voldemort.cluster.Zone; +import voldemort.cluster.failuredetector.FailureDetector; +import voldemort.routing.RoutingStrategy; +import voldemort.store.routed.BasicPipelineData; +import voldemort.store.routed.Pipeline.Event; +import voldemort.store.routed.Pipeline.Operation; +import voldemort.utils.ByteArray; + +/* + * Use the zone aware node list returned via the routing strategy. However give + * preference to the current node, if it is part of the preflist returned from + * the routing strategy. + */ + +public class ConfigureNodesLocalHostByZone> extends + ConfigureNodesByZone { + + public ConfigureNodesLocalHostByZone(PD pipelineData, + Event completeEvent, + FailureDetector failureDetector, + int required, + RoutingStrategy routingStrategy, + ByteArray key, + Zone clientZone) { + super(pipelineData, + completeEvent, + failureDetector, + required, + routingStrategy, + key, + clientZone); + } + + /* + * If the current node exists in the nodes list, bring it to the front + */ + @Override + public List getNodes(ByteArray key, Operation op) { + List nodes = null; + List reorderedNodes = new ArrayList(); + + try { + nodes = super.getNodes(key, op); + String currentHost = InetAddress.getLocalHost().getHostName(); + for(Node n: nodes) { + if(currentHost.contains(n.getHost()) || n.getHost().contains(currentHost)) { + reorderedNodes.add(n); + nodes.remove(n); + break; + } + } + reorderedNodes.addAll(nodes); + nodes = reorderedNodes; + } catch(VoldemortException e) { + pipelineData.setFatalError(e); + return null; + } catch(UnknownHostException e) { + e.printStackTrace(); + return null; + } + return nodes; + } +}