Skip to content

Commit

Permalink
Merge branch 'master' of git://github.com/voldemort/voldemort
Browse files Browse the repository at this point in the history
  • Loading branch information
jkreps committed Oct 30, 2009
2 parents 47ff456 + ebfba93 commit b2daa52
Show file tree
Hide file tree
Showing 17 changed files with 171 additions and 54 deletions.
2 changes: 1 addition & 1 deletion build.properties
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ tomcat.manager.password=tomcat
tomcat.context=/voldemort

## Release
curr.release=0.55
curr.release=0.56.1
42 changes: 23 additions & 19 deletions clients/cpp/src/Connection.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
/* -*- C++ -*-; c-basic-offset: 4; indent-tabs-mode: nil */
/*
* Implementation for SocketStore class.
*
*
* Copyright (c) 2009 Webroot Software, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
Expand All @@ -28,6 +28,10 @@
#include <sys/socket.h>
#include <cerrno>

#if !defined(MSG_NOSIGNAL) && defined(__APPLE__)
#define MSG_NOSIGNAL MSG_HAVEMORE
#endif

namespace Voldemort {

using namespace std;
Expand All @@ -39,7 +43,7 @@ Connection::Connection(const string& hostName,
const string& negString,
shared_ptr<ClientConfig>& conf)
: config(conf), host(hostName), port(portNum), negotiationString(negString),
io_service(), resolver(io_service), timer(io_service),
io_service(), resolver(io_service), timer(io_service),
socket(io_service), connbuf(NULL), connstream(NULL), active(false) {
}

Expand All @@ -57,15 +61,15 @@ void Connection::connect() {
boost::bind(&Connection::handle_resolve, this,
asio::placeholders::error,
asio::placeholders::iterator));

wait_for_operation(config->getConnectionTimeoutMs());
}

void Connection::check_error(const system::error_code& err) {
void Connection::check_error(const boost::system::error_code& err) {
if (err) {
active = false;
throw UnreachableStoreException(err.message());
}
}
}

void Connection::wait_for_operation(long millis)
Expand All @@ -91,22 +95,22 @@ void Connection::timeout() {
op_timeout = true;
}

void Connection::handle_resolve(const system::error_code& err,
void Connection::handle_resolve(const boost::system::error_code& err,
tcp::resolver::iterator endpoint_iterator) {
check_error(err);

// Attempt a connection to the first endpoint in the list. Each endpoint
// will be tried until we successfully establish a connection.
tcp::endpoint endpoint = *endpoint_iterator;
socket.async_connect(endpoint,
boost::bind(&Connection::handle_connect,
boost::bind(&Connection::handle_connect,
this,
asio::placeholders::error,
asio::placeholders::error,
++endpoint_iterator));

}

void Connection::handle_connect(const system::error_code& err,
void Connection::handle_connect(const boost::system::error_code& err,
tcp::resolver::iterator endpoint_iterator) {
if (!err) {
/* We're done with ASIO now, since it performs really poorly
Expand Down Expand Up @@ -141,9 +145,9 @@ void Connection::handle_connect(const system::error_code& err,
socket.close();
tcp::endpoint endpoint = *endpoint_iterator;
socket.async_connect(endpoint,
boost::bind(&Connection::handle_connect,
boost::bind(&Connection::handle_connect,
this,
asio::placeholders::error,
asio::placeholders::error,
++endpoint_iterator));
} else {
check_error(err);
Expand Down Expand Up @@ -175,7 +179,7 @@ size_t Connection::read_some(char* buffer, size_t bufferLen) {
}

#if 0
void Connection::handle_data_op(const system::error_code& err,
void Connection::handle_data_op(const boost::system::error_code& err,
size_t transferred) {
check_error(err);
bytesTransferred = transferred;
Expand Down Expand Up @@ -212,7 +216,7 @@ void Connection::close() {
resolver.cancel();
}

ConnectionBuffer::ConnectionBuffer(Connection& con)
ConnectionBuffer::ConnectionBuffer(Connection& con)
: conn(con), unbuffered_(false) {
init_buffers();
};
Expand All @@ -234,8 +238,8 @@ void ConnectionBuffer::init_buffers() {

int ConnectionBuffer::underflow() {
if (gptr() == egptr()) {
size_t bytes_transferred =
conn.read_some(get_buffer_.begin() + putback_max,
size_t bytes_transferred =
conn.read_some(get_buffer_.begin() + putback_max,
get_buffer_.size());
if (bytes_transferred < 0)
return traits_type::eof();
Expand Down Expand Up @@ -263,7 +267,7 @@ int ConnectionBuffer::overflow(int c) {
}
} else {
// Send all data in the output buffer.
size_t bytes_transferred =
size_t bytes_transferred =
conn.write(pbase(), pptr() - pbase());
if (bytes_transferred <= 0)
return traits_type::eof();
Expand Down
8 changes: 4 additions & 4 deletions clients/cpp/src/include/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,16 @@ class Connection {
private:
void wait_for_operation(long millis);
void timeout();
void handle_connect(const system::error_code& err,
void handle_connect(const boost::system::error_code& err,
tcp::resolver::iterator endpoint_iterator);

void handle_resolve(const system::error_code& err,
void handle_resolve(const boost::system::error_code& err,
tcp::resolver::iterator endpoint_iterator);
#if 0
void handle_data_op(const system::error_code& err,
void handle_data_op(const boost::system::error_code& err,
size_t transferred);
#endif
void check_error(const system::error_code& err);
void check_error(const boost::system::error_code& err);

shared_ptr<ClientConfig> config;

Expand Down
1 change: 1 addition & 0 deletions clients/cpp/utils/stress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <memory>
#include <vector>
#include <sstream>
#include <iostream>

#include <voldemort/voldemort.h>
#include <boost/thread/thread.hpp>
Expand Down
25 changes: 23 additions & 2 deletions src/java/voldemort/client/AbstractStoreClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public abstract class AbstractStoreClientFactory implements StoreClientFactory {
private final RequestFormatType requestFormatType;
private final MBeanServer mbeanServer;
private final int jmxId;
private final int maxBootstrapRetries;

public AbstractStoreClientFactory(ClientConfig config) {
this.threadPool = new ClientThreadPool(config.getMaxThreads(),
Expand All @@ -105,6 +106,7 @@ public AbstractStoreClientFactory(ClientConfig config) {
else
this.mbeanServer = null;
this.jmxId = jmxIdCounter.getAndIncrement();
this.maxBootstrapRetries = config.getMaxBootstrapRetries();
registerThreadPoolJmx(threadPool);
}

Expand Down Expand Up @@ -132,9 +134,9 @@ public <K, V> StoreClient<K, V> getStoreClient(String storeName,
public <K, V> Store<K, V> getRawStore(String storeName,
InconsistencyResolver<Versioned<V>> resolver) {
// Get cluster and store metadata
String clusterXml = bootstrapMetadata(MetadataStore.CLUSTER_KEY, bootstrapUrls);
String clusterXml = bootstrapMetadataWithRetries(MetadataStore.CLUSTER_KEY, bootstrapUrls);
Cluster cluster = clusterMapper.readCluster(new StringReader(clusterXml));
String storesXml = bootstrapMetadata(MetadataStore.STORES_KEY, bootstrapUrls);
String storesXml = bootstrapMetadataWithRetries(MetadataStore.STORES_KEY, bootstrapUrls);
List<StoreDefinition> storeDefs = storeMapper.readStoreList(new StringReader(storesXml));
StoreDefinition storeDef = null;
for(StoreDefinition d: storeDefs)
Expand Down Expand Up @@ -201,6 +203,25 @@ private CompressionStrategy getCompressionStrategy(SerializerDefinition serializ
return new CompressionStrategyFactory().get(serializerDef.getCompression());
}

private String bootstrapMetadataWithRetries(String key, URI[] urls) {
int nTries = 0;
while(nTries++ < this.maxBootstrapRetries) {
try {
return bootstrapMetadata(key, urls);
} catch(BootstrapFailureException e) {
int backOffTime = 5 * nTries;
logger.warn("Failed to bootstrap will try again after" + backOffTime);
try {
Thread.sleep(backOffTime * 1000);
} catch(InterruptedException e1) {
throw new RuntimeException(e1);
}
}
}

throw new BootstrapFailureException("No available boostrap servers found!");
}

private String bootstrapMetadata(String key, URI[] urls) {
for(URI url: urls) {
try {
Expand Down
16 changes: 16 additions & 0 deletions src/java/voldemort/client/ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ClientConfig {
private volatile RequestFormatType requestFormatType = RequestFormatType.VOLDEMORT_V1;
private volatile RoutingTier routingTier = RoutingTier.CLIENT;
private volatile boolean enableJmx = true;
private volatile int maxBootstrapRetries = 1;

public ClientConfig() {}

Expand All @@ -70,6 +71,7 @@ public ClientConfig() {}
public static final String BOOTSTRAP_URLS_PROPERTY = "bootstrap_urls";
public static final String REQUEST_FORMAT_PROPERTY = "request_format";
public static final String ENABLE_JMX_PROPERTY = "enable_jmx";
public static final String MAX_BOOTSTRAP_RETRIES = "max_bootstrap_retries";

/**
* Initiate the client config from a set of properties. This is useful for
Expand Down Expand Up @@ -127,6 +129,9 @@ public ClientConfig(Properties properties) {
if(props.containsKey(ENABLE_JMX_PROPERTY))
this.setEnableJmx(props.getBoolean(ENABLE_JMX_PROPERTY));

if(props.containsKey(MAX_BOOTSTRAP_RETRIES))
this.setMaxBootstrapRetries(props.getInt(MAX_BOOTSTRAP_RETRIES));

}

public int getMaxConnectionsPerNode() {
Expand Down Expand Up @@ -374,4 +379,15 @@ public ClientConfig setEnableJmx(boolean enableJmx) {
this.enableJmx = enableJmx;
return this;
}

public int getMaxBootstrapRetries() {
return maxBootstrapRetries;
}

public void setMaxBootstrapRetries(int maxBootstrapRetries) {
if(maxBootstrapRetries < 1)
throw new IllegalArgumentException("maxBootstrapRetries should be >= 1");

this.maxBootstrapRetries = maxBootstrapRetries;
}
}
4 changes: 4 additions & 0 deletions src/java/voldemort/server/storage/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ protected void stopInner() {
try {
store.close();
} catch(Exception e) {
logger.error(e);
lastException = e;
}
}
Expand All @@ -303,6 +304,7 @@ protected void stopInner() {
try {
store.close();
} catch(Exception e) {
logger.error(e);
lastException = e;
}
}
Expand All @@ -313,6 +315,7 @@ protected void stopInner() {
try {
this.storeRepository.getSlopStore().close();
} catch(Exception e) {
logger.error(e);
lastException = e;
}
}
Expand All @@ -324,6 +327,7 @@ protected void stopInner() {
try {
config.close();
} catch(Exception e) {
logger.error(e);
lastException = e;
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/java/voldemort/store/bdb/BdbStorageConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public String getType() {

public EnvironmentStats getStats(String storeName) {
StatsConfig config = new StatsConfig();
config.setFast(true);
config.setFast(false);
try {
Environment env = getEnvironment(storeName);
return env.getStats(config);
Expand All @@ -183,7 +183,9 @@ public EnvironmentStats getStats(String storeName) {

@JmxOperation(description = "A variety of stats about one BDB environment.")
public String getEnvStatsAsString(String storeName) throws Exception {
return getStats(storeName).toString();
String envStats = getStats(storeName).toString();
logger.debug("Bdb Environment stats:\n" + envStats);
return envStats;
}

public void close() {
Expand Down
16 changes: 9 additions & 7 deletions src/java/voldemort/store/bdb/BdbStorageEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.log4j.Logger;

import voldemort.VoldemortException;
import voldemort.annotations.jmx.JmxGetter;
import voldemort.annotations.jmx.JmxOperation;
import voldemort.serialization.IdentitySerializer;
import voldemort.serialization.Serializer;
import voldemort.serialization.VersionedSerializer;
Expand Down Expand Up @@ -167,7 +167,7 @@ private static <T> List<T> get(Cursor cursor,

for(OperationStatus status = cursor.getSearchKey(keyEntry, valueEntry, lockMode); status == OperationStatus.SUCCESS; status = cursor.getNextDup(keyEntry,
valueEntry,
LockMode.RMW)) {
lockMode)) {
results.add(serializer.toObject(valueEntry.getData()));
}
return results;
Expand Down Expand Up @@ -314,20 +314,22 @@ private static void attemptClose(Cursor cursor) {
}
}

public DatabaseStats getStats() {
public DatabaseStats getStats(boolean setFast) {
try {
StatsConfig config = new StatsConfig();
config.setFast(true);
config.setFast(setFast);
return this.bdbDatabase.getStats(config);
} catch(DatabaseException e) {
logger.error(e);
throw new VoldemortException(e);
}
}

@JmxGetter(name = "stats", description = "A variety of stats about the BDB for this store.")
public String getStatsAsString() {
return getStats().toString();
@JmxOperation(description = "A variety of stats about the BDB for this store.")
public String getBdbStats() {
String stats = getStats(false).toString();
logger.debug("Bdb store" + getName() + " stats:\n" + stats + "\n");
return stats;
}

private static class BdbStoreIterator implements
Expand Down
14 changes: 8 additions & 6 deletions src/java/voldemort/store/readonly/ReadOnlyStorageEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,15 @@ public void open() {
public void close() throws VoldemortException {
logger.debug("Close called for read-only store.");
this.fileModificationLock.writeLock().lock();
if(!isOpen) {
fileModificationLock.writeLock().unlock();
throw new IllegalStateException("Attempt to close non-open store.");
}

try {
this.isOpen = false;
fileSet.close();
if(isOpen) {
this.isOpen = false;
fileSet.close();
}
else {
logger.debug("Attempt to close already closed store " + getName());
}
} finally {
this.fileModificationLock.writeLock().unlock();
}
Expand Down
6 changes: 5 additions & 1 deletion src/java/voldemort/store/routed/RoutedStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,11 @@ public void run() {
} catch(UnreachableStoreException e) {
markUnavailable(node, e);
failures.add(e);
} catch(Exception e) {
} catch (ObsoleteVersionException e) {
// Do not log ObsoleteVersionException
failures.add(e);
}
catch(Exception e) {
logger.warn("Error in PUT on node " + node.getId() + "(" + node.getHost()
+ ")", e);
failures.add(e);
Expand Down
Loading

0 comments on commit b2daa52

Please sign in to comment.