Skip to content

Commit

Permalink
add clientId for voldemort client
Browse files Browse the repository at this point in the history
  • Loading branch information
Lei Gao committed Jun 27, 2012
1 parent cd8b942 commit b59973e
Show file tree
Hide file tree
Showing 8 changed files with 503 additions and 25 deletions.
75 changes: 71 additions & 4 deletions src/java/voldemort/client/AbstractStoreClientFactory.java
Expand Up @@ -16,12 +16,17 @@

package voldemort.client;

import java.io.File;
import java.io.IOException;
import java.io.StringReader;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -98,6 +103,8 @@ public abstract class AbstractStoreClientFactory implements StoreClientFactory {
private final ClientConfig config;
private final RoutedStoreFactory routedStoreFactory;
private final int clientZoneId;
private final String clientContextName;
private final AtomicInteger sequencer;

public AbstractStoreClientFactory(ClientConfig config) {
this.config = config;
Expand All @@ -112,18 +119,24 @@ public AbstractStoreClientFactory(ClientConfig config) {
this.maxBootstrapRetries = config.getMaxBootstrapRetries();
this.stats = new StoreStats();
this.clientZoneId = config.getClientZoneId();
this.clientContextName = (null == config.getClientContextName() ? ""
: config.getClientContextName());
this.routedStoreFactory = new RoutedStoreFactory(config.isPipelineRoutedStoreEnabled(),
threadPool,
config.getTimeoutConfig());
this.sequencer = new AtomicInteger(0);

if(this.isJmxEnabled) {
JmxUtils.registerMbean(threadPool,
JmxUtils.createObjectName(JmxUtils.getPackageName(threadPool.getClass()),
JmxUtils.getClassName(threadPool.getClass())
+ "."
+ clientContextName
+ jmxId()));
JmxUtils.registerMbean(new StoreStatsJmx(stats),
JmxUtils.createObjectName("voldemort.store.stats.aggregate",
"aggregate-perf" + jmxId()));
clientContextName + ".aggregate-perf"
+ jmxId()));
}
}

Expand All @@ -133,12 +146,18 @@ public <K, V> StoreClient<K, V> getStoreClient(String storeName) {

public <K, V> StoreClient<K, V> getStoreClient(String storeName,
InconsistencyResolver<Versioned<V>> resolver) {
return new DefaultStoreClient<K, V>(storeName, resolver, this, 3);
return new DefaultStoreClient<K, V>(storeName,
resolver,
this,
3,
clientContextName,
sequencer.getAndIncrement());
}

@SuppressWarnings("unchecked")
public <K, V, T> Store<K, V, T> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver) {
InconsistencyResolver<Versioned<V>> resolver,
UUID clientId) {

logger.info("Client zone-id [" + clientZoneId
+ "] Attempting to obtain metadata for store [" + storeName + "] ");
Expand Down Expand Up @@ -223,7 +242,13 @@ public <K, V, T> Store<K, V, T> getRawStore(String storeName,
store = statStore;
JmxUtils.registerMbean(new StoreStatsJmx(statStore.getStats()),
JmxUtils.createObjectName(JmxUtils.getPackageName(store.getClass()),
store.getName() + jmxId()));
clientContextName
+ "."
+ store.getName()
+ jmxId()
+ (null == clientId ? ""
: "."
+ clientId.toString())));
}

if(storeDef.getKeySerializer().hasCompression()
Expand Down Expand Up @@ -257,6 +282,11 @@ public <K, V, T> Store<K, V, T> getRawStore(String storeName,
return serializedStore;
}

public <K, V, T> Store<K, V, T> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver) {
return getRawStore(storeName, resolver, null);
}

protected ClientConfig getConfig() {
return config;
}
Expand Down Expand Up @@ -409,4 +439,41 @@ public String jmxId() {
return jmxId == 0 ? "" : Integer.toString(jmxId);
}

/**
* Generate a unique client ID based on: 0. clientContext, if specified; 1.
* storeName 2. run path 3. client sequence
*
* @param storeName the name of the store the client is created for
* @param contextName the name of the client context
* @param clientSequence the client sequence number
* @return unique client ID
*/
public static UUID generateClientId(String storeName, String contextName, int clientSequence) {
String newLine = System.getProperty("line.separator");
StringBuilder context = new StringBuilder(contextName == null ? "" : contextName);
context.append(0 == clientSequence ? "" : ("." + clientSequence));
context.append(".").append(storeName);

try {
InetAddress host = InetAddress.getLocalHost();
context.append("@").append(host.getHostName()).append(":");
} catch(UnknownHostException e) {
logger.info("Unable to obtain client hostname.");
logger.info(e.getMessage());
}

try {
String currentPath = new File(".").getCanonicalPath();
context.append(currentPath).append(newLine);
} catch(IOException e) {
logger.info("Unable to obtain client run path.");
logger.info(e.getMessage());
}

if(logger.isDebugEnabled()) {
logger.debug(context.toString());
}

return UUID.nameUUIDFromBytes(context.toString().getBytes());
}
}
32 changes: 20 additions & 12 deletions src/java/voldemort/client/CachingStoreClientFactory.java
@@ -1,12 +1,12 @@
/*
* Copyright 2008-2010 LinkedIn, Inc
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
Expand All @@ -16,8 +16,13 @@

package voldemort.client;

import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.log4j.Logger;

import voldemort.annotations.jmx.JmxManaged;
import voldemort.annotations.jmx.JmxOperation;
import voldemort.cluster.failuredetector.FailureDetector;
Expand All @@ -26,14 +31,12 @@
import voldemort.versioning.InconsistencyResolver;
import voldemort.versioning.Versioned;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.google.common.collect.ImmutableList;

/**
* A wrapper for a store {@link StoreClientFactory} which caches requests
* to <code>getStoreClient</code>
*
* A wrapper for a store {@link StoreClientFactory} which caches requests to
* <code>getStoreClient</code>
*
*/
@JmxManaged(description = "A StoreClientFactory which caches clients")
public class CachingStoreClientFactory implements StoreClientFactory {
Expand All @@ -48,7 +51,6 @@ public CachingStoreClientFactory(StoreClientFactory inner) {
this.cache = new ConcurrentHashMap<Pair<String, Object>, StoreClient<?, ?>>();
}


@SuppressWarnings("unchecked")
public <K, V> StoreClient<K, V> getStoreClient(String storeName) {
Pair<String, Object> key = Pair.create(storeName, null);
Expand All @@ -74,7 +76,13 @@ public <K, V> StoreClient<K, V> getStoreClient(String storeName,

public <K, V, T> Store<K, V, T> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver) {
return inner.getRawStore(storeName, resolver);
return getRawStore(storeName, resolver, null);
}

public <K, V, T> Store<K, V, T> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
UUID clientId) {
return inner.getRawStore(storeName, resolver, clientId);
}

public void close() {
Expand Down
15 changes: 15 additions & 0 deletions src/java/voldemort/client/ClientConfig.java
Expand Up @@ -78,6 +78,7 @@ public class ClientConfig {
private long failureDetectorRequestLengthThreshold = socketTimeoutMs;

private volatile int maxBootstrapRetries = 2;
private volatile String clientContextName = "default";

public ClientConfig() {}

Expand Down Expand Up @@ -118,6 +119,7 @@ public ClientConfig() {}
public static final String FAILUREDETECTOR_CATASTROPHIC_ERROR_TYPES_PROPERTY = "failuredetector_catastrophic_error_types";
public static final String FAILUREDETECTOR_REQUEST_LENGTH_THRESHOLD_PROPERTY = "failuredetector_request_length_threshold";
public static final String MAX_BOOTSTRAP_RETRIES = "max_bootstrap_retries";
public static final String CLIENT_CONTEXT_NAME = "voldemort_client_context";

/**
* Instantiate the client config using a properties file
Expand Down Expand Up @@ -276,6 +278,10 @@ private void setProperties(Properties properties) {

if(props.containsKey(MAX_BOOTSTRAP_RETRIES))
this.setMaxBootstrapRetries(props.getInt(MAX_BOOTSTRAP_RETRIES));

if(props.containsKey(CLIENT_CONTEXT_NAME)) {
this.setClientContextName(props.getString(CLIENT_CONTEXT_NAME, null));
}
}

public int getMaxConnectionsPerNode() {
Expand Down Expand Up @@ -692,4 +698,13 @@ public ClientConfig setMaxBootstrapRetries(int maxBootstrapRetries) {
return this;
}

public String getClientContextName() {
return clientContextName;
}

public ClientConfig setClientContextName(String clientContextName) {
this.clientContextName = clientContextName;
return this;
}

}
29 changes: 25 additions & 4 deletions src/java/voldemort/client/DefaultStoreClient.java
Expand Up @@ -18,6 +18,7 @@

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Map.Entry;

import org.apache.log4j.Logger;
Expand Down Expand Up @@ -63,29 +64,45 @@ public class DefaultStoreClient<K, V> implements StoreClient<K, V> {
private final String storeName;
private final InconsistencyResolver<Versioned<V>> resolver;
private volatile Store<K, V, Object> store;
private final UUID clientId;

public DefaultStoreClient(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
StoreClientFactory storeFactory,
int maxMetadataRefreshAttempts) {
this(storeName, resolver, storeFactory, maxMetadataRefreshAttempts, null, 0);
}

public DefaultStoreClient(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
StoreClientFactory storeFactory,
int maxMetadataRefreshAttempts,
String clientContext,
int clientSequence) {

this.storeName = Utils.notNull(storeName);
this.resolver = resolver;
this.storeFactory = Utils.notNull(storeFactory);
this.metadataRefreshAttempts = maxMetadataRefreshAttempts;

this.clientId = AbstractStoreClientFactory.generateClientId(storeName,
clientContext,
clientSequence);
// Registering self to be able to bootstrap client dynamically via JMX
JmxUtils.registerMbean(this,
JmxUtils.createObjectName(JmxUtils.getPackageName(this.getClass()),
JmxUtils.getClassName(this.getClass())
+ "." + storeName));

+ "." + clientContext + "."
+ storeName + "."
+ clientId.toString()));
bootStrap();
logger.info("Voldemort client created: clientContext=" + clientContext + " clientSequence="
+ clientSequence + " clientId=" + clientId.toString());
}

@JmxOperation(description = "bootstrap metadata from the cluster.")
public void bootStrap() {
logger.info("Bootstrapping metadata for store " + this.storeName);
this.store = storeFactory.getRawStore(storeName, resolver);
this.store = storeFactory.getRawStore(storeName, resolver, clientId);
}

public boolean delete(K key) {
Expand Down Expand Up @@ -355,4 +372,8 @@ else if(versions.size() == 1)
return put(key, versioned, transforms);

}

public UUID getClientId() {
return clientId;
}
}
9 changes: 8 additions & 1 deletion src/java/voldemort/client/MockStoreClientFactory.java
Expand Up @@ -18,6 +18,7 @@

import java.io.StringReader;
import java.util.List;
import java.util.UUID;

import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.cluster.failuredetector.NoopFailureDetector;
Expand Down Expand Up @@ -106,7 +107,8 @@ public <K, V> StoreClient<K, V> getStoreClient(String storeName,
}

public <K1, V1, T1> Store<K1, V1, T1> getRawStore(String storeName,
InconsistencyResolver<Versioned<V1>> resolver) {
InconsistencyResolver<Versioned<V1>> resolver,
UUID clientId) {
if(this.storesXml != null)
return getRawStore(storeName);

Expand All @@ -131,6 +133,11 @@ public <K1, V1, T1> Store<K1, V1, T1> getRawStore(String storeName,
return consistentStore;
}

public <K, V, T> Store<K, V, T> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver) {
return getRawStore(storeName, resolver, null);
}

private <K1, V1, T1> Store<K1, V1, T1> getRawStore(String storeName) {
List<StoreDefinition> storeDefs = storeMapper.readStoreList(new StringReader(storesXml));
StoreDefinition storeDef = null;
Expand Down
17 changes: 14 additions & 3 deletions src/java/voldemort/client/StoreClientFactory.java
Expand Up @@ -16,6 +16,8 @@

package voldemort.client;

import java.util.UUID;

import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.store.Store;
import voldemort.versioning.InconsistencyResolver;
Expand Down Expand Up @@ -65,16 +67,25 @@ public <K, V> StoreClient<K, V> getStoreClient(String storeName,
/**
* Get the underlying store, not the public StoreClient interface
*
* @param <K> The key type
* @param <V> The value type
* @param <T> The transform type
* @param storeName The name of the store
* @param resolver The inconsistency resolver
* @return The appropriate store
*/
<K, V, T> Store<K, V, T> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver);

/**
* Get the underlying store, not the public StoreClient interface
*
* @param storeName The name of the store
* @param resolver The inconsistency resolver
* @param clientId The unique id of the client
* @return The appropriate store
*/
<K, V, T> Store<K, V, T> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver,
UUID clientId);

/**
* Close the store client
*/
Expand Down

0 comments on commit b59973e

Please sign in to comment.