Permalink
Browse files

Fix improper handling of corrupted data when reading and writing buff…

…er files.

Summary:
On read part:
  originally, scribe will discard any data in the file from the point where
corruption is detected, but with this handling scheme, part of the corrupted
data can still be read and sent to primary. An example is illustrated as
follows:

  <--- GOOD FRAME --><--- PARTIAL FRAME --| <--- NEXT GOOD FRAME --->
                     ^                    ^ the corrupted frame got truncated
here
                     the frame size header of the corrupted frame is OK

in the above example, when the corrupted frame is read, StdFile::readNext() doesn't
know it is corrupted (since the frame size header is OK), and reads the partial frame
together with the leading part of the next good frame. When we feed this corrupted
frame to TBinaryProtocol to deserialize it, thrift throws an exception. This exception
was not properly handled, so that scribe failed to pass this buffer file, resulting
in buffer file piling up (is buffer streaming disabled).

This diff catches the exception and discards all data from the beginning of the
corrupted frame as the old scribe does. Ideally, we would like a smarter handling
policy like local scanning to adjust the read pointer to the beginning of the next
good frame, but that would require significant change to FileInterface. As discussed
offline, we would like to create a new file type (e.g. BufferFile) later to do that.

On write part:
  the original scribe doesn't rotate file on write failure. Next time handleMessages()
is called, the corrupted file is re-opened, and good data are appended to the end. This
result in loss of all data after the corruption point. We change the policy to rotate
file on write failures, so that only the last few bytes of the corrupted file will be
lost.

This diff also fix the badalloc exception described in the task. That exception
happens because we keep using TMemoryBuffer for deserialization without
resetting it. This causes its r/w pointers to keep moving forward without
releasing any buffer space before rBase_, which effectively is a memory leak.

Test Plan:
For the read part:
  scribe scribe with a midtier-writer configuration;
  do stress test;
  scribe_ctrl stop the writer scribe; the mid-tier scribe should begin
generating buffers;
  open the oldest buffer file and change some bytes in it so that it is
corrupted*.
  start the writer scribe again so that buffers are replayed.
  scribe should complain errors and skip rest of the corrupted file.

For the write part:
  mkdir /tmp/tmpfs
  sudo mount -t tmpfs -o size=20m tmpfs /tmp/tmpfs
  start a mid-tier scribe;
  run stresstest.php;
  when the file size quota is reached, scribe should rotate to next file
instead of
trying to re-open the original file. There may be some partial data at the end
of the original
file.
  sudo mount -t tmpfs -o remount -o size=100m tmpfs /tmp/tmpfs
  scribe should flush the remaining data
  • Loading branch information...
yliang yliang
yliang authored and yliang committed Jul 27, 2010
1 parent 067a3db commit fe63cb04744974a701123a391ea7f0d48838bbd9
View

This file was deleted.

Oops, something went wrong.
View

This file was deleted.

Oops, something went wrong.
View
@@ -31,7 +31,8 @@
using namespace scribe::thrift;
static const std::string kMetaLogfilePrefix = "scribe_meta<new_logfile>: ";
static const std::string kMetaLogfilePrefix = "scribe_meta<new_logfile>: ";
static const uint32_t kMinConvertBufferSize = 16000;
namespace scribe {
@@ -131,7 +132,7 @@ bool FileStore::openInternal(bool incrementFilename, struct tm* currentTime) {
if (!writeFile_) {
LOG_OPER("[%s] Failed to create file <%s> of type <%s> for writing",
categoryHandled_.c_str(), file.c_str(), fsType_.c_str());
setStatus("file open error");
g_handler->incCounter(categoryHandled_, "err_file_open_1");
return false;
}
@@ -145,7 +146,7 @@ bool FileStore::openInternal(bool incrementFilename, struct tm* currentTime) {
if (!success) {
LOG_OPER("[%s] Failed to create directory for file <%s>",
categoryHandled_.c_str(), file.c_str());
setStatus("File open error");
g_handler->incCounter(categoryHandled_, "err_file_open_2");
return false;
}
@@ -156,7 +157,7 @@ bool FileStore::openInternal(bool incrementFilename, struct tm* currentTime) {
LOG_OPER("[%s] Failed to open file <%s> for writing",
categoryHandled_.c_str(),
file.c_str());
setStatus("File open error");
g_handler->incCounter(categoryHandled_, "err_file_open_3");
} else {
/* just make a best effort here, and don't error if it fails */
@@ -176,14 +177,13 @@ bool FileStore::openInternal(bool incrementFilename, struct tm* currentTime) {
currentSize_ = writeFile_->fileSize();
currentFilename_ = file;
eventsWritten_ = 0;
setStatus("");
}
} catch(const std::exception& e) {
LOG_OPER("[%s] Failed to create/open file of type <%s> for writing",
categoryHandled_.c_str(), fsType_.c_str());
LOG_OPER("Exception: %s", e.what());
setStatus("file create/open error");
g_handler->incCounter(categoryHandled_, "err_file_create_open");
return false;
}
@@ -292,19 +292,21 @@ bool FileStore::writeMessages(LogEntryVectorPtr messages,
// reserve a reasonable size of the write buffer
writeBuf.reserve(maxWriteSize + 1024);
// reset and reserve some space for convertBuffer_
convertBuffer_->resetBuffer(kMinConvertBufferSize);
try {
for (LogEntryVector::iterator iter = messages->begin();
iter != messages->end();
++ iter) {
if (isBufferFile_) {
convertBuffer_->resetBuffer(); // reset the r/w pointers
(*iter)->write(protocol.get());
unsigned long messageLen = convertBuffer_->available_read();
writeBuf += writeFile->getFrame(messageLen);
writeBuf += convertBuffer_->getBufferAsString();
convertBuffer_->resetBuffer();
} else {
// have to be careful with the length here. getFrame wants the length
@@ -367,9 +369,12 @@ bool FileStore::writeMessages(LogEntryVectorPtr messages,
if (writeBuf.size() > maxWriteSize ||
messages->end() == iter + 1 ) {
if (!writeFile->write(writeBuf)) {
LOG_OPER("[%s] File store failed to write (%lu) messages to file",
categoryHandled_.c_str(), messages->size());
setStatus("File write error");
LOG_OPER(
"[%s] File store failed to write (%lu) messages (%lu bytes) to file",
categoryHandled_.c_str(),
messages->size(),
writeBuf.size());
g_handler->incCounter(categoryHandled_, "err_file_write");
g_handler->stats.addCounter(StatCounters::kFileWriteErr, 1);
success = false;
break;
@@ -401,7 +406,13 @@ bool FileStore::writeMessages(LogEntryVectorPtr messages,
eventsWritten_ += numWritten;
if (!success) {
close();
if (isBufferFile_ && currentSize_ > 0 && !rotateOnReopen_) {
// force a rotation so that corrupted bytes are bounded at the end of
// the file
rotateFile();
} else {
close();
}
// update messages to include only the messages that were not handled
if (numWritten > 0) {
@@ -500,16 +511,32 @@ bool FileStore::readOldest(/*out*/ LogEntryVectorPtr messages,
protocol = TBinaryProtocolFactory().getProtocol(convertBuffer_);
}
lostBytes_ = 0;
uint32_t bsize = 0;
string message;
while ((loss = inFile->readNext(&message)) > 0) {
if (!message.empty()) {
LogEntryPtr entry(new LogEntry);
if (isThriftEncoded) {
convertBuffer_->write(reinterpret_cast<const uint8_t*>(message.data()),
message.length());
entry->read(protocol.get());
try {
convertBuffer_->resetBuffer((uint8_t*)message.data(),
message.length());
entry->read(protocol.get());
} catch (const TTransportException& te) {
// Corruption detected. Usually this message contains part of the
// original message and part of the next message. We simply drop
// this message.
LOG_OPER("[%s] Corrupted message detected. Lost %ld bytes.",
categoryHandled_.c_str(), message.length());
lostBytes_ += message.length();
continue; // keep trying and hope we can get back onto the road
// after some retries. For each subsequent readNext(),
// it may fail with number of lost bytes returned, or
// may fail in thrift deserialization, or eventually
// we reach the beginning of a good frame.
}
} else {
// check whether a category is stored with the message
@@ -535,12 +562,15 @@ bool FileStore::readOldest(/*out*/ LogEntryVectorPtr messages,
}
}
// accumulate all the numbers of lost bytes
if (loss < 0) {
lostBytes_ = -loss;
lostBytes_ += -loss;
}
if (lostBytes_ > 0) {
g_handler->stats.addCounter(StatCounters::kFileLostBytes, lostBytes_);
} else {
lostBytes_ = 0;
}
inFile->close();
LOG_OPER("[%s] read <%lu> entries of <%d> bytes from file <%s>",
View
@@ -76,7 +76,7 @@ class HdfsFile : public FileInterface,
long readNext(string* item) { return false; };
void deleteFile() {};
void listImpl(const string& path, vector<string>* files) {};
string getFrame(unsigned dataSize) { return 0; };
string getFrame(unsigned dataSize) { return string(); };
bool createDirectory(const string& path) { return false; };
bool createSymlink(const string& newPath, const string& oldPath) {
return false;
View
@@ -33,7 +33,11 @@ using namespace std;
namespace scribe {
StdFile::StdFile(const string& name, bool frame)
: FileInterface(name, frame), inputBuffer_(NULL), bufferSize_(0) {
: FileInterface(name, frame),
inputBuffer_(NULL),
bufferSize_(0),
fileSize_(-1),
bytesRead_(0) {
}
StdFile::~StdFile() {
@@ -48,6 +52,7 @@ bool StdFile::exists() {
}
bool StdFile::openRead() {
fileSize();
return open(fstream::in);
}
@@ -70,6 +75,7 @@ bool StdFile::open(ios_base::openmode mode) {
}
file_.open(filename_.c_str(), mode);
bytesRead_ = 0;
return file_.good();
}
@@ -141,7 +147,7 @@ StdFile::readNext(string* item) {
if (offset != -1) { \
size = -(fileSize() - offset); \
} else { \
size = -fileSize(); \
size = -(fileSize() - bytesRead_); \
} \
if (size > 0) { \
/* loss size can't be positive \
@@ -158,7 +164,7 @@ StdFile::readNext(string* item) {
inputBuffer_ = (char *) malloc(bufferSize_);
if (inputBuffer_ == NULL) {
CALC_LOSS();
LOG_OPER("WARNING: nomem Data Loss loss %ld bytes in %s", size,
LOG_OPER("WARNING: nomem Data Loss loss %ld bytes in %s", -size,
filename_.c_str());
return (size);
}
@@ -169,11 +175,13 @@ StdFile::readNext(string* item) {
/* end of file */
return (0);
}
bytesRead_ += UINT_SIZE;
// check if most signiifcant bit set - should never be set
if (size >= INT_MAX) {
if (size >= std::min((long)INT_MAX, fileSize_)) {
/* Definitely corrupted. Stop reading any further */
CALC_LOSS();
LOG_OPER("WARNING: Corruption Data Loss %ld bytes in %s", size,
LOG_OPER("WARNING: Corruption Data Loss %ld bytes in %s", -size,
filename_.c_str());
return (size);
}
@@ -189,16 +197,17 @@ StdFile::readNext(string* item) {
}
if (inputBuffer_ == NULL) {
CALC_LOSS();
LOG_OPER("WARNING: nomem Corruption? Data Loss %ld bytes in %s", size,
LOG_OPER("WARNING: nomem Corruption? Data Loss %ld bytes in %s", -size,
filename_.c_str());
return (size);
}
file_.read(inputBuffer_, size);
if (file_.good()) {
item->assign(inputBuffer_, size);
bytesRead_ += size;
} else {
CALC_LOSS();
LOG_OPER("WARNING: Data Loss %ld bytes in %s", size, filename_.c_str());
LOG_OPER("WARNING: Data Loss %ld bytes in %s", -size, filename_.c_str());
}
if (bufferSize_ > kLargeBufferSize) {
free(inputBuffer_);
@@ -209,15 +218,15 @@ StdFile::readNext(string* item) {
}
unsigned long StdFile::fileSize() {
unsigned long size = 0;
try {
size = filesystem::file_size(filename_.c_str());
if (fileSize_ < 0) {
fileSize_ = filesystem::file_size(filename_.c_str());
}
} catch(const std::exception& e) {
LOG_OPER("Failed to get size for file <%s> error <%s>",
filename_.c_str(), e.what());
size = 0;
}
return size;
return fileSize_;
}
void StdFile::listImpl(const string& path, vector<string>* files) {
@@ -244,7 +253,7 @@ bool StdFile::createDirectory(const string& path) {
filesystem::create_directories(path);
} catch(const std::exception& e) {
LOG_OPER("Exception < %s > in StdFile::createDirectory for path %s ",
e.what(),path.c_str());
e.what(),path.c_str());
return false;
}
View
@@ -55,6 +55,8 @@ class StdFile : public FileInterface,
char* inputBuffer_;
unsigned bufferSize_;
std::fstream file_;
long fileSize_;
long bytesRead_;
// disallow empty construction
StdFile();

This file was deleted.

Oops, something went wrong.
View
@@ -0,0 +1,87 @@
<?php
// Copyright (c) 2007-2010 Facebook
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// See accompanying file LICENSE or visit the Scribe site at:
// http://developers.facebook.com/scribe/
// this test verifies the following process:
// 1) scribe is running in central-client style (two scribes)
// 2) the central scribe goes down
// 3) buffer files in client scribe pile up until diskfull
// 4) central scribe comes up
// 5) buffer files should be flushed to central scribe without jamming
require_once 'tests.php';
require_once 'testutil.php';
function set_up() {
system('mkdir -p /tmp/scribetest_/central');
system('mkdir -p /tmp/scribetest_/client');
system('mount -t tmpfs -o size=16m tmpfs /tmp/scribetest_/client');
}
function tear_down() {
system('umount /tmp/scribetest_/client');
system('rm -rf /tmp/scribetest_/client');
system('rm -rf /tmp/scribetest_/central');
}
set_up();
$success = true;
$pid1 = scribe_start('diskfulltest.client', $GLOBALS['SCRIBE_BIN'],
$GLOBALS['SCRIBE_PORT'], 'scribe.conf.diskfull.client');
print("running stress test\n");
$msg_sent = stress_test('test', 'client1', 1000, 200000, 20, 100, 1);
sleep(2); // it should begin to complain errors
$pid2 = scribe_start('diskfulltest.central', $GLOBALS['SCRIBE_BIN'],
$GLOBALS['SCRIBE_PORT2'], 'scribe.conf.diskfull.central');
// wait for messages to arrive
sleep(30);
// we would get duplicate messages
uniqueFiles('/tmp/scribetest_/central/test', 'test_');
// check results
$results = resultChecker('/tmp/scribetest_/central/test', 'test_', 'client1');
if ($results["count"] < $msg_sent) {
$success = false;
}
$results = resultChecker('/tmp/scribetest_/client/test', 'test_', 'client1');
if ($results["count"] != 0 || $results["out_of_order"] != 0) {
$success = false;
}
if (!scribe_stop($GLOBALS['SCRIBE_CTRL'], $GLOBALS['SCRIBE_PORT'], $pid1)) {
print("ERROR: could not stop client scribe\n");
$success = false;
}
if (!scribe_stop($GLOBALS['SCRIBE_CTRL'], $GLOBALS['SCRIBE_PORT2'], $pid2)) {
print("ERROR: could not stop central scribe\n");
$success = false;
}
tear_down();
return $success;
Oops, something went wrong.

0 comments on commit fe63cb0

Please sign in to comment.