Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

hadoop support and bug fixes

commit dbc7df4872ef73b81f30d49997f9a69a365d2902 1 parent e4c7ff9
Anthony Giardullo authored
View
8 README.BUILD
@@ -58,6 +58,7 @@ Requirements
[thrift] Thrift framework
[fb303] Facebook Bassline (included in thrift/contrib/fb303/)
fb303 r697294 or later is required.
+[hadoop] optional. version 0.19.1 or higher (http://hadoop.apache.org)
These libraries are open source and may be freely obtained, but they are not
provided as a part of this distribution.
@@ -122,6 +123,9 @@ Examples:
# To disable static libraries and enable shared libraries. [ default has been set to static]
./configure --disable-static
+# To build scribe with Hadoop support
+./configure --enable-hdfs
+
# To set thrift home to a non-default location
./configure --with-thriftpath=/myhome/local/thrift
@@ -147,5 +151,5 @@ Acknowledgements
================
The build process for Scribe uses autoconf macros to compile/link with Boost.
These macros were written by Thomas Porschberg, Michael Tindal, and
-Daniel Casimiro. See ax_boost_base.m4, ax_boost_filesystem.m4, and
-ax_boost_system.m4 in the aclocal subdirectory for more information.
+Daniel Casimiro. See the m4 files in the aclocal subdirectory for more
+information.
View
3  configure.ac
@@ -46,16 +46,17 @@ FB_ENABLE_DEFAULT_STATIC
# Example: Macro supplies -DFACEBOOK at compile time and "if FACEBOOK endif" capabilities.
FB_ENABLE_FEATURE([FACEBOOK], [facebook])
+FB_ENABLE_FEATURE([USE_SCRIBE_HDFS], [hdfs])
# Personalized path generator Sets default paths. Provides --with-xx=DIR options.
# FB_WITH_PATH([<var>_home], [<var>path], [<default location>]
# Example: sets $(thrift_home) variable with default path set to /usr/local.
FB_WITH_PATH([thrift_home], [thriftpath], [/usr/local])
-FB_WITH_PATH([jvm_lib], [jvmpath], [/usr/local/java/jre/lib/amd64/server])
FB_WITH_PATH([fb303_home], [fb303path], [/usr/local])
FB_WITH_PATH([smc_home], [smcpath], [${EXTERNAL_PATH}/services/trunk/src])
FB_WITH_PATH([fb_home], [fbpath], [${EXTERNAL_PATH}/libfacebook])
+FB_WITH_PATH([hadoop_home], [hadooppath], [/usr/local])
# Require boost 1.36 with system and filesytem libraries
AX_BOOST_BASE([1.36])
View
67 examples/hdfs_example.conf
@@ -0,0 +1,67 @@
+## Copyright (c) 2007-2009 Facebook
+##
+## Licensed under the Apache License, Version 2.0 (the "License");
+## you may not use this file except in compliance with the License.
+## You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+## See accompanying file LICENSE or visit the Scribe site at:
+## http://developers.facebook.com/scribe/
+
+
+##
+## Sample Scribe configuration
+##
+
+# This file configures Scribe to listen for messages on port 1463 and write
+# all messages to Hadoop. If unable to write to Hadoop, Scribe will buffer to
+# /tmp and keep retrying.
+
+port=1463
+max_msg_per_second=2000000
+check_interval=1
+max_queue_size=100000000
+num_thrift_server_threads=2
+
+
+# DEFAULT - write all messages to hadoop
+<store>
+category=default
+type=buffer
+
+target_write_size=20480
+max_write_interval=1
+buffer_send_rate=1
+retry_interval=30
+retry_interval_range=10
+
+<primary>
+type=file
+fs_type=hdfs
+file_path=hdfs://myhadoopserver:9000/scribedata
+create_symlink=no
+use_hostname_sub_directory=yes
+base_filename=thisisoverwritten
+max_size=1000000000
+rotate_period=daily
+rotate_hour=0
+rotate_minute=5
+add_newlines=1
+</primary>
+
+
+<secondary>
+type=file
+fs_type=std
+file_path=/tmp/scribetest
+base_filename=thisisoverwritten
+max_size=3000000
+</secondary>
+</store>
View
98 examples/hdfs_example2.conf
@@ -0,0 +1,98 @@
+## Copyright (c) 2007-2009 Facebook
+##
+## Licensed under the Apache License, Version 2.0 (the "License");
+## you may not use this file except in compliance with the License.
+## You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+##
+## See accompanying file LICENSE or visit the Scribe site at:
+## http://developers.facebook.com/scribe/
+
+
+##
+## Sample Scribe configuration
+##
+
+# This file configures Scribe to first attempt to write to a hadoop instance.
+# If this fails, scribe will then attempt to write to a backup hadoop
+# instance. If this fails, scribe will buffer files to local disk. This is
+# accomplished by nesting a buffer store inside another buffer store.
+
+# Note that since replay_buffer=no in the inner buffer store, messages written
+# to the backup hadoop instance will remain on the backup hadoop instance even
+# if the primary hadoop instance comes back online. But since replay_buffer is
+# not turned off in the outer buffer store, messages logged to /tmp will
+# eventually get logged to hadoop when either the primary or backup hadoop
+# instance comes back online.
+
+port=1463
+max_msg_per_second=1000000
+check_interval=1
+max_queue_size=100000000
+num_thrift_server_threads=3
+
+# DEFAULT
+<store>
+ category=default
+ type=buffer
+
+ target_write_size=20480
+ max_write_interval=1
+ retry_interval=120
+ retry_interval_range=60
+ buffer_send_rate=5
+
+ <primary>
+ type=buffer
+
+ target_write_size=20480
+ max_write_interval=1
+ retry_interval=600
+ retry_interval_range=60
+ replay_buffer=no
+
+ <primary>
+ type=file
+ fs_type=hdfs
+ file_path=hdfs://hadoopserver:9000/scribedata
+ create_symlink=no
+ use_hostname_sub_directory=yes
+ base_filename=thisisoverwritten
+ max_size=1000000000
+ rotate_period=daily
+ rotate_hour=0
+ rotate_minute=5
+ add_newlines=1
+ </primary>
+
+ <secondary>
+ type=file
+ fs_type=hdfs
+ file_path=hdfs://backuphadoopserver:9000/scribedata
+ create_symlink=no
+ use_hostname_sub_directory=yes
+ base_filename=thisisoverwritten
+ max_size=1000000000
+ rotate_period=daily
+ rotate_hour=0
+ rotate_minute=5
+ add_newlines=1
+ </secondary>
+ </primary>
+
+ <secondary>
+ type=file
+ fs_type=std
+ file_path=/tmp
+ base_filename=thisisoverwritten
+ max_size=4000000
+ </secondary>
+</store>
+
View
1  if/scribe.thrift
@@ -20,7 +20,6 @@
include "fb303/if/fb303.thrift"
namespace cpp scribe.thrift
-namespace perl Scribe.Thrift
enum ResultCode
{
View
246 src/HdfsFile.cpp
@@ -0,0 +1,246 @@
+// Copyright (c) 2009- Facebook
+// Distributed under the Scribe Software License
+//
+// See accompanying file LICENSE or visit the Scribe site at:
+// http://developers.facebook.com/scribe/
+//
+
+#include "common.h"
+#include "file.h"
+#include "HdfsFile.h"
+
+#ifdef USE_SCRIBE_HDFS
+
+using namespace std;
+
+HdfsFile::HdfsFile(const std::string& name) : FileInterface(name, false), inputBuffer_(NULL), bufferSize_(0) {
+ LOG_OPER("[hdfs] Connecting to HDFS");
+
+ // First attempt to parse the hdfs cluster from the path name specified.
+ // If it fails, then use the default hdfs cluster.
+ fileSys = connectToPath(name.c_str());
+ if (!fileSys) {
+ LOG_OPER("[hdfs] Connected to default HDFS");
+ fileSys = hdfsConnectNewInstance("default", 0);
+ }
+
+ if (fileSys == 0) {
+ // ideally, we should throw an exception here, but the scribe store code
+ // does not handle this elegantly now.
+ LOG_OPER("[hdfs] ERROR: HDFS is not configured for file: %s", name.c_str());
+ }
+ hfile = 0;
+}
+
+HdfsFile::~HdfsFile() {
+ if (fileSys) {
+ hdfsDisconnect(fileSys);
+ }
+ fileSys = 0;
+ hfile = 0;
+}
+
+bool HdfsFile::openRead() {
+ if (fileSys) {
+ hfile = hdfsOpenFile(fileSys, filename.c_str(), O_RDONLY, 0, 0, 0);
+ }
+ if (hfile) {
+ LOG_OPER("[hdfs] opened for read %s", filename.c_str());
+ return true;
+ }
+ return false;
+}
+
+bool HdfsFile::openWrite() {
+ int flags;
+
+ if (!fileSys) {
+ return false;
+ }
+ if (hfile) {
+ LOG_OPER("[hdfs] already opened for write %s", filename.c_str());
+ return false;
+ }
+
+ if (hdfsExists(fileSys, filename.c_str()) == 0) {
+ flags = O_WRONLY|O_APPEND; // file exists, append to it.
+ } else {
+ flags = O_WRONLY;
+ }
+ hfile = hdfsOpenFile(fileSys, filename.c_str(), flags, 0, 0, 0);
+ if (hfile) {
+ if (flags & O_APPEND) {
+ LOG_OPER("[hdfs] opened for append %s", filename.c_str());
+ } else {
+ LOG_OPER("[hdfs] opened for write %s", filename.c_str());
+ }
+ return true;
+ }
+ return false;
+}
+
+bool HdfsFile::openTruncate() {
+ LOG_OPER("[hdfs] truncate %s", filename.c_str());
+ deleteFile();
+ return openWrite();
+}
+
+bool HdfsFile::isOpen() {
+ bool retVal = (hfile) ? true : false;
+ return retVal;
+}
+
+void HdfsFile::close() {
+ if (fileSys) {
+ if (hfile) {
+ hdfsCloseFile(fileSys, hfile );
+ LOG_OPER("[hdfs] closed %s", filename.c_str());
+ }
+ hfile = 0;
+ }
+}
+
+bool HdfsFile::write(const std::string& data) {
+ if (!isOpen()) {
+ bool success = openWrite();
+
+ if (!success) {
+ return false;
+ }
+ }
+ tSize bytesWritten = hdfsWrite(fileSys, hfile, data.data(),
+ (tSize) data.length());
+ bool retVal = (bytesWritten == (tSize) data.length()) ? true : false;
+ return retVal;
+}
+
+void HdfsFile::flush() {
+ if (hfile) {
+ hdfsFlush(fileSys, hfile);
+ }
+}
+
+unsigned long HdfsFile::fileSize() {
+ long size = 0L;
+
+ if (fileSys) {
+ hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys, filename.c_str());
+ if (pFileInfo != NULL) {
+ size = pFileInfo->mSize;
+ hdfsFreeFileInfo(pFileInfo, 1);
+ }
+ }
+ return size;
+}
+
+void HdfsFile::deleteFile() {
+ if (fileSys) {
+ hdfsDelete(fileSys, filename.c_str());
+ }
+ LOG_OPER("[hdfs] deleteFile %s", filename.c_str());
+}
+
+void HdfsFile::listImpl(const std::string& path,
+ std::vector<std::string>& _return) {
+ if (!fileSys) {
+ return;
+ }
+
+ int value = hdfsExists(fileSys, path.c_str());
+ if (value == 0) {
+ int numEntries = 0;
+ hdfsFileInfo* pHdfsFileInfo = 0;
+ pHdfsFileInfo = hdfsListDirectory(fileSys, path.c_str(), &numEntries);
+ if (pHdfsFileInfo) {
+ for(int i = 0; i < numEntries; i++) {
+ char* pathname = pHdfsFileInfo[i].mName;
+ char* filename = rindex(pathname, '/');
+ if (filename != NULL) {
+ _return.push_back(filename+1);
+ }
+ }
+ hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
+ }
+ }
+}
+
+bool HdfsFile::readNext(std::string& _return) {
+ return false; // frames not yet supported
+}
+
+string HdfsFile::getFrame(unsigned data_length) {
+ return std::string(); // not supported
+}
+
+bool HdfsFile::createDirectory(std::string path) {
+ // opening the file will create the directories.
+ return true;
+}
+
+/**
+ * HDFS currently does not support symlinks. So we create a
+ * normal file and write the symlink data into it
+ */
+bool HdfsFile::createSymlink(std::string oldpath, std::string newpath) {
+ LOG_OPER("[hdfs] Creating symlink oldpath %s newpath %s",
+ oldpath.c_str(), newpath.c_str());
+ HdfsFile* link = new HdfsFile(newpath);
+ if (link->openWrite() == false) {
+ LOG_OPER("[hdfs] Creating symlink failed because %s already exists.",
+ newpath.c_str());
+ return false;
+ }
+ if (link->write(oldpath) == false) {
+ LOG_OPER("[hdfs] Writing symlink %s failed", newpath.c_str());
+ return false;
+ }
+ link->close();
+ return true;
+}
+
+/**
+ * If the URI is specified of the form
+ * hdfs://server::port/path, then connect to the
+ * specified cluster
+ */
+hdfsFS HdfsFile::connectToPath(const char* uri) {
+ char* hostport = (char *)malloc(strlen(uri)+1);
+ char* buf = (char *)malloc(strlen(uri)+1);
+ char* portStr;
+ char* host;
+ int port = -1;
+ hdfsFS fs = NULL;
+ int ret = 0;
+
+ if (uri == NULL || strlen(uri) == 0) {
+ free (buf);
+ free(hostport);
+ return 0;
+ }
+ ret = sscanf(uri, "hdfs://%s", hostport);
+ if (ret != 1) {
+ free(hostport);
+ free (buf);
+ return 0;
+ }
+ host = strtok_r(hostport, ":", &buf);
+ portStr = strtok_r(NULL, "/", &buf);
+ if (host == NULL || portStr == NULL) {
+ free(hostport);
+ free (buf);
+ return 0;
+ }
+ ret = sscanf(portStr, "%d", &port);
+ if (ret != 1) {
+ free(hostport);
+ free (buf);
+ return 0;
+ }
+ fs = hdfsConnectNewInstance(host, port);
+ free(hostport);
+ free(buf);
+ return fs;
+}
+
+#endif // USE_SCRIBE_HDFS
+
View
88 src/HdfsFile.h
@@ -0,0 +1,88 @@
+// Copyright (c) 2009- Facebook
+// Distributed under the Scribe Software License
+//
+// See accompanying file LICENSE or visit the Scribe site at:
+// http://developers.facebook.com/scribe/
+//
+#ifndef HDFS_FILE_H
+#define HDFS_FILE_H
+
+#ifdef USE_SCRIBE_HDFS
+#include "hdfs.h"
+
+class HdfsFile : public FileInterface {
+ public:
+ HdfsFile(const std::string& name);
+ virtual ~HdfsFile();
+
+ static void init(); // initialize hdfs subsystem
+ bool openRead(); // open for reading file
+ bool openWrite(); // open for appending to file
+ bool openTruncate(); // truncate and open for write
+ bool isOpen(); // is file open?
+ void close();
+ bool write(const std::string& data);
+ void flush();
+ unsigned long fileSize();
+ bool readNext(std::string& _return);
+ void deleteFile();
+ void listImpl(const std::string& path, std::vector<std::string>& _return);
+ std::string getFrame(unsigned data_size);
+ bool createDirectory(std::string path);
+ bool createSymlink(std::string newpath, std::string oldpath);
+
+ private:
+ char* inputBuffer_;
+ unsigned bufferSize_;
+ hdfsFS fileSys;
+ hdfsFile hfile;
+ hdfsFS connectToPath(const char* uri);
+
+ // disallow copy, assignment, and empty construction
+ HdfsFile();
+ HdfsFile(HdfsFile& rhs);
+ HdfsFile& operator=(HdfsFile& rhs);
+};
+
+/**
+ * A static lock
+ */
+class HdfsLock {
+ private:
+ static bool lockInitialized;
+
+ public:
+ static pthread_mutex_t lock;
+ static bool initLock() {
+ pthread_mutex_init(&lock, NULL);
+ return true;
+ }
+};
+
+#else
+
+class HdfsFile : public FileInterface {
+ public:
+ HdfsFile(const std::string& name) : FileInterface(name, false) {
+ LOG_OPER("[hdfs] ERROR: HDFS is not supported. file: %s", name.c_str());
+ LOG_OPER("[hdfs] If you want HDFS Support, please recompile scribe with HDFS support");
+ }
+ static void init() {};
+ bool openRead() { return false; }; // open for reading file
+ bool openWrite(){ return false; }; // open for appending to file
+ bool openTruncate() { return false; } // open for write and truncate
+ bool isOpen() { return false; }; // is file open?
+ void close() {};
+ bool write(const std::string& data) { return false; };
+ void flush() {};
+ unsigned long fileSize() { return 0; };
+ bool readNext(std::string& _return) { return false; };
+ void deleteFile() {};
+ void listImpl(const std::string& path, std::vector<std::string>& _return) {};
+ std::string getFrame(unsigned data_size) { return 0; };
+ bool createDirectory(std::string path) { return false; };
+ bool createSymlink(std::string newpath, std::string oldpath) { return false; };
+};
+#endif // USE_SCRIBE_HDFS
+
+#endif // HDFS_FILE_H
View
6 src/Makefile.am
@@ -30,6 +30,7 @@ all:
# smc_home
# fb_home
# fb303_home
+# hadoop_home
#
THRIFT = $(thrift_home)/bin/thrift
@@ -46,7 +47,7 @@ endif
# Set libraries external to this component.
-EXTERNAL_LIBS = -L$(thrift_home)/lib -L$(fb303_home)/lib -lfb303 -lthrift -lthriftnb
+EXTERNAL_LIBS = -L$(thrift_home)/lib -L$(fb303_home)/lib -L$(hadoop_home)/lib -lfb303 -lthrift -lthriftnb
EXTERNAL_LIBS += -levent -lpthread
# Section 2 ############################################################################
@@ -59,6 +60,7 @@ AM_CPPFLAGS += -I$(thrift_home)/include
AM_CPPFLAGS += -I$(thrift_home)/include/thrift
AM_CPPFLAGS += -I$(fb303_home)/include/thrift
AM_CPPFLAGS += -I$(fb303_home)/include/thrift/fb303
+AM_CPPFLAGS += -I$(hadoop_home)/include
AM_CPPFLAGS += $(BOOST_CPPFLAGS)
AM_CPPFLAGS += $(FB_CPPFLAGS) $(DEBUG_CPPFLAGS)
@@ -89,7 +91,7 @@ endif
# Binaries -- multiple progs can be defined.
bin_PROGRAMS = scribed
-scribed_SOURCES = store.cpp store_queue.cpp conf.cpp file.cpp conn_pool.cpp scribe_server.cpp $(FB_SOURCES)
+scribed_SOURCES = store.cpp store_queue.cpp conf.cpp file.cpp HdfsFile.cpp conn_pool.cpp scribe_server.cpp $(FB_SOURCES)
scribed_LDADD = $(EXTERNAL_LIBS) $(INTERNAL_LIBS)
if SHARED
View
3  src/common.h
@@ -47,6 +47,9 @@
#include "thrift/protocol/TBinaryProtocol.h"
#include "thrift/server/TNonblockingServer.h"
+#include "thrift/concurrency/ThreadManager.h"
+#include "thrift/concurrency/PosixThreadFactory.h"
+#include "thrift/concurrency/Mutex.h"
#include "thrift/transport/TSocket.h"
#include "thrift/transport/TSocketPool.h"
#include "thrift/transport/TServerSocket.h"
View
31 src/conf.cpp
@@ -13,7 +13,7 @@
// limitations under the License.
//
// See accompanying file LICENSE or visit the Scribe site at:
-// http://developers.facebook.com/scribe/
+// http://developers.facebook.com/scribe/
//
// @author Bobby Johnson
// @author Jason Sobel
@@ -115,8 +115,17 @@ bool StoreConf::parseStore(queue<string>& raw_config, /*out*/ StoreConf* parsed_
line = raw_config.front();
raw_config.pop();
+ // remove leading and trailing whitespace
+ line = trimString(line);
+
+ // remove comment
+ size_t comment = line.find_first_of('#');
+ if (comment != string::npos) {
+ line.erase(comment);
+ }
+
int length = line.size();
- if (0 >= length || line[0] == '#') {
+ if (0 >= length) {
continue;
}
if (line[0] == '<') {
@@ -156,6 +165,11 @@ bool StoreConf::parseStore(queue<string>& raw_config, /*out*/ StoreConf* parsed_
} else {
string arg = line.substr(0, eq);
string val = line.substr(eq + 1, string::npos);
+
+ // remove leading and trailing whitespace
+ arg = trimString(arg);
+ val = trimString(val);
+
if (parsed_config->values.find(arg) != parsed_config->values.end()) {
LOG_OPER("Bad config - duplicate key %s", arg.c_str());
}
@@ -166,6 +180,19 @@ bool StoreConf::parseStore(queue<string>& raw_config, /*out*/ StoreConf* parsed_
return true;
}
+// trims leading and trailing whitespace from a string
+string StoreConf::trimString(const string& str) {
+ string whitespace = " \t";
+ size_t start = str.find_first_not_of(whitespace);
+ size_t end = str.find_last_not_of(whitespace);
+
+ if (start != string::npos) {
+ return str.substr(start, end - start + 1);
+ } else {
+ return "";
+ }
+}
+
// reads every line from the file and pushes then onto _return
// returns false on error
bool StoreConf::readConfFile(const string& filename, queue<string>& _return) {
View
8 src/conf.h
@@ -13,7 +13,7 @@
// limitations under the License.
//
// See accompanying file LICENSE or visit the Scribe site at:
-// http://developers.facebook.com/scribe/
+// http://developers.facebook.com/scribe/
//
// @author Bobby Johnson
// @author Jason Sobel
@@ -30,7 +30,7 @@
#include "src/gen-cpp/scribe.h"
/*
- * This class reads and parses a configuration
+ * This class reads and parses a configuration
* describing a hierarchy of store objects.
*
* It reads a conf file with a proprietary format, although it could
@@ -67,7 +67,9 @@ class StoreConf {
string_map_t values;
store_conf_map_t stores;
- static bool parseStore(/*in,out*/ std::queue<std::string>& raw_config, /*out*/ StoreConf* parsed_config);
+ static bool parseStore(/*in,out*/ std::queue<std::string>& raw_config,
+ /*out*/ StoreConf* parsed_config);
+ static std::string trimString(const std::string& str);
bool readConfFile(const std::string& filename, std::queue<std::string>& _return);
};
View
6 src/env_default.h
@@ -13,7 +13,7 @@
// limitations under the License.
//
// See accompanying file LICENSE or visit the Scribe site at:
-// http://developers.facebook.com/scribe/
+// http://developers.facebook.com/scribe/
//
// @author Bobby Johnson
@@ -50,7 +50,9 @@ class network_config {
public:
// gets a vector of machine/port pairs for a named service
// returns true on success
- static bool getService(const std::string& serviceName, server_vector_t& _return) {
+ static bool getService(const std::string& serviceName,
+ const std::string& options,
+ server_vector_t& _return) {
return false;
}
};
View
48 src/file.cpp
@@ -21,6 +21,7 @@
#include "common.h"
#include "file.h"
+#include "HdfsFile.h"
// INITIAL_BUFFER_SIZE must always be >= UINT_SIZE
#define INITIAL_BUFFER_SIZE 4096
@@ -34,6 +35,8 @@ boost::shared_ptr<FileInterface> FileInterface::createFileInterface(const std::s
bool framed) {
if (0 == type.compare("std")) {
return shared_ptr<FileInterface>(new StdFile(name, framed));
+ } else if (0 == type.compare("hdfs")) {
+ return shared_ptr<FileInterface>(new HdfsFile(name));
} else {
return shared_ptr<FileInterface>();
}
@@ -41,14 +44,14 @@ boost::shared_ptr<FileInterface> FileInterface::createFileInterface(const std::s
std::vector<std::string> FileInterface::list(const std::string& path, const std::string &fsType) {
std::vector<std::string> files;
- shared_ptr<FileInterface> concrete_file = createFileInterface(fsType, "");
+ shared_ptr<FileInterface> concrete_file = createFileInterface(fsType, path);
if (concrete_file) {
concrete_file->listImpl(path, files);
}
return files;
}
-FileInterface::FileInterface(const std::string& name, bool frame)
+FileInterface::FileInterface(const std::string& name, bool frame)
: framed(frame), filename(name) {
}
@@ -71,19 +74,7 @@ bool StdFile::openRead() {
}
bool StdFile::openWrite() {
- /* try to create the directory containing the file */
- string::size_type slash;
- if (!filename.empty() &&
- (filename.find_first_of("/") != string::npos) &&
- (filename.find_first_of("/") != (slash = filename.find_last_of("/")))) {
- try {
- boost::filesystem::create_directory(filename.substr(0, slash));
- } catch(std::exception const& e) {
- LOG_OPER("Exception < %s > trying to create directory", e.what());
- return false;
- }
- }
-
+ // open file for write in append mode
ios_base::openmode mode = fstream::out | fstream::app;
return open(mode);
}
@@ -119,7 +110,7 @@ string StdFile::getFrame(unsigned data_length) {
if (framed) {
char buf[UINT_SIZE];
- serializeUInt(data_length, buf);
+ serializeUInt(data_length, buf);
return string(buf, UINT_SIZE);
} else {
@@ -132,7 +123,7 @@ bool StdFile::write(const std::string& data) {
if (!file.is_open()) {
return false;
}
-
+
file << data;
if (file.bad()) {
return false;
@@ -177,7 +168,7 @@ bool StdFile::readNext(std::string& _return) {
return true;
} else {
int offset = file.tellg();
- LOG_OPER("ERROR: Failed to read file %s at offset %d",
+ LOG_OPER("ERROR: Failed to read file %s at offset %d",
filename.c_str(), offset);
return false;
}
@@ -203,7 +194,7 @@ unsigned long StdFile::fileSize() {
return size;
}
-void StdFile::listImpl(const std::string& path, std::vector<std::string>& _return) {
+void StdFile::listImpl(const std::string& path, std::vector<std::string>& _return) {
try {
if (boost::filesystem::exists(path)) {
boost::filesystem::directory_iterator dir_iter(path), end_iter;
@@ -222,6 +213,25 @@ void StdFile::deleteFile() {
boost::filesystem::remove(filename);
}
+bool StdFile::createDirectory(std::string path) {
+ try {
+ boost::filesystem::create_directory(path);
+ } catch(std::exception const& e) {
+ LOG_OPER("Exception < %s > trying to create directory", e.what());
+ return false;
+ }
+
+ return true;
+}
+
+bool StdFile::createSymlink(std::string oldpath, std::string newpath) {
+ if (symlink(oldpath.c_str(), newpath.c_str()) == 0) {
+ return true;
+ }
+
+ return false;
+}
+
// Buffer had better be at least UINT_SIZE long!
unsigned FileInterface::unserializeUInt(const char* buffer) {
unsigned retval = 0;
View
8 src/file.h
@@ -29,8 +29,8 @@ class FileInterface {
FileInterface(const std::string& name, bool framed);
virtual ~FileInterface();
- static boost::shared_ptr<FileInterface> createFileInterface(const std::string& type,
- const std::string& name,
+ static boost::shared_ptr<FileInterface> createFileInterface(const std::string& type,
+ const std::string& name,
bool framed = false);
static std::vector<std::string> list(const std::string& path, const std::string& fsType);
@@ -46,6 +46,8 @@ class FileInterface {
virtual void deleteFile() = 0;
virtual void listImpl(const std::string& path, std::vector<std::string>& _return) = 0;
virtual std::string getFrame(unsigned data_size) {return std::string();};
+ virtual bool createDirectory(std::string path) = 0;
+ virtual bool createSymlink(std::string oldpath, std::string newpath) = 0;
protected:
bool framed;
@@ -72,6 +74,8 @@ class StdFile : public FileInterface {
void deleteFile();
void listImpl(const std::string& path, std::vector<std::string>& _return);
std::string getFrame(unsigned data_size);
+ bool createDirectory(std::string path);
+ bool createSymlink(std::string newpath, std::string oldpath);
private:
bool open(std::ios_base::openmode mode);
View
640 src/scribe_server.cpp
@@ -13,7 +13,7 @@
// limitations under the License.
//
// See accompanying file LICENSE or visit the Scribe site at:
-// http://developers.facebook.com/scribe/
+// http://developers.facebook.com/scribe/
//
// @author Bobby Johnson
// @author James Wang
@@ -41,6 +41,7 @@ shared_ptr<scribeHandler> g_Handler;
#define DEFAULT_CHECK_PERIOD 5
#define DEFAULT_MAX_MSG_PER_SECOND 100000
#define DEFAULT_MAX_QUEUE_SIZE 5000000
+#define DEFAULT_SERVER_THREADS 1
void print_usage(const char* program_name) {
cout << "Usage: " << program_name << " [-p port] [-c config_file]";
@@ -86,13 +87,30 @@ int main(int argc, char **argv) {
config_file = argv[optind];
}
+ // seed random number generation with something reasonably unique
+ srand(time(NULL) ^ getpid());
+
g_Handler = shared_ptr<scribeHandler>(new scribeHandler(port, config_file));
g_Handler->initialize();
shared_ptr<TProcessor> processor(new scribeProcessor(g_Handler));
/* This factory is for binary compatibility. */
- shared_ptr<TProtocolFactory> binaryProtocolFactory( new TBinaryProtocolFactory(0, 0, false, false) );
- TNonblockingServer server(processor, binaryProtocolFactory, g_Handler->port);
+ shared_ptr<TProtocolFactory>
+ binaryProtocolFactory(new TBinaryProtocolFactory(0, 0, false, false));
+ shared_ptr<ThreadManager> thread_manager;
+
+ if (g_Handler->numThriftServerThreads > 1) {
+ // create a ThreadManager to process incoming calls
+ thread_manager = ThreadManager::
+ newSimpleThreadManager(g_Handler->numThriftServerThreads);
+
+ shared_ptr<PosixThreadFactory> thread_factory(new PosixThreadFactory());
+ thread_manager->threadFactory(thread_factory);
+ thread_manager->start();
+ }
+
+ TNonblockingServer server(processor, binaryProtocolFactory,
+ g_Handler->port, thread_manager);
LOG_OPER("Starting scribe server on port %lu", g_Handler->port);
fflush(stderr);
@@ -110,6 +128,7 @@ int main(int argc, char **argv) {
scribeHandler::scribeHandler(unsigned long int server_port, const std::string& config_file)
: FacebookBase("Scribe"),
port(server_port),
+ numThriftServerThreads(DEFAULT_SERVER_THREADS),
checkPeriod(DEFAULT_CHECK_PERIOD),
pcategories(NULL),
pcategory_prefixes(NULL),
@@ -134,7 +153,9 @@ scribeHandler::~scribeHandler() {
// Returns the handler status, but overwrites it with WARNING if it's
// ALIVE and at least one store has a nonempty status.
fb_status scribeHandler::getStatus() {
- Guard monitor(statusLock);
+ RWGuard monitor(scribeHandlerLock);
+ Guard status_monitor(statusLock);
+
fb_status return_status(status);
if (status == ALIVE) {
for (category_map_t::iterator cat_iter = pcategories->begin();
@@ -157,14 +178,16 @@ fb_status scribeHandler::getStatus() {
void scribeHandler::setStatus(fb_status new_status) {
LOG_OPER("STATUS: %s", statusAsString(new_status));
- Guard monitor(statusLock);
+ Guard status_monitor(statusLock);
status = new_status;
}
// Returns the handler status details if non-empty,
// otherwise the first non-empty store status found
void scribeHandler::getStatusDetails(std::string& _return) {
- Guard monitor(statusLock);
+ RWGuard monitor(scribeHandlerLock);
+ Guard status_monitor(statusLock);
+
_return = statusDetails;
if (_return.empty()) {
if (pcategories) {
@@ -174,7 +197,7 @@ void scribeHandler::getStatusDetails(std::string& _return) {
for (store_list_t::iterator store_iter = cat_iter->second->begin();
store_iter != cat_iter->second->end();
++store_iter) {
-
+
if (!(_return = (*store_iter)->getStatus()).empty()) {
return;
}
@@ -187,7 +210,7 @@ void scribeHandler::getStatusDetails(std::string& _return) {
void scribeHandler::setStatusDetails(const string& new_status_details) {
LOG_OPER("STATUS: %s", new_status_details.c_str());
- Guard monitor(statusLock);
+ Guard status_monitor(statusLock);
statusDetails = new_status_details;
}
@@ -211,6 +234,7 @@ const char* scribeHandler::statusAsString(fb_status status) {
}
+// Should be called while holding a writeLock on scribeHandlerLock
bool scribeHandler::createCategoryFromModel(
const string &category, const boost::shared_ptr<StoreQueue> &model) {
@@ -257,20 +281,20 @@ bool scribeHandler::createCategoryFromModel(
return true;
}
-ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
-
- //LOG_OPER("received Log with <%d> messages", (int)messages.size());
+// Check if we need to deny this request due to throttling
+bool scribeHandler::throttleRequest(const vector<LogEntry>& messages) {
+ // Check if we need to rate limit
if (throttleDeny(messages.size())) {
incrementCounter("denied for rate");
- return TRY_LATER;
+ return true;
}
if (!pcategories || !pcategory_prefixes) {
// don't bother to spam anything for this, our status should already
// be showing up as WARNING in the monitoring tools.
incrementCounter("invalid requests");
- return TRY_LATER;
+ return true;
}
// Throttle based on store queues getting too long.
@@ -279,7 +303,7 @@ ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
// the nice property that any size array will succeed if we're unloaded before attempting
// it, so we won't hit a case where there's a client request that will never succeed.
// Also note that we always check all categories, not just the ones in this request.
- // This is a simplification based on the assumption that most Log() calls contain most
+ // This is a simplification based on the assumption that most Log() calls contain most
// categories.
unsigned long max_count = 0;
for (category_map_t::iterator cat_iter = pcategories->begin();
@@ -302,10 +326,97 @@ ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
}
}
}
-
+
if (max_count > maxQueueSize) {
incrementCounter("denied for queue size");
- return TRY_LATER;
+ return true;
+ }
+
+ return false;
+}
+
+// Should be called while holding a writeLock on scribeHandlerLock
+shared_ptr<store_list_t> scribeHandler::createNewCategory(
+ const string& category) {
+
+ shared_ptr<store_list_t> store_list;
+
+ // First, check the list of category prefixes for a model
+ category_prefix_map_t::iterator cat_prefix_iter = pcategory_prefixes->begin();
+ while (cat_prefix_iter != pcategory_prefixes->end()) {
+ string::size_type len = cat_prefix_iter->first.size();
+ if (cat_prefix_iter->first.compare(0, len-1, category, 0, len-1) == 0) {
+ // Found a matching prefix model
+
+ createCategoryFromModel(category, cat_prefix_iter->second);
+ category_map_t::iterator cat_iter = pcategories->find(category);
+
+ if (cat_iter != pcategories->end()) {
+ store_list = cat_iter->second;
+ } else {
+ LOG_OPER("failed to create new prefix store for category <%s>",
+ category.c_str());
+ }
+
+ break;
+ }
+ cat_prefix_iter++;
+ }
+
+ // Then try creating a store if we have a default store defined
+ if (store_list == NULL) {
+ if (defaultStore != NULL) {
+
+ createCategoryFromModel(category, defaultStore);
+ category_map_t::iterator cat_iter = pcategories->find(category);
+
+ if (cat_iter != pcategories->end()) {
+ store_list = cat_iter->second;
+ } else {
+ LOG_OPER("failed to create new default store for category <%s>",
+ category.c_str());
+ }
+ }
+ }
+
+ return store_list;
+}
+
+// Add this message to every store in list
+void scribeHandler::addMessage(
+ const LogEntry& entry,
+ const shared_ptr<store_list_t>& store_list) {
+
+ int numstores = 0;
+
+ // Add message to store_list
+ for (store_list_t::iterator store_iter = store_list->begin();
+ store_iter != store_list->end();
+ ++store_iter) {
+ ++numstores;
+ boost::shared_ptr<LogEntry> ptr(new LogEntry);
+ ptr->category = entry.category;
+ ptr->message = entry.message;
+
+ (*store_iter)->addMessage(ptr);
+ }
+
+ if (numstores) {
+ incrementCounter("received good");
+ } else {
+ incrementCounter("received bad");
+ }
+}
+
+
+ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
+ ResultCode result;
+
+ scribeHandlerLock.acquireRead();
+
+ if (throttleRequest(messages)) {
+ result = TRY_LATER;
+ goto end;
}
for (vector<LogEntry>::const_iterator msg_iter = messages.begin();
@@ -318,8 +429,7 @@ ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
continue;
}
- boost::shared_ptr<store_list_t> store_list;
-
+ shared_ptr<store_list_t> store_list;
string category = (*msg_iter).category;
// First look for an exact match of the category
@@ -330,76 +440,34 @@ ResultCode scribeHandler::Log(const vector<LogEntry>& messages) {
}
}
- // Then check the list of category prefixes for a model
- if (store_list == NULL) {
- category_prefix_map_t::iterator cat_prefix_iter = pcategory_prefixes->begin();
- while (cat_prefix_iter != pcategory_prefixes->end()) {
- string::size_type len = cat_prefix_iter->first.size();
- if (cat_prefix_iter->first.compare(0, len-1, category, 0, len-1) == 0) {
-
- // Found a matching prefix model
- if (createCategoryFromModel(category, cat_prefix_iter->second)) {
- category_map_t::iterator cat_iter = pcategories->find(category);
-
- if (cat_iter != pcategories->end()) {
- store_list = cat_iter->second;
- } else {
- LOG_OPER("failed to create new prefix store for category <%s>",
- category.c_str());
- }
- }
-
- break;
- }
- cat_prefix_iter++;
- }
- }
-
- // Then try creating a store if we have a default store defined
+ // Try creating a new store for this category if we didn't find one
if (store_list == NULL) {
- if (defaultStore != NULL) {
+ // Need write lock to create a new category
+ scribeHandlerLock.release();
+ scribeHandlerLock.acquireWrite();
- if (createCategoryFromModel(category, defaultStore)) {
- category_map_t::iterator cat_iter = pcategories->find(category);
+ store_list = createNewCategory(category);
- if (cat_iter != pcategories->end()) {
- store_list = cat_iter->second;
- } else {
- LOG_OPER("failed to create new default store for category <%s>",
- category.c_str());
- }
- }
- }
+ scribeHandlerLock.release();
+ scribeHandlerLock.acquireRead();
}
if (store_list == NULL) {
- LOG_OPER("log entry has invalid category <%s>", (*msg_iter).category.c_str());
+ LOG_OPER("log entry has invalid category <%s>",
+ (*msg_iter).category.c_str());
incrementCounter("received bad");
continue;
}
- int numstores = 0;
-
- // Add message to store_list
- for (store_list_t::iterator store_iter = store_list->begin();
- store_iter != store_list->end();
- ++store_iter) {
- ++numstores;
- boost::shared_ptr<LogEntry> ptr(new LogEntry);
- ptr->category = (*msg_iter).category;
- ptr->message = (*msg_iter).message;
-
- (*store_iter)->addMessage(ptr);
- }
-
- if (numstores) {
- incrementCounter("received good");
- } else {
- incrementCounter("received bad");
- }
+ // Log this message
+ addMessage(*msg_iter, store_list);
}
- return OK;
+ result = OK;
+
+ end:
+ scribeHandlerLock.release();
+ return result;
}
// Returns true if overloaded.
@@ -420,7 +488,7 @@ bool scribeHandler::throttleDeny(int num_messages) {
}
if (numMsgLastSecond + num_messages > maxMsgPerSecond) {
- LOG_OPER("throttle denying request with <%d> messages. It would exceed max of <%lu> messages this second",
+ LOG_OPER("throttle denying request with <%d> messages. It would exceed max of <%lu> messages this second",
num_messages, maxMsgPerSecond);
return true;
} else {
@@ -429,8 +497,7 @@ bool scribeHandler::throttleDeny(int num_messages) {
}
}
-void scribeHandler::shutdown() {
-
+void scribeHandler::stopStores() {
setStatus(STOPPING);
// Thrift doesn't currently support stopping the server from the handler,
@@ -441,11 +508,24 @@ void scribeHandler::shutdown() {
delete pcategory_prefixes;
pcategory_prefixes = NULL;
}
+}
+
+void scribeHandler::shutdown() {
+ RWGuard monitor(scribeHandlerLock, true);
+
+ stopStores();
exit(0);
}
void scribeHandler::reinitialize() {
- LOG_OPER("reload not supported");
+ RWGuard monitor(scribeHandlerLock, true);
+
+ // reinitialize() will re-read the config file and re-configure the stores.
+ // This is done without shutting down the Thrift server, so this will not
+ // reconfigure any server settings such as port number.
+ LOG_OPER("reinitializing");
+ stopStores();
+ initialize();
}
void scribeHandler::initialize() {
@@ -457,9 +537,10 @@ void scribeHandler::initialize() {
bool perfect_config = true;
bool enough_config_to_run = true;
int numstores = 0;
- category_map_t *pnew_categories = new category_map_t;
- category_prefix_map_t *pnew_category_prefixes = new category_prefix_map_t;
- shared_ptr<StoreQueue> tmpDefault;
+
+ pnew_categories = new category_map_t;
+ pnew_category_prefixes = new category_prefix_map_t;
+ tmpDefault.reset();
try {
// Get the config data and parse it.
@@ -500,6 +581,18 @@ void scribeHandler::initialize() {
throw runtime_error("No port number configured");
}
+ // check if config sets the size to use for the ThreadManager
+ unsigned long int num_threads;
+ if (config.getUnsigned("num_thrift_server_threads", num_threads)) {
+ numThriftServerThreads = (size_t) num_threads;
+
+ if (numThriftServerThreads <= 0) {
+ LOG_OPER("invalid value for num_thrift_server_threads: %lu",
+ num_threads);
+ throw runtime_error("invalid value for num_thrift_server_threads");
+ }
+ }
+
// Build a new map of stores, and move stores from the old map as
// we find them in the config file. Any stores left in the old map
// at the end will be deleted.
@@ -508,131 +601,14 @@ void scribeHandler::initialize() {
for (std::vector<pStoreConf>::iterator iter = store_confs.begin();
iter != store_confs.end();
++iter) {
+ pStoreConf store_conf = (*iter);
- bool is_default = false;
- pStoreConf store_conf = (*iter);
- std::string category;
- if (!store_conf->getString("category", category) ||
- category.empty()) {
- setStatusDetails("Bad config - store with no category");
- perfect_config = false;
- continue;
- }
-
- LOG_OPER("CATEGORY : %s", category.c_str());
- if (0 == category.compare("default")) {
- if (tmpDefault != NULL) {
- setStatusDetails("Bad config - multiple default stores specified");
- perfect_config = false;
- continue;
- }
- is_default = true;
- }
-
- bool is_prefix_category = (!category.empty() && category[category.size() - 1] == '*');
-
- std::string type;
- if (!store_conf->getString("type", type) ||
- type.empty()) {
- string errormsg("Bad config - no type for store with category: ");
- errormsg += category;
- setStatusDetails(errormsg);
- perfect_config = false;
- continue;
- }
-
-
- // look for the store in the current list
- shared_ptr<StoreQueue> pstore;
- if (!is_prefix_category && pcategories) {
- category_map_t::iterator category_iter = pcategories->find(category);
- if (category_iter != pcategories->end()) {
- shared_ptr<store_list_t> pstores = category_iter->second;
-
- for ( store_list_t::iterator it = pstores->begin(); it != pstores->end(); ++it ) {
- if ( (*it)->getBaseType() == type &&
- pstores->size() <= 1) { // no good way to match them up if there's more than one
- pstore = (*it);
- pstores->erase(it);
- }
- }
- }
- }
-
- // create a new store if it doesn't already exist
- if (!pstore) {
- try {
- string store_name;
- bool is_model, multi_category;
-
- /* remove any *'s from category name */
- if (is_prefix_category)
- store_name = category.substr(0, category.size() - 1);
- else
- store_name = category;
-
- // Determine if this store will handle multiple categories
- multi_category =
- !newThreadPerCategory && (is_default || is_prefix_category);
-
- // Determine if this store is just a model for later stores
- is_model = newThreadPerCategory && (is_default || is_prefix_category);
-
- pstore =
- shared_ptr<StoreQueue>(new StoreQueue(type, store_name, checkPeriod,
- is_model, multi_category));
- } catch (...) {
- pstore.reset();
- }
- }
-
- if (!pstore) {
- string errormsg("Bad config - can't create a store of type: ");
- errormsg += type;
- setStatusDetails(errormsg);
- perfect_config = false;
- continue;
- }
-
- // configure and reopen the new store
- pstore->configureAndOpen(store_conf);
-
- ++numstores;
+ bool success = configureStore(store_conf, &numstores);
- if (is_default) {
- LOG_OPER("Creating default store");
- tmpDefault = pstore;
- }
- else if (is_prefix_category) {
- category_prefix_map_t::iterator category_iter =
- pnew_category_prefixes->find(category);
-
- if (category_iter == pnew_category_prefixes->end()) {
- (*pnew_category_prefixes)[category] = pstore;
- } else {
- string errormsg =
- "Bad config - multiple prefix stores specified for category: ";
-
- errormsg += category;
- setStatusDetails(errormsg);
+ if (!success) {
perfect_config = false;
}
- }
-
- // push the new store onto the new map if it's not just a model
- if (!pstore->isModelStore()) {
- shared_ptr<store_list_t> pstores;
- category_map_t::iterator category_iter = pnew_categories->find(category);
- if (category_iter != pnew_categories->end()) {
- pstores = category_iter->second;
- } else {
- pstores = shared_ptr<store_list_t>(new store_list_t);
- (*pnew_categories)[category] = pstores;
- }
- pstores->push_back(pstore);
- }
- } // for each store in the conf file
-
+ }
} catch(std::exception const& e) {
string errormsg("Bad config - exception: ");
errormsg += e.what();
@@ -649,31 +625,32 @@ void scribeHandler::initialize() {
enough_config_to_run = false;
}
+ // clean up existing stores
+ deleteCategoryMap(pcategories);
+ pcategories = NULL;
+ if (pcategory_prefixes) {
+ delete pcategory_prefixes;
+ pcategory_prefixes = NULL;
+ }
+ defaultStore.reset();
+
if (enough_config_to_run) {
- deleteCategoryMap(pcategories);
pcategories = pnew_categories;
- if (pcategory_prefixes) {
- delete pcategory_prefixes;
- }
pcategory_prefixes = pnew_category_prefixes;
defaultStore = tmpDefault;
} else {
- // If the new configuration failed we'll run with
+ // If the new configuration failed we'll run with
// nothing configured and status set to WARNING
deleteCategoryMap(pnew_categories);
- deleteCategoryMap(pcategories);
- pcategories = NULL;
- if (pcategory_prefixes) {
- delete pcategory_prefixes;
- pcategory_prefixes = NULL;
- }
if (pnew_category_prefixes) {
delete pnew_category_prefixes;
- pnew_category_prefixes = NULL;
}
- defaultStore.reset();
}
+ pnew_categories = NULL;
+ pnew_category_prefixes = NULL;
+ tmpDefault.reset();
+
if (!perfect_config || !enough_config_to_run) { // perfect should be a subset of enough, but just in case
setStatus(WARNING); // status details should have been set above
} else {
@@ -682,6 +659,235 @@ void scribeHandler::initialize() {
}
}
+
+// Configures the store specified by the store configuration. Returns false if failed.
+bool scribeHandler::configureStore(pStoreConf store_conf, int *numstores) {
+ string category;
+ shared_ptr<StoreQueue> pstore;
+ vector<string> category_list;
+ shared_ptr<StoreQueue> model;
+ bool single_category = true;
+
+
+ // Check if a single category is specified
+ if (store_conf->getString("category", category)) {
+ category_list.push_back(category);
+ }
+
+ // Check if multiple categories are specified
+ string categories;
+ if (store_conf->getString("categories", categories)) {
+ // We want to set up to configure multiple categories, even if there is
+ // only one category specified here so that configuration is consistent
+ // for the 'categories' keyword.
+ single_category = false;
+
+ // Parse category names, separated by whitespace
+ stringstream ss(categories);
+
+ while (ss >> category) {
+ category_list.push_back(category);
+ }
+ }
+
+ if (category_list.size() == 0) {
+ setStatusDetails("Bad config - store with no category");
+ return false;
+ }
+ else if (single_category) {
+ // configure single store
+ shared_ptr<StoreQueue> result =
+ configureStoreCategory(store_conf, category_list[0], model);
+
+ if (result == NULL) {
+ return false;
+ }
+
+ (*numstores)++;
+ } else {
+ // configure multiple stores
+ string type;
+
+ if (!store_conf->getString("type", type) ||
+ type.empty()) {
+ string errormsg("Bad config - no type for store with category: ");
+ errormsg += categories;
+ setStatusDetails(errormsg);
+ return false;
+ }
+
+ // create model so that we can create stores as copies of this model
+ model = configureStoreCategory(store_conf, categories, model, true);
+
+ if (model == NULL) {
+ string errormsg("Bad config - could not create store for category: ");
+ errormsg += categories;
+ setStatusDetails(errormsg);
+ return false;
+ }
+
+ // create a store for each category
+ vector<string>::iterator iter;
+ for (iter = category_list.begin(); iter < category_list.end(); iter++) {
+ shared_ptr<StoreQueue> result =
+ configureStoreCategory(store_conf, *iter, model);
+
+ if (!result) {
+ return false;
+ }
+
+ (*numstores)++;
+ }
+ }
+
+ return true;
+}
+
+
+// Configures the store specified by the store configuration and category.
+shared_ptr<StoreQueue> scribeHandler::configureStoreCategory(
+ pStoreConf store_conf, //configuration for store
+ const string &category, //category name
+ const boost::shared_ptr<StoreQueue> &model, //model to use (optional)
+ bool category_list) { //is a list of stores?
+
+ bool is_default = false;
+ bool already_created = false;
+
+ if (category.empty()) {
+ setStatusDetails("Bad config - store with blank category");
+ return shared_ptr<StoreQueue>();
+ }
+
+ LOG_OPER("CATEGORY : %s", category.c_str());
+ if (0 == category.compare("default")) {
+ if (tmpDefault != NULL) {
+ setStatusDetails("Bad config - multiple default stores specified");
+ return shared_ptr<StoreQueue>();
+ }
+ is_default = true;
+ }
+
+ bool is_prefix_category = (!category.empty() &&
+ category[category.size() - 1] == '*' &&
+ !category_list);
+
+ std::string type;
+ if (!store_conf->getString("type", type) ||
+ type.empty()) {
+ string errormsg("Bad config - no type for store with category: ");
+ errormsg += category;
+ setStatusDetails(errormsg);
+ return shared_ptr<StoreQueue>();
+ }
+
+ // look for the store in the current list
+ shared_ptr<StoreQueue> pstore;
+ if (!is_prefix_category && pcategories) {
+ category_map_t::iterator category_iter = pcategories->find(category);
+ if (category_iter != pcategories->end()) {
+ shared_ptr<store_list_t> pstores = category_iter->second;
+
+ for ( store_list_t::iterator it = pstores->begin(); it != pstores->end(); ++it ) {
+ if ( (*it)->getBaseType() == type &&
+ pstores->size() <= 1) { // no good way to match them up if there's more than one
+ pstore = (*it);
+ pstores->erase(it);
+ }
+ }
+ }
+ }
+
+ try {
+ // create a new store if it doesn't already exist
+ if (!pstore) {
+ if (model != NULL) {
+ // Create a copy of the model if we want a new thread per category
+ if (newThreadPerCategory && !is_default && !is_prefix_category) {
+ pstore = shared_ptr<StoreQueue>(new StoreQueue(model, category));
+ } else {
+ pstore = model;
+ already_created = true;
+ }
+ } else {
+ string store_name;
+ bool is_model, multi_category, categories;
+
+ /* remove any *'s from category name */
+ if (is_prefix_category)
+ store_name = category.substr(0, category.size() - 1);
+ else
+ store_name = category;
+
+ // Does this store define multiple categories
+ categories = (is_default || is_prefix_category || category_list);
+
+ // Determine if this store will actually handle multiple categories
+ multi_category = !newThreadPerCategory && categories;
+
+ // Determine if this store is just a model for later stores
+ is_model = newThreadPerCategory && categories;
+
+ pstore =
+ shared_ptr<StoreQueue>(new StoreQueue(type, store_name, checkPeriod,
+ is_model, multi_category));
+ }
+ }
+ } catch (...) {
+ pstore.reset();
+ }
+
+ if (!pstore) {
+ string errormsg("Bad config - can't create a store of type: ");
+ errormsg += type;
+ setStatusDetails(errormsg);
+ return shared_ptr<StoreQueue>();
+ }
+
+ // open store. and configure it if not copied from a model
+ if (model == NULL) {
+ pstore->configureAndOpen(store_conf);
+ } else if (!already_created) {
+ pstore->open();
+ }
+
+ if (is_default) {
+ LOG_OPER("Creating default store");
+ tmpDefault = pstore;
+ }
+ else if (is_prefix_category) {
+ category_prefix_map_t::iterator category_iter =
+ pnew_category_prefixes->find(category);
+
+ if (category_iter == pnew_category_prefixes->end()) {
+ (*pnew_category_prefixes)[category] = pstore;
+ } else {
+ string errormsg =
+ "Bad config - multiple prefix stores specified for category: ";
+
+ errormsg += category;
+ setStatusDetails(errormsg);
+ return shared_ptr<StoreQueue>();
+ }
+ }
+
+ // push the new store onto the new map if it's not just a model
+ if (!pstore->isModelStore() && !category_list) {
+ shared_ptr<store_list_t> pstores;
+ category_map_t::iterator category_iter = pnew_categories->find(category);
+ if (category_iter != pnew_categories->end()) {
+ pstores = category_iter->second;
+ } else {
+ pstores = shared_ptr<store_list_t>(new store_list_t);
+ (*pnew_categories)[category] = pstores;
+ }
+ pstores->push_back(pstore);
+ }
+
+ return pstore;
+}
+
+
// delete pcats and everything it contains
void scribeHandler::deleteCategoryMap(category_map_t *pcats) {
if (!pcats) {
View
36 src/scribe_server.h
@@ -1,4 +1,4 @@
-// Copyright (c) 2007-2008 Facebook
+// Copyright (c) 2007-2009 Facebook
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -13,7 +13,7 @@
// limitations under the License.
//
// See accompanying file LICENSE or visit the Scribe site at:
-// http://developers.facebook.com/scribe/
+// http://developers.facebook.com/scribe/
//
// @author Bobby Johnson
// @author James Wang
@@ -31,7 +31,7 @@ typedef std::vector<boost::shared_ptr<StoreQueue> > store_list_t;
typedef std::map<std::string, boost::shared_ptr<store_list_t> > category_map_t;
typedef std::map<std::string, boost::shared_ptr<StoreQueue> > category_prefix_map_t;
-class scribeHandler : virtual public scribe::thrift::scribeIf,
+class scribeHandler : virtual public scribe::thrift::scribeIf,
public facebook::fb303::FacebookBase {
public:
@@ -44,7 +44,7 @@ class scribeHandler : virtual public scribe::thrift::scribeIf,
scribe::thrift::ResultCode Log(const std::vector<scribe::thrift::LogEntry>& messages);
- void getVersion(std::string& _return) {_return = "2.01";}
+ void getVersion(std::string& _return) {_return = "2.1";}
facebook::fb303::fb_status getStatus();
void getStatusDetails(std::string& _return);
void setStatus(facebook::fb303::fb_status new_status);
@@ -52,6 +52,9 @@ class scribeHandler : virtual public scribe::thrift::scribeIf,
unsigned long int port; // it's long because that's all I implemented in the conf class
+ // number of threads processing new Thrift connections
+ size_t numThriftServerThreads;
+
private:
unsigned long checkPeriod; // periodic check interval for all contained stores
@@ -64,6 +67,11 @@ class scribeHandler : virtual public scribe::thrift::scribeIf,
// the default store
boost::shared_ptr<StoreQueue> defaultStore;
+ // temp versions of the above 3 pointers to use during initialization
+ category_map_t* pnew_categories;
+ category_prefix_map_t* pnew_category_prefixes;
+ boost::shared_ptr<StoreQueue> tmpDefault;
+
std::string configFilename;
facebook::fb303::fb_status status;
std::string statusDetails;
@@ -74,6 +82,12 @@ class scribeHandler : virtual public scribe::thrift::scribeIf,
unsigned long maxQueueSize;
bool newThreadPerCategory;
+ /* mutex to syncronize access to scribeHandler.
+ * A single mutex is fine since it only needs to be locked in write mode
+ * during start/stop/reinitialize or when we need to create a new category.
+ */
+ apache::thrift::concurrency::ReadWriteMutex scribeHandlerLock;
+
// disallow empty construction, copy, and assignment
scribeHandler();
scribeHandler(const scribeHandler& rhs);
@@ -83,8 +97,20 @@ class scribeHandler : virtual public scribe::thrift::scribeIf,
bool throttleDeny(int num_messages); // returns true if overloaded
void deleteCategoryMap(category_map_t *pcats);
const char* statusAsString(facebook::fb303::fb_status new_status);
- bool createCategoryFromModel(const std::string &category,
+ bool createCategoryFromModel(const std::string &category,
const boost::shared_ptr<StoreQueue> &model);
+ boost::shared_ptr<StoreQueue>
+ configureStoreCategory(pStoreConf store_conf,
+ const std::string &category,
+ const boost::shared_ptr<StoreQueue> &model,
+ bool category_list=false);
+ bool configureStore(pStoreConf store_conf, int* num_stores);
+ void stopStores();
+ bool throttleRequest(const std::vector<scribe::thrift::LogEntry>& messages);
+ boost::shared_ptr<store_list_t>
+ createNewCategory(const std::string& category);
+ void addMessage(const scribe::thrift::LogEntry& entry,
+ const boost::shared_ptr<store_list_t>& store_list);
};
extern boost::shared_ptr<scribeHandler> g_Handler;
View
440 src/store.cpp
@@ -13,17 +13,18 @@
// limitations under the License.
//
// See accompanying file LICENSE or visit the Scribe site at:
-// http://developers.facebook.com/scribe/
+// http://developers.facebook.com/scribe/
//
// @author Bobby Johnson
// @author James Wang
// @author Jason Sobel
// @author Alex Moskalyuk
-// @author Avinash Lakshman
+// @author Avinash Lakshman
// @author Anthony Giardullo
#include "common.h"
#include "scribe_server.h"
+#include "thrift/transport/TSimpleFileTransport.h"
using namespace std;
using namespace boost;
@@ -43,13 +44,14 @@ using namespace scribe::thrift;
#define DEFAULT_BUFFERSTORE_AVG_RETRY_INTERVAL 300
#define DEFAULT_BUFFERSTORE_RETRY_INTERVAL_RANGE 60
#define DEFAULT_BUCKETSTORE_DELIMITER ':'
+#define DEFAULT_NETWORKSTORE_CACHE_TIMEOUT 300
ConnPool g_connPool;
const string meta_logfile_prefix = "scribe_meta<new_logfile>: ";
-boost::shared_ptr<Store>
-Store::createStore(const string& type, const string& category,
+boost::shared_ptr<Store>
+Store::createStore(const string& type, const string& category,
bool readable, bool multi_category) {
if (0 == type.compare("file")) {
return shared_ptr<Store>(new FileStore(category, multi_category, readable));
@@ -76,7 +78,7 @@ Store::createStore(const string& type, const string& category,
}
}
-Store::Store(const string& category, const string &type, bool multi_category)
+Store::Store(const string& category, const string &type, bool multi_category)
: categoryHandled(category),
multiCategory(multi_category),
storeType(type) {
@@ -125,11 +127,14 @@ const std::string& Store::getType() {
return storeType;
}
-FileStoreBase::FileStoreBase(const string& category, const string &type,
+FileStoreBase::FileStoreBase(const string& category, const string &type,
bool multi_category)
: Store(category, type, multi_category),
+ baseFilePath("/tmp"),
+ subDirectory(""),
filePath("/tmp"),
baseFileName(category),
+ baseSymlinkName(""),
maxSize(DEFAULT_FILESTORE_MAX_SIZE),
maxWriteSize(DEFAULT_FILESTORE_MAX_WRITE_SIZE),
rollPeriod(ROLL_NEVER),
@@ -153,10 +158,27 @@ void FileStoreBase::configure(pStoreConf configuration) {
// We can run using defaults for all of these, but there are
// a couple of suspicious things we warn about.
std::string tmp;
- configuration->getString("file_path", filePath);
+ configuration->getString("file_path", baseFilePath);
+ configuration->getString("sub_directory", subDirectory);
+ configuration->getString("use_hostname_sub_directory", tmp);
+
+ if (0 == tmp.compare("yes")) {
+ setHostNameSubDir();
+ }
+
+ filePath = baseFilePath;
+ if (!subDirectory.empty()) {
+ filePath += "/" + subDirectory;
+ }
+
+
if (!configuration->getString("base_filename", baseFileName)) {
LOG_OPER("[%s] WARNING: Bad config - no base_filename specified for file store", categoryHandled.c_str());
}
+
+ // check if symlink name is optionally specified
+ configuration->getString("base_symlink_name", baseSymlinkName);
+
if (configuration->getString("rotate_period", tmp)) {
if (0 == tmp.compare("hourly")) {
rollPeriod = ROLL_HOURLY;
@@ -176,6 +198,7 @@ void FileStoreBase::configure(pStoreConf configuration) {
writeCategory = true;
}
}
+
if (configuration->getString("create_symlink", tmp)) {
if (0 == tmp.compare("yes")) {
createSymlink = true;
@@ -184,15 +207,17 @@ void FileStoreBase::configure(pStoreConf configuration) {
}
}
+ configuration->getString("fs_type", fsType);
+
configuration->getUnsigned("max_size", maxSize);
configuration->getUnsigned("max_write_size", maxWriteSize);
- configuration->getString("fs_type", fsType);
configuration->getUnsigned("rotate_hour", rollHour);
configuration->getUnsigned("rotate_minute", rollMinute);
configuration->getUnsigned("chunk_size", chunkSize);
}
void FileStoreBase::copyCommon(const FileStoreBase *base) {
+ subDirectory = base->subDirectory;
chunkSize = base->chunkSize;
maxSize = base->maxSize;
maxWriteSize = base->maxWriteSize;
@@ -203,13 +228,19 @@ void FileStoreBase::copyCommon(const FileStoreBase *base) {
writeMeta = base->writeMeta;
writeCategory = base->writeCategory;
createSymlink = base->createSymlink;
+ baseSymlinkName = base->baseSymlinkName;
/*
- * append the category name as a subdir of the file path and change the
+ * append the category name to the base file path and change the
* baseFileName to the category name. these are arbitrary, could be anything
* unique
*/
- filePath = base->filePath + std::string("/") + categoryHandled;
+ baseFilePath = base->baseFilePath + std::string("/") + categoryHandled;
+ filePath = baseFilePath;
+ if (!subDirectory.empty()) {
+ filePath += "/" + subDirectory;
+ }
+
baseFileName = categoryHandled;
}
@@ -232,9 +263,9 @@ void FileStoreBase::periodicCheck() {
static_cast<uint>(timeinfo->tm_hour) >= rollHour &&
static_cast<uint>(timeinfo->tm_min) >= rollMinute;
} else {
- rotate = timeinfo->tm_hour != lastRollTime &&
+ rotate = timeinfo->tm_hour != lastRollTime &&
static_cast<uint>(timeinfo->tm_min) >= rollMinute;
- }
+ }
}
if (rotate) {
rotateFile(timeinfo);
@@ -253,8 +284,8 @@ void FileStoreBase::rotateFile(struct tm *timeinfo) {
string FileStoreBase::makeFullFilename(int suffix, struct tm* creation_time) {
ostringstream filename;
-
- filename << filePath << '/';
+
+ filename << filePath << '/';
filename << makeBaseFilename(creation_time);
filename << '_' << setw(5) << setfill('0') << suffix;
@@ -263,7 +294,11 @@ string FileStoreBase::makeFullFilename(int suffix, struct tm* creation_time) {
string FileStoreBase::makeBaseSymlink() {
ostringstream base;
- base << categoryHandled << "_current";
+ if (!baseSymlinkName.empty()) {
+ base << baseSymlinkName << "_current";
+ } else {
+ base << baseFileName << "_current";
+ }
return base.str();
}
@@ -282,13 +317,13 @@ string FileStoreBase::makeBaseFilename(struct tm* creation_time) {
}
ostringstream filename;
-
+
filename << baseFileName;
if (rollPeriod != ROLL_NEVER) {
filename << '-' << creation_time->tm_year + 1900 << '-'
<< setw(2) << setfill('0') << creation_time->tm_mon + 1 << '-'
<< setw(2) << setfill('0') << creation_time->tm_mday;
-
+
}
return filename.str();
}
@@ -309,7 +344,7 @@ int FileStoreBase::findNewestFile(const string& base_filename) {
max_suffix = suffix;
}
}
- return max_suffix;
+ return max_suffix;
}
int FileStoreBase::findOldestFile(const string& base_filename) {
@@ -321,7 +356,7 @@ int FileStoreBase::findOldestFile(const string& base_filename) {
for (std::vector<std::string>::iterator iter = files.begin();
iter != files.end();
++iter) {
-
+
int suffix = getFileSuffix(*iter, base_filename);
if (suffix >= 0 &&
(min_suffix == -1 || suffix < min_suffix)) {
@@ -353,31 +388,33 @@ void FileStoreBase::printStats() {
filename += "/scribe_stats";
boost::shared_ptr<FileInterface> stats_file = FileInterface::createFileInterface(fsType, filename);
- if (!stats_file || !stats_file->openWrite()) {
- LOG_OPER("[%s] Failed to open stats file <%s> of type <%s> for writing",
+ if (!stats_file ||
+ !stats_file->createDirectory(filePath) ||
+ !stats_file->openWrite()) {
+ LOG_OPER("[%s] Failed to open stats file <%s> of type <%s> for writing",
categoryHandled.c_str(), filename.c_str(), fsType.c_str());
// This isn't enough of a problem to change our status
return;
}
-
+
time_t rawtime;
time(&rawtime);
struct tm *local_time = localtime(&rawtime);
ostringstream msg;
- msg << local_time->tm_year + 1900 << '-'
+ msg << local_time->tm_year + 1900 << '-'
<< setw(2) << setfill('0') << local_time->tm_mon + 1 << '-'
<< setw(2) << setfill('0') << local_time->tm_mday << '-'
<< setw(2) << setfill('0') << local_time->tm_hour << ':'
<< setw(2) << setfill('0') << local_time->tm_min;
msg << " wrote <" << currentSize << "> bytes in <" << eventsWritten << "> events to file <" << currentFilename << ">" << endl;
-
+
stats_file->write(msg.str());
stats_file->close();
}
-// Returns the number of bytes to pad to align to the specified chunk size
+// Returns the number of bytes to pad to align to the specified chunk size
unsigned long FileStoreBase::bytesToPad(unsigned long next_message_length,
unsigned long current_file_size,
unsigned long chunk_size) {
@@ -394,6 +431,31 @@ unsigned long FileStoreBase::bytesToPad(unsigned long next_message_length,
return 0;
}
+// set subDirectory to the name of this machine
+void FileStoreBase::setHostNameSubDir() {
+ if (!subDirectory.empty()) {
+ string error_msg = "WARNING: Bad config - ";
+ error_msg += "use_hostname_sub_directory will override sub_directory path";
+ LOG_OPER("[%s] %s", categoryHandled.c_str(), error_msg.c_str());
+ }
+
+ char hostname[255];
+ int error = gethostname(hostname, sizeof(hostname));
+ if (error) {
+ LOG_OPER("[%s] WARNING: gethostname returned error: %d ",
+ categoryHandled.c_str(), error);
+ }
+
+ string hoststring(hostname);
+
+ if (hoststring.empty()) {
+ LOG_OPER("[%s] WARNING: could not get host name",
+ categoryHandled.c_str());
+ } else {
+ subDirectory = hoststring;
+ }
+}
+
FileStore::FileStore(const string& category, bool multi_category,
bool is_buffer_file)
: FileStoreBase(category, "file", multi_category),
@@ -419,8 +481,8 @@ void FileStore::configure(pStoreConf configuration) {
// read a file that's both chunked and framed. The buffer file has
// to be framed, so we don't allow it to be chunked.
// (framed means we write a message size to disk before the message
- // data, which allows us to identify separate messages in binary data.
- // Chunked means we pad with zeroes to ensure that every multiple
+ // data, which allows us to identify separate messages in binary data.
+ // Chunked means we pad with zeroes to ensure that every multiple
// of n bytes is the start of a message, which helps in recovering
// corrupted binary data and seeking into large files)
chunkSize = 0;
@@ -474,12 +536,26 @@ bool FileStore::openInternal(bool incrementFilename, struct tm* current_time) {
writeFile = FileInterface::createFileInterface(fsType, file, isBufferFile);
if (!writeFile) {
- LOG_OPER("[%s] Failed to create file <%s> of type <%s> for writing",
+ LOG_OPER("[%s] Failed to create file <%s> of type <%s> for writing",
categoryHandled.c_str(), file.c_str(), fsType.c_str());
setStatus("file open error");
return false;
}
-
+
+ success = writeFile->createDirectory(baseFilePath);
+
+ // If we created a subdirectory, we need to create two directories
+ if (success && !subDirectory.empty()) {
+ success = writeFile->createDirectory(filePath);
+ }
+
+ if (!success) {
+ LOG_OPER("[%s] Failed to create directory for file <%s>",
+ categoryHandled.c_str(), file.c_str());
+ setStatus("File open error");
+ return false;
+ }
+
success = writeFile->openWrite();
@@ -491,8 +567,10 @@ bool FileStore::openInternal(bool incrementFilename, struct tm* current_time) {
/* just make a best effort here, and don't error if it fails */
if (createSymlink && !isBufferFile) {
string symlinkName = makeFullSymlink();
- unlink(symlinkName.c_str());
- symlink(file.c_str(), symlinkName.c_str());
+ boost::shared_ptr<FileInterface> tmp =
+ FileInterface::createFileInterface(fsType, symlinkName, isBufferFile);
+ tmp->deleteFile();
+ writeFile->createSymlink(file, symlinkName);
}
// else it confuses the filename code on reads
@@ -701,14 +779,13 @@ bool FileStore::replaceOldest(boost::shared_ptr<logentry_vector_t> messages,
// Need to close and reopen store in case we already have this file open
close();
- // delete this file, then reopen it and write messages
shared_ptr<FileInterface> infile = FileInterface::createFileInterface(fsType, filename, isBufferFile);
- infile->deleteFile();
+ // overwrite the old contents of the file
bool success;
- if (infile->openWrite()) {
+ if (infile->openTruncate()) {
success = writeMessages(messages, infile);
-
+
} else {
LOG_OPER("[%s] Failed to open file <%s> for writing and truncate",
categoryHandled.c_str(), filename.c_str());
@@ -722,7 +799,7 @@ bool FileStore::replaceOldest(boost::shared_ptr<logentry_vector_t> messages,
return success;
}
-bool FileStore::readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages,
+bool FileStore::readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages,
struct tm* now) {
int index = findOldestFile(makeBaseFilename(now));
@@ -751,7 +828,7 @@ bool FileStore::readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages
entry->category = message.substr(0, message.length() - 1);
if (!infile->readNext(message)) {
- LOG_OPER("[%s] category not stored with message <%s>",
+ LOG_OPER("[%s] category not stored with message <%s>",
categoryHandled.c_str(), entry->category.c_str());
}
} else {
@@ -762,9 +839,9 @@ bool FileStore::readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages
messages->push_back(entry);
}
- }
+ }
infile->close();
-
+
LOG_OPER("[%s] successfully read <%lu> entries from file <%s>", categoryHandled.c_str(), messages->size(), filename.c_str());
return true;
}
@@ -790,10 +867,11 @@ bool FileStore::empty(struct tm* now) {
}
-ThriftFileStore::ThriftFileStore(const std::string& category, bool multi_category)
+ThriftFileStore::ThriftFileStore(const std::string& category, bool multi_category)
: FileStoreBase(category, "thriftfile", multi_category),
flushFrequencyMs(0),
- msgBufferSize(0) {
+ msgBufferSize(0),
+ useSimpleFile(0) {
}
ThriftFileStore::~ThriftFileStore() {
@@ -816,11 +894,11 @@ bool ThriftFileStore::handleMessages(boost::shared_ptr<logentry_vector_t> messag
unsigned long messages_handled = 0;
for (logentry_vector_t::iterator iter = messages->begin();
- iter != messages->end();
+ iter != messages->end();
++iter) {
// This length is an estimate -- what the ThriftLogFile actually writes is a black box to us
- uint32_t length = (*iter)->message.size();
+ uint32_t length = (*iter)->message.size();
try {
thriftFileTransport->write(reinterpret_cast<const uint8_t*>((*iter)->message.data()), length);
@@ -864,6 +942,7 @@ void ThriftFileStore::configure(pStoreConf configuration) {
FileStoreBase::configure(configuration);
configuration->getUnsigned("flush_frequency_ms", flushFrequencyMs);
configuration->getUnsigned("msg_buffer_size", msgBufferSize);
+ configuration->getUnsigned("use_simple_file", useSimpleFile);
}
void ThriftFileStore::close() {
@@ -871,7 +950,7 @@ void ThriftFileStore::close() {
}
void ThriftFileStore::flush() {
- // TFileTransport has its own periodic flushing mechanism, and we
+ // TFileTransport has its own periodic flushing mechanism, and we
// introduce deadlocks if we try to call it from more than one place
return;
}
@@ -897,38 +976,35 @@ bool ThriftFileStore::openInternal(bool incrementFilename, struct tm* current_ti
string filename = makeFullFilename(suffix, current_time);
/* try to create the directory containing the file */
- string::size_type slash;
- if (!filename.empty() &&
- (filename.find_first_of("/") != string::npos) &&
- (filename.find_first_of("/") != (slash = filename.find_last_of("/")))) {
- try {
- boost::filesystem::create_directory(filename.substr(0, slash));
- } catch(std::exception const& e) {
- LOG_OPER("Exception < %s > trying to create directory", e.what());
- return false;
- }
+ if (!createFileDirectory()) {
+ LOG_OPER("[%s] Could not create path for file: %s",
+ categoryHandled.c_str(), filename.c_str());
+ return false;
}
-
+
if (rollPeriod == ROLL_DAILY) {
lastRollTime = current_time->tm_mday;
} else { // default to hourly if rollPeriod is garbage
lastRollTime = current_time->tm_hour;
}
-
- try {
- TFileTransport *transport = new TFileTransport(filename);
- thriftFileTransport.reset(transport);
+ try {
+ if (useSimpleFile) {
+ thriftFileTransport.reset(new TSimpleFileTransport(filename, false, true));
+ } else {
+ TFileTransport *transport = new TFileTransport(filename);
+ thriftFileTransport.reset(transport);
- if (chunkSize != 0) {
- transport->setChunkSize(chunkSize);
- }
- if (flushFrequencyMs > 0) {
- transport->setFlushMaxUs(flushFrequencyMs * 1000);
- }
- if (msgBufferSize > 0) {
- transport->setEventBufferSize(msgBufferSize);
+ if (chunkSize != 0) {
+ transport->setChunkSize(chunkSize);
+ }
+ if (flushFrequencyMs > 0) {
+ transport->setFlushMaxUs(flushFrequencyMs * 1000);
+ }
+ if (msgBufferSize > 0) {
+ transport->setEventBufferSize(msgBufferSize);
+ }
}
LOG_OPER("[%s] Opened file <%s> for writing", categoryHandled.c_str(), filename.c_str());
@@ -958,17 +1034,32 @@ bool ThriftFileStore::openInternal(bool incrementFilename, struct tm* current_ti
return true;
}
-BufferStore::BufferStore(const string& category, bool multi_category)
+bool ThriftFileStore::createFileDirectory () {
+ try {
+ boost::filesystem::create_directory(baseFilePath);
+
+ // If we created a subdirectory, we need to create two directories
+ if (!subDirectory.empty()) {
+ boost::filesystem::create_directory(filePath);
+ }
+ }catch(std::exception const& e) {
+ LOG_OPER("Exception < %s > trying to create directory", e.what());
+ return false;
+ }
+ return true;
+}
+
+BufferStore::BufferStore(const string& category, bool multi_category)
: Store(category, "buffer", multi_category),
maxQueueLength(DEFAULT_BUFFERSTORE_MAX_QUEUE_LENGTH),
bufferSendRate(DEFAULT_BUFFERSTORE_SEND_RATE),
avgRetryInterval(DEFAULT_BUFFERSTORE_AVG_RETRY_INTERVAL),
retryIntervalRange(DEFAULT_BUFFERSTORE_RETRY_INTERVAL_RANGE),
+ replayBuffer(true),
state(DISCONNECTED) {
time(&lastWriteTime);
time(&lastOpenAttempt);
- srand(lastWriteTime);
retryInterval = getNewRetryInterval();
// we can't open the client conection until we get configured
@@ -986,8 +1077,13 @@ void BufferStore::configure(pStoreConf configuration) {
configuration->getUnsigned("retry_interval", (unsigned long&) avgRetryInterval);
configuration->getUnsigned("retry_interval_range", (unsigned long&) retryIntervalRange);
+ string tmp;
+ if (configuration->getString("replay_buffer", tmp) && tmp != "yes") {
+ replayBuffer = false;
+ }
+
if (retryIntervalRange > avgRetryInterval) {
- LOG_OPER("[%s] Bad config - retry_interval_range must be less than retry_interval. Using <%d> as range instead of <%d>",
+ LOG_OPER("[%s] Bad config - retry_interval_range must be less than retry_interval. Using <%d> as range instead of <%d>",
categoryHandled.c_str(), (int)avgRetryInterval, (int)retryIntervalRange);
retryIntervalRange = avgRetryInterval;
}
@@ -1004,8 +1100,9 @@ void BufferStore::configure(pStoreConf configuration) {
setStatus(msg);
cout << msg << endl;
} else {
- // last parameter here says that this is a readable store
- secondaryStore = createStore(type, categoryHandled, true, multiCategory);
+ // If replayBuffer is true, then we need to create a readable store
+ secondaryStore = createStore(type, categoryHandled, replayBuffer,
+ multiCategory);
secondaryStore->configure(secondary_store_conf);
}
}
@@ -1033,7 +1130,7 @@ void BufferStore::configure(pStoreConf configuration) {
}
}
- // If the config is bad we'll still try to write the data to a
+ // If the config is bad we'll still try to write the data to a
// default location on local disk.
if (!secondaryStore) {
secondaryStore = createStore("file", categoryHandled, true, multiCategory);
@@ -1051,7 +1148,15 @@ bool BufferStore::open() {
// try to open the primary store, and set the state accordingly
if (primaryStore->open()) {
- changeState(SENDING_BUFFER); // in case there are files left over from a previous instance
+ // in case there are files left over from a previous instance
+ changeState(SENDING_BUFFER);
+
+ // If we don't need to send buffers, skip to streaming
+ if (!replayBuffer) {
+ // We still switch state to SENDING_BUFFER first just to make sure we
+ // can open the secondary store
+ changeState(STREAMING);
+ }
} else {
secondaryStore->open();
changeState(DISCONNECTED);
@@ -1088,6 +1193,7 @@ shared_ptr<Store> BufferStore::copy(const std::string &category) {
store->bufferSendRate = bufferSendRate;
store->avgRetryInterval = avgRetryInterval;
store->retryIntervalRange = retryIntervalRange;
+ store->replayBuffer = replayBuffer;
store->primaryStore = primaryStore->copy(category);
store->secondaryStore = secondaryStore->copy(category);
@@ -1131,7 +1237,7 @@ void BufferStore::changeState(buffer_state_t new_state) {