Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Scribe dynamic bucket id to network store configuration

Summary:
this diff added feature such that scribe will be able to work against a
zookeeper managed network cluster.  Instead of embedding zookeeper into scribe,
we added a bucketupdater thrift interface that all zookeeper managed hosts
should support that return a bid to host:port mapping.

added bucketupdate test to testsuites.php.

tested with testsuite.

Test Plan:
bucketupdater test case added to testsuites.php.
"
// testing bucket store updater.
// Setup:
// 1. two scribe servers.  one running on port 1465 and using conf file
//    scribe.conf.bucketupdater.server1, and the other running on port 1466
//    and using conf file scribe.conf.bucketupdater.server2.
//    scribe server that configured by scribe.bucketupdater.server1
//    writes scribe messages to /tmp/scribetest_/bucketupdater/server1
//    and scribe server that configured by scribe.bucketupdater.server2
//    writes scribe messages to /tmp/scribetest_/bucketupdater/server2
// 2. bidupdater server which implements bucket updater interface.
//    It reads mappings from a local file bidmap which is symbolic
//    linked to bidmap.1 or bidmap.2.  bidmap.1 maps bucket 1 to
//    scribe server that writes to /tmp/scribetest_/bucketupdater/server1
//    and bucket 2 to scribe server that writes to
/tmp/scribetest_/bucketupdater/server2.
//    bidmap.2 does the opposite.
//
// Test setup:
// 1. launch scribe -p 1465 scribe.conf.bucketupdater.srever1
// 2. launch scribe -p 1466 scribe.conf.bucketupdater.server2
// 3. ln -sf bidmap.1 bidmap.
// 4. launch bidupdater: bidupdater bidmap
// 5. scribe -p 1463 scribe.conf.bucketupdater which use bidupdater
//    to dynamically configure bucket store: bucketupdater.
// 6. send two message, one with bucket id 1 and the other with bucket id 2
//    to scribe running on 1463.  Check that the messages are
//    in /tmp/scribetest_/bucketupdater/server1/bucketupdater/bucket1
//    and /tmp/scribetest_/bucketupdater/server2/bucketupdater/bucket2
//    respectively.
// 7. ln -sf bidmap.2 bid
// 8. wait for 15 seconds
// 9. send another two messages, one with bucket 1 and the other bucket 2
//    Check that messages are in the reverse of 6.
"

DiffCamp Revision: 108360
Reviewed By: pkhemani
Commenters: groys, zshao
CC: agiardullo, jsong, pkhemani, groys, scribe-dev@lists
Revert Plan:
OK

git-svn-id: svn+ssh://tubbs/svnapps/fbomb/branches/scribe-os/fbcode/scribe@28165 2248de34-8caa-4a3c-bc55-5e52d9d7b73a
  • Loading branch information...
commit 9c231ed2f820bdc64a0c1e604e034cfd87d9ce59 1 parent 17fe373
groys authored groys committed
View
37 if/bucketupdater.thrift
@@ -0,0 +1,37 @@
+#!/usr/local/bin/thrift --cpp --php
+
+## Copyright (c) 2009- Facebook
+##
+## Licensed under the Apache License, Version 2.0 (the "License");
+## you may not use this file except in compliance with the License.
+## You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+## See accompanying file LICENSE or visit the Scribe site at:
+## http://developers.facebook.com/scribe/
+
+namespace cpp scribe.thrift
+namespace java com.facebook.infrastructure.service
+
+// BucketStoreMapping service exception
+exception BucketStoreMappingException {
+ 1: string message;
+ 2: i32 code;
+}
+
+struct HostPort {
+ 2: string host,
+ 3: i32 port
+}
+
+service BucketStoreMapping {
+ // given a category, return a list of HashCodeToNetworkStore mappings
+ map<i32, HostPort> getMapping(1: string category) throws (1: BucketStoreMappingException e);
+}
View
433 src/dynamic_bucket_updater.cpp
@@ -0,0 +1,433 @@
+#include <strstream>
+#include <iostream>
+#include "thrift/lib/cpp/transport/TBufferTransports.h"
+#include "scribe/src/dynamic_bucket_updater.h"
+#include "scribe/src/scribe_server.h"
+
+using namespace std;
+using namespace apache::thrift::concurrency;
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using namespace facebook;
+using namespace facebook::fb303;
+using namespace scribe::thrift;
+
+extern shared_ptr<scribeHandler> g_Handler;
+
+DynamicBucketUpdater* DynamicBucketUpdater::instance_ = NULL;
+Mutex DynamicBucketUpdater::instanceLock_;
+
+// bucket updater connection error
+const char* DynamicBucketUpdater::FB303_ERR_CONNECT = "bucketupdater.err.update_connect";
+// error calling bucketupdater.thrift
+const char* DynamicBucketUpdater::FB303_ERR_THRIFTCALL = "bucketupdater.err.thrift_call";
+// bucket updater return empty result
+const char* DynamicBucketUpdater::FB303_ERR_EMPTYRESULT = "bucketupdater.err.empty_result";
+// number of times a remote updater has been called
+const char* DynamicBucketUpdater::FB303_REMOTEUPDATE = "bucketupdater.remove_updater";
+// missing a bid mapping
+const char* DynamicBucketUpdater::FB303_ERR_NOMAPPING = "bucketupdater.err.nobidmapping";
+// number of buckets that have been updated
+const char* DynamicBucketUpdater::FB303_BUCKETSUPDATED = "bucketupdater.bucket_updated";
+
+bool DynamicBucketUpdater::getHost(const string& category,
+ const StoreConf* pconf,
+ string& host,
+ uint32_t& port) {
+ bool success = false;
+ // getHost to check what the current category/bid to host:port
+ // mapping is
+ string service, serviceOptions, updaterHost, updaterPort;
+ long int timeout = 1000; // 1 second default timeout
+ long int ttl = 60; // update every minute
+ long int bid;
+ pconf->getString("bucket_updater_service_options", serviceOptions);
+ pconf->getInt("bucket_updater_ttl", ttl);
+ pconf->getInt("bucket_id", bid);
+ pconf->getString("bucket_updater_service", service);
+ pconf->getString("bucket_updater_host", updaterHost);
+ pconf->getString("bucket_updater_port", updaterPort);
+ pconf->getInt("timeout", timeout);
+
+ if (!service.empty()) {
+ success = DynamicBucketUpdater::getHost(g_Handler.get(),
+ category,
+ ttl,
+ (uint32_t)bid,
+ host,
+ port,
+ service,
+ serviceOptions,
+ timeout, timeout, timeout);
+ } else {
+ success = DynamicBucketUpdater::getHost(g_Handler.get(),
+ category,
+ ttl,
+ (uint32_t)bid,
+ host,
+ port,
+ updaterHost,
+ atoi(updaterPort.c_str()),
+ timeout, timeout, timeout);
+ }
+
+ if (!success) {
+ LOG_OPER("[%s] dynamic bucket updater failed: bid=%ld",
+ category.c_str(), bid);
+ }
+ return success;
+}
+
+bool DynamicBucketUpdater::isConfigValid(const string& category,
+ const StoreConf* pconf) {
+ // we need dynamic_updater_bid paramter
+ string bid;
+ if (!pconf->getString("bucket_id", bid)) {
+ LOG_OPER("[%s] dynamic bucket updater configuration invalid. Missing bucket_id. Is the network a descendant of a bucket store?", category.c_str());
+ return false;
+ }
+
+ // requires either dynamic_updater_host and networ_updater_port, or
+ // dynamic_updater_service
+ string host, port, service;
+ if (!pconf->getString("bucket_updater_service", service) &&
+ (!pconf->getString("bucket_updater_host", host) ||
+ !pconf->getString("bucket_updater_port", port))) {
+ LOG_OPER("[%s] dynamic bucket updater configuration invalid. Either bucket_updater_service or bucket_updater_host and bucket_updater_port is needed. Current values are: bucket_updater_service=<%s>, bucket_updater_host=<%s>, bucket_updater_port=<%s>",
+ category.c_str(), service.c_str(), host.c_str(), port.c_str());
+ return false;
+ }
+ return true;
+}
+
+/**
+ * Return host, port given a key and bucket id, bid, combination.
+ * If a mapping is found, the result will be returned in out parameter and
+ * function returns true. Otherwise, function returns false and output
+ * parameters are not modified.
+ *
+ * @param fbBase ponter to FacebookBase
+ * @param category the category name, or any identifier that uniquely
+ * identifies a bucket store.
+ * @param ttl ttl in seconds
+ * @param bid bucket id
+ * @param host the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param port the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param service service name
+ * @param connTimeout connection timeout
+ * @param sendTimeout send timeout
+ * @param recvTimeout receive timeout
+ */
+bool DynamicBucketUpdater::getHost(facebook::fb303::FacebookBase *fbBase,
+ const string &category,
+ uint32_t ttl,
+ uint64_t bid,
+ string &host,
+ uint32_t &port,
+ string updateHost,
+ uint32_t updatePort,
+ uint32_t connTimeout,
+ uint32_t sendTimeout,
+ uint32_t recvTimeout) {
+ DynamicBucketUpdater *instance = DynamicBucketUpdater::getInstance(fbBase);
+
+ instance->lock_.lock();
+
+ bool ret = instance->getHostInternal(category, ttl, bid,
+ host, port, updateHost,
+ updatePort, connTimeout,
+ sendTimeout, recvTimeout);
+ instance->lock_.unlock();
+ return ret;
+}
+
+/**
+ * Return host, port given a key and bucket id, bid, combination.
+ * If a mapping is found, the result will be returned in out parameter and
+ * function returns true. Otherwise, function returns false and output
+ * parameters are not modified.
+ *
+ * @param fbBase ponter to FacebookBase
+ * @param category the category name, or any identifier that uniquely
+ * identifies a bucket store.
+ * @param ttl ttl in seconds
+ * @param bid bucket id
+ * @param host the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param port the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ */
+bool DynamicBucketUpdater::getHost(facebook::fb303::FacebookBase *fbBase,
+ const string &category,
+ uint32_t ttl,
+ uint64_t bid,
+ string &host,
+ uint32_t &port,
+ string serviceName,
+ string serviceOptions,
+ uint32_t connTimeout,
+ uint32_t sendTimeout,
+ uint32_t recvTimeout) {
+ server_vector_t servers;
+ bool success = network_config::getService(serviceName,
+ serviceOptions,
+ servers);
+
+ // Cannot open if we couldn't find any servers
+ if (!success || servers.empty()) {
+ LOG_OPER("[%s] Failed to get servers from Service [%s] for dynamic bucket updater",
+ category.c_str(), serviceName.c_str());
+
+ return false;
+ }
+
+ // randomly pick one from the service
+ int which = rand() % servers.size();
+ string updateHost = servers[which].first;
+ uint32_t updatePort = servers[which].second;
+ return DynamicBucketUpdater::getHost(fbBase, category, ttl, bid,
+ host, port,
+ updateHost, updatePort,
+ connTimeout, sendTimeout,
+ recvTimeout);
+}
+
+/**
+ * actual implementation of getHost.
+ *
+ * @param category the category name, or any identifier that uniquely
+ * identifies a bucket store.
+ * @param ttl ttl in seconds
+ * @param bid bucket id
+ * @param host the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param port the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ */
+bool DynamicBucketUpdater::getHostInternal(const string &category,
+ uint32_t ttl,
+ uint64_t bid,
+ string &host,
+ uint32_t &port,
+ string updateHost,
+ uint32_t updatePort,
+ uint32_t connTimeout,
+ uint32_t sendTimeout,
+ uint32_t recvTimeout) {
+ time_t now = time(NULL);
+
+ // periodic check whether we need to fetch the mapping, or need to
+ // update
+ CatBidToHostMap::const_iterator iter = catMap_.find(category);
+ if (iter == catMap_.end() || iter->second.lastUpdated_ + ttl < now) {
+ periodicCheck(category,
+ ttl,
+ updateHost,
+ updatePort,
+ connTimeout, sendTimeout, recvTimeout);
+ iter = catMap_.find(category);
+ // check again.
+ if (iter == catMap_.end()) {
+ return false;
+ }
+ }
+
+ const CategoryEntry &catEnt = iter->second;
+ map<uint64_t, HostEntry>::const_iterator bidIter = catEnt.bidMap_.find(bid);
+
+ bool ret = false;
+ if (bidIter != catEnt.bidMap_.end()) {
+ const HostEntry &entry = bidIter->second;
+ host = entry.host_;
+ port = entry.port_;
+ ret = true;
+ } else {
+ ostringstream oss;
+ oss << "Missing mapping for category " << category << ", bid: " << bid
+ << ", updateHost: " << updateHost << ", updatePort: " << updatePort;
+ LOG_OPER(oss.str());
+ addStatValue(DynamicBucketUpdater::FB303_ERR_NOMAPPING, 1);
+ }
+
+ return ret;
+}
+
+/**
+ * Given a category name, remote host:port, current time, and category
+ * mapping time to live (ttl), check whether we need to update the
+ * category mapping. If so query bucket mapping
+ * using bucketupdater thrift interface and update internal category,
+ * bucket id to host mappings.
+ *
+ * This function takes care of try/catch and locking. The bulk of the
+ * update logic is delegated to updateInternal.
+ *
+ * @param category category or key that uniquely identifies this updater.
+ * @param ttl ttl in seconds
+ * @param host remote host that will be used to retrieve bucket mapping
+ * @param port remote port that will be used to retrieve bucket mapping
+ * @param connTimeout connection timeout
+ * @param sendTimeout send time out
+ * @param recvTimeout receive time out
+ *
+ * @return true if successful. false otherwise.
+ */
+bool DynamicBucketUpdater::periodicCheck(string category,
+ uint32_t ttl,
+ string host,
+ uint32_t port,
+ uint32_t connTimeout,
+ uint32_t sendTimeout,
+ uint32_t recvTimeout) {
+ bool ret = false;
+ try {
+ ret = updateInternal(category,
+ ttl,
+ host,
+ port,
+ connTimeout,
+ sendTimeout,
+ recvTimeout);
+ } catch (const TTransportException& ttx) {
+ LOG_OPER("periodicCheck(%s, %s, %u, %d, %d, %d) TTransportException: %s",
+ category.c_str(), host.c_str(), port,
+ connTimeout, sendTimeout, recvTimeout,
+ ttx.what());
+ addStatValue(DynamicBucketUpdater::FB303_ERR_THRIFTCALL, 1);
+ ret = false;
+ } catch (const BucketStoreMappingException& bex) {
+ ostringstream oss;
+ LOG_OPER("periodicCheck(%s, %s, %u, %d, %d, %d) TTransportException: %s",
+ category.c_str(), host.c_str(), port,
+ connTimeout, sendTimeout, recvTimeout,
+ bex.message.c_str());
+ addStatValue(DynamicBucketUpdater::FB303_ERR_THRIFTCALL, 1);
+ ret = false;
+ }
+
+ return ret;
+}
+
+/**
+ * Given a category name, remote host and port, query bucket mapping
+ * using bucketupdater thrift interface and update internal category,
+ * bucket id to host mappings.
+ *
+ * @param category category or other uniquely identifiable key
+ * @param ttl ttl in seconds
+ * @param remoteHost remote host that will be used to retrieve bucket mapping
+ * @param remotePort remote port that will be used to retrieve bucket mapping
+ * @param connTimeout connection timeout
+ * @param sendTimeout send time out
+ * @param recvTimeout receive time out
+ *
+ * @return true if successful. false otherwise.
+ */
+bool DynamicBucketUpdater::updateInternal(
+ string category,
+ uint32_t ttl,
+ string remoteHost,
+ uint32_t remotePort,
+ uint32_t connTimeout,
+ uint32_t sendTimeout,
+ uint32_t recvTimeout) {
+ addStatValue(DynamicBucketUpdater::FB303_REMOTEUPDATE, 1);
+
+ // remove the current mapping
+ CatBidToHostMap::iterator catIter = catMap_.find(category);
+ if (catIter != catMap_.end()) {
+ catMap_.erase(catIter);
+ }
+
+ shared_ptr<TSocket> socket = shared_ptr<TSocket>(
+ new TSocket(remoteHost, remotePort));
+
+ if (!socket) {
+ addStatValue(DynamicBucketUpdater::FB303_ERR_CONNECT, 1);
+ ostringstream oss;
+ oss << "Failed to create socket in bucket updater("
+ << category
+ << ", "
+ << remoteHost
+ << ", "
+ << remotePort
+ << ")";
+ LOG_OPER(oss.str());
+ return false;
+ }
+
+ socket->setConnTimeout(connTimeout);
+ socket->setRecvTimeout(recvTimeout);
+ socket->setSendTimeout(sendTimeout);
+
+ shared_ptr<TFramedTransport> framedTransport = shared_ptr<TFramedTransport>(
+ new TFramedTransport(socket));
+ framedTransport->open();
+ shared_ptr<TBinaryProtocol> protocol = shared_ptr<TBinaryProtocol>(
+ new TBinaryProtocol(framedTransport));
+
+ // no strict version checking
+ protocol->setStrict(false, false);
+ BucketStoreMappingClient client(protocol);
+ map<int32_t, HostPort> mapping;
+ client.getMapping(mapping, category);
+
+ if (mapping.size() == 0) {
+ addStatValue(DynamicBucketUpdater::FB303_ERR_EMPTYRESULT, 1);
+
+ return false;
+ }
+
+ CategoryEntry catEntry(category, ttl);
+ catEntry.lastUpdated_ = time(NULL);
+ // update bucket id host mappings
+ for (map<int32_t, HostPort>::const_iterator iter = mapping.begin();
+ iter != mapping.end(); ++iter) {
+ uint32_t bid = iter->first;
+ const HostPort &hp = iter->second;
+ HostEntry hentry;
+ hentry.host_ = hp.host;
+ hentry.port_ = hp.port;
+ catEntry.bidMap_[bid] = hentry;
+ }
+ catMap_[category] = catEntry;
+
+ // increment the counter for number of buckets updated
+ addStatValue(DynamicBucketUpdater::FB303_BUCKETSUPDATED, mapping.size());
+
+ return true;
+}
+
+DynamicBucketUpdater* DynamicBucketUpdater::getInstance(
+ FacebookBase *fbBase) {
+ if (DynamicBucketUpdater::instance_) {
+ return DynamicBucketUpdater::instance_;
+ }
+
+ Guard g(DynamicBucketUpdater::instanceLock_);
+ if (!DynamicBucketUpdater::instance_) {
+ DynamicBucketUpdater::instance_ = new DynamicBucketUpdater(fbBase);
+ }
+ return DynamicBucketUpdater::instance_;
+}
+
+/**
+ * Setup fb303 counters.
+ */
+void DynamicBucketUpdater::initFb303Counters() {
+ if (fbBase_) {
+ // initialize fb303 time series based counters
+ fbBase_->addStatExportType(
+ DynamicBucketUpdater::FB303_ERR_CONNECT, stats::SUM);
+ fbBase_->addStatExportType(
+ DynamicBucketUpdater::FB303_ERR_EMPTYRESULT, stats::SUM);
+ fbBase_->addStatExportType(
+ DynamicBucketUpdater::FB303_REMOTEUPDATE, stats::SUM);
+ fbBase_->addStatExportType(
+ DynamicBucketUpdater::FB303_BUCKETSUPDATED, stats::SUM);
+ fbBase_->addStatExportType(
+ DynamicBucketUpdater::FB303_ERR_NOMAPPING, stats::SUM);
+ }
+}
View
237 src/dynamic_bucket_updater.h
@@ -0,0 +1,237 @@
+// Copyright (c) 2007-2008 Facebook
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// See accompanying file LICENSE or visit the Scribe site at:
+// http://developers.facebook.com/scribe/
+//
+// @author John Song
+
+#ifndef SCRIBE_DYNAMIC_BUCKET_UPDATER_H
+#define SCRIBE_DYNAMIC_BUCKET_UPDATER_H
+
+#include <map>
+#include "common.h"
+#include "common/fb303/cpp/FacebookBase.h"
+#include "scribe/if/gen-cpp/BucketStoreMapping.h"
+#include "scribe/src/conf.h"
+
+/**
+ * DynamicBucketUpdater updates a bucket store's bucket id to host:port
+ * mapping periodically using the bucketupdater.thrift interface.
+ */
+class DynamicBucketUpdater {
+ public:
+ // bucket updater connection error
+ static const char *FB303_ERR_CONNECT;
+ // error calling bucketupdater.thrift
+ static const char *FB303_ERR_THRIFTCALL;
+ // bucket updater return empty result
+ static const char *FB303_ERR_EMPTYRESULT;
+ // number of times a remote updater has been called
+ static const char *FB303_REMOTEUPDATE;
+ // number of buckets that have been updated
+ static const char *FB303_BUCKETSUPDATED;
+ // missing a bid mapping
+ static const char *FB303_ERR_NOMAPPING;
+
+ static bool getHost(const string& category, const StoreConf* pconf, std::string& host, uint32_t& port);
+
+ static bool isConfigValid(const string& category, const StoreConf* pconf);
+
+ /**
+ * Return host, port given a key and bucket id, bid, combination.
+ * If a mapping is found, the result will be returned in out parameter and
+ * function returns true. Otherwise, function returns false and output
+ * parameters are not modified.
+ *
+ * @param fbBase ponter to FacebookBase
+ * @param category the category name, or any identifier that uniquely
+ * identifies a bucket store.
+ * @param ttl ttl in seconds
+ * @param bid bucket id
+ * @param host the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param port the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param service service name
+ * @param connTimeout connection timeout
+ * @param sendTimeout send timeout
+ * @param recvTimeout receive timeout
+ */
+ static bool getHost(facebook::fb303::FacebookBase *fbBase,
+ const string &category,
+ uint32_t ttl,
+ uint64_t bid,
+ string &host,
+ uint32_t &port,
+ string service,
+ string serviceOption,
+ uint32_t connTimeout = 150,
+ uint32_t sendTimeout = 150,
+ uint32_t recvTimeout = 150);
+
+ /**
+ * Return host, port given a key and bucket id, bid, combination.
+ * If a mapping is found, the result will be returned in out parameter and
+ * function returns true. Otherwise, function returns false and output
+ * parameters are not modified.
+ *
+ * @param fbBase ponter to FacebookBase
+ * @param category the category name, or any identifier that uniquely
+ * identifies a bucket store.
+ * @param ttl ttl in seconds
+ * @param bid bucket id
+ * @param host the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param port the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param updateHost remote host to fetch mapping
+ * @param updatePort remote port to fetch mapping
+ * @param connTimeout connection timeout
+ * @param sendTimeout send timeout
+ * @param recvTimeout receive timeout
+ */
+ static bool getHost(facebook::fb303::FacebookBase *fbBase,
+ const string &category,
+ uint32_t ttl,
+ uint64_t bid,
+ string &host,
+ uint32_t &port,
+ string updateHost,
+ uint32_t updatePort,
+ uint32_t connTimeout = 150,
+ uint32_t sendTimeout = 150,
+ uint32_t recvTimeout = 150);
+
+ protected:
+ /**
+ * actual implementation of getHost.
+ *
+ * @param category the category name, or any identifier that uniquely
+ * identifies a bucket store.
+ * @param ttl ttl in seconds
+ * @param bid bucket id
+ * @param host the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ * @param port the output parameter that receives the host output.
+ * If no mapping is found, this variable is not modified.
+ */
+ bool getHostInternal(const string &category,
+ uint32_t ttl,
+ uint64_t bid,
+ string &host,
+ uint32_t &port,
+ string updateHost,
+ uint32_t updatePort,
+ uint32_t connTimeout,
+ uint32_t sendTimeout,
+ uint32_t recvTimeout);
+ /**
+ * Given a category name, remote host:port, current time, and category
+ * mapping, performs a periodic update. The current mapping will be
+ * removed first before update is performed.
+ *
+ * This function takes care of try/catch and locking. The bulk of the
+ * update logic is delegated to updateInternal.
+ *
+ * @param category category or key that uniquely identifies this updater.
+ * @param ttl ttl in seconds
+ * @param host remote host that will be used to retrieve bucket mapping
+ * @param port remote port that will be used to retrieve bucket mapping
+ * @param connTimeout connection timeout
+ * @param sendTimeout send time out
+ * @param recvTimeout receive time out
+ *
+ * @return true if successful. false otherwise.
+ */
+ bool periodicCheck(string category,
+ uint32_t ttl,
+ string host,
+ uint32_t port,
+ uint32_t connTimeout = 150,
+ uint32_t sendTimeout = 150,
+ uint32_t recvTimeout = 150);
+
+ struct HostEntry {
+ string host_;
+ uint32_t port_;
+ };
+
+ struct CategoryEntry {
+ CategoryEntry() {}
+ CategoryEntry(string category, uint32_t ttl) : category_(category), ttl_(ttl),
+ lastUpdated_(0) {
+ }
+
+ string category_;
+ uint32_t ttl_;
+ time_t lastUpdated_;
+ map<uint64_t, HostEntry> bidMap_;
+ };
+
+ // category and bid to HostEntry map
+ typedef map<string, CategoryEntry> CatBidToHostMap;
+
+ /**
+ * Given a category name, remote host and port, query bucket mapping
+ * using bucketupdater thrift interface and update internal category,
+ * bucket id to host mappings.
+ *
+ * @param category category or other uniquely identifiable key
+ * @param ttl ttl in seconds
+ * @param remoteHost remote host that will be used to retrieve bucket mapping
+ * @param remotePort remote port that will be used to retrieve bucket mapping
+ * @param connTimeout connection timeout
+ * @param sendTimeout send time out
+ * @param recvTimeout receive time out
+ *
+ * @return true if successful. false otherwise.
+ */
+ bool updateInternal(string category,
+ uint32_t ttl,
+ string remoteHost,
+ uint32_t remotePort,
+ uint32_t connTimeout,
+ uint32_t sendTimeout,
+ uint32_t recvTimeout);
+
+ static DynamicBucketUpdater *getInstance(facebook::fb303::FacebookBase *fbBase);
+
+ /**
+ * Setup fb303 counters.
+ */
+ void initFb303Counters();
+
+ static DynamicBucketUpdater *instance_;
+ static apache::thrift::concurrency::Mutex instanceLock_;
+ facebook::fb303::FacebookBase *fbBase_;
+ apache::thrift::concurrency::Mutex lock_;
+ CatBidToHostMap catMap_;
+
+ void addStatValue(string name, uint64_t value) {
+ if (fbBase_) {
+ fbBase_->addStatValue(name, value);
+ }
+ }
+
+ // make singleton
+ DynamicBucketUpdater(facebook::fb303::FacebookBase *fbBase)
+ : fbBase_(fbBase) {
+ initFb303Counters();
+ }
+
+ DynamicBucketUpdater(const DynamicBucketUpdater& other) {}
+};
+
+#endif // SCRIBE_DYNAMIC_BUCKET_UPDATER_H
View
47 src/network_dynamic_config.cpp
@@ -0,0 +1,47 @@
+// Copyright (c) 2007-2008 Facebook
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// See accompanying file LICENSE or visit the Scribe site at:
+// http://developers.facebook.com/scribe/
+//
+// @author John Song
+
+#include "scribe/src/network_dynamic_config.h"
+#include "scribe/src/dynamic_bucket_updater.h"
+
+static NetworkDynamicConfigMod netConfigMods[] = {
+ {
+ "thrift_bucket",
+ DynamicBucketUpdater::isConfigValid,
+ DynamicBucketUpdater::getHost,
+ },
+ {
+ "",
+ NULL,
+ NULL,
+ },
+};
+
+NetworkDynamicConfigMod* getNetworkDynamicConfigMod(const char* name) {
+ for (NetworkDynamicConfigMod *pconf = netConfigMods;
+ pconf->isConfigValidFunc && pconf->getHostFunc;
+ ++pconf) {
+ if (strcmp(name, pconf->name) == 0) {
+ return pconf;
+ }
+ }
+
+ return NULL;
+}
+
View
37 src/network_dynamic_config.h
@@ -0,0 +1,37 @@
+// Copyright (c) 2007-2008 Facebook
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// See accompanying file LICENSE or visit the Scribe site at:
+// http://developers.facebook.com/scribe/
+//
+// @author John Song
+
+#ifndef SCRIBE_NETWORK_UPDATER_H
+#define SCRIBE_NETWORK_UPDATER_H
+
+#include "scribe/src/conf.h"
+
+// functional types for network dynamic updater validation and getHost calls
+typedef bool (*NetworkIsConfigValidFunc)(const std::string& category, const StoreConf* pconf);
+typedef bool (*NetworkGetHost)(const std::string& category, const StoreConf* pconf, std::string& host, uint32_t& port);
+
+struct NetworkDynamicConfigMod {
+ const char* name;
+ NetworkIsConfigValidFunc isConfigValidFunc;
+ NetworkGetHost getHostFunc;
+};
+
+NetworkDynamicConfigMod* getNetworkDynamicConfigMod(const char* name);
+
+#endif //SCRIBE_NETWORK_UPDATER_H
View
73 src/store.cpp
@@ -27,7 +27,8 @@
#include <algorithm>
#include "common.h"
#include "scribe_server.h"
-#include "thrift/transport/TSimpleFileTransport.h"
+#include "thrift/lib/cpp/transport/TSimpleFileTransport.h"
+#include "network_dynamic_config.h"
using namespace std;
using namespace boost;
@@ -60,8 +61,6 @@ const double MULT_INC_FACTOR = 1.414; //sqrt(2)
#define ADD_DEC_FACTOR 2
#define CONT_SUCCESS_THRESHOLD 1
-
-
ConnPool g_connPool;
const string meta_logfile_prefix = "scribe_meta<new_logfile>: ";
@@ -1718,8 +1717,10 @@ NetworkStore::NetworkStore(StoreQueue* storeq,
serviceBased(false),
remotePort(0),
serviceCacheTimeout(DEFAULT_NETWORKSTORE_CACHE_TIMEOUT),
- lastServiceCheck(0),
- opened(false) {
+ ignoreNetworkError(false),
+ configmod(NULL),
+ opened(false),
+ lastServiceCheck(0) {
// we can't open the connection until we get configured
// the bool for opened ensures that we don't make duplicate
@@ -1757,6 +1758,58 @@ void NetworkStore::configure(pStoreConf configuration, pStoreConf parent) {
useConnPool = true;
}
}
+ if (configuration->getString("ignore_network_error", temp)) {
+ if (0 == temp.compare("yes")) {
+ ignoreNetworkError = true;
+ }
+ }
+
+ // if this network store dynamic configured?
+ // get network dynamic updater parameters
+ string dynamicType;
+ if (configuration->getString("dynamic_config_type", dynamicType)) {
+ // get dynamic config module
+ configmod = getNetworkDynamicConfigMod(dynamicType.c_str());
+ if (configmod) {
+ if (!configmod->isConfigValidFunc(categoryHandled, configuration.get())) {
+ LOG_OPER("[%s] dynamic network configuration is not valid.",
+ categoryHandled.c_str());
+ configmod = NULL;
+ } else {
+ // set remote host port
+ string host;
+ uint32_t port;
+ if (configmod->getHostFunc(categoryHandled, storeConf.get(), host, port)) {
+ remoteHost = host;
+ remotePort = port;
+ LOG_OPER("[%s] dynamic configred network store destination configured:<%s:%lu>",
+ categoryHandled.c_str(), remoteHost.c_str(), remotePort);
+ }
+ }
+ } else {
+ LOG_OPER("[%s] dynamic network configuration is not valid. Unable to find network dynamic configuration module with name <%s>",
+ categoryHandled.c_str(), dynamicType.c_str());
+ }
+ }
+}
+
+void NetworkStore::periodicCheck() {
+ if (configmod) {
+ // get the network updater type
+ string host;
+ uint32_t port;
+ bool success = configmod->getHostFunc(categoryHandled, storeConf.get(), host, port);
+ if (success && (host != remoteHost || port != remotePort)) {
+ // if it is different from the current configuration
+ // then close and open again
+ LOG_OPER("[%s] dynamic configred network store destination changed. old value:<%s:%lu>, new value:<%s:%lu>",
+ categoryHandled.c_str(), remoteHost.c_str(), remotePort,
+ host.c_str(), (long unsigned)port);
+ remoteHost = host;
+ remotePort = port;
+ close();
+ }
+ }
}
bool NetworkStore::open() {
@@ -1824,8 +1877,8 @@ bool NetworkStore::open() {
}
}
-
- if (opened) {
+ if (opened || ignoreNetworkError) {
+ // clear status on success or if we should not signal error here
setStatus("");
} else {
setStatus("Failed to connect");
@@ -2083,6 +2136,12 @@ void BucketStore::createBuckets(pStoreConf configuration) {
createStore(storeQueue, type, categoryHandled, false, multiCategory);
buckets.push_back(bucket);
+ //add bucket id configuration
+ bucket_conf->setUnsigned("bucket_id", i);
+ bucket_conf->setUnsigned("network::bucket_id", i);
+ bucket_conf->setUnsigned("file::bucket_id", i);
+ bucket_conf->setUnsigned("thriftfile::bucket_id", i);
+ bucket_conf->setUnsigned("buffer::bucket_id", i);
bucket->configure(bucket_conf, storeConf);
}
View
3  src/store.h
@@ -32,6 +32,7 @@
#include "file.h"
#include "conn_pool.h"
#include "store_queue.h"
+#include "network_dynamic_config.h"
class StoreQueue;
@@ -381,6 +382,7 @@ class NetworkStore : public Store {
void configure(pStoreConf configuration, pStoreConf parent);
void close();
void flush();
+ void periodicCheck();
protected:
static const long int DEFAULT_SOCKET_TIMEOUT_MS = 5000; // 5 sec timeout
@@ -398,6 +400,7 @@ class NetworkStore : public Store {
time_t lastServiceCheck;
// if true do not update status to reflect failure to connect
bool ignoreNetworkError;
+ NetworkDynamicConfigMod* configmod;
// state
bool opened;
View
3  test/bidmap.1
@@ -0,0 +1,3 @@
+1,localhost,1465
+2,localhost,1466
+3,localhost,1467
View
3  test/bidmap.2
@@ -0,0 +1,3 @@
+1,localhost,1466
+2,localhost,1465
+3,localhost,1465
View
130 test/bucketupdater.php
@@ -0,0 +1,130 @@
+<?php
+// Copyright (c) 2007-2008 Facebook
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// See accompanying file LICENSE or visit the Scribe site at:
+// http://developers.facebook.com/scribe/
+
+require_once 'tests.php';
+require_once 'testutil.php';
+
+// testing bucket store updater.
+// Setup:
+// 1. 3 scribe servers. one running on port 1465 and using conf file
+// scribe.conf.bucketupdater.server1, one running on port 1466
+// and using conf file scribe.conf.bucketupdater.server2, and the
+// last one running on port 1477 and using scribe.conf.bucketupdater.server.3.
+// Scribe server that configured by scribe.bucketupdater.server1
+// writes scribe messages to /tmp/scribetest_/bucketupdater/server1
+// Scribe server that configured by scribe.bucketupdater.server2
+// writes scribe messages to /tmp/scribetest_/bucketupdater/server2
+// Scribe server that configured by scribe.bucketupdater.server.3
+// writes scribe messages to /tmp/scribetest_/bucketupdater/server3
+// 2. bidupdater server which implements bucket updater interface.
+// It reads mappings from a local file bidmap which is symbolic
+// linked to bidmap.1 or bidmap.2. bidmap.1 maps bucket 1 to
+// scribe server that writes to /tmp/scribetest_/bucketupdater/server1,
+// bucket 2 to scribe server that writes to /tmp/scribetest_/bucketupdater/server2,
+// and bucket 3 to scribe server that writes to
+// /tmp/scribetest_bucketupdater/server3.
+// 3. bidmap.2 maps bucket1 to server2 and bucket 2 and 3 to server2.
+// 4. central server use connection pool.
+//
+// Test setup:
+// 1. launch scribe -p 1465 scribe.conf.bucketupdater.srever1
+// 2. launch scribe -p 1466 scribe.conf.bucketupdater.server2
+// 3. launch scribe -p 1467 scribe.conf.bucketupdater.server3
+// 3. ln -sf bidmap.1 bidmap.
+// 4. launch bidupdater: bidupdater -p 9999 -f bidmap
+// 5. scribe -p 1463 scribe.conf.bucketupdater which use bidupdater
+// to dynamically configure bucket store: bucketupdater.
+// 6. send two message, one with bucket id 1 and the other with bucket id 2
+// to scribe running on 1463. Check that the messages are
+// in /tmp/scribetest_/bucketupdater/server1/bucketupdater/bucket1
+// and /tmp/scribetest_/bucketupdater/server2/bucketupdater/bucket2
+// respectively.
+// 7. ln -sf bidmap.2 bid
+// 8. wait for 5 seconds
+// 9. send another two messages with bucket id 1 and 2.
+// Check that message with bucket id 1 goes to server2, and bucket
+// 2 and 3 ends in server1.
+// 10. check conn pool works correctly by verifying that there is no
+// network connection from 1463 to 1467, i.e. server3 using
+// netstat -n | grep ESTABLISH | grep -q 9999
+// and check the return code is 1.
+
+echo "Starting scribe server1: 1465, scribe.conf.bucketupdater.server1\n";
+// start scribe server1
+$pidScribe1 = scribe_start("bucketupdater.server1",
+ $GLOBALS['SCRIBE_BIN'], 1465,
+ "scribe.conf.bucketupdater.server1");
+
+echo "Starting scribe server2: 1466, scribe.conf.bucketupdater.server2\n";
+// start scribe server2
+$pidScribe2 = scribe_start("bucketupdater.server2",
+ $GLOBALS['SCRIBE_BIN'], 1466,
+ "scribe.conf.bucketupdater.server2");
+
+echo "Starting scribe server2: 1467, scribe.conf.bucketupdater.server2\n";
+// start scribe server3
+$pidScribe3 = scribe_start("bucketupdater.server3",
+ $GLOBALS['SCRIBE_BIN'], 1467,
+ "scribe.conf.bucketupdater.server3");
+
+$cmd = "ln -sf bidmap.1 bidmap";
+echo "$cmd\n";
+// symlink
+system($cmd);
+
+// start bidupdaer
+$cmd = "../../_bin/scribe/test/bucketupdater/bidupdater -p 9999 -f ./bidmap > bidupdater.out 2>&1 & echo $!";
+echo "$cmd\n";
+$pidUpdater = system($cmd);
+
+echo "Starting scribe central server: 1463,scribe.conf.bucketupdater.central.\n";
+// start scribe central server
+$pidScribeCentral = scribe_start("bucketupdater.central",
+ $GLOBALS['SCRIBE_BIN'], 1463,
+ "scribe.conf.bucketupdater.central");
+
+// test
+$success1 = bucketupdater_test("server1/bucket001/content_current",
+ "server2/bucket002/content_current",
+ "server3/bucket002/content_current");
+
+// change symlink
+$cmd = "ln -sf bidmap.2 bidmap";
+echo "$cmd\n";
+system($cmd);
+
+echo "sleep(15) to wait for bucket updater to take effect.\n";
+sleep(15);
+
+// test again
+$success2 = bucketupdater_test("server2/bucket001/content_current",
+ "server1/bucket002/content_current",
+ "server1/bucket002/content_current");
+
+// stop scribe server
+scribe_stop($GLOBALS['SCRIBE_CTRL'], 1465, $pidScribe1);
+scribe_stop($GLOBALS['SCRIBE_CTRL'], 1466, $pidScribe2);
+scribe_stop($GLOBALS['SCRIBE_CTRL'], 1467, $pidScribe3);
+scribe_stop($GLOBALS['SCRIBE_CTRL'], 1463, $pidScribeCentral);
+
+// kill -9
+$cmd = "kill -9 $pidUpdater\n";
+echo "$cmd\n";
+system($cmd);
+
+return ($success1 && $success2);
View
9 test/bucketupdater/TARGETS
@@ -0,0 +1,9 @@
+cpp_binary (
+ name = 'bidupdater',
+ srcs = [
+ 'main.cpp',
+ ],
+ deps = [
+ '@/scribe/if:bucket',
+ ],
+)
View
104 test/bucketupdater/main.cpp
@@ -0,0 +1,104 @@
+// This autogenerated skeleton file illustrates how to build a server.
+// You should copy it to another filename to avoid overwriting it.
+
+#include "scribe/if/gen-cpp/BucketStoreMapping.h"
+#include <protocol/TBinaryProtocol.h>
+#include <server/TSimpleServer.h>
+#include <transport/TServerSocket.h>
+#include <transport/TBufferTransports.h>
+#include <iostream>
+#include <fstream>
+#include <unistd.h>
+#include <boost/algorithm/string.hpp>
+
+using namespace std;
+using namespace boost;
+using namespace apache::thrift;
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using namespace apache::thrift::server;
+
+using namespace ::scribe::thrift;
+
+class BucketStoreMappingHandler : virtual public BucketStoreMappingIf {
+ public:
+ BucketStoreMappingHandler(const string& file) : file_(file) {
+ }
+
+ void getMapping(std::map<int32_t, HostPort> & _return, const std::string& category) {
+ // get current time
+ char timeBuf[1024];
+ time_t now = time(NULL);
+ ctime_r(&now, timeBuf);
+ // chop the last "\n";
+ timeBuf[strlen(timeBuf) - 1] = 0;
+
+ // open the file "file", it should be in the form of
+ // i1,host,port
+ // i2
+ // where n is number of bucket and i1, i2 are bucket number
+ ifstream ifs(file_.c_str());
+ if (ifs.good()) {
+ string line;
+ // read in one line
+ while (!ifs.eof()) {
+ getline(ifs, line);
+ if (line.empty()) {
+ continue;
+ }
+ vector<string> parts;
+ boost::split(parts, line, boost::is_any_of(","));
+ if (parts.size() < 3) {
+ cerr << "ignorig line: " << line << endl;
+ continue;
+ }
+ int32_t bid = atoi(parts[0].c_str());
+ HostPort hp;
+ hp.host = parts[1];
+ hp.port = atoi(parts[2].c_str());
+ _return[bid] = hp;
+ cout << "[" << timeBuf << "] bucket " << bid << " => "
+ << hp.host << ":" << hp.port << endl;
+ }
+ } else {
+ cerr << "Can't read configure file: " << file_ << endl;
+ }
+ }
+
+ protected:
+ string file_;
+};
+
+void help() {
+ cout << "Usage: -f bucket_map_file [-p port]" << endl;
+}
+
+int main(int argc, char **argv) {
+ int port = 9090;
+ string file;
+
+ int c;
+ while ((c = getopt(argc, argv, "p:f:")) != -1) {
+ switch (c) {
+ case 'p':
+ port = atoi(optarg);
+ break;
+ case 'f':
+ file = optarg;
+ break;
+ default:
+ abort();
+ }
+ }
+ shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
+ shared_ptr<BucketStoreMappingHandler> handler(new BucketStoreMappingHandler(file));
+ shared_ptr<BucketStoreMappingProcessor> processor(new BucketStoreMappingProcessor(handler));
+ shared_ptr<TServerSocket> serverSocket(new TServerSocket(port));
+ //shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
+ shared_ptr<TTransportFactory> transportFactory(new TFramedTransportFactory());
+
+ TSimpleServer server(processor, serverSocket, transportFactory, protocolFactory);
+ server.serve();
+ return 0;
+}
+
View
2  test/paramtest.php
@@ -28,7 +28,7 @@
print("running param inheritance test\n");
param_test();
-sleep(5);
+sleep(2);
// check results
$file = "/tmp/scribetest_/paramtest/primary_current";
View
113 test/scribe.conf.bucketupdater.central
@@ -0,0 +1,113 @@
+## Copyright (c) 2007-2008 Facebook
+##
+## Licensed under the Apache License, Version 2.0 (the "License");
+## you may not use this file except in compliance with the License.
+## You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+## See accompanying file LICENSE or visit the Scribe site at:
+## http://developers.facebook.com/scribe/
+
+
+##
+## Test configuration listens on a different port and writes data to
+## /tmp/scribetest
+##
+
+
+# scribe configuration
+#
+# This file specifies global key-value pairs as well as store
+# objects, which are surrounded by xml-like tags <store></store>
+#
+# Each store has a category and a type. The category must match the
+# category string used by the client code, and the type must be one of:
+# file, network, bucket, buffer. The remainder of the store
+# configuration depends on the type.
+#
+# Some types of stores include other stores, which are specified by
+# nested xml-like tags. These have specific names that depend on type.
+# For example a buffer store has a <primary> store and a <secondary>
+# store, which can be of any type, and are configured the same way
+# they would be in a top-level <store>. Note that nested stores don't
+# have a configured category, it is inherited from the top-level store.
+#
+# The category "default" is a special case. Any category not configured
+# here will be handled using the default configuration, except with
+# filenames overwritten with the category name.
+#
+# The parser isn't great, so add whitespace at your own risk.
+
+max_msg_per_second=2000000
+check_interval=1
+# global inheritance
+file::fs_type=std
+
+# DEFAULT
+<store>
+category=default
+type=buffer
+
+target_write_size=20480
+max_write_interval=1
+buffer_send_rate=2
+retry_interval=3
+retry_interval_range=1
+
+
+<primary>
+type=file
+fs_type=std
+file_path=/tmp/scribetest_
+base_filename=thisisoverwritten
+max_size=2000000
+rotate_period=daily
+rotate_hour=0
+rotate_minute=5
+add_newlines=1
+</primary>
+
+<secondary>
+type=file
+fs_type=std
+file_path=/tmp/scribe_test_
+base_filename=thisisoverwritten
+max_size=3000000
+</secondary>
+</store>
+
+<store>
+category=bucketupdater
+target_write_size=1
+type=bucket
+num_buckets=3
+delimiter=59
+bucket_type=key_modulo
+network::dynamic_config_type=thrift_bucket
+network::bucket_updater_ttl=3
+network::bucket_updater_host=localhost
+network::bucket_updater_port=9999
+network::use_conn_pool=yes
+ <bucket0>
+ type=file
+ fs_type=std
+ file_path=/tmp/scribetest_
+ base_filename=thisshouldnothappen
+ </bucket0>
+ <bucket1>
+ type=network
+ </bucket1>
+ <bucket2>
+ type=network
+ </bucket2>
+ <bucket3>
+ type=network
+ </bucket2>
+</store>
View
102 test/scribe.conf.bucketupdater.server1
@@ -0,0 +1,102 @@
+## Copyright (c) 2007-2008 Facebook
+##
+## Licensed under the Apache License, Version 2.0 (the "License");
+## you may not use this file except in compliance with the License.
+## You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+## See accompanying file LICENSE or visit the Scribe site at:
+## http://developers.facebook.com/scribe/
+
+
+##
+## Test configuration listens on a different port and writes data to
+## /tmp/scribetest
+##
+
+
+# scribe configuration
+#
+# This file specifies global key-value pairs as well as store
+# objects, which are surrounded by xml-like tags <store></store>
+#
+# Each store has a category and a type. The category must match the
+# category string used by the client code, and the type must be one of:
+# file, network, bucket, buffer. The remainder of the store
+# configuration depends on the type.
+#
+# Some types of stores include other stores, which are specified by
+# nested xml-like tags. These have specific names that depend on type.
+# For example a buffer store has a <primary> store and a <secondary>
+# store, which can be of any type, and are configured the same way
+# they would be in a top-level <store>. Note that nested stores don't
+# have a configured category, it is inherited from the top-level store.
+#
+# The category "default" is a special case. Any category not configured
+# here will be handled using the default configuration, except with
+# filenames overwritten with the category name.
+#
+# The parser isn't great, so add whitespace at your own risk.
+
+max_msg_per_second=2000000
+check_interval=1
+# global inheritance
+file::fs_type=std
+
+# DEFAULT
+<store>
+category=default
+type=buffer
+
+target_write_size=20480
+max_write_interval=1
+buffer_send_rate=2
+retry_interval=3
+retry_interval_range=1
+
+
+<primary>
+type=file
+fs_type=std
+file_path=/tmp/scribetest_
+base_filename=thisisoverwritten
+max_size=2000000
+rotate_period=daily
+rotate_hour=0
+rotate_minute=5
+add_newlines=1
+</primary>
+
+<secondary>
+type=file
+fs_type=std
+file_path=/tmp/scribe_test_
+base_filename=thisisoverwritten
+max_size=3000000
+</secondary>
+</store>
+
+<store>
+target_write_size=1
+category=bucketupdater
+type=bucket
+num_buckets=3
+bucket_subdir=bucket
+bucket_type=key_modulo
+delimiter=59
+
+<bucket>
+type=file
+fs_type=std
+file_path=/tmp/scribetest_/bucketupdater/server1
+base_filename=content
+max_size=3000
+</bucket>
+</store>
View
102 test/scribe.conf.bucketupdater.server2
@@ -0,0 +1,102 @@
+## Copyright (c) 2007-2008 Facebook
+##
+## Licensed under the Apache License, Version 2.0 (the "License");
+## you may not use this file except in compliance with the License.
+## You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+## See accompanying file LICENSE or visit the Scribe site at:
+## http://developers.facebook.com/scribe/
+
+
+##
+## Test configuration listens on a different port and writes data to
+## /tmp/scribetest
+##
+
+
+# scribe configuration
+#
+# This file specifies global key-value pairs as well as store
+# objects, which are surrounded by xml-like tags <store></store>
+#
+# Each store has a category and a type. The category must match the
+# category string used by the client code, and the type must be one of:
+# file, network, bucket, buffer. The remainder of the store
+# configuration depends on the type.
+#
+# Some types of stores include other stores, which are specified by
+# nested xml-like tags. These have specific names that depend on type.
+# For example a buffer store has a <primary> store and a <secondary>
+# store, which can be of any type, and are configured the same way
+# they would be in a top-level <store>. Note that nested stores don't
+# have a configured category, it is inherited from the top-level store.
+#
+# The category "default" is a special case. Any category not configured
+# here will be handled using the default configuration, except with
+# filenames overwritten with the category name.
+#
+# The parser isn't great, so add whitespace at your own risk.
+
+max_msg_per_second=2000000
+check_interval=1
+# global inheritance
+file::fs_type=std
+
+# DEFAULT
+<store>
+category=default
+type=buffer
+
+target_write_size=20480
+max_write_interval=1
+buffer_send_rate=2
+retry_interval=3
+retry_interval_range=1
+
+
+<primary>
+type=file
+fs_type=std
+file_path=/tmp/scribetest_
+base_filename=thisisoverwritten
+max_size=2000000
+rotate_period=daily
+rotate_hour=0
+rotate_minute=5
+add_newlines=1
+</primary>
+
+<secondary>
+type=file
+fs_type=std
+file_path=/tmp/scribe_test_
+base_filename=thisisoverwritten
+max_size=3000000
+</secondary>
+</store>
+
+<store>
+category=bucketupdater
+target_write_size=1
+type=bucket
+num_buckets=3
+bucket_subdir=bucket
+bucket_type=key_modulo
+delimiter=59
+
+<bucket>
+type=file
+fs_type=std
+file_path=/tmp/scribetest_/bucketupdater/server2
+base_filename=content
+max_size=3000
+</bucket>
+</store>
View
2  test/scribe.conf.paramtest
@@ -87,7 +87,7 @@ max_size=3000000
category=paramtest
type=buffer
-target_write_size=20480
+target_write_size=1
max_write_interval=1
buffer_send_rate=2
retry_interval=3
View
28 test/tests.php
@@ -420,6 +420,34 @@ function make_message($client_name, $avg_size, $sequence, $random) {
return $message;
}
+function create_bucketupdater_client($host, $port) {
+ try {
+ // Set up the socket connections
+ print "creating socket pool\n";
+ $sock = new TSocketPool(array($host), array($port));
+ $sock->setDebug(0);
+ $sock->setSendTimeout(1000);
+ $sock->setRecvTimeout(2500);
+ $sock->setNumRetries(1);
+ $sock->setRandomize(false);
+ $sock->setAlwaysTryLast(true);
+ $trans = new TFramedTransport($sock);
+ $prot = new TBinaryProtocol($trans);
+
+ // Create the client
+ print "creating bucketupdater client\n";
+ $updater_client = new BucketStoreMappingClient($prot);
+
+ // Open the transport (we rely on PHP to close it at script termination)
+ print "opening transport\n";
+ $trans->open();
+ } catch (Exception $x) {
+ print "Unable to create bucket updater client, received exception: $x \n";
+ return null;
+ }
+ return $updater_client;
+}
+
function create_scribe_client() {
try {
// Set up the socket connections
View
2  test/testsuite.php
@@ -43,7 +43,9 @@
'buffertest',
'buffertest2',
// 'categoriestest',
+ 'bucketupdater',
'paramtest',
+ 'reloadtest',
);
$output_file = null;
View
11 test/testutil.php
@@ -89,6 +89,17 @@ function scribe_stop($scribe_ctrl_path, $port, $pid = 0) {
return true;
}
+/**
+ * reload scribe
+ */
+function scribe_reload($scribe_ctrl_path, $port) {
+ $cmd = "$scribe_ctrl_path/scribe_ctrl reload $port";
+ system($cmd, $ret);
+
+ // return true if scribe is alive
+ return $ret == 0;
+}
+
function check_alive($scribe_ctrl_path, $port) {
$scribe_ctrl = "$scribe_ctrl_path/scribe_ctrl";
$command = "$scribe_ctrl status $port ";
Please sign in to comment.
Something went wrong with that request. Please try again.