Browse files

Ability to double log default and prefix categories

Summary:
Allow default and prefix categories to have multiple stores.

In the process (1) removed temporary data strucutres that were used only
during config time (2) had to fix locking to allow multiple stores to be
created for a category at run time

Test Plan:
test multiple file stores for default
test multiple file stores for test*
test multiple file stores for "default test*"
test multiple file stores for "xyz test*"
test multiple file stores for "test*"

DiffCamp Revision: 103186
Reviewed By: groys
Commenters: arivin, agiardullo
CC: agiardullo, arivin, pkhemani, groys, scribe-dev@lists
Tasks:
#179680

Revert Plan:
OK

git-svn-id: svn+ssh://tubbs/svnapps/fbomb/trunk/fbcode/scribe@25744 2248de34-8caa-4a3c-bc55-5e52d9d7b73a
  • Loading branch information...
1 parent 186eef2 commit d409aba93f10194cf595d3bb4c9a068de1426a5e pkhemani committed with groys Apr 12, 2010
Showing with 345 additions and 142 deletions.
  1. +112 −132 src/scribe_server.cpp
  2. +3 −9 src/scribe_server.h
  3. +76 −0 test/scribe.conf.twodefaulttest
  4. +6 −1 test/testutil.php
  5. +148 −0 test/twodefaulttest.php
View
244 src/scribe_server.cpp
@@ -174,10 +174,9 @@ scribeHandler::scribeHandler(unsigned long int server_port, const std::string& c
scribeHandler::~scribeHandler() {
deleteCategoryMap(pcategories);
- if (pcategory_prefixes) {
- delete pcategory_prefixes;
- pcategory_prefixes = NULL;
- }
+ pcategories = NULL;
+ deleteCategoryMap(pcategory_prefixes);
+ pcategory_prefixes = NULL;
}
// Returns the handler status, but overwrites it with WARNING if it's
@@ -187,16 +186,16 @@ fb_status scribeHandler::getStatus() {
Guard status_monitor(statusLock);
fb_status return_status(status);
- if (status == ALIVE) {
+ if (pcategories == NULL) {
+ return_status = WARNING;
+ } else if (status == ALIVE) {
for (category_map_t::iterator cat_iter = pcategories->begin();
cat_iter != pcategories->end();
++cat_iter) {
for (store_list_t::iterator store_iter = cat_iter->second->begin();
store_iter != cat_iter->second->end();
- ++store_iter)
- {
- if (!(*store_iter)->getStatus().empty())
- {
+ ++store_iter) {
+ if (!(*store_iter)->getStatus().empty()) {
return_status = WARNING;
return return_status;
}
@@ -268,8 +267,7 @@ const char* scribeHandler::statusAsString(fb_status status) {
bool scribeHandler::createCategoryFromModel(
const string &category, const boost::shared_ptr<StoreQueue> &model) {
- if ((pcategories == NULL) ||
- (pcategories->find(category) != pcategories->end())) {
+ if (pcategories == NULL) {
return false;
}
@@ -302,10 +300,14 @@ bool scribeHandler::createCategoryFromModel(
pstore = model;
}
- shared_ptr<store_list_t> pstores =
- shared_ptr<store_list_t>(new store_list_t);
-
- (*pcategories)[category] = pstores;
+ shared_ptr<store_list_t> pstores;
+ category_map_t::iterator cat_iter = pcategories->find(category);
+ if (cat_iter == pcategories->end()) {
+ pstores = shared_ptr<store_list_t>(new store_list_t);
+ (*pcategories)[category] = pstores;
+ } else {
+ pstores = cat_iter->second;
+ }
pstores->push_back(pstore);
return true;
@@ -368,13 +370,17 @@ shared_ptr<store_list_t> scribeHandler::createNewCategory(
shared_ptr<store_list_t> store_list;
// First, check the list of category prefixes for a model
- category_prefix_map_t::iterator cat_prefix_iter = pcategory_prefixes->begin();
+ category_map_t::iterator cat_prefix_iter = pcategory_prefixes->begin();
while (cat_prefix_iter != pcategory_prefixes->end()) {
string::size_type len = cat_prefix_iter->first.size();
if (cat_prefix_iter->first.compare(0, len-1, category, 0, len-1) == 0) {
// Found a matching prefix model
- createCategoryFromModel(category, cat_prefix_iter->second);
+ shared_ptr<store_list_t> pstores = cat_prefix_iter->second;
+ for (store_list_t::iterator store_iter = pstores->begin();
+ store_iter != pstores->end(); ++store_iter) {
+ createCategoryFromModel(category, *store_iter);
+ }
category_map_t::iterator cat_iter = pcategories->find(category);
if (cat_iter != pcategories->end()) {
@@ -389,19 +395,19 @@ shared_ptr<store_list_t> scribeHandler::createNewCategory(
cat_prefix_iter++;
}
- // Then try creating a store if we have a default store defined
- if (store_list == NULL) {
- if (defaultStore != NULL) {
- createCategoryFromModel(category, defaultStore);
- category_map_t::iterator cat_iter = pcategories->find(category);
-
- if (cat_iter != pcategories->end()) {
- store_list = cat_iter->second;
- } else {
- LOG_OPER("failed to create new default store for category <%s>",
- category.c_str());
- }
+ // Then try creating a store if we have a default store defined
+ if (store_list == NULL && !defaultStores.empty()) {
+ for (store_list_t::iterator store_iter = defaultStores.begin();
+ store_iter != defaultStores.end(); ++store_iter) {
+ createCategoryFromModel(category, *store_iter);
+ }
+ category_map_t::iterator cat_iter = pcategories->find(category);
+ if (cat_iter != pcategories->end()) {
+ store_list = cat_iter->second;
+ } else {
+ LOG_OPER("failed to create new default store for category <%s>",
+ category.c_str());
}
}
@@ -458,12 +464,11 @@ ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
shared_ptr<store_list_t> store_list;
string category = (*msg_iter).category;
+ category_map_t::iterator cat_iter;
// First look for an exact match of the category
- if (pcategories) {
- category_map_t::iterator cat_iter = pcategories->find(category);
- if (cat_iter != pcategories->end()) {
- store_list = cat_iter->second;
- }
+ if (pcategories && ((cat_iter = pcategories->find(category)) !=
+ pcategories->end())) {
+ store_list = cat_iter->second;
}
// Try creating a new store for this category if we didn't find one
@@ -472,7 +477,12 @@ ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
scribeHandlerLock.release();
scribeHandlerLock.acquireWrite();
- store_list = createNewCategory(category);
+ if (pcategories && ((cat_iter = pcategories->find(category)) !=
+ pcategories->end())) {
+ store_list = cat_iter->second;
+ } else {
+ store_list = createNewCategory(category);
+ }
scribeHandlerLock.release();
scribeHandlerLock.acquireRead();
@@ -533,10 +543,8 @@ void scribeHandler::stopStores() {
// so this could leave clients in weird states.
deleteCategoryMap(pcategories);
pcategories = NULL;
- if (pcategory_prefixes) {
- delete pcategory_prefixes;
- pcategory_prefixes = NULL;
- }
+ deleteCategoryMap(pcategory_prefixes);
+ pcategory_prefixes = NULL;
}
void scribeHandler::shutdown() {
@@ -567,9 +575,6 @@ void scribeHandler::initialize() {
bool enough_config_to_run = true;
int numstores = 0;
- pnew_categories = new category_map_t;
- pnew_category_prefixes = new category_prefix_map_t;
- tmpDefault.reset();
try {
// Get the config data and parse it.
@@ -625,6 +630,22 @@ void scribeHandler::initialize() {
}
}
+ defaultStores.clear();
+ if (pcategories) {
+ deleteCategoryMap(pcategories);
+ }
+ pcategories = new category_map_t;
+ if (pcategories == NULL) {
+ throw runtime_error("Could not alloc pcategories");
+ }
+ if (pcategory_prefixes) {
+ deleteCategoryMap(pcategory_prefixes);
+ }
+ pcategory_prefixes = new category_map_t;
+ if (pcategory_prefixes == NULL) {
+ throw runtime_error("Could not alloc pcategory_prefixes");
+ }
+
// Build a new map of stores, and move stores from the old map as
// we find them in the config file. Any stores left in the old map
// at the end will be deleted.
@@ -657,33 +678,18 @@ void scribeHandler::initialize() {
enough_config_to_run = false;
}
- // clean up existing stores
- deleteCategoryMap(pcategories);
- pcategories = NULL;
- if (pcategory_prefixes) {
- delete pcategory_prefixes;
- pcategory_prefixes = NULL;
- }
- defaultStore.reset();
-
- if (enough_config_to_run) {
- pcategories = pnew_categories;
- pcategory_prefixes = pnew_category_prefixes;
- defaultStore = tmpDefault;
- } else {
+ if (!enough_config_to_run) {
// If the new configuration failed we'll run with
// nothing configured and status set to WARNING
- deleteCategoryMap(pnew_categories);
- if (pnew_category_prefixes) {
- delete pnew_category_prefixes;
- }
+ deleteCategoryMap(pcategories);
+ pcategories = NULL;
+ deleteCategoryMap(pcategory_prefixes);
+ pcategory_prefixes = NULL;
}
- pnew_categories = NULL;
- pnew_category_prefixes = NULL;
- tmpDefault.reset();
- if (!perfect_config || !enough_config_to_run) { // perfect should be a subset of enough, but just in case
+ if (!perfect_config || !enough_config_to_run) {
+ // perfect should be a subset of enough, but just in case
setStatus(WARNING); // status details should have been set above
} else {
setStatusDetails("");
@@ -793,10 +799,6 @@ shared_ptr<StoreQueue> scribeHandler::configureStoreCategory(
LOG_OPER("CATEGORY : %s", category.c_str());
if (0 == category.compare("default")) {
- if (tmpDefault != NULL) {
- setStatusDetails("Bad config - multiple default stores specified");
- return shared_ptr<StoreQueue>();
- }
is_default = true;
}
@@ -815,55 +817,38 @@ shared_ptr<StoreQueue> scribeHandler::configureStoreCategory(
// look for the store in the current list
shared_ptr<StoreQueue> pstore;
- if (!is_prefix_category && pcategories) {
- category_map_t::iterator category_iter = pcategories->find(category);
- if (category_iter != pcategories->end()) {
- shared_ptr<store_list_t> pstores = category_iter->second;
-
- for ( store_list_t::iterator it = pstores->begin(); it != pstores->end(); ++it ) {
- if ( (*it)->getBaseType() == type &&
- pstores->size() <= 1) { // no good way to match them up if there's more than one
- pstore = (*it);
- pstores->erase(it);
- }
- }
- }
- }
try {
- // create a new store if it doesn't already exist
- if (!pstore) {
- if (model != NULL) {
- // Create a copy of the model if we want a new thread per category
- if (newThreadPerCategory && !is_default && !is_prefix_category) {
- pstore = shared_ptr<StoreQueue>(new StoreQueue(model, category));
- } else {
- pstore = model;
- already_created = true;
- }
+ if (model != NULL) {
+ // Create a copy of the model if we want a new thread per category
+ if (newThreadPerCategory && !is_default && !is_prefix_category) {
+ pstore = shared_ptr<StoreQueue>(new StoreQueue(model, category));
} else {
- string store_name;
- bool is_model, multi_category, categories;
+ pstore = model;
+ already_created = true;
+ }
+ } else {
+ string store_name;
+ bool is_model, multi_category, categories;
- /* remove any *'s from category name */
- if (is_prefix_category)
- store_name = category.substr(0, category.size() - 1);
- else
- store_name = category;
+ /* remove any *'s from category name */
+ if (is_prefix_category)
+ store_name = category.substr(0, category.size() - 1);
+ else
+ store_name = category;
- // Does this store define multiple categories
- categories = (is_default || is_prefix_category || category_list);
+ // Does this store define multiple categories
+ categories = (is_default || is_prefix_category || category_list);
- // Determine if this store will actually handle multiple categories
- multi_category = !newThreadPerCategory && categories;
+ // Determine if this store will actually handle multiple categories
+ multi_category = !newThreadPerCategory && categories;
- // Determine if this store is just a model for later stores
- is_model = newThreadPerCategory && categories;
+ // Determine if this store is just a model for later stores
+ is_model = newThreadPerCategory && categories;
- pstore =
- shared_ptr<StoreQueue>(new StoreQueue(type, store_name, checkPeriod,
- is_model, multi_category));
- }
+ pstore =
+ shared_ptr<StoreQueue>(new StoreQueue(type, store_name, checkPeriod,
+ is_model, multi_category));
}
} catch (...) {
pstore.reset();
@@ -885,35 +870,28 @@ shared_ptr<StoreQueue> scribeHandler::configureStoreCategory(
if (is_default) {
LOG_OPER("Creating default store");
- tmpDefault = pstore;
- }
- else if (is_prefix_category) {
- category_prefix_map_t::iterator category_iter =
- pnew_category_prefixes->find(category);
-
- if (category_iter == pnew_category_prefixes->end()) {
- (*pnew_category_prefixes)[category] = pstore;
+ defaultStores.push_back(pstore);
+ } else if (is_prefix_category) {
+ shared_ptr<store_list_t> pstores;
+ category_map_t::iterator category_iter = pcategory_prefixes->find(category);
+ if (category_iter != pcategory_prefixes->end()) {
+ pstores = category_iter->second;
} else {
- string errormsg =
- "Bad config - multiple prefix stores specified for category: ";
-
- errormsg += category;
- setStatusDetails(errormsg);
- return shared_ptr<StoreQueue>();
+ pstores = shared_ptr<store_list_t>(new store_list_t);
+ (*pcategory_prefixes)[category] = pstores;
}
- }
-
- // push the new store onto the new map if it's not just a model
- if (!pstore->isModelStore() && !category_list) {
+ pstores->push_back(pstore);
+ } else if (!pstore->isModelStore() && !category_list) {
+ // push the new store onto the new map if it's not just a model
shared_ptr<store_list_t> pstores;
- category_map_t::iterator category_iter = pnew_categories->find(category);
- if (category_iter != pnew_categories->end()) {
+ category_map_t::iterator category_iter = pcategories->find(category);
+ if (category_iter != pcategories->end()) {
pstores = category_iter->second;
} else {
pstores = shared_ptr<store_list_t>(new store_list_t);
- (*pnew_categories)[category] = pstores;
+ (*pcategories)[category] = pstores;
}
- pstores->push_back(pstore);
+ pstores->push_back(pstore);
}
return pstore;
@@ -939,7 +917,9 @@ void scribeHandler::deleteCategoryMap(category_map_t *pcats) {
throw std::logic_error("deleteCategoryMap: iterator in store map holds null pointer");
}
- (*store_iter)->stop();
+ if (!(*store_iter)->isModelStore()) {
+ (*store_iter)->stop();
+ }
} // for each store
pstores->clear();
} // for each category
View
12 src/scribe_server.h
@@ -29,7 +29,6 @@
typedef std::vector<boost::shared_ptr<StoreQueue> > store_list_t;
typedef std::map<std::string, boost::shared_ptr<store_list_t> > category_map_t;
-typedef std::map<std::string, boost::shared_ptr<StoreQueue> > category_prefix_map_t;
class scribeHandler : virtual public scribe::thrift::scribeIf,
public facebook::fb303::FacebookBase {
@@ -71,15 +70,10 @@ class scribeHandler : virtual public scribe::thrift::scribeIf,
// Each of these entries is a map of type->StoreQueue.
// The StoreQueue contains a store, which could contain additional stores.
category_map_t* pcategories;
- category_prefix_map_t* pcategory_prefixes;
+ category_map_t* pcategory_prefixes;
- // the default store
- boost::shared_ptr<StoreQueue> defaultStore;
-
- // temp versions of the above 3 pointers to use during initialization
- category_map_t* pnew_categories;
- category_prefix_map_t* pnew_category_prefixes;
- boost::shared_ptr<StoreQueue> tmpDefault;
+ // the default stores
+ store_list_t defaultStores;
std::string configFilename;
facebook::fb303::fb_status status;
View
76 test/scribe.conf.twodefaulttest
@@ -0,0 +1,76 @@
+## Copyright (c) 2007-2008 Facebook
+##
+## Licensed under the Apache License, Version 2.0 (the "License");
+## you may not use this file except in compliance with the License.
+## You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+## See accompanying file LICENSE or visit the Scribe site at:
+## http://developers.facebook.com/scribe/
+
+
+##
+## Test configuration listens on a different port and writes data to
+## /tmp/scribetest
+##
+
+
+# scribe configuration
+#
+# This file specifies global key-value pairs as well as store
+# objects, which are surrounded by xml-like tags <store></store>
+#
+# Each store has a category and a type. The category must match the
+# category string used by the client code, and the type must be one of:
+# file, network, bucket, buffer. The remainder of the store
+# configuration depends on the type.
+#
+# Some types of stores include other stores, which are specified by
+# nested xml-like tags. These have specific names that depend on type.
+# For example a buffer store has a <primary> store and a <secondary>
+# store, which can be of any type, and are configured the same way
+# they would be in a top-level <store>. Note that nested stores don't
+# have a configured category, it is inherited from the top-level store.
+#
+# The category "default" is a special case. Any category not configured
+# here will be handled using the default configuration, except with
+# filenames overwritten with the category name.
+#
+# The parser isn't great, so add whitespace at your own risk.
+
+max_msg_per_second=2000000
+check_interval=1
+
+# DEFAULT
+<store>
+category=default
+type=file
+fs_type=std
+file_path=/tmp/scribetest_
+base_filename=thisisoverwritten
+max_size=2000000
+rotate_period=daily
+rotate_hour=0
+rotate_minute=5
+add_newlines=1
+</store>
+
+<store>
+category=default
+type=file
+fs_type=std
+file_path=/tmp/scribe_test_
+base_filename=thisisoverwritten
+max_size=3000000
+rotate_period=daily
+rotate_hour=0
+rotate_minute=5
+add_newlines=1
+</store>
View
7 test/testutil.php
@@ -143,14 +143,19 @@ function resultChecker($path, $fileprefix, $clientname) {
// For each file that matches the prefix, call resultFileChecker
// to count the matching lines
+ $files = array();
if ($dir = opendir($path)) {
while (false !== ($file = readdir($dir))) {
if (0 === strncmp($file, $fileprefix, strlen($fileprefix))) {
+ $files[] = $file;
+ }
+ }
+ sort($files);
+ foreach ($files as $file) {
$filename = "$path/$file";
$tmp_results = resultFileChecker($filename, $clientname, $last_entry);
$results["count"] += $tmp_results["count"];
$results["out_of_order"] += $tmp_results["out_of_order"];
- }
}
} else {
print("ERROR: could not open directory: $path \n");
View
148 test/twodefaulttest.php
@@ -0,0 +1,148 @@
+<?php
+// Copyright (c) 2007-2008 Facebook
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+// See accompanying file LICENSE or visit the Scribe site at:
+// http://developers.facebook.com/scribe/
+
+
+// very similar to basictestst. Infact it does every test that basictest does
+// but does them twice - once for each place where default category gets logged.
+// The scribe.conf.twodefaulttest will show that the default category is being
+// logged into two filestores simultaneously
+
+require_once 'tests.php';
+require_once 'testutil.php';
+
+$success = true;
+
+$pid = scribe_start('twodefaulttest', $GLOBALS['SCRIBE_BIN'],
+ $GLOBALS['SCRIBE_PORT'],
+ 'scribe.conf.twodefaulttest');
+
+// write 10k messages to category test (handled by default store)
+print("test writing 10k messages to category test\n");
+stress_test('test', 'client1', 1000, 10000, 20, 100, 1);
+sleep(25);
+$results = resultChecker('/tmp/scribetest_/test', 'test-', 'client1');
+
+if ($results["count"] != 10000 || $results["out_of_order"] != 0) {
+ $success = false;
+ print_r($results);
+}
+$results = resultChecker('/tmp/scribe_test_/test', 'test-', 'client1');
+
+if ($results["count"] != 10000 || $results["out_of_order"] != 0) {
+ $success = false;
+ print_r($results);
+}
+
+
+
+// write another 10k messages to category test (should see 1 out of order)
+print("test writing another 10k messages (will see 1 out of order)\n");
+stress_test('test', 'client1', 1000, 10000, 20, 100, 1);
+sleep(25);
+$results = resultChecker('/tmp/scribetest_/test', 'test-', 'client1');
+
+if ($results["count"] != 20000 || $results["out_of_order"] != 1) {
+ $success = false;
+}
+$results = resultChecker('/tmp/scribe_test_/test', 'test-', 'client1');
+
+if ($results["count"] != 20000 || $results["out_of_order"] != 1) {
+ $success = false;
+}
+
+// write 200k messages to category test using different client name
+print("test writing 200k more messages to category test\n");
+stress_test('test', 'client2', 10000, 200000, 50, 100, 1);
+sleep(25);
+$results = resultChecker('/tmp/scribetest_/test', 'test-', 'client2');
+
+if ($results["count"] != 200000 || $results["out_of_order"] != 0) {
+ $success = false;
+}
+$results = resultChecker('/tmp/scribe_test_/test', 'test-', 'client2');
+
+if ($results["count"] != 200000 || $results["out_of_order"] != 0) {
+ $success = false;
+}
+
+// write 10k messages to category tps (handled by named store)
+print("test writing 10k messages to category tps\n");
+stress_test('tps', 'client1', 1000, 10000, 200, 100, 1);
+sleep(25);
+$results = resultChecker('/tmp/scribetest_/tps', 'tps-', 'client1');
+
+if ($results["count"] != 10000 || $results["out_of_order"] != 0) {
+ $success = false;
+}
+$results = resultChecker('/tmp/scribe_test_/tps', 'tps-', 'client1');
+
+if ($results["count"] != 10000 || $results["out_of_order"] != 0) {
+ $success = false;
+}
+
+// write 10k messages to category foodoo (handled by prefix store)
+print("test writing 10k messages to category foodoo\n");
+stress_test('foodoo', 'client1', 10000, 10000, 20, 100, 1);
+sleep(25);
+$results = resultChecker('/tmp/scribetest_/foodoo', 'foodoo-', 'client1');
+
+if ($results["count"] != 10000 || $results["out_of_order"] != 0) {
+ $success = false;
+}
+$results = resultChecker('/tmp/scribe_test_/foodoo', 'foodoo-', 'client1');
+
+if ($results["count"] != 10000 || $results["out_of_order"] != 0) {
+ $success = false;
+}
+
+// write 10k messages to category rock (handled by categories prefix store)
+print("test writing 100k messages to category rock\n");
+stress_test('rock', 'client1', 100, 10000, 20, 100, 1);
+sleep(25);
+$results = resultChecker('/tmp/scribetest_/rock', 'rock-', 'client1');
+
+if ($results["count"] != 10000 || $results["out_of_order"] != 0) {
+ $success = false;
+}
+$results = resultChecker('/tmp/scribe_test_/rock', 'rock-', 'client1');
+
+if ($results["count"] != 10000 || $results["out_of_order"] != 0) {
+ $success = false;
+}
+
+// write 10k messages to category paper (handled by categories store)
+print("test writing 10k messages to category paper\n");
+stress_test('paper', 'client1', 1000, 10000, 20, 500, 1);
+sleep(25);
+$results = resultChecker('/tmp/scribetest_/paper', 'paper-', 'client1');
+
+if ($results["count"] != 10000 || $results["out_of_order"] != 0) {
+ $success = false;
+}
+$results = resultChecker('/tmp/scribe_test_/paper', 'paper-', 'client1');
+
+if ($results["count"] != 10000 || $results["out_of_order"] != 0) {
+ $success = false;
+}
+
+if (!scribe_stop($GLOBALS['SCRIBE_CTRL'], $GLOBALS['SCRIBE_PORT'], $pid)) {
+ print("ERROR: could not stop scribe\n");
+ $success = false;
+}
+
+return $success;

0 comments on commit d409aba

Please sign in to comment.