Permalink
Browse files

make corrupted file handling more robust

Summary:
On detecting corruption in a backed up file, stop reading any further.
Calculate how
	many bytes are being lost. Output this information in LOG messages and updated
the
	bytes lost counter.

	StdFile::readNext can only work for framed files. The dead code for reading
from non-framed
	files was anyway wrong - it allocated only 4K for a message line while there
are many messages
	longer than that. I removed this dead code.

	I haven't yet added checksums - what the task asks for. I will not close the
task after this
	diff

Test Plan:
1/ Make sure that the regular code when the backup file is not corrupted works.

	2/ Force backup. corrupt backup file by changing frame size field.
	   move the scribe-server to sending_buffer state. The backup file is removed,
as much
	   data that could be sent is sent and log and counters have the loss
information.

	   [Tue May 25 23:22:35 2010] "WARNING: Corruption Data Loss -14 bytes in
/tmp/corr/foo/foo_00000"
	   scribe_overall:bytes lost: 28
	   scribe_overall:received good: 7
	   foo:received good: 7
	   scribe_overall:retries: 154
	   foo:bytes lost: 28  <===
	   foo:retries: 154
	   scribe_overall:sent: 3

	3/ same as 2/ but when uploading the backup file there is an error. The backup
file is left as
	   it is. In this situation when the backup file was being read the LOG will
contain info that
	   x bytes were lost. But the bytes lost counter won't go up. The bytes lost
counter only goes
	   up when the corrupted file is being deleted.

DiffCamp Revision: 118317
Reviewed By: groys
CC: agiardullo, pkhemani, groys, scribe-dev@lists
Tasks:

Revert Plan:
OK

git-svn-id: svn+ssh://tubbs/svnapps/fbomb/branches/scribe-os/fbcode/scribe@28770 2248de34-8caa-4a3c-bc55-5e52d9d7b73a
  • Loading branch information...
1 parent 2123b73 commit 47f6a219c2649584d5325290e749605f34b03c53 pkhemani committed with groys May 28, 2010
Showing with 114 additions and 55 deletions.
  1. +3 −2 src/HdfsFile.cpp
  2. +2 −2 src/HdfsFile.h
  3. +82 −37 src/file.cpp
  4. +2 −2 src/file.h
  5. +24 −12 src/store.cpp
  6. +1 −0 src/store.h
View
@@ -178,8 +178,9 @@ void HdfsFile::listImpl(const std::string& path,
}
}
-bool HdfsFile::readNext(std::string& _return) {
- return false; // frames not yet supported
+long HdfsFile::readNext(std::string& _return) {
+ /* choose a reasonable value for loss */
+ return (-1000 * 1000 * 1000);
}
string HdfsFile::getFrame(unsigned data_length) {
View
@@ -24,7 +24,7 @@ class HdfsFile : public FileInterface {
bool write(const std::string& data);
void flush();
unsigned long fileSize();
- bool readNext(std::string& _return);
+ long readNext(std::string& _return);
void deleteFile();
void listImpl(const std::string& path, std::vector<std::string>& _return);
std::string getFrame(unsigned data_size);
@@ -77,7 +77,7 @@ class HdfsFile : public FileInterface {
void flush() {};
void sync() {};
unsigned long fileSize() { return 0; };
- bool readNext(std::string& _return) { return false; };
+ long readNext(std::string& _return) { return false; };
void deleteFile() {};
void listImpl(const std::string& path, std::vector<std::string>& _return) {};
std::string getFrame(unsigned data_size) { return 0; };
View
@@ -23,8 +23,8 @@
#include "file.h"
#include "HdfsFile.h"
-// INITIAL_BUFFER_SIZE must always be >= UINT_SIZE
-#define INITIAL_BUFFER_SIZE 4096
+#define INITIAL_BUFFER_SIZE (64 * 1024)
+#define LARGE_BUFFER_SIZE (16 * INITIAL_BUFFER_SIZE) /* arbitrarily chosen */
#define UINT_SIZE 4
using namespace std;
@@ -137,50 +137,95 @@ void StdFile::flush() {
}
}
-bool StdFile::readNext(std::string& _return) {
+/*
+ * read the next frame in the file that is currently open. returns the
+ * body of the frame in _return.
+ *
+ * returns a negative number if it
+ * encounters any problem when reading from the file. The negative
+ * number is the number of bytes in the file that will not be read
+ * becuase of this problem (most likely corruption of file).
+ *
+ * returns 0 on end of file or when it encounters a frame of size 0
+ *
+ * On success it returns the number of bytes in the frame's body
+ *
+ * This function assumes that the file it is reading is framed.
+ */
+long
+StdFile::readNext(std::string& _return) {
+ long size;
+
+#define CALC_LOSS() do { \
+ int offset = file.tellg(); \
+ if (offset != -1) { \
+ size = -(fileSize() - offset); \
+ } else { \
+ size = -fileSize(); \
+ } \
+ if (size > 0) { \
+ /* loss size can't be positive \
+ * choose a arbitrary but reasonable
+ * value for loss
+ */ \
+ size = -(1000 * 1000 * 1000); \
+ } \
+ /* loss size can be 0 */ \
+} while (0)
if (!inputBuffer) {
bufferSize = INITIAL_BUFFER_SIZE;
- inputBuffer = new char[bufferSize];
+ inputBuffer = (char *) malloc(bufferSize);
+ if (inputBuffer == NULL) {
+ CALC_LOSS();
+ LOG_OPER("WARNING: nomem Data Loss loss %ld bytes in %s", size,
+ filename.c_str());
+ return (size);
+ }
}
- if (framed) {
- unsigned size;
- file.read(inputBuffer, UINT_SIZE); // assumes INITIAL_BUFFER_SIZE > UINT_SIZE
- if (file.good() && (size = unserializeUInt(inputBuffer))) {
-
- // check if size is larger than half the max uint size
- if (size >= (((unsigned)1) << (UINT_SIZE*8 - 1))) {
- LOG_OPER("WARNING: attempting to read message of size %d bytes", size);
-
- // Do not try to make bufferSize any larger than this or you might overflow
- bufferSize = size;
- }
+ file.read(inputBuffer, UINT_SIZE);
+ if (!file.good() || (size = unserializeUInt(inputBuffer)) == 0) {
+ /* end of file */
+ return (0);
+ }
+ // check if most signiifcant bit set - should never be set
+ if (size >= INT_MAX) {
+ /* Definitely corrupted. Stop reading any further */
+ CALC_LOSS();
+ LOG_OPER("WARNING: Corruption Data Loss %ld bytes in %s", size,
+ filename.c_str());
+ return (size);
+ }
- while (size > bufferSize) {
- bufferSize = 2 * bufferSize;
- delete[] inputBuffer;
- inputBuffer = new char[bufferSize];
- }
- file.read(inputBuffer, size);
- if (file.good()) {
- _return.assign(inputBuffer, size);
- return true;
- } else {
- int offset = file.tellg();
- LOG_OPER("ERROR: Failed to read file %s at offset %d",
- filename.c_str(), offset);
- return false;
- }
+ if (size > bufferSize) {
+ bufferSize = ((size + INITIAL_BUFFER_SIZE - 1) / INITIAL_BUFFER_SIZE) *
+ INITIAL_BUFFER_SIZE;
+ free(inputBuffer);
+ inputBuffer = (char *) malloc(bufferSize);
+ if (bufferSize > LARGE_BUFFER_SIZE) {
+ LOG_OPER("WARNING: allocating large buffer Corruption? %d", bufferSize);
}
+ }
+ if (inputBuffer == NULL) {
+ CALC_LOSS();
+ LOG_OPER("WARNING: nomem Corruption? Data Loss %ld bytes in %s", size,
+ filename.c_str());
+ return (size);
+ }
+ file.read(inputBuffer, size);
+ if (file.good()) {
+ _return.assign(inputBuffer, size);
} else {
- file.getline(inputBuffer, bufferSize);
- if (file.good()) {
- _return = inputBuffer;
- return true;
- }
+ CALC_LOSS();
+ LOG_OPER("WARNING: Data Loss %ld bytes in %s", size, filename.c_str());
}
- return false;
+ if (bufferSize > LARGE_BUFFER_SIZE) {
+ free(inputBuffer);
+ inputBuffer = NULL;
+ }
+ return (size);
+#undef CALC_LOSS
}
unsigned long StdFile::fileSize() {
View
@@ -42,7 +42,7 @@ class FileInterface {
virtual bool write(const std::string& data) = 0;
virtual void flush() = 0;
virtual unsigned long fileSize() = 0;
- virtual bool readNext(std::string& _return) = 0; // returns a line if unframed or a record if framed
+ virtual long readNext(std::string& _return) = 0;
virtual void deleteFile() = 0;
virtual void listImpl(const std::string& path, std::vector<std::string>& _return) = 0;
virtual std::string getFrame(unsigned data_size) {return std::string();};
@@ -70,7 +70,7 @@ class StdFile : public FileInterface {
bool write(const std::string& data);
void flush();
unsigned long fileSize();
- bool readNext(std::string& _return);
+ long readNext(std::string& _return);
void deleteFile();
void listImpl(const std::string& path, std::vector<std::string>& _return);
std::string getFrame(unsigned data_size);
View
@@ -593,7 +593,8 @@ FileStore::FileStore(StoreQueue* storeq,
bool multi_category, bool is_buffer_file)
: FileStoreBase(storeq, category, "file", multi_category),
isBufferFile(is_buffer_file),
- addNewlines(false) {
+ addNewlines(false),
+ lost_bytes(0) {
}
FileStore::~FileStore() {
@@ -909,6 +910,10 @@ void FileStore::deleteOldest(struct tm* now) {
}
shared_ptr<FileInterface> deletefile = FileInterface::createFileInterface(fsType,
makeFullFilename(index, now));
+ if (lost_bytes) {
+ g_Handler->incCounter(categoryHandled, "bytes lost", lost_bytes);
+ lost_bytes = 0;
+ }
deletefile->deleteFile();
}
@@ -951,6 +956,8 @@ bool FileStore::replaceOldest(boost::shared_ptr<logentry_vector_t> messages,
bool FileStore::readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages,
struct tm* now) {
+ long loss;
+
int index = findOldestFile(makeBaseFilename(now));
if (index < 0) {
// This isn't an error. It's legit to call readOldest when there aren't any
@@ -970,7 +977,7 @@ bool FileStore::readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages
uint32_t bsize = 0;
std::string message;
- while (infile->readNext(message)) {
+ while ((loss = infile->readNext(message)) > 0) {
if (!message.empty()) {
logentry_ptr_t entry = logentry_ptr_t(new LogEntry);
@@ -979,9 +986,11 @@ bool FileStore::readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages
// get category without trailing \n
entry->category = message.substr(0, message.length() - 1);
- if (!infile->readNext(message)) {
- LOG_OPER("[%s] category not stored with message <%s>",
- categoryHandled.c_str(), entry->category.c_str());
+ if ((loss = infile->readNext(message)) <= 0) {
+ LOG_OPER("[%s] category not stored with message <%s> "
+ "corruption?, incompatible config change?",
+ categoryHandled.c_str(), entry->category.c_str());
+ break;
}
} else {
entry->category = categoryHandled;
@@ -994,9 +1003,14 @@ bool FileStore::readOldest(/*out*/ boost::shared_ptr<logentry_vector_t> messages
bsize += entry->message.size();
}
}
+ if (loss < 0) {
+ lost_bytes = -loss;
+ } else {
+ lost_bytes = 0;
+ }
infile->close();
- LOG_OPER("[%s] successfully read <%lu> entries of <%d> bytes from file <%s>",
+ LOG_OPER("[%s] read <%lu> entries of <%d> bytes from file <%s>",
categoryHandled.c_str(), messages->size(), bsize, filename.c_str());
return true;
}
@@ -1793,15 +1807,15 @@ void NetworkStore::periodicCheck() {
string host;
uint32_t port;
bool success = configmod->getHostFunc(categoryHandled, storeConf.get(), host, port);
- if (success && (host != remoteHost || port != remotePort)) {
+ if (success && (host != remoteHost || port != remotePort)) {
// if it is different from the current configuration
// then close and open again
LOG_OPER("[%s] dynamic configred network store destination changed. old value:<%s:%lu>, new value:<%s:%lu>",
categoryHandled.c_str(), remoteHost.c_str(), remotePort,
host.c_str(), (long unsigned)port);
remoteHost = host;
remotePort = port;
- close();
+ close();
}
}
}
@@ -2112,14 +2126,12 @@ void BucketStore::createBuckets(pStoreConf configuration) {
bucket_name = ss.str();
if (!configuration->getStore(bucket_name, bucket_conf)) {
- error_msg = "could not find bucket definition for " +
- bucket_name;
+ error_msg = "could not find bucket definition for " + bucket_name;
goto handle_error;
}
if (!bucket_conf->getString("type", type)) {
- error_msg =
- "store contained in a bucket store must have a type";
+ error_msg = "store contained in a bucket store must have a type";
goto handle_error;
}
View
@@ -229,6 +229,7 @@ class FileStore : public FileStoreBase {
// disallow copy, assignment, and empty construction
FileStore(FileStore& rhs);
FileStore& operator=(FileStore& rhs);
+ long lost_bytes;
};
/*

0 comments on commit 47f6a21

Please sign in to comment.