Skip to content

Commit

Permalink
Close fail when BackGroundWrite error
Browse files Browse the repository at this point in the history
  • Loading branch information
bluebore committed Jun 1, 2015
1 parent 7f14b2e commit 6956a5e
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 24 deletions.
5 changes: 3 additions & 2 deletions src/chunkserver/chunkserver_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class Block {
}
~Block() {
if (_bufdatalen > 0) {
LOG(WARNING, "Data lost, %d bytes in %s,%ld",
LOG(INFO, "Data lost, %d bytes in %s,%ld",
_bufdatalen, _meta.file_path, _meta.block_size - _bufdatalen);
}
if (_blockbuf) {
Expand All @@ -92,7 +92,8 @@ class Block {
}
if (_recv_window) {
if (_recv_window->Size()) {
fprintf(stderr, "recv_window fragments: %d\n", _recv_window->Size());
LOG(INFO, "bid:%ld recv_window fragments: %d\n",
_meta.block_id, _recv_window->Size());
std::vector<std::pair<int32_t,Buffer> > frags;
_recv_window->GetFragments(&frags);
for (uint32_t i = 0; i < frags.size(); i++) {
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/rpc_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class RpcClient {
(stub->*func)(&controller, request, response, NULL);
if (controller.Failed()) {
if (retry < retry_times - 1) {
LOG(WARNING, "Send failed, retry ...\n");
LOG(DEBUG, "Send failed, retry ...\n");
usleep(1000000);
} else {
LOG(WARNING, "SendRequest fail: %s\n", controller.ErrorText().c_str());
Expand Down
61 changes: 40 additions & 21 deletions src/sdk/bfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class BfsFileImpl : public File {
bool _closed; ///< 是否关闭
Mutex _mu;
CondVar _sync_signal; ///< _sync_var
bool _bg_error; ///< background write error
};

class FSImpl : public FS {
Expand Down Expand Up @@ -408,7 +409,7 @@ BfsFileImpl::BfsFileImpl(FSImpl* fs, RpcClient* rpc_client,
_open_flags(flags), _chains_head(NULL), _block_for_write(NULL),
_write_buf(NULL), _last_seq(-1), _back_writing(0),
_chunkserver(NULL), _read_offset(0), _closed(false),
_sync_signal(&_mu) {
_sync_signal(&_mu), _bg_error(false) {
_write_window = new common::SlidingWindow<int>(100,
boost::bind(&BfsFileImpl::OnWriteCommit, this, _1, _2));
}
Expand Down Expand Up @@ -505,8 +506,16 @@ int64_t BfsFileImpl::Read(char* buf, int64_t read_len) {

int64_t BfsFileImpl::Write(const char* buf, int64_t len) {
common::timer::AutoTimer at(100, "Write", _name.c_str());
if (!(_open_flags & O_WRONLY) && !(_open_flags & O_APPEND)) {
return -2;

{
MutexLock lock(&_mu, "Write");
if (!(_open_flags & O_WRONLY) && !(_open_flags & O_APPEND)) {
return -2;
}
if (_bg_error || _closed) {
return -3;
}
common::atomic_inc(&_back_writing);
}
if (_open_flags & O_WRONLY) {
MutexLock lock(&_mu, "Write");
Expand All @@ -520,6 +529,7 @@ int64_t BfsFileImpl::Write(const char* buf, int64_t len) {
&request, &response, 5, 3);
if (!ret || !response.has_block()) {
LOG(WARNING, "AddBlock fail for %s\n", _name.c_str());
common::atomic_dec(&_back_writing);
return -1;
}
_block_for_write = new LocatedBlock(response.block());
Expand All @@ -530,8 +540,8 @@ int64_t BfsFileImpl::Write(const char* buf, int64_t len) {
_rpc_client->GetStub(addr, &_chains_head);
}
} else if (_open_flags == O_APPEND) {
MutexLock lock(&_mu, "Append");
if (_chains_head == NULL) {
MutexLock lock(&_mu, "Append");
FileLocationRequest request;
FileLocationResponse response;
request.set_file_name(_name);
Expand All @@ -545,7 +555,8 @@ int64_t BfsFileImpl::Write(const char* buf, int64_t len) {
const std::string& addr = _block_for_write->chains(0).address();
_rpc_client->GetStub(addr, &_chains_head);
} else {
fprintf(stderr, "Locate file %s error: %d\n", _name.c_str(), response.status());
LOG(WARNING, "Locate file %s error: %d\n", _name.c_str(), response.status());
common::atomic_dec(&_back_writing);
return -1;
}
}
Expand Down Expand Up @@ -573,6 +584,7 @@ int64_t BfsFileImpl::Write(const char* buf, int64_t len) {
}
}
// printf("Write return %d, buf_size=%d\n", w, file->_write_buf->Size());
common::atomic_dec(&_back_writing);
return w;
}

Expand Down Expand Up @@ -655,23 +667,30 @@ void BfsFileImpl::WriteChunkCallback(const WriteBlockRequest* request,
WriteBuffer* buffer) {
if (failed || response->status() != 0) {
if (sofa::pbrpc::RPC_ERROR_SEND_BUFFER_FULL != error) {
LOG(WARNING, "BackgroundWrite failed [bid:%ld, seq:%d, offset:%ld, len:%d]"
" status: %d, retry_times: %d",
buffer->block_id(), buffer->Sequence(),
buffer->offset(), buffer->Size(),
response->status(), retry_times);
if (retry_times < 5) {
LOG(WARNING, "BackgroundWrite failed "
"[bid:%ld, seq:%d, offset:%ld, len:%d]"
" status: %d, retry_times: %d",
buffer->block_id(), buffer->Sequence(),
buffer->offset(), buffer->Size(),
response->status(), retry_times);
}
if (--retry_times == 0) {
///TODO: SetFaild & handle it
/// If there is a chunkserver failed, abandon this block, and add a new block ..............
assert(0);
_bg_error = true;
delete buffer;
delete request;
}
}
common::atomic_inc(&_back_writing);
g_thread_pool.DelayTask(
5, boost::bind(&BfsFileImpl::DelayWriteChunk, this, buffer, request, retry_times));
if (!_bg_error) {
common::atomic_inc(&_back_writing);
g_thread_pool.DelayTask(5,
boost::bind(&BfsFileImpl::DelayWriteChunk, this, buffer, request, retry_times));
}
} else {
LOG(DEBUG, "BackgroundWrite done [bid:%ld, seq:%d, offset:%ld, len:%d]\n",
buffer->block_id(), buffer->Sequence(), buffer->offset(), buffer->Size());
LOG(DEBUG, "BackgroundWrite done bid:%ld, seq:%d, offset:%ld, len:%d, _back_writing:%d",
buffer->block_id(), buffer->Sequence(), buffer->offset(),
buffer->Size(), _back_writing);
int r = _write_window->Add(buffer->Sequence(), 0);
assert(r == 0);
delete buffer;
Expand All @@ -681,7 +700,7 @@ void BfsFileImpl::WriteChunkCallback(const WriteBlockRequest* request,

{
MutexLock lock(&_mu, "WriteChunkCallback");
if (_write_queue.empty()) {
if (_write_queue.empty() || _bg_error) {
common::atomic_dec(&_back_writing); // for AsyncRequest
if (_back_writing == 0) {
_sync_signal.Broadcast();
Expand Down Expand Up @@ -715,19 +734,19 @@ bool BfsFileImpl::Sync() {
_sync_signal.Wait((_name + " Sync wait").c_str());
}
// fprintf(stderr, "Sync %s fail\n", _name.c_str());
return true;
return !_bg_error;
}

bool BfsFileImpl::Close() {
common::timer::AutoTimer at(500, "Close", _name.c_str());
bool ret = true;
MutexLock lock(&_mu, "Close");
if (_block_for_write && ((_open_flags & O_WRONLY) || _open_flags == O_APPEND)) {
if (!_write_buf) {
_write_buf = new WriteBuffer(++_last_seq, 32, _block_for_write->block_id(),
_block_for_write->block_size());
}
_write_buf->SetLast();
LOG(INFO, "Close write seq: %d", _write_buf->Sequence());
StartWrite(_write_buf);

//common::timer::AutoTimer at(1, "LastWrite", _name.c_str());
Expand All @@ -741,7 +760,7 @@ bool BfsFileImpl::Close() {
_chunkserver = NULL;
LOG(DEBUG, "File %s closed", _name.c_str());
_closed = true;
return ret;
return !_bg_error;
}

bool FS::OpenFileSystem(const char* nameserver, FS** fs) {
Expand Down
1 change: 1 addition & 0 deletions tera/bfs_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ int32_t BfsFile::Flush() {
return ret;
}
int32_t BfsFile::Sync() {
LOG(INFO, "Sync(%s) start", _name.c_str());
common::timer::AutoTimer ac(10, "Sync", _name.c_str());
int ret = -1;
if (_file->Sync()) {
Expand Down

0 comments on commit 6956a5e

Please sign in to comment.