Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'opensource_develop_os' into opensource_master

  • Loading branch information...
commit 9c48ec2cfea796d2e0ee5b8638dfb30e127e35ea 2 parents 427751d + 612366b
groys authored
Showing with 5,589 additions and 705 deletions.
  1. +9 −1 examples/scribe_ctrl
  2. +37 −0 if/bucketupdater.thrift
  3. +13 −5 src/HdfsFile.cpp
  4. +3 −2 src/HdfsFile.h
  5. +4 −1 src/Makefile.am
  6. +117 −18 src/conf.cpp
  7. +12 −7 src/conf.h
  8. +87 −53 src/conn_pool.cpp
  9. +16 −9 src/conn_pool.h
  10. +434 −0 src/dynamic_bucket_updater.cpp
  11. +237 −0 src/dynamic_bucket_updater.h
  12. +139 −0 src/env_default.cpp
  13. +38 −13 src/env_default.h
  14. +85 −40 src/file.cpp
  15. +2 −2 src/file.h
  16. +47 −0 src/network_dynamic_config.cpp
  17. +37 −0 src/network_dynamic_config.h
  18. +170 −229 src/scribe_server.cpp
  19. +23 −12 src/scribe_server.h
  20. +328 −198 src/store.cpp
  21. +21 −17 src/store.h
  22. +12 −28 src/store_queue.cpp
  23. +3 −1 src/store_queue.h
  24. +102 −0 test/basictest.php
  25. +106 −0 test/basictest2.php
  26. +3 −0  test/bidmap.1
  27. +3 −0  test/bidmap.2
  28. +23 −0 test/bucket_test.php
  29. +210 −0 test/buckettest.conf
  30. +130 −0 test/bucketupdater.php
  31. +9 −0 test/bucketupdater/TARGETS
  32. +104 −0 test/bucketupdater/main.cpp
  33. +99 −0 test/buffertest.php
  34. +122 −0 test/buffertest2.php
  35. +57 −0 test/categoriestest.php
  36. +47 −0 test/filecat.php
  37. +48 −0 test/paramtest.php
  38. +23 −0 test/reinit.php
  39. +179 −0 test/scribe.conf.basictest
  40. +181 −0 test/scribe.conf.basictest2
  41. +113 −0 test/scribe.conf.bucketupdater.central
  42. +102 −0 test/scribe.conf.bucketupdater.server1
  43. +102 −0 test/scribe.conf.bucketupdater.server2
  44. +179 −0 test/scribe.conf.buffertest
  45. +181 −0 test/scribe.conf.buffertest2
  46. +211 −0 test/scribe.conf.categoriestest
  47. +114 −0 test/scribe.conf.paramtest
  48. +84 −0 test/scribe.conf.scribehtest
  49. +127 −0 test/scribe.conf.simpletest
  50. +63 −65 test/scribe.conf.test
  51. +84 −0 test/scribe.conf.test.hdfs
  52. +76 −0 test/scribe.conf.twodefaulttest
  53. +81 −0 test/simpletest.php
  54. +43 −0 test/superstress.php
  55. +168 −4 test/tests.php
  56. +161 −0 test/testsuite.php
  57. +232 −0 test/testutil.php
  58. +148 −0 test/twodefaulttest.php
10 examples/scribe_ctrl
View
@@ -20,7 +20,15 @@
'''scribe_ctrl: A simple script for running and monitoring scribe.'''
import sys
-from fb303_scripts import *
+
+#make this work for facebook environment too
+isFacebook = 1
+if (isFacebook == 1):
+ # put your own path here!
+ sys.path.insert(0, '/mytrunk/fbcode-test/common/fb303/scripts')
+ import fb303_simple_mgmt
+else:
+ from fb303_scripts import *
# thrift python packages need to be installed
import thrift
37 if/bucketupdater.thrift
View
@@ -0,0 +1,37 @@
+#!/usr/local/bin/thrift --cpp --php
+
+## Copyright (c) 2009- Facebook
+##
+## Licensed under the Apache License, Version 2.0 (the "License");
+## you may not use this file except in compliance with the License.
+## You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+## See accompanying file LICENSE or visit the Scribe site at:
+## http://developers.facebook.com/scribe/
+
+namespace cpp scribe.thrift
+namespace java com.facebook.infrastructure.service
+
+// BucketStoreMapping service exception
+exception BucketStoreMappingException {
+ 1: string message;
+ 2: i32 code;
+}
+
+struct HostPort {
+ 2: string host,
+ 3: i32 port
+}
+
+service BucketStoreMapping {
+ // given a category, return a list of HashCodeToNetworkStore mappings
+ map<i32, HostPort> getMapping(1: string category) throws (1: BucketStoreMappingException e);
+}
18 src/HdfsFile.cpp
View
@@ -29,8 +29,9 @@ HdfsFile::HdfsFile(const std::string& name) : FileInterface(name, false), inputB
HdfsFile::~HdfsFile() {
if (fileSys) {
- LOG_OPER("[hdfs] disconnected fileSys for %s", filename.c_str());
+ LOG_OPER("[hdfs] disconnecting fileSys for %s", filename.c_str());
hdfsDisconnect(fileSys);
+ LOG_OPER("[hdfs] disconnected fileSys for %s", filename.c_str());
}
fileSys = 0;
hfile = 0;
@@ -95,14 +96,15 @@ bool HdfsFile::isOpen() {
void HdfsFile::close() {
if (fileSys) {
if (hfile) {
+ LOG_OPER("[hdfs] closing %s", filename.c_str());
hdfsCloseFile(fileSys, hfile );
- LOG_OPER("[hdfs] closed %s", filename.c_str());
}
hfile = 0;
// Close the file system
- LOG_OPER("[hdfs] disconnected fileSys for %s", filename.c_str());
+ LOG_OPER("[hdfs] disconnecting fileSys for %s", filename.c_str());
hdfsDisconnect(fileSys);
+ LOG_OPER("[hdfs] disconnected fileSys for %s", filename.c_str());
fileSys = 0;
}
}
@@ -167,12 +169,18 @@ void HdfsFile::listImpl(const std::string& path,
}
}
hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
+ // A NULL indicates error
+ } else {
+ throw std::runtime_error("hdfsListDirectory call failed");
}
+ } else if (value == -1) {
+ throw std::runtime_error("hdfsExists call failed");
}
}
-bool HdfsFile::readNext(std::string& _return) {
- return false; // frames not yet supported
+long HdfsFile::readNext(std::string& _return) {
+ /* choose a reasonable value for loss */
+ return (-1000 * 1000 * 1000);
}
string HdfsFile::getFrame(unsigned data_length) {
5 src/HdfsFile.h
View
@@ -24,7 +24,7 @@ class HdfsFile : public FileInterface {
bool write(const std::string& data);
void flush();
unsigned long fileSize();
- bool readNext(std::string& _return);
+ long readNext(std::string& _return);
void deleteFile();
void listImpl(const std::string& path, std::vector<std::string>& _return);
std::string getFrame(unsigned data_size);
@@ -75,8 +75,9 @@ class HdfsFile : public FileInterface {
void close() {};
bool write(const std::string& data) { return false; };
void flush() {};
+ void sync() {};
unsigned long fileSize() { return 0; };
- bool readNext(std::string& _return) { return false; };
+ long readNext(std::string& _return) { return false; };
void deleteFile() {};
void listImpl(const std::string& path, std::vector<std::string>& _return) {};
std::string getFrame(unsigned data_size) { return 0; };
5 src/Makefile.am
View
@@ -40,6 +40,9 @@ THRIFT = $(thrift_home)/bin/thrift
if FACEBOOK
FB_SOURCES = gen-cpp/ServiceManager_types.cpp gen-cpp/ServiceManager.cpp
FB_CPPFLAGS = -I$(fb_home)
+ ENV_SOURCES = env_facebook.cpp
+else
+ ENV_SOURCES = env_default.cpp
endif
if DEBUG
DEBUG_CPPFLAGS = -DDEBUG_TIMING
@@ -94,7 +97,7 @@ endif
# Binaries -- multiple progs can be defined.
bin_PROGRAMS = scribed
-scribed_SOURCES = store.cpp store_queue.cpp conf.cpp file.cpp conn_pool.cpp scribe_server.cpp $(FB_SOURCES)
+scribed_SOURCES = store.cpp store_queue.cpp conf.cpp file.cpp HdfsFile.cpp conn_pool.cpp scribe_server.cpp $(FB_SOURCES) $(ENV_SOURCES)
if USE_SCRIBE_HDFS
scribed_SOURCES += HdfsFile.cpp
endif
135 src/conf.cpp
View
@@ -17,20 +17,25 @@
//
// @author Bobby Johnson
// @author Jason Sobel
+// @author John Song
+#include <boost/algorithm/string.hpp>
#include "common.h"
#include "conf.h"
+#include "scribe_server.h"
using namespace boost;
using namespace std;
+extern shared_ptr<scribeHandler> g_Handler;
+
StoreConf::StoreConf() {
}
StoreConf::~StoreConf() {
}
-bool StoreConf::getStore(const std::string& storeName, pStoreConf& _return) {
+bool StoreConf::getStore(const string& storeName, pStoreConf& _return) {
store_conf_map_t::iterator iter = stores.find(storeName);
if (iter != stores.end()) {
_return = iter->second;
@@ -40,13 +45,17 @@ bool StoreConf::getStore(const std::string& storeName, pStoreConf& _return) {
}
}
-void StoreConf::getAllStores(std::vector<pStoreConf>& _return) {
+void StoreConf::setParent(pStoreConf pParent) {
+ parent = pParent;
+}
+
+void StoreConf::getAllStores(vector<pStoreConf>& _return) {
for (store_conf_map_t::iterator iter = stores.begin(); iter != stores.end(); ++iter) {
_return.push_back(iter->second);
}
}
-bool StoreConf::getInt(const std::string& intName, long int& _return) {
+bool StoreConf::getInt(const string& intName, long int& _return) const {
string str;
if (getString(intName, str)) {
_return = strtol(str.c_str(), NULL, 0);
@@ -56,7 +65,18 @@ bool StoreConf::getInt(const std::string& intName, long int& _return) {
}
}
-bool StoreConf::getUnsigned(const std::string& intName, unsigned long int& _return) {
+bool StoreConf::getFloat(const std::string& floatName, float & _return) const {
+ string str;
+ if (getString(floatName, str)) {
+ _return = strtof(str.c_str(), NULL);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool StoreConf::getUnsigned(const string& intName,
+ unsigned long int& _return) const {
string str;
if (getString(intName, str)) {
_return = strtoul(str.c_str(), NULL, 0);
@@ -66,7 +86,8 @@ bool StoreConf::getUnsigned(const std::string& intName, unsigned long int& _retu
}
}
-bool StoreConf::getUnsignedLongLong(const std::string& llName, unsigned long long& _return) {
+bool StoreConf::getUnsignedLongLong(const string& llName,
+ unsigned long long& _return) const {
string str;
if (getString(llName, str)) {
_return = strtoull(str.c_str(), NULL, 10);
@@ -76,43 +97,88 @@ bool StoreConf::getUnsignedLongLong(const std::string& llName, unsigned long lon
}
}
-bool StoreConf::getString(const std::string& stringName, std::string& _return) {
- string_map_t::iterator iter = values.find(stringName);
+bool StoreConf::getString(const string& stringName,
+ string& _return) const {
+ // allow parameter inheritance, i.e. if a named value is not found in the
+ // current store's configuration, we keep looking up the current StoreConf's
+ // ancestors until it is either found or we hit the root store.
+ // To avoid ambiguity, when searching for named parameter in the ancestor store
+ // we are looking for $type::$stringName where $type is the store type.
+ // check the current store conf
+
+ // first check the current store
+ string_map_t::const_iterator iter = values.find(stringName);
if (iter != values.end()) {
_return = iter->second;
return true;
- } else {
+ }
+
+ // "category", "categories", "type" parameters can't be inherited
+ string_map_t::const_iterator typeIter = values.find("type");
+ string storeType = typeIter == values.end() ? "" : typeIter->second;
+ if (storeType.empty()
+ || stringName == "type"
+ || stringName == "category"
+ || stringName == "categories") {
return false;
}
+
+ // not found
+ bool found = false;
+ string inheritedName = storeType + "::" + stringName;
+ // searching for type::stringName start with the current configuration
+ // this allows a parameter to be used by the current configuration
+ // and descendant stores. E.g.
+ // file::fs_type = std
+ // can be used by this file store and all descendant file stores.
+ for (const StoreConf* pconf = this; pconf;
+ pconf = const_cast<StoreConf*>(pconf->parent.get())) {
+ string_map_t::const_iterator iter = pconf->values.find(inheritedName);
+ if (iter != pconf->values.end()) {
+ _return = iter->second;
+ found = true;
+ break;
+ }
+ }
+ // if we didn't find any. then try g_Handler's config
+ if (!found) {
+ const StoreConf& gconf = g_Handler->getConfig();
+ string_map_t::const_iterator iter = gconf.values.find(inheritedName);
+ if (iter != gconf.values.end()) {
+ _return = iter->second;
+ found = true;
+ }
+ }
+ return found;
}
-void StoreConf::setString(const std::string& stringName, const std::string& value) {
+void StoreConf::setString(const string& stringName, const string& value) {
values[stringName] = value;
}
-void StoreConf::setUnsigned(const std::string& stringName, unsigned long value) {
+void StoreConf::setUnsigned(const string& stringName, unsigned long value) {
ostringstream oss;
oss << value;
setString(stringName, oss.str());
}
-void StoreConf::setUnsignedLongLong(const std::string& stringName, unsigned long long value) {
+void StoreConf::setUnsignedLongLong(const string& stringName, unsigned long long value) {
ostringstream oss;
oss << value;
setString(stringName, oss.str());
}
// reads and parses the config data
-void StoreConf::parseConfig(const std::string& filename) {
+void StoreConf::parseConfig(const string& filename) {
queue<string> config_strings;
if (readConfFile(filename, config_strings)) {
LOG_OPER("got configuration data from file <%s>", filename.c_str());
} else {
- std::ostringstream msg;
+ ostringstream msg;
msg << "Failed to open config file <" << filename << ">";
- throw std::runtime_error(msg.str());
+ throw runtime_error(msg.str());
}
parseStore(config_strings, this);
@@ -164,7 +230,7 @@ bool StoreConf::parseStore(queue<string>& raw_config, /*out*/ StoreConf* parsed_
if (0 == store_name.compare("store")) {
// This is a special case for the top-level stores. They share
// the same name, so we append an index to put them in the map
- std::ostringstream oss;
+ ostringstream oss;
oss << store_index;
store_name += oss.str();
++store_index;
@@ -212,18 +278,51 @@ string StoreConf::trimString(const string& str) {
// reads every line from the file and pushes then onto _return
// returns false on error
bool StoreConf::readConfFile(const string& filename, queue<string>& _return) {
- std::string line;
- std::ifstream config_file;
+ string line;
+ ifstream config_file;
config_file.open(filename.c_str());
if (!config_file.good()) {
return false;
}
- while (std::getline(config_file, line)) {
+ while (getline(config_file, line)) {
_return.push(line);
}
config_file.close();
return true;
}
+
+// serialize StoreConf
+ostream& operator<<(ostream& os, const StoreConf& sconf) {
+ return sconf.print(os, 0);
+}
+
+static string indent(uint32_t depth, bool useSpace, uint32_t tabw) {
+ int len = useSpace ? depth * tabw : depth;
+ return string(len, useSpace ? ' ' : '\t');
+}
+
+ostream& StoreConf::print(ostream& os, uint32_t depth,
+ bool useSpace, uint32_t tabw) const {
+ // we only need to iterator through keys. as map guaranteed keys
+ // are weakly ordered, so we will get consistent output.
+ for (string_map_t::const_iterator iter = values.begin();
+ iter != values.end(); iter++) {
+ int len = useSpace ? depth * tabw : depth;
+ os << indent(depth, useSpace, tabw) << iter->first
+ << "=" << iter->second << endl;
+ }
+ // print out sub stores
+ for (store_conf_map_t::const_iterator iter = stores.begin();
+ iter != stores.end(); iter++) {
+ os << indent(depth, useSpace, tabw) << "<" << iter->first << ">"
+ << endl;
+ iter->second->print(os, depth + 1, useSpace, tabw);
+ os << indent(depth, useSpace, tabw) << "</" << iter->first << ">"
+ << endl;
+ }
+
+ return os;
+}
19 src/conf.h
View
@@ -42,8 +42,10 @@ typedef boost::shared_ptr<StoreConf> pStoreConf;
typedef std::map<std::string, std::string> string_map_t;
typedef std::map<std::string, pStoreConf> store_conf_map_t;
-class StoreConf {
+std::ostream& operator<<(std::ostream& os, const StoreConf& storeConf);
+class StoreConf {
+ friend std::ostream& operator<<(std::ostream& os, const StoreConf& storeConf);
public:
StoreConf();
virtual ~StoreConf();
@@ -53,10 +55,11 @@ class StoreConf {
// The return parameter is untouched if the key isn't found.
void getAllStores(std::vector<pStoreConf>& _return);
bool getStore(const std::string& storeName, pStoreConf& _return);
- bool getInt(const std::string& intName, long int& _return);
- bool getUnsigned(const std::string& intName, unsigned long int& _return);
- bool getUnsignedLongLong(const std::string& intName, unsigned long long& _return);
- bool getString(const std::string& stringName, std::string& _return);
+ bool getInt(const std::string& intName, long int& _return) const;
+ bool getUnsigned(const std::string& intName, unsigned long int& _return) const;
+ bool getUnsignedLongLong(const std::string& intName, unsigned long long& _return) const;
+ bool getFloat(const std::string& floatName, float & _return) const;
+ bool getString(const std::string& stringName, std::string& _return) const;
void setString(const std::string& stringName, const std::string& value);
void setUnsigned(const std::string& intName, unsigned long value);
@@ -64,15 +67,17 @@ class StoreConf {
// Reads configuration from a file and throws an exception if it fails.
void parseConfig(const std::string& filename);
-
+ void setParent(pStoreConf parent);
private:
string_map_t values;
store_conf_map_t stores;
-
+ pStoreConf parent;
static bool parseStore(/*in,out*/ std::queue<std::string>& raw_config,
/*out*/ StoreConf* parsed_config);
static std::string trimString(const std::string& str);
bool readConfFile(const std::string& filename, std::queue<std::string>& _return);
+ std::ostream& print(std::ostream& os, uint32_t depth, bool useSpace = true,
+ uint32_t tabw = 2) const;
};
#endif //!defined SCRIBE_CONF_H
140 src/conn_pool.cpp
View
@@ -55,12 +55,12 @@ string ConnPool::makeKey(const string& hostname, unsigned long port) {
}
bool ConnPool::open(const string& hostname, unsigned long port, int timeout) {
- return openCommon(makeKey(hostname, port),
+ return openCommon(makeKey(hostname, port),
shared_ptr<scribeConn>(new scribeConn(hostname, port, timeout)));
}
bool ConnPool::open(const string &service, const server_vector_t &servers, int timeout) {
- return openCommon(service,
+ return openCommon(service,
shared_ptr<scribeConn>(new scribeConn(service, servers, timeout)));
}
@@ -72,18 +72,20 @@ void ConnPool::close(const string &service) {
closeCommon(service);
}
-bool ConnPool::send(const string& hostname, unsigned long port,
+int ConnPool::send(const string& hostname, unsigned long port,
shared_ptr<logentry_vector_t> messages) {
return sendCommon(makeKey(hostname, port), messages);
}
-bool ConnPool::send(const string &service,
+int ConnPool::send(const string &service,
shared_ptr<logentry_vector_t> messages) {
return sendCommon(service, messages);
}
bool ConnPool::openCommon(const string &key, shared_ptr<scribeConn> conn) {
+#define RETURN(x) {pthread_mutex_unlock(&mapMutex); return(x);}
+
// note on locking:
// The mapMutex locks all reads and writes to the connMap.
// The locks on each connection serialize writes and deletion.
@@ -94,23 +96,31 @@ bool ConnPool::openCommon(const string &key, shared_ptr<scribeConn> conn) {
pthread_mutex_lock(&mapMutex);
conn_map_t::iterator iter = connMap.find(key);
if (iter != connMap.end()) {
- (*iter).second->addRef();
- pthread_mutex_unlock(&mapMutex);
- return true;
- } else {
- // don't need to lock the conn yet, because no one know about
- // it until we release the mapMutex
+ shared_ptr<scribeConn> old_conn = (*iter).second;
+ if (old_conn->isOpen()) {
+ old_conn->addRef();
+ RETURN(true);
+ }
if (conn->open()) {
- // ref count starts at one, so don't addRef here
+ LOG_OPER("CONN_POOL: switching to a new connection <%s>", key.c_str());
+ conn->setRef(old_conn->getRef());
+ conn->addRef();
+ // old connection will be magically deleted by shared_ptr
connMap[key] = conn;
- pthread_mutex_unlock(&mapMutex);
- return true;
- } else {
- // conn object that failed to open is deleted
- pthread_mutex_unlock(&mapMutex);
- return false;
+ RETURN(true);
}
+ RETURN(false);
+ }
+ // don't need to lock the conn yet, because no one know about
+ // it until we release the mapMutex
+ if (conn->open()) {
+ // ref count starts at one, so don't addRef here
+ connMap[key] = conn;
+ RETURN(true);
}
+ // conn object that failed to open is deleted
+ RETURN(false);
+#undef RETURN
}
void ConnPool::closeCommon(const string &key) {
@@ -131,26 +141,26 @@ void ConnPool::closeCommon(const string &key) {
pthread_mutex_unlock(&mapMutex);
}
-bool ConnPool::sendCommon(const string &key,
+int ConnPool::sendCommon(const string &key,
shared_ptr<logentry_vector_t> messages) {
pthread_mutex_lock(&mapMutex);
conn_map_t::iterator iter = connMap.find(key);
if (iter != connMap.end()) {
(*iter).second->lock();
pthread_mutex_unlock(&mapMutex);
- bool result = (*iter).second->send(messages);
+ int result = (*iter).second->send(messages);
(*iter).second->unlock();
return result;
} else {
LOG_OPER("send failed. No connection pool entry for <%s>", key.c_str());
pthread_mutex_unlock(&mapMutex);
- return false;
+ return (CONN_FATAL);
}
}
scribeConn::scribeConn(const string& hostname, unsigned long port, int timeout_)
: refCount(1),
- smcBased(false),
+ serviceBased(false),
remoteHost(hostname),
remotePort(port),
timeout(timeout_) {
@@ -159,8 +169,8 @@ scribeConn::scribeConn(const string& hostname, unsigned long port, int timeout_)
scribeConn::scribeConn(const string& service, const server_vector_t &servers, int timeout_)
: refCount(1),
- smcBased(true),
- smcService(service),
+ serviceBased(true),
+ serviceName(service),
serverList(servers),
timeout(timeout_) {
pthread_mutex_init(&mutex, NULL);
@@ -182,6 +192,10 @@ unsigned scribeConn::getRef() {
return refCount;
}
+void scribeConn::setRef(unsigned r) {
+ refCount = r;
+}
+
void scribeConn::lock() {
pthread_mutex_lock(&mutex);
}
@@ -197,7 +211,7 @@ bool scribeConn::isOpen() {
bool scribeConn::open() {
try {
- socket = smcBased ?
+ socket = serviceBased ?
shared_ptr<TSocket>(new TSocketPool(serverList)) :
shared_ptr<TSocket>(new TSocket(remoteHost, remotePort));
@@ -208,6 +222,17 @@ bool scribeConn::open() {
socket->setConnTimeout(timeout);
socket->setRecvTimeout(timeout);
socket->setSendTimeout(timeout);
+ /*
+ * We don't want to send resets to close the connection. Among
+ * other badness it also reduces data reliability. On getting a
+ * rest, the receiving socket will throw any data the receving
+ * process has not yet read.
+ *
+ * echo 5 > /proc/sys/net/ipv4/tcp_fin_timeout to set the TIME_WAIT
+ * timeout on a system.
+ * sysctl -a | grep tcp
+ */
+ socket->setLinger(0, 0);
framedTransport = shared_ptr<TFramedTransport>(new TFramedTransport(socket));
if (!framedTransport) {
@@ -224,14 +249,14 @@ bool scribeConn::open() {
}
framedTransport->open();
- if (smcBased) {
+ if (serviceBased) {
remoteHost = socket->getPeerHost();
}
- } catch (TTransportException& ttx) {
+ } catch (const TTransportException& ttx) {
LOG_OPER("failed to open connection to remote scribe server %s thrift error <%s>",
connectionString().c_str(), ttx.what());
return false;
- } catch (std::exception& stx) {
+ } catch (const std::exception& stx) {
LOG_OPER("failed to open connection to remote scribe server %s std error <%s>",
connectionString().c_str(), stx.what());
return false;
@@ -244,20 +269,19 @@ bool scribeConn::open() {
void scribeConn::close() {
try {
framedTransport->close();
- } catch (TTransportException& ttx) {
+ } catch (const TTransportException& ttx) {
LOG_OPER("error <%s> while closing connection to remote scribe server %s",
ttx.what(), connectionString().c_str());
}
}
-bool scribeConn::send(boost::shared_ptr<logentry_vector_t> messages) {
+int
+scribeConn::send(boost::shared_ptr<logentry_vector_t> messages) {
+ bool fatal;
int size = messages->size();
- if (size <= 0) {
- return true;
- }
if (!isOpen()) {
if (!open()) {
- return false;
+ return (CONN_FATAL);
}
}
@@ -279,30 +303,40 @@ bool scribeConn::send(boost::shared_ptr<logentry_vector_t> messages) {
g_Handler->incCounter("sent", size);
LOG_OPER("Successfully sent <%d> messages to remote scribe server %s",
size, connectionString().c_str());
- return true;
- } else {
- LOG_OPER("Failed to send <%d> messages, remote scribe server %s returned error code <%d>",
- size, connectionString().c_str(), (int) result);
+ return (CONN_OK);
}
- } catch (TTransportException& ttx) {
- LOG_OPER("Failed to send <%d> messages to remote scribe server %s error <%s>",
- size, connectionString().c_str(),
- ttx.what());
+ fatal = false;
+ LOG_OPER("Failed to send <%d> messages, remote scribe server %s "
+ "returned error code <%d>", size, connectionString().c_str(),
+ (int) result);
+ } catch (const TTransportException& ttx) {
+ fatal = true;
+ LOG_OPER("Failed to send <%d> messages to remote scribe server %s "
+ "error <%s>", size, connectionString().c_str(), ttx.what());
} catch (...) {
- LOG_OPER("Unknown exception sending <%d> messages to remote scribe server %s",
- size, connectionString().c_str());
+ fatal = true;
+ LOG_OPER("Unknown exception sending <%d> messages to remote scribe "
+ "server %s", size, connectionString().c_str());
+ }
+ /*
+ * If this is a serviceBased connection then close it. We might
+ * be lucky and get another service when we reopen this connection.
+ * If the IP:port of the remote is fixed then no point closing this
+ * connection ... we are going to get the same connection back.
+ */
+ if (serviceBased || fatal) {
+ close();
+ return (CONN_FATAL);
}
- // we only get here if sending failed
- close();
- return false;
+ return (CONN_TRANSIENT);
}
std::string scribeConn::connectionString() {
- if (smcBased) {
- return "<" + remoteHost + " SMC service: " + smcService + ">";
- } else {
- char port[10];
- snprintf(port, 10, "%lu", remotePort);
- return "<" + remoteHost + ":" + string(port) + ">";
- }
+ if (serviceBased) {
+ return "<" + remoteHost + " Service: " + serviceName + ">";
+ } else {
+ char port[10];
+ snprintf(port, 10, "%lu", remotePort);
+ return "<" + remoteHost + ":" + string(port) + ">";
+ }
}
25 src/conn_pool.h
View
@@ -24,6 +24,12 @@
#include "common.h"
+/* return codes for ScribeConn and ConnPool */
+#define CONN_FATAL (-1) /* fatal error. close everything */
+#define CONN_OK (0) /* success */
+#define CONN_TRANSIENT (1) /* transient error */
+
+// Basic scribe class to manage network connections. Used by network store
class scribeConn {
public:
scribeConn(const std::string& host, unsigned long port, int timeout);
@@ -33,6 +39,7 @@ class scribeConn {
void addRef();
void releaseRef();
unsigned getRef();
+ void setRef(unsigned);
void lock();
void unlock();
@@ -40,7 +47,7 @@ class scribeConn {
bool isOpen();
bool open();
void close();
- bool send(boost::shared_ptr<logentry_vector_t> messages);
+ int send(boost::shared_ptr<logentry_vector_t> messages);
private:
std::string connectionString();
@@ -53,8 +60,8 @@ class scribeConn {
unsigned refCount;
- bool smcBased;
- std::string smcService;
+ bool serviceBased;
+ std::string serviceName;
server_vector_t serverList;
std::string remoteHost;
unsigned long remotePort;
@@ -62,13 +69,13 @@ class scribeConn {
pthread_mutex_t mutex;
};
-// key is hostname:port or the smc_service
+// key is hostname:port or the service
typedef std::map<std::string, boost::shared_ptr<scribeConn> > conn_map_t;
// Scribe class to manage connection pooling
-// Maintains a map of (<host,port> or smc_service) to scribeConn class.
+// Maintains a map of (<host,port> or service) to scribeConn class.
// used to ensure that there is only one connection from one particular
-// scribe server to any host,port or smc_service.
+// scribe server to any host,port or service.
// see the global g_connPool in store.cpp
class ConnPool {
public:
@@ -81,15 +88,15 @@ class ConnPool {
void close(const std::string& host, unsigned long port);
void close(const std::string &service);
- bool send(const std::string& host, unsigned long port,
+ int send(const std::string& host, unsigned long port,
boost::shared_ptr<logentry_vector_t> messages);
- bool send(const std::string &service,
+ int send(const std::string &service,
boost::shared_ptr<logentry_vector_t> messages);
private:
bool openCommon(const std::string &key, boost::shared_ptr<scribeConn> conn);
void closeCommon(const std::string &key);
- bool sendCommon(const std::string &key,
+ int sendCommon(const std::string &key,
boost::shared_ptr<logentry_vector_t> messages);
protected:
434 src/dynamic_bucket_updater.cpp
View
@@ -0,0 +1,434 @@
+#include <strstream>
+#include <iostream>
+#include "thrift/lib/cpp/transport/TBufferTransports.h"
+#include "scribe/src/dynamic_bucket_updater.h"
+#include "scribe/src/scribe_server.h"
+
+using namespace std;
+using namespace apache::thrift::concurrency;
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using namespace facebook;
+using namespace facebook::fb303;
+using namespace scribe::thrift;
+
+extern shared_ptr<scribeHandler> g_Handler;
+
+DynamicBucketUpdater* DynamicBucketUpdater::instance_ = NULL;
+Mutex DynamicBucketUpdater::instanceLock_;
+
+// bucket updater connection error
+const char* DynamicBucketUpdater::FB303_ERR_CONNECT = "bucketupdater.err.update_connect";
+// error calling bucketupdater.thrift
+const char* DynamicBucketUpdater::FB303_ERR_THRIFTCALL = "bucketupdater.err.thrift_call";
+// bucket updater return empty result
+const char* DynamicBucketUpdater::FB303_ERR_EMPTYRESULT = "bucketupdater.err.empty_result";
+// number of times a remote updater has been called
+const char* DynamicBucketUpdater::FB303_REMOTEUPDATE = "bucketupdater.remove_updater";
+// missing a bid mapping
+const char* DynamicBucketUpdater::FB303_ERR_NOMAPPING = "bucketupdater.err.nobidmapping";
+// number of buckets that have been updated
+const char* DynamicBucketUpdater::FB303_BUCKETSUPDATED = "bucketupdater.bucket_updated";
+
+bool DynamicBucketUpdater::getHost(const string& category,
+ const StoreConf* pconf,
+ string& host,
+ uint32_t& port) {
+ bool success = false;
+ // getHost to check what the current category/bid to host:port
+ // mapping is
+ string service, serviceOptions, updaterHost, updaterPort;
+ long int timeout = 1000; // 1 second default timeout
+ long int ttl = 60; // update every minute
+ long int bid;
+ pconf->getString("bucket_updater_service_options", serviceOptions);
+ pconf->getInt("bucket_updater_ttl", ttl);
+ pconf->getInt("bucket_id", bid);
+ pconf->getString("bucket_updater_service", service);
+ pconf->getString("bucket_updater_host", updaterHost);
+ pconf->getString("bucket_updater_port", updaterPort);
+ pconf->getInt("timeout", timeout);
+
+ if (!service.empty()) {
+ success = DynamicBucketUpdater::getHost(g_Handler.get(),
+ category,
+ ttl,
+ (uint32_t)bid,
+ host,
+ port,
+ service,
+ serviceOptions,
+ timeout, timeout, timeout);
+ } else {
+ success = DynamicBucketUpdater::getHost(g_Handler.get(),
+ category,
+ ttl,
+ (uint32_t)bid,
+ host,
+ port,
+ updaterHost,
+ atoi(updaterPort.c_str()),
+ timeout, timeout, timeout);
+ }
+
+ if (!success) {
+ LOG_OPER("[%s] dynamic bucket updater failed: bid=%ld",
+ category.c_str(), bid);
+ }
+ return success;
+}
+
+bool DynamicBucketUpdater::isConfigValid(const string& category,
+ const StoreConf* pconf) {
+ // we need dynamic_updater_bid paramter
+ string bid;
+ if (!pconf->getString("bucket_id", bid)) {
+ LOG_OPER("[%s] dynamic bucket updater configuration invalid. Missing bucket_id. Is the network a descendant of a bucket store?", category.c_str());
+ return false;
+ }
+
+ // requires either dynamic_updater_host and networ_updater_port, or
+ // dynamic_updater_service
+ string host, port, service;
+ if (!pconf->getString("bucket_updater_service", service) &&
+ (!pconf->getString("bucket_updater_host", host) ||
+ !pconf->getString("bucket_updater_port", port))) {
+ LOG_OPER("[%s] dynamic bucket updater configuration invalid. Either bucket_updater_service or bucket_updater_host and bucket_updater_port is needed. Current values are: bucket_updater_service=<%s>, bucket_updater_host=<%s>, bucket_updater_port=<%s>",
+ category.c_str(), service.c_str(), host.c_str(), port.c_str());
+ return false;
+ }
+ return true;
+}
+
+/**
+ * Return host, port given a key and bucket id, bid, combination.
+ * If a mapping is found, the result will be returned in out parameter and
+ * function returns true. Otherwise, function returns false and output
+ * parameters are not modified.
+ *
+ * @param fbBase ponter to FacebookBase
+ * @param category the category name, or any identifier that uniquely
+ * identifies a bucket store.
+ * @param ttl ttl in seconds
+ * @param bid bucket id
+ * @param host the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param port the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param service service name
+ * @param connTimeout connection timeout
+ * @param sendTimeout send timeout
+ * @param recvTimeout receive timeout
+ */
+bool DynamicBucketUpdater::getHost(facebook::fb303::FacebookBase *fbBase,
+ const string &category,
+ uint32_t ttl,
+ uint64_t bid,
+ string &host,
+ uint32_t &port,
+ string updateHost,
+ uint32_t updatePort,
+ uint32_t connTimeout,
+ uint32_t sendTimeout,
+ uint32_t recvTimeout) {
+ DynamicBucketUpdater *instance = DynamicBucketUpdater::getInstance(fbBase);
+
+ instance->lock_.lock();
+
+ bool ret = instance->getHostInternal(category, ttl, bid,
+ host, port, updateHost,
+ updatePort, connTimeout,
+ sendTimeout, recvTimeout);
+ instance->lock_.unlock();
+ return ret;
+}
+
+/**
+ * Return host, port given a key and bucket id, bid, combination.
+ * If a mapping is found, the result will be returned in out parameter and
+ * function returns true. Otherwise, function returns false and output
+ * parameters are not modified.
+ *
+ * @param fbBase ponter to FacebookBase
+ * @param category the category name, or any identifier that uniquely
+ * identifies a bucket store.
+ * @param ttl ttl in seconds
+ * @param bid bucket id
+ * @param host the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param port the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ */
+bool DynamicBucketUpdater::getHost(facebook::fb303::FacebookBase *fbBase,
+ const string &category,
+ uint32_t ttl,
+ uint64_t bid,
+ string &host,
+ uint32_t &port,
+ string serviceName,
+ string serviceOptions,
+ uint32_t connTimeout,
+ uint32_t sendTimeout,
+ uint32_t recvTimeout) {
+ server_vector_t servers;
+ bool success = scribe::network_config::getService(serviceName,
+ serviceOptions,
+ servers);
+
+ // Cannot open if we couldn't find any servers
+ if (!success || servers.empty()) {
+ LOG_OPER("[%s] Failed to get servers from Service [%s] "
+ "for dynamic bucket updater",
+ category.c_str(), serviceName.c_str());
+
+ return false;
+ }
+
+ // randomly pick one from the service
+ int which = rand() % servers.size();
+ string updateHost = servers[which].first;
+ uint32_t updatePort = servers[which].second;
+ return DynamicBucketUpdater::getHost(fbBase, category, ttl, bid,
+ host, port,
+ updateHost, updatePort,
+ connTimeout, sendTimeout,
+ recvTimeout);
+}
+
+/**
+ * actual implementation of getHost.
+ *
+ * @param category the category name, or any identifier that uniquely
+ * identifies a bucket store.
+ * @param ttl ttl in seconds
+ * @param bid bucket id
+ * @param host the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param port the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ */
+bool DynamicBucketUpdater::getHostInternal(const string &category,
+ uint32_t ttl,
+ uint64_t bid,
+ string &host,
+ uint32_t &port,
+ string updateHost,
+ uint32_t updatePort,
+ uint32_t connTimeout,
+ uint32_t sendTimeout,
+ uint32_t recvTimeout) {
+ time_t now = time(NULL);
+
+ // periodic check whether we need to fetch the mapping, or need to
+ // update
+ CatBidToHostMap::const_iterator iter = catMap_.find(category);
+ if (iter == catMap_.end() || iter->second.lastUpdated_ + ttl < now) {
+ periodicCheck(category,
+ ttl,
+ updateHost,
+ updatePort,
+ connTimeout, sendTimeout, recvTimeout);
+ iter = catMap_.find(category);
+ // check again.
+ if (iter == catMap_.end()) {
+ return false;
+ }
+ }
+
+ const CategoryEntry &catEnt = iter->second;
+ map<uint64_t, HostEntry>::const_iterator bidIter = catEnt.bidMap_.find(bid);
+
+ bool ret = false;
+ if (bidIter != catEnt.bidMap_.end()) {
+ const HostEntry &entry = bidIter->second;
+ host = entry.host_;
+ port = entry.port_;
+ ret = true;
+ } else {
+ ostringstream oss;
+ oss << "Missing mapping for category " << category << ", bid: " << bid
+ << ", updateHost: " << updateHost << ", updatePort: " << updatePort;
+ LOG_OPER(oss.str());
+ addStatValue(DynamicBucketUpdater::FB303_ERR_NOMAPPING, 1);
+ }
+
+ return ret;
+}
+
+/**
+ * Given a category name, remote host:port, current time, and category
+ * mapping time to live (ttl), check whether we need to update the
+ * category mapping. If so query bucket mapping
+ * using bucketupdater thrift interface and update internal category,
+ * bucket id to host mappings.
+ *
+ * This function takes care of try/catch and locking. The bulk of the
+ * update logic is delegated to updateInternal.
+ *
+ * @param category category or key that uniquely identifies this updater.
+ * @param ttl ttl in seconds
+ * @param host remote host that will be used to retrieve bucket mapping
+ * @param port remote port that will be used to retrieve bucket mapping
+ * @param connTimeout connection timeout
+ * @param sendTimeout send time out
+ * @param recvTimeout receive time out
+ *
+ * @return true if successful. false otherwise.
+ */
+bool DynamicBucketUpdater::periodicCheck(string category,
+ uint32_t ttl,
+ string host,
+ uint32_t port,
+ uint32_t connTimeout,
+ uint32_t sendTimeout,
+ uint32_t recvTimeout) {
+ bool ret = false;
+ try {
+ ret = updateInternal(category,
+ ttl,
+ host,
+ port,
+ connTimeout,
+ sendTimeout,
+ recvTimeout);
+ } catch (const TTransportException& ttx) {
+ LOG_OPER("periodicCheck(%s, %s, %u, %d, %d, %d) TTransportException: %s",
+ category.c_str(), host.c_str(), port,
+ connTimeout, sendTimeout, recvTimeout,
+ ttx.what());
+ addStatValue(DynamicBucketUpdater::FB303_ERR_THRIFTCALL, 1);
+ ret = false;
+ } catch (const BucketStoreMappingException& bex) {
+ ostringstream oss;
+ LOG_OPER("periodicCheck(%s, %s, %u, %d, %d, %d) TTransportException: %s",
+ category.c_str(), host.c_str(), port,
+ connTimeout, sendTimeout, recvTimeout,
+ bex.message.c_str());
+ addStatValue(DynamicBucketUpdater::FB303_ERR_THRIFTCALL, 1);
+ ret = false;
+ }
+
+ return ret;
+}
+
+/**
+ * Given a category name, remote host and port, query bucket mapping
+ * using bucketupdater thrift interface and update internal category,
+ * bucket id to host mappings.
+ *
+ * @param category category or other uniquely identifiable key
+ * @param ttl ttl in seconds
+ * @param remoteHost remote host that will be used to retrieve bucket mapping
+ * @param remotePort remote port that will be used to retrieve bucket mapping
+ * @param connTimeout connection timeout
+ * @param sendTimeout send time out
+ * @param recvTimeout receive time out
+ *
+ * @return true if successful. false otherwise.
+ */
+bool DynamicBucketUpdater::updateInternal(
+ string category,
+ uint32_t ttl,
+ string remoteHost,
+ uint32_t remotePort,
+ uint32_t connTimeout,
+ uint32_t sendTimeout,
+ uint32_t recvTimeout) {
+ addStatValue(DynamicBucketUpdater::FB303_REMOTEUPDATE, 1);
+
+ // remove the current mapping
+ CatBidToHostMap::iterator catIter = catMap_.find(category);
+ if (catIter != catMap_.end()) {
+ catMap_.erase(catIter);
+ }
+
+ shared_ptr<TSocket> socket = shared_ptr<TSocket>(
+ new TSocket(remoteHost, remotePort));
+
+ if (!socket) {
+ addStatValue(DynamicBucketUpdater::FB303_ERR_CONNECT, 1);
+ ostringstream oss;
+ oss << "Failed to create socket in bucket updater("
+ << category
+ << ", "
+ << remoteHost
+ << ", "
+ << remotePort
+ << ")";
+ LOG_OPER(oss.str());
+ return false;
+ }
+
+ socket->setConnTimeout(connTimeout);
+ socket->setRecvTimeout(recvTimeout);
+ socket->setSendTimeout(sendTimeout);
+
+ shared_ptr<TFramedTransport> framedTransport = shared_ptr<TFramedTransport>(
+ new TFramedTransport(socket));
+ framedTransport->open();
+ shared_ptr<TBinaryProtocol> protocol = shared_ptr<TBinaryProtocol>(
+ new TBinaryProtocol(framedTransport));
+
+ // no strict version checking
+ protocol->setStrict(false, false);
+ BucketStoreMappingClient client(protocol);
+ map<int32_t, HostPort> mapping;
+ client.getMapping(mapping, category);
+
+ if (mapping.size() == 0) {
+ addStatValue(DynamicBucketUpdater::FB303_ERR_EMPTYRESULT, 1);
+
+ return false;
+ }
+
+ CategoryEntry catEntry(category, ttl);
+ catEntry.lastUpdated_ = time(NULL);
+ // update bucket id host mappings
+ for (map<int32_t, HostPort>::const_iterator iter = mapping.begin();
+ iter != mapping.end(); ++iter) {
+ uint32_t bid = iter->first;
+ const HostPort &hp = iter->second;
+ HostEntry hentry;
+ hentry.host_ = hp.host;
+ hentry.port_ = hp.port;
+ catEntry.bidMap_[bid] = hentry;
+ }
+ catMap_[category] = catEntry;
+
+ // increment the counter for number of buckets updated
+ addStatValue(DynamicBucketUpdater::FB303_BUCKETSUPDATED, mapping.size());
+
+ return true;
+}
+
+DynamicBucketUpdater* DynamicBucketUpdater::getInstance(
+ FacebookBase *fbBase) {
+ if (DynamicBucketUpdater::instance_) {
+ return DynamicBucketUpdater::instance_;
+ }
+
+ Guard g(DynamicBucketUpdater::instanceLock_);
+ if (!DynamicBucketUpdater::instance_) {
+ DynamicBucketUpdater::instance_ = new DynamicBucketUpdater(fbBase);
+ }
+ return DynamicBucketUpdater::instance_;
+}
+
+/**
+ * Setup fb303 counters.
+ */
+void DynamicBucketUpdater::initFb303Counters() {
+ if (fbBase_) {
+ // initialize fb303 time series based counters
+ fbBase_->addStatExportType(
+ DynamicBucketUpdater::FB303_ERR_CONNECT, stats::SUM);
+ fbBase_->addStatExportType(
+ DynamicBucketUpdater::FB303_ERR_EMPTYRESULT, stats::SUM);
+ fbBase_->addStatExportType(
+ DynamicBucketUpdater::FB303_REMOTEUPDATE, stats::SUM);
+ fbBase_->addStatExportType(
+ DynamicBucketUpdater::FB303_BUCKETSUPDATED, stats::SUM);
+ fbBase_->addStatExportType(
+ DynamicBucketUpdater::FB303_ERR_NOMAPPING, stats::SUM);
+ }
+}
237 src/dynamic_bucket_updater.h
View
@@ -0,0 +1,237 @@
+// Copyright (c) 2007-2008 Facebook
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// See accompanying file LICENSE or visit the Scribe site at:
+// http://developers.facebook.com/scribe/
+//
+// @author John Song
+
+#ifndef SCRIBE_DYNAMIC_BUCKET_UPDATER_H
+#define SCRIBE_DYNAMIC_BUCKET_UPDATER_H
+
+#include <map>
+#include "common.h"
+#include "common/fb303/cpp/FacebookBase.h"
+#include "scribe/if/gen-cpp/BucketStoreMapping.h"
+#include "scribe/src/conf.h"
+
+/**
+ * DynamicBucketUpdater updates a bucket store's bucket id to host:port
+ * mapping periodically using the bucketupdater.thrift interface.
+ */
+class DynamicBucketUpdater {
+ public:
+ // bucket updater connection error
+ static const char *FB303_ERR_CONNECT;
+ // error calling bucketupdater.thrift
+ static const char *FB303_ERR_THRIFTCALL;
+ // bucket updater return empty result
+ static const char *FB303_ERR_EMPTYRESULT;
+ // number of times a remote updater has been called
+ static const char *FB303_REMOTEUPDATE;
+ // number of buckets that have been updated
+ static const char *FB303_BUCKETSUPDATED;
+ // missing a bid mapping
+ static const char *FB303_ERR_NOMAPPING;
+
+ static bool getHost(const string& category, const StoreConf* pconf, std::string& host, uint32_t& port);
+
+ static bool isConfigValid(const string& category, const StoreConf* pconf);
+
+ /**
+ * Return host, port given a key and bucket id, bid, combination.
+ * If a mapping is found, the result will be returned in out parameter and
+ * function returns true. Otherwise, function returns false and output
+ * parameters are not modified.
+ *
+ * @param fbBase ponter to FacebookBase
+ * @param category the category name, or any identifier that uniquely
+ * identifies a bucket store.
+ * @param ttl ttl in seconds
+ * @param bid bucket id
+ * @param host the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param port the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param service service name
+ * @param connTimeout connection timeout
+ * @param sendTimeout send timeout
+ * @param recvTimeout receive timeout
+ */
+ static bool getHost(facebook::fb303::FacebookBase *fbBase,
+ const string &category,
+ uint32_t ttl,
+ uint64_t bid,
+ string &host,
+ uint32_t &port,
+ string service,
+ string serviceOption,
+ uint32_t connTimeout = 150,
+ uint32_t sendTimeout = 150,
+ uint32_t recvTimeout = 150);
+
+ /**
+ * Return host, port given a key and bucket id, bid, combination.
+ * If a mapping is found, the result will be returned in out parameter and
+ * function returns true. Otherwise, function returns false and output
+ * parameters are not modified.
+ *
+ * @param fbBase ponter to FacebookBase
+ * @param category the category name, or any identifier that uniquely
+ * identifies a bucket store.
+ * @param ttl ttl in seconds
+ * @param bid bucket id
+ * @param host the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param port the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param updateHost remote host to fetch mapping
+ * @param updatePort remote port to fetch mapping
+ * @param connTimeout connection timeout
+ * @param sendTimeout send timeout
+ * @param recvTimeout receive timeout
+ */
+ static bool getHost(facebook::fb303::FacebookBase *fbBase,
+ const string &category,
+ uint32_t ttl,
+ uint64_t bid,
+ string &host,
+ uint32_t &port,
+ string updateHost,
+ uint32_t updatePort,
+ uint32_t connTimeout = 150,
+ uint32_t sendTimeout = 150,
+ uint32_t recvTimeout = 150);
+
+ protected:
+ /**
+ * actual implementation of getHost.
+ *
+ * @param category the category name, or any identifier that uniquely
+ * identifies a bucket store.
+ * @param ttl ttl in seconds
+ * @param bid bucket id
+ * @param host the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param port the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ */
+ bool getHostInternal(const string &category,
+ uint32_t ttl,
+ uint64_t bid,
+ string &host,
+ uint32_t &port,
+ string updateHost,
+ uint32_t updatePort,
+ uint32_t connTimeout,
+ uint32_t sendTimeout,
+ uint32_t recvTimeout);
+ /**
+ * Given a category name, remote host:port, current time, and category
+ * mapping, performs a periodic update. The current mapping will be
+ * removed first before update is performed.
+ *
+ * This function takes care of try/catch and locking. The bulk of the
+ * update logic is delegated to updateInternal.
+ *
+ * @param category category or key that uniquely identifies this updater.
+ * @param ttl ttl in seconds
+ * @param host remote host that will be used to retrieve bucket mapping
+ * @param port remote port that will be used to retrieve bucket mapping
+ * @param connTimeout connection timeout
+ * @param sendTimeout send time out
+ * @param recvTimeout receive time out
+ *
+ * @return true if successful. false otherwise.
+ */
+ bool periodicCheck(string category,
+ uint32_t ttl,
+ string host,
+ uint32_t port,
+ uint32_t connTimeout = 150,
+ uint32_t sendTimeout = 150,
+ uint32_t recvTimeout = 150);
+
+ struct HostEntry {
+ string host_;
+ uint32_t port_;
+ };
+
+ struct CategoryEntry {
+ CategoryEntry() {}
+ CategoryEntry(string category, uint32_t ttl) : category_(category), ttl_(ttl),
+ lastUpdated_(0) {
+ }
+
+ string category_;
+ uint32_t ttl_;
+ time_t lastUpdated_;
+ map<uint64_t, HostEntry> bidMap_;
+ };
+
+ // category and bid to HostEntry map
+ typedef map<string, CategoryEntry> CatBidToHostMap;
+
+ /**
+ * Given a category name, remote host and port, query bucket mapping
+ * using bucketupdater thrift interface and update internal category,
+ * bucket id to host mappings.
+ *
+ * @param category category or other uniquely identifiable key
+ * @param ttl ttl in seconds
+ * @param remoteHost remote host that will be used to retrieve bucket mapping
+ * @param remotePort remote port that will be used to retrieve bucket mapping
+ * @param connTimeout connection timeout
+ * @param sendTimeout send time out
+ * @param recvTimeout receive time out
+ *
+ * @return true if successful. false otherwise.
+ */
+ bool updateInternal(string category,
+ uint32_t ttl,
+ string remoteHost,
+ uint32_t remotePort,
+ uint32_t connTimeout,
+ uint32_t sendTimeout,
+ uint32_t recvTimeout);
+
+ static DynamicBucketUpdater *getInstance(facebook::fb303::FacebookBase *fbBase);
+
+ /**
+ * Setup fb303 counters.
+ */
+ void initFb303Counters();
+
+ static DynamicBucketUpdater *instance_;
+ static apache::thrift::concurrency::Mutex instanceLock_;
+ facebook::fb303::FacebookBase *fbBase_;
+ apache::thrift::concurrency::Mutex lock_;
+ CatBidToHostMap catMap_;
+
+ void addStatValue(string name, uint64_t value) {
+ if (fbBase_) {
+ fbBase_->addStatValue(name, value);
+ }
+ }
+
+ // make singleton
+ DynamicBucketUpdater(facebook::fb303::FacebookBase *fbBase)
+ : fbBase_(fbBase) {
+ initFb303Counters();
+ }
+
+ DynamicBucketUpdater(const DynamicBucketUpdater& other) {}
+};
+
+#endif // SCRIBE_DYNAMIC_BUCKET_UPDATER_H
139 src/env_default.cpp
View
@@ -0,0 +1,139 @@
+// Copyright (c) 2007-2008 Facebook
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// See accompanying file LICENSE or visit the Scribe site at:
+// http://developers.facebook.com/scribe/
+//
+// @author Bobby Johnson
+// @author James Wang
+// @author Jason Sobel
+// @author Avinash Lakshman
+// @author Anthony Giardullo
+
+#include "scribe/src/common.h"
+#include "scribe/src/scribe_server.h"
+
+using namespace apache::thrift;
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using namespace apache::thrift::server;
+using namespace apache::thrift::concurrency;
+
+using namespace scribe::thrift;
+using namespace scribe::concurrency;
+
+using boost::shared_ptr;
+
+/*
+ * Network configuration and directory services
+ */
+
+bool scribe::network_config::getService(const std::string& serviceName,
+ const std::string& options,
+ server_vector_t& _return) {
+ return false;
+}
+
+/*
+ * Concurrency mechanisms
+ */
+
+shared_ptr<ReadWriteMutex> scribe::concurrency::createReadWriteMutex() {
+ return shared_ptr<ReadWriteMutex>(new ReadWriteMutex());
+}
+
+/*
+ * Time functions
+ */
+
+unsigned long scribe::clock::nowInMsec() {
+ // There is a minor race condition between the 2 calls below,
+ // but the chance is really small.
+
+ // Get current time in timeval
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+
+ // Get current time in sec
+ time_t sec = time(NULL);
+
+ return ((unsigned long)sec) * 1000 + (tv.tv_usec / 1000);
+}
+
+/*
+ * Hash functions
+ */
+
+uint32_t scribe::integerhash::hash32(uint32_t key) {
+ return key;
+}
+
+uint32_t scribe::strhash::hash32(const char *s) {
+ // Use the djb2 hash (http://www.cse.yorku.ca/~oz/hash.html)
+ if (s == NULL) {
+ return 0;
+ }
+ uint32_t hash = 5381;
+ int c;
+ while ((c = *s++)) {
+ hash = ((hash << 5) + hash) + c; // hash * 33 + c
+ }
+ return hash;
+}
+
+/*
+ * Starting a scribe server.
+ */
+// note: this function uses global g_Handler.
+void scribe::startServer() {
+ boost::shared_ptr<TProcessor> processor(new scribeProcessor(g_Handler));
+ /* This factory is for binary compatibility. */
+ boost::shared_ptr<TProtocolFactory> protocol_factory(
+ new TBinaryProtocolFactory(0, 0, false, false)
+ );
+ boost::shared_ptr<ThreadManager> thread_manager;
+
+ if (g_Handler->numThriftServerThreads > 1) {
+ // create a ThreadManager to process incoming calls
+ thread_manager = ThreadManager::newSimpleThreadManager(
+ g_Handler->numThriftServerThreads
+ );
+
+ shared_ptr<PosixThreadFactory> thread_factory(new PosixThreadFactory());
+ thread_manager->threadFactory(thread_factory);
+ thread_manager->start();
+ }
+
+ shared_ptr<TNonblockingServer> server(new TNonblockingServer(
+ processor,
+ protocol_factory,
+ g_Handler->port,
+ thread_manager
+ ));
+ g_Handler->setServer(server);
+
+ LOG_OPER("Starting scribe server on port %lu", g_Handler->port);
+ fflush(stderr);
+
+ // throttle concurrent connections
+ unsigned long mconn = g_Handler->getMaxConn();
+ if (mconn > 0) {
+ LOG_OPER("Throttle max_conn to %lu", mconn);
+ server->setMaxConnections(mconn);
+ server->setOverloadAction(T_OVERLOAD_CLOSE_ON_ACCEPT);
+ }
+
+ server->serve();
+ // this function never returns
+}
51 src/env_default.h
View
@@ -42,21 +42,43 @@
fprintf(stderr,"[%s] " #format_string " \n", dbgtime,##__VA_ARGS__); \
}
+
+namespace scribe {
+
/*
* Network based configuration and directory service
*/
-class network_config {
- public:
+namespace network_config {
// gets a vector of machine/port pairs for a named service
// returns true on success
- static bool getService(const std::string& serviceName,
+ bool getService(const std::string& serviceName,
const std::string& options,
- server_vector_t& _return) {
- return false;
- }
-};
+ server_vector_t& _return);
+
+} // !namespace scribe::network_config
+
+/*
+ * Concurrency mechanisms
+ */
+
+namespace concurrency {
+ using apache::thrift::concurrency::ReadWriteMutex;
+
+ // returns a new instance of read/write mutex.
+ // you can choose different implementations based on your needs.
+ boost::shared_ptr<ReadWriteMutex> createReadWriteMutex();
+} // !namespace scribe::concurrency
+
+/*
+ * Time functions
+ */
+
+namespace clock {
+ unsigned long nowInMsec();
+
+} // !namespace scribe::clock
/*
* Hash functions
@@ -65,16 +87,19 @@ class network_config {
// You can probably find better hash functions than these
class integerhash {
public:
- static uint32_t hash32(uint32_t key) {
- return key;
- }
+ static uint32_t hash32(uint32_t key);
};
class strhash {
public:
- static uint32_t hash32(const char *s) {
- return 0;
- }
+ static uint32_t hash32(const char *s);
};
+/*
+ * Starting a scribe server.
+ */
+void startServer();
+
+} // !namespace scribe
+
#endif // SCRIBE_ENV
125 src/file.cpp
View
@@ -23,8 +23,8 @@
#include "file.h"
#include "HdfsFile.h"
-// INITIAL_BUFFER_SIZE must always be >= UINT_SIZE
-#define INITIAL_BUFFER_SIZE 4096
+#define INITIAL_BUFFER_SIZE (64 * 1024)
+#define LARGE_BUFFER_SIZE (16 * INITIAL_BUFFER_SIZE) /* arbitrarily chosen */
#define UINT_SIZE 4
using namespace std;
@@ -137,57 +137,102 @@ void StdFile::flush() {
}
}
-bool StdFile::readNext(std::string& _return) {
+/*
+ * read the next frame in the file that is currently open. returns the
+ * body of the frame in _return.
+ *
+ * returns a negative number if it
+ * encounters any problem when reading from the file. The negative
+ * number is the number of bytes in the file that will not be read
+ * becuase of this problem (most likely corruption of file).
+ *
+ * returns 0 on end of file or when it encounters a frame of size 0
+ *
+ * On success it returns the number of bytes in the frame's body
+ *
+ * This function assumes that the file it is reading is framed.
+ */
+long
+StdFile::readNext(std::string& _return) {
+ long size;
+
+#define CALC_LOSS() do { \
+ int offset = file.tellg(); \
+ if (offset != -1) { \
+ size = -(fileSize() - offset); \
+ } else { \
+ size = -fileSize(); \
+ } \
+ if (size > 0) { \
+ /* loss size can't be positive \
+ * choose a arbitrary but reasonable
+ * value for loss
+ */ \
+ size = -(1000 * 1000 * 1000); \
+ } \
+ /* loss size can be 0 */ \
+} while (0)
if (!inputBuffer) {
bufferSize = INITIAL_BUFFER_SIZE;
- inputBuffer = new char[bufferSize];
+ inputBuffer = (char *) malloc(bufferSize);
+ if (inputBuffer == NULL) {
+ CALC_LOSS();
+ LOG_OPER("WARNING: nomem Data Loss loss %ld bytes in %s", size,
+ filename.c_str());
+ return (size);
+ }
}
- if (framed) {
- unsigned size;
- file.read(inputBuffer, UINT_SIZE); // assumes INITIAL_BUFFER_SIZE > UINT_SIZE
- if (file.good() && (size = unserializeUInt(inputBuffer))) {
-
- // check if size is larger than half the max uint size
- if (size >= (((unsigned)1) << (UINT_SIZE*8 - 1))) {
- LOG_OPER("WARNING: attempting to read message of size %d bytes", size);
-
- // Do not try to make bufferSize any larger than this or you might overflow
- bufferSize = size;
- }
+ file.read(inputBuffer, UINT_SIZE);
+ if (!file.good() || (size = unserializeUInt(inputBuffer)) == 0) {
+ /* end of file */
+ return (0);
+ }
+ // check if most signiifcant bit set - should never be set
+ if (size >= INT_MAX) {
+ /* Definitely corrupted. Stop reading any further */
+ CALC_LOSS();
+ LOG_OPER("WARNING: Corruption Data Loss %ld bytes in %s", size,
+ filename.c_str());
+ return (size);
+ }
- while (size > bufferSize) {
- bufferSize = 2 * bufferSize;
- delete[] inputBuffer;
- inputBuffer = new char[bufferSize];
- }
- file.read(inputBuffer, size);
- if (file.good()) {
- _return.assign(inputBuffer, size);
- return true;
- } else {
- int offset = file.tellg();
- LOG_OPER("ERROR: Failed to read file %s at offset %d",
- filename.c_str(), offset);
- return false;
- }
+ if (size > bufferSize) {
+ bufferSize = ((size + INITIAL_BUFFER_SIZE - 1) / INITIAL_BUFFER_SIZE) *
+ INITIAL_BUFFER_SIZE;
+ free(inputBuffer);
+ inputBuffer = (char *) malloc(bufferSize);
+ if (bufferSize > LARGE_BUFFER_SIZE) {
+ LOG_OPER("WARNING: allocating large buffer Corruption? %d", bufferSize);
}
+ }
+ if (inputBuffer == NULL) {
+ CALC_LOSS();
+ LOG_OPER("WARNING: nomem Corruption? Data Loss %ld bytes in %s", size,
+ filename.c_str());
+ return (size);
+ }
+ file.read(inputBuffer, size);
+ if (file.good()) {
+ _return.assign(inputBuffer, size);
} else {
- file.getline(inputBuffer, bufferSize);
- if (file.good()) {
- _return = inputBuffer;
- return true;
- }
+ CALC_LOSS();
+ LOG_OPER("WARNING: Data Loss %ld bytes in %s", size, filename.c_str());
}
- return false;
+ if (bufferSize > LARGE_BUFFER_SIZE) {
+ free(inputBuffer);
+ inputBuffer = NULL;
+ }
+ return (size);
+#undef CALC_LOSS
}
unsigned long StdFile::fileSize() {
unsigned long size = 0;
try {
size = boost::filesystem::file_size(filename.c_str());
- } catch(std::exception const& e) {
+ } catch(const std::exception& e) {
LOG_OPER("Failed to get size for file <%s> error <%s>", filename.c_str(), e.what());
size = 0;
}
@@ -203,7 +248,7 @@ void StdFile::listImpl(const std::string& path, std::vector<std::string>& _retur
_return.push_back(dir_iter->filename());
}
}
- } catch (std::exception const& e) {
+ } catch (const std::exception& e) {
LOG_OPER("exception <%s> listing files in <%s>",
e.what(), path.c_str());
}
@@ -216,7 +261,7 @@ void StdFile::deleteFile() {
bool StdFile::createDirectory(std::string path) {
try {
boost::filesystem::create_directories(path);
- } catch(std::exception const& e) {
+ } catch(const std::exception& e) {
LOG_OPER("Exception < %s > in StdFile::createDirectory for path %s ",
e.what(),path.c_str());
return false;
4 src/file.h
View
@@ -42,7 +42,7 @@ class FileInterface {
virtual bool write(const std::string& data) = 0;
virtual void flush() = 0;
virtual unsigned long fileSize() = 0;
- virtual bool readNext(std::string& _return) = 0; // returns a line if unframed or a record if framed
+ virtual long readNext(std::string& _return) = 0;
virtual void deleteFile() = 0;
virtual void listImpl(const std::string& path, std::vector<std::string>& _return) = 0;
virtual std::string getFrame(unsigned data_size) {return std::string();};
@@ -70,7 +70,7 @@ class StdFile : public FileInterface {
bool write(const std::string& data);
void flush();
unsigned long fileSize();
- bool readNext(std::string& _return);
+ long readNext(std::string& _return);
void deleteFile();
void listImpl(const std::string& path, std::vector<std::string>& _return);
std::string getFrame(unsigned data_size);
47 src/network_dynamic_config.cpp
View
@@ -0,0 +1,47 @@
+// Copyright (c) 2007-2008 Facebook
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// See accompanying file LICENSE or visit the Scribe site at:
+// http://developers.facebook.com/scribe/
+//
+// @author John Song
+
+#include "scribe/src/network_dynamic_config.h"
+#include "scribe/src/dynamic_bucket_updater.h"
+
+static NetworkDynamicConfigMod netConfigMods[] = {
+ {
+ "thrift_bucket",
+ DynamicBucketUpdater::isConfigValid,
+ DynamicBucketUpdater::getHost,
+ },
+ {
+ "",
+ NULL,
+ NULL,
+ },
+};
+
+NetworkDynamicConfigMod* getNetworkDynamicConfigMod(const char* name) {
+ for (NetworkDynamicConfigMod *pconf = netConfigMods;
+ pconf->isConfigValidFunc && pconf->getHostFunc;
+ ++pconf) {
+ if (strcmp(name, pconf->name) == 0) {
+ return pconf;
+ }
+ }
+
+ return NULL;
+}
+
37 src/network_dynamic_config.h
View
@@ -0,0 +1,37 @@
+// Copyright (c) 2007-2008 Facebook
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// See accompanying file LICENSE or visit the Scribe site at:
+// http://developers.facebook.com/scribe/
+//
+// @author John Song
+
+#ifndef SCRIBE_NETWORK_UPDATER_H
+#define SCRIBE_NETWORK_UPDATER_H
+
+#include "scribe/src/conf.h"
+
+// functional types for network dynamic updater validation and getHost calls
+typedef bool (*NetworkIsConfigValidFunc)(const std::string& category, const StoreConf* pconf);
+typedef bool (*NetworkGetHost)(const std::string& category, const StoreConf* pconf, std::string& host, uint32_t& port);
+
+struct NetworkDynamicConfigMod {
+ const char* name;
+ NetworkIsConfigValidFunc isConfigValidFunc;
+ NetworkGetHost getHostFunc;
+};
+
+NetworkDynamicConfigMod* getNetworkDynamicConfigMod(const char* name);
+
+#endif //SCRIBE_NETWORK_UPDATER_H
399 src/scribe_server.cpp
View
@@ -24,16 +24,14 @@
#include "common.h"
#include "scribe_server.h"
-using namespace apache::thrift;
-using namespace apache::thrift::protocol;
-using namespace apache::thrift::transport;
-using namespace apache::thrift::server;
using namespace apache::thrift::concurrency;
using namespace facebook::fb303;
+using namespace facebook;
using namespace scribe::thrift;
using namespace std;
+
using boost::shared_ptr;
shared_ptr<scribeHandler> g_Handler;
@@ -42,6 +40,7 @@ shared_ptr<scribeHandler> g_Handler;
#define DEFAULT_MAX_MSG_PER_SECOND 0
#define DEFAULT_MAX_QUEUE_SIZE 5000000LL
#define DEFAULT_SERVER_THREADS 3
+#define DEFAULT_MAX_CONN 0
static string overall_category = "scribe_overall";
static string log_separator = ":";
@@ -113,31 +112,9 @@ int main(int argc, char **argv) {
g_Handler = shared_ptr<scribeHandler>(new scribeHandler(port, config_file));
g_Handler->initialize();
- shared_ptr<TProcessor> processor(new scribeProcessor(g_Handler));
- /* This factory is for binary compatibility. */
- shared_ptr<TProtocolFactory>
- binaryProtocolFactory(new TBinaryProtocolFactory(0, 0, false, false));
- shared_ptr<ThreadManager> thread_manager;
-
- if (g_Handler->numThriftServerThreads > 1) {
- // create a ThreadManager to process incoming calls
- thread_manager = ThreadManager::
- newSimpleThreadManager(g_Handler->numThriftServerThreads);
-
- shared_ptr<PosixThreadFactory> thread_factory(new PosixThreadFactory());
- thread_manager->threadFactory(thread_factory);
- thread_manager->start();
- }
-
- TNonblockingServer server(processor, binaryProtocolFactory,
- g_Handler->port, thread_manager);
+ scribe::startServer(); // never returns
- LOG_OPER("Starting scribe server on port %lu", g_Handler->port);
- fflush(stderr);
-
- server.serve();
-
- } catch(std::exception const& e) {
+ } catch(const std::exception& e) {
LOG_OPER("Exception in main: %s", e.what());
}
@@ -150,43 +127,38 @@ scribeHandler::scribeHandler(unsigned long int server_port, const std::string& c
port(server_port),
numThriftServerThreads(DEFAULT_SERVER_THREADS),
checkPeriod(DEFAULT_CHECK_PERIOD),
- pcategories(NULL),
- pcategory_prefixes(NULL),
configFilename(config_file),
status(STARTING),
statusDetails("initial state"),
numMsgLastSecond(0),
maxMsgPerSecond(DEFAULT_MAX_MSG_PER_SECOND),
+ maxConn(DEFAULT_MAX_CONN),
maxQueueSize(DEFAULT_MAX_QUEUE_SIZE),
newThreadPerCategory(true) {
time(&lastMsgTime);
+ scribeHandlerLock = scribe::concurrency::createReadWriteMutex();
}
scribeHandler::~scribeHandler() {
- deleteCategoryMap(pcategories);
- if (pcategory_prefixes) {
- delete pcategory_prefixes;
- pcategory_prefixes = NULL;
- }
+ deleteCategoryMap(categories);
+ deleteCategoryMap(category_prefixes);
}
// Returns the handler status, but overwrites it with WARNING if it's
// ALIVE and at least one store has a nonempty status.
fb_status scribeHandler::getStatus() {
- RWGuard monitor(scribeHandlerLock);
+ RWGuard monitor(*scribeHandlerLock);
Guard status_monitor(statusLock);
fb_status return_status(status);
if (status == ALIVE) {
- for (category_map_t::iterator cat_iter = pcategories->begin();
- cat_iter != pcategories->end();
+ for (category_map_t::iterator cat_iter = categories.begin();
+ cat_iter != categories.end();
++cat_iter) {
for (store_list_t::iterator store_iter = cat_iter->second->begin();
store_iter != cat_iter->second->end();
- ++store_iter)
- {
- if (!(*store_iter)->getStatus().empty())
- {
+ ++store_iter) {
+ if (!(*store_iter)->getStatus().empty()) {
return_status = WARNING;
return return_status;
}
@@ -205,25 +177,23 @@ void scribeHandler::setStatus(fb_status new_status) {
// Returns the handler status details if non-empty,
// otherwise the first non-empty store status found
void scribeHandler::getStatusDetails(std::string& _return) {
- RWGuard monitor(scribeHandlerLock);
+ RWGuard monitor(*scribeHandlerLock);
Guard status_monitor(statusLock);
_return = statusDetails;
if (_return.empty()) {
- if (pcategories) {
- for (category_map_t::iterator cat_iter = pcategories->begin();
- cat_iter != pcategories->end();
- ++cat_iter) {
- for (store_list_t::iterator store_iter = cat_iter->second->begin();
- store_iter != cat_iter->second->end();
- ++store_iter) {
-
- if (!(_return = (*store_iter)->getStatus()).empty()) {
- return;
- }
- } // for each store
- } // for each category
- }
+ for (category_map_t::iterator cat_iter = categories.begin();
+ cat_iter != categories.end();
+ ++cat_iter) {
+ for (store_list_t::iterator store_iter = cat_iter->second->begin();
+ store_iter != cat_iter->second->end();
+ ++store_iter) {
+
+ if (!(_return = (*store_iter)->getStatus()).empty()) {
+ return;
+ }
+ } // for each store
+ } // for each category
} // if we don't have an interesting top level status
return;
}
@@ -258,14 +228,6 @@ const char* scribeHandler::statusAsString(fb_status status) {
bool scribeHandler::createCategoryFromModel(
const string &category, const boost::shared_ptr<StoreQueue> &model) {
- if ((pcategories == NULL) ||
- (pcategories->find(category) != pcategories->end())) {
- return false;
- }
-
- LOG_OPER("[%s] Creating new category from model %s", category.c_str(),
- model->getCategoryHandled().c_str());
-
// Make sure the category name is sane.
try {
string clean_path = boost::filesystem::path(category).string();
@@ -275,7 +237,7 @@ bool scribeHandler::createCategoryFromModel(
return false;
}
- } catch(std::exception const& e) {
+ } catch(const std::exception& e) {
LOG_OPER("Category not a valid boost filename. Boost exception:%s", e.what());
return false;
}
@@ -284,18 +246,26 @@ bool scribeHandler::createCategoryFromModel(
if (newThreadPerCategory) {
// Create a new thread/StoreQueue for this category
pstore = shared_ptr<StoreQueue>(new StoreQueue(model, category));
+ LOG_OPER("[%s] Creating new category store from model %s",
+ category.c_str(), model->getCategoryHandled().c_str());
// queue a command to the store to open it
pstore->open();
} else {
// Use existing StoreQueue
pstore = model;
+ LOG_OPER("[%s] Using existing store for the config categories %s",
+ category.c_str(), model->getCategoryHandled().c_str());
}
- shared_ptr<store_list_t> pstores =
- shared_ptr<store_list_t>(new store_list_t);
-
- (*pcategories)[category] = pstores;
+ shared_ptr<store_list_t> pstores;
+ category_map_t::iterator cat_iter = categories.find(category);
+ if (cat_iter == categories.end()) {
+ pstores = shared_ptr<store_list_t>(new store_list_t);
+ categories[category] = pstores;
+ } else {
+ pstores = cat_iter->second;
+ }
pstores->push_back(pstore);
return true;
@@ -310,13 +280,6 @@ bool scribeHandler::throttleRequest(const vector<LogEntry>& messages) {
return true;
}
- if (!pcategories || !pcategory_prefixes) {
- // don't bother to spam anything for this, our status should already
- // be showing up as WARNING in the monitoring tools.
- incCounter("invalid requests");
- return true;
- }
-
// Throttle based on store queues getting too long.
// Note that there's one decision for all categories, because the whole array passed to us
// must either succeed or fail together. Checking before we've queued anything also has
@@ -326,8 +289,8 @@ bool scribeHandler::throttleRequest(const vector<LogEntry>& messages) {
// This is a simplification based on the assumption that most Log() calls contain most
// categories.
unsigned long long max_count = 0;
- for (category_map_t::iterator cat_iter = pcategories->begin();
- cat_iter != pcategories->end();
+ for (category_map_t::iterator cat_iter = categories.begin();
+ cat_iter != categories.end();
++cat_iter) {
shared_ptr<store_list_t> pstores = cat_iter->second;
if (!pstores) {
@@ -341,7 +304,6 @@ bool scribeHandler::throttleRequest(const vector<LogEntry>& messages) {
} else {
unsigned long long size = (*store_iter)->getSize();
if (size > maxQueueSize) {
- incCounter("denied for queue size");
incCounter((*store_iter)->getCategoryHandled(), "denied for queue size");
return true;
}
@@ -359,16 +321,20 @@ shared_ptr<store_list_t> scribeHandler::createNewCategory(
shared_ptr<store_list_t> store_list;
// First, check the list of category prefixes for a model
- category_prefix_map_t::iterator cat_prefix_iter = pcategory_prefixes->begin();
- while (cat_prefix_iter != pcategory_prefixes->end()) {
+ category_map_t::iterator cat_prefix_iter = category_prefixes.begin();
+ while (cat_prefix_iter != category_prefixes.end()) {
string::size_type len = cat_prefix_iter->first.size();
if (cat_prefix_iter->first.compare(0, len-1, category, 0, len-1) == 0) {
// Found a matching prefix model
- createCategoryFromModel(category, cat_prefix_iter->second);
- category_map_t::iterator cat_iter = pcategories->find(category);
+ shared_ptr<store_list_t> pstores = cat_prefix_iter->second;
+ for (store_list_t::iterator store_iter = pstores->begin();
+ store_iter != pstores->end(); ++store_iter) {
+ createCategoryFromModel(category, *store_iter);
+ }
+ category_map_t::iterator cat_iter = categories.find(category);
- if (cat_iter != pcategories->end()) {
+ if (cat_iter != categories.end()) {
store_list = cat_iter->second;
} else {
LOG_OPER("failed to create new prefix store for category <%s>",
@@ -380,19 +346,19 @@ shared_ptr<store_list_t> scribeHandler::createNewCategory(
cat_prefix_iter++;
}
- // Then try creating a store if we have a default store defined
- if (store_list == NULL) {
- if (defaultStore != NULL) {
-
- createCategoryFromModel(category, defaultStore);
- category_map_t::iterator cat_iter = pcategories->find(category);
- if (cat_iter != pcategories->end()) {
- store_list = cat_iter->second;
- } else {
- LOG_OPER("failed to create new default store for category <%s>",
- category.c_str());
- }
+ // Then try creating a store if we have a default store defined
+ if (store_list == NULL && !defaultStores.empty()) {
+ for (store_list_t::iterator store_iter = defaultStores.begin();
+ store_iter != defaultStores.end(); ++store_iter) {
+ createCategoryFromModel(category, *store_iter);
+ }
+ category_map_t::iterator cat_iter = categories.find(category);
+ if (cat_iter != categories.end()) {
+ store_list = cat_iter->second;
+ } else {
+ LOG_OPER("failed to create new default store for category <%s>",
+ category.c_str());
}
}
@@ -427,9 +393,13 @@ void scribeHandler::addMessage(
ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
- ResultCode result;
+ ResultCode result = TRY_LATER;
- scribeHandlerLock.acquireRead();
+ scribeHandlerLock->acquireRead();
+ if(status == STOPPING) {
+ result = TRY_LATER;
+ goto end;
+ }
if (throttleRequest(messages)) {
result = TRY_LATER;
@@ -449,24 +419,31 @@ ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
shared_ptr<store_list_t> store_list;
string category = (*msg_iter).category;
+ category_map_t::iterator cat_iter;
// First look for an exact match of the category
- if (pcategories) {
- category_map_t::iterator cat_iter = pcategories->find(category);
- if (cat_iter != pcategories->end()) {
- store_list = cat_iter->second;
- }
+ if ((cat_iter = categories.find(category)) != categories.end()) {
+ store_list = cat_iter->second;
}
// Try creating a new store for this category if we didn't find one
if (store_list == NULL) {
// Need write lock to create a new category
- scribeHandlerLock.release();
- scribeHandlerLock.acquireWrite();
+ scribeHandlerLock->release();
+ scribeHandlerLock->acquireWrite();
+
+ // This may cause some duplicate messages if some messages in this batch
+ // were already added to queues
+ if(status == STOPPING) {
+ result = TRY_LATER;
+ goto end;
+ }
- store_list = createNewCategory(category);
+ if ((cat_iter = categories.find(category)) != categories.end()) {
+ store_list = cat_iter->second;
+ } else {
+ store_list = createNewCategory(category);
+ }
- scribeHandlerLock.release();
- scribeHandlerLock.acquireRead();
}
if (store_list == NULL) {
@@ -483,7 +460,7 @@ ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
result = OK;
end:
- scribeHandlerLock.release();
+ scribeHandlerLock->release();
return result;
}
@@ -519,26 +496,28 @@ bool scribeHandler::throttleDeny(int num_messages) {
void scribeHandler::stopStores() {
setStatus(STOPPING);
-
- // Thrift doesn't currently support stopping the server from the handler,
- // so this could leave clients in weird states.
- deleteCategoryMap(pcategories);
- pcategories = NULL;
- if (pcategory_prefixes) {
- delete pcategory_prefixes;
- pcategory_prefixes = NULL;
+ shared_ptr<store_list_t> store_list;
+ for (store_list_t::iterator store_iter = defaultStores.begin();
+ store_iter != defaultStores.end(); ++store_iter) {
+ if (!(*store_iter)->isModelStore()) {
+ (*store_iter)->stop();
+ }
}
+ defaultStores.clear();
+ deleteCategoryMap(categories);
+ deleteCategoryMap(category_prefixes);
+
}
void scribeHandler::shutdown() {
- RWGuard monitor(scribeHandlerLock, true);
-
+ RWGuard monitor(*scribeHandlerLock, true);
stopStores();
- exit(0);
+ // calling stop to allow thrift to clean up client states and exit
+ server->stop();
}