Permalink
Browse files

Extend env_default.h and env_facebook.h to respect implementation dif…

…ferences

between facebook and the open-source community.

Summary:
1. put configuration classes in env_*.h in scribe namespace;
2. add scribe::concurrency for concurrency mechanisms;
3. make scribe_server.cpp: scribeHandler::scribeHandlerLock configurable;
4. add scribe::clock for time utilities;

Test Plan:
just compile it

DiffCamp Revision: 118366
Reviewed By: groys
Commenters: jsong
CC: agiardullo, zshao, jsong, groys, yliang, datafreeway-dev@lists
Revert Plan:
OK

git-svn-id: svn+ssh://tubbs/svnapps/fbomb/branches/scribe-os/fbcode/scribe@28661 2248de34-8caa-4a3c-bc55-5e52d9d7b73a
  • Loading branch information...
1 parent b07d15e commit 2123b731afecb2643c75a3ed9b3b2c03488d8c49 yliang committed with groys May 27, 2010
Showing with 98 additions and 58 deletions.
  1. +35 −34 src/dynamic_bucket_updater.cpp
  2. +40 −0 src/env_default.h
  3. +9 −8 src/scribe_server.cpp
  4. +2 −1 src/scribe_server.h
  5. +12 −15 src/store.cpp
@@ -50,30 +50,30 @@ bool DynamicBucketUpdater::getHost(const string& category,
pconf->getInt("timeout", timeout);
if (!service.empty()) {
- success = DynamicBucketUpdater::getHost(g_Handler.get(),
- category,
- ttl,
- (uint32_t)bid,
- host,
- port,
- service,
- serviceOptions,
- timeout, timeout, timeout);
+ success = DynamicBucketUpdater::getHost(g_Handler.get(),
+ category,
+ ttl,
+ (uint32_t)bid,
+ host,
+ port,
+ service,
+ serviceOptions,
+ timeout, timeout, timeout);
} else {
success = DynamicBucketUpdater::getHost(g_Handler.get(),
- category,
- ttl,
- (uint32_t)bid,
- host,
- port,
- updaterHost,
- atoi(updaterPort.c_str()),
- timeout, timeout, timeout);
+ category,
+ ttl,
+ (uint32_t)bid,
+ host,
+ port,
+ updaterHost,
+ atoi(updaterPort.c_str()),
+ timeout, timeout, timeout);
}
if (!success) {
LOG_OPER("[%s] dynamic bucket updater failed: bid=%ld",
- category.c_str(), bid);
+ category.c_str(), bid);
}
return success;
}
@@ -171,22 +171,23 @@ bool DynamicBucketUpdater::getHost(facebook::fb303::FacebookBase *fbBase,
uint32_t sendTimeout,
uint32_t recvTimeout) {
server_vector_t servers;
- bool success = network_config::getService(serviceName,
- serviceOptions,
- servers);
-
- // Cannot open if we couldn't find any servers
- if (!success || servers.empty()) {
- LOG_OPER("[%s] Failed to get servers from Service [%s] for dynamic bucket updater",
- category.c_str(), serviceName.c_str());
-
- return false;
- }
-
- // randomly pick one from the service
- int which = rand() % servers.size();
- string updateHost = servers[which].first;
- uint32_t updatePort = servers[which].second;
+ bool success = scribe::network_config::getService(serviceName,
+ serviceOptions,
+ servers);
+
+ // Cannot open if we couldn't find any servers
+ if (!success || servers.empty()) {
+ LOG_OPER("[%s] Failed to get servers from Service [%s] "
+ "for dynamic bucket updater",
+ category.c_str(), serviceName.c_str());
+
+ return false;
+ }
+
+ // randomly pick one from the service
+ int which = rand() % servers.size();
+ string updateHost = servers[which].first;
+ uint32_t updatePort = servers[which].second;
return DynamicBucketUpdater::getHost(fbBase, category, ttl, bid,
host, port,
updateHost, updatePort,
View
@@ -42,6 +42,8 @@
fprintf(stderr,"[%s] " #format_string " \n", dbgtime,##__VA_ARGS__); \
}
+namespace scribe {
+
/*
* Network based configuration and directory service
*/
@@ -57,6 +59,42 @@ class network_config {
}
};
+/*
+ * Concurrency mechanisms
+ */
+
+class concurrency {
+public:
+ // returns a new instance of read/write mutex.
+ // you can choose different implementations based on your needs.
+ static boost::shared_ptr<apache::thrift::concurrency::ReadWriteMutex>
+ createReadWriteMutex() {
+ using apache::thrift::concurrency::ReadWriteMutex;
+
+ return boost::shared_ptr<ReadWriteMutex>(new ReadWriteMutex());
+ }
+};
+
+/*
+ * Time functions
+ */
+
+class clock {
+public:
+ static unsigned long nowInMsec() {
+ // There is a minor race condition between the 2 calls below,
+ // but the chance is really small.
+
+ // Get current time in timeval
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+
+ // Get current time in sec
+ time_t sec = time(NULL);
+
+ return ((unsigned long)sec) * 1000 + (tv.tv_usec / 1000);
+ }
+};
/*
* Hash functions
@@ -86,4 +124,6 @@ class strhash {
}
};
+} // !namespace scribe
+
#endif // SCRIBE_ENV
View
@@ -169,6 +169,7 @@ scribeHandler::scribeHandler(unsigned long int server_port, const std::string& c
maxQueueSize(DEFAULT_MAX_QUEUE_SIZE),
newThreadPerCategory(true) {
time(&lastMsgTime);
+ scribeHandlerLock = scribe::concurrency::createReadWriteMutex();
}
scribeHandler::~scribeHandler() {
@@ -179,7 +180,7 @@ scribeHandler::~scribeHandler() {
// Returns the handler status, but overwrites it with WARNING if it's
// ALIVE and at least one store has a nonempty status.
fb_status scribeHandler::getStatus() {
- RWGuard monitor(scribeHandlerLock);
+ RWGuard monitor(*scribeHandlerLock);
Guard status_monitor(statusLock);
fb_status return_status(status);
@@ -209,7 +210,7 @@ void scribeHandler::setStatus(fb_status new_status) {
// Returns the handler status details if non-empty,
// otherwise the first non-empty store status found
void scribeHandler::getStatusDetails(std::string& _return) {
- RWGuard monitor(scribeHandlerLock);
+ RWGuard monitor(*scribeHandlerLock);
Guard status_monitor(statusLock);
_return = statusDetails;
@@ -427,7 +428,7 @@ void scribeHandler::addMessage(
ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
ResultCode result = TRY_LATER;
- scribeHandlerLock.acquireRead();
+ scribeHandlerLock->acquireRead();
if(status == STOPPING) {
result = TRY_LATER;
goto end;
@@ -460,8 +461,8 @@ ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
// Try creating a new store for this category if we didn't find one
if (store_list == NULL) {
// Need write lock to create a new category
- scribeHandlerLock.release();
- scribeHandlerLock.acquireWrite();
+ scribeHandlerLock->release();
+ scribeHandlerLock->acquireWrite();
// This may cause some duplicate messages if some messages in this batch
// were already added to queues
@@ -492,7 +493,7 @@ ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
result = OK;
end:
- scribeHandlerLock.release();
+ scribeHandlerLock->release();
return result;
}
@@ -542,14 +543,14 @@ void scribeHandler::stopStores() {
}
void scribeHandler::shutdown() {
- RWGuard monitor(scribeHandlerLock, true);
+ RWGuard monitor(*scribeHandlerLock, true);
stopStores();
// calling stop to allow thrift to clean up client states and exit
server->stop();
}
void scribeHandler::reinitialize() {
- RWGuard monitor(scribeHandlerLock, true);
+ RWGuard monitor(*scribeHandlerLock, true);
// reinitialize() will re-read the config file and re-configure the stores.
// This is done without shutting down the Thrift server, so this will not
View
@@ -105,7 +105,8 @@ class scribeHandler : virtual public scribe::thrift::scribeIf,
* A single mutex is fine since it only needs to be locked in write mode
* during start/stop/reinitialize or when we need to create a new category.
*/
- apache::thrift::concurrency::ReadWriteMutex scribeHandlerLock;
+ boost::shared_ptr<apache::thrift::concurrency::ReadWriteMutex>
+ scribeHandlerLock;
// disallow empty construction, copy, and assignment
scribeHandler();
View
@@ -1771,8 +1771,8 @@ void NetworkStore::configure(pStoreConf configuration, pStoreConf parent) {
configmod = NULL;
} else {
// set remote host port
- string host;
- uint32_t port;
+ string host;
+ uint32_t port;
if (configmod->getHostFunc(categoryHandled, storeConf.get(), host, port)) {
remoteHost = host;
remotePort = port;
@@ -1797,8 +1797,8 @@ void NetworkStore::periodicCheck() {
// if it is different from the current configuration
// then close and open again
LOG_OPER("[%s] dynamic configred network store destination changed. old value:<%s:%lu>, new value:<%s:%lu>",
- categoryHandled.c_str(), remoteHost.c_str(), remotePort,
- host.c_str(), (long unsigned)port);
+ categoryHandled.c_str(), remoteHost.c_str(), remotePort,
+ host.c_str(), (long unsigned)port);
remoteHost = host;
remotePort = port;
close();
@@ -1821,8 +1821,8 @@ bool NetworkStore::open() {
if (lastServiceCheck <= (time_t) (now - serviceCacheTimeout)) {
lastServiceCheck = now;
- success =
- network_config::getService(serviceName, serviceOptions, servers);
+ success = scribe::network_config::getService(serviceName, serviceOptions,
+ servers);
}
// Cannot open if we couldn't find any servers
@@ -2088,21 +2088,18 @@ void BucketStore::createBuckets(pStoreConf configuration) {
unsigned long i;
if (configuration->getString("bucket_subdir", tmp_string)) {
- error_msg =
- "cannot have bucket_subdir when defining multiple buckets";
- goto handle_error;
+ error_msg = "cannot have bucket_subdir when defining multiple buckets";
+ goto handle_error;
}
if (configuration->getString("bucket_offset", tmp_string)) {
- error_msg =
- "cannot have bucket_offset when defining multiple buckets";
- goto handle_error;
+ error_msg = "cannot have bucket_offset when defining multiple buckets";
+ goto handle_error;
}
if (configuration->getString("failure_bucket", tmp_string)) {
- error_msg =
- "cannot have failure_bucket when defining multiple buckets";
- goto handle_error;
+ error_msg = "cannot have failure_bucket when defining multiple buckets";
+ goto handle_error;
}
// Configure stores named 'bucket0, bucket1, bucket2, ... bucket{numBuckets}

0 comments on commit 2123b73

Please sign in to comment.