Skip to content

Commit

Permalink
Fix Urgent statistics & Add Lost statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
bluebore committed Jan 24, 2016
1 parent c79ab19 commit d96b06d
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 43 deletions.
104 changes: 71 additions & 33 deletions src/nameserver/block_mapping.cc
Expand Up @@ -74,15 +74,25 @@ bool BlockMapping::ChangeReplicaNum(int64_t block_id, int32_t replica_num) {
}

void BlockMapping::AddNewBlock(int64_t block_id, int32_t replica,
int64_t version, int64_t size) {
MutexLock lock(&mu_);
int64_t version, int64_t size,
const std::vector<int32_t>* init_replicas) {
NSBlock* nsblock = NULL;
NSBlockMap::iterator it = block_map_.find(block_id);
//Don't suppport soft link now
assert(it == block_map_.end());
nsblock = new NSBlock(block_id, replica, version, size);
block_map_[block_id] = nsblock;
LOG(DEBUG, "Init block info: #%ld ", block_id);
if (init_replicas) {
for (uint32_t i = 0; i < init_replicas->size(); i++) {
nsblock->replica.insert(init_replicas->at(i));
}
}
if (init_replicas) {
LOG(DEBUG, "Init block info: #%ld ", block_id);
} else {
LOG(DEBUG, "Rebuild block info: #%ld ", block_id);
}

MutexLock lock(&mu_);
std::pair<NSBlockMap::iterator, bool> ret =
block_map_.insert(std::make_pair(block_id,nsblock));
assert(ret.second == true);
if (next_block_id_ <= block_id) {
next_block_id_ = block_id + 1;
}
Expand All @@ -97,34 +107,43 @@ bool BlockMapping::UpdateBlockInfo(int64_t id, int32_t server_id, int64_t block_
//have been removed
LOG(DEBUG, "UpdateBlockInfo #%ld has been removed", id);
return false;
} else {
nsblock = it->second;
if (nsblock->version >= 0) {
if (block_version >= 0 && nsblock->version != block_version) {
LOG(INFO, "block #%ld on slow chunkserver: %d,"
" Ns: V%ld cs: V%ld drop it",
id, server_id, nsblock->version, block_version);
return false;
} else if (block_version < 0) {
/// pulling block?
return true;
}
}
nsblock = it->second;
if (nsblock->version >= 0) {
if (block_version >= 0 && nsblock->version != block_version) {
LOG(INFO, "block #%ld on slow chunkserver: %d,"
" Ns: V%ld cs: V%ld drop it",
id, server_id, nsblock->version, block_version);
return false;
} else if (block_version < 0) {
/// pulling block?
return true;
} else {
assert(block_version >= 0 && block_version == nsblock->version);
/// another received block
}
if (nsblock->block_size != block_size && block_size) {
// update
if (nsblock->block_size) {
LOG(WARNING, "block #%ld size mismatch", id);
assert(0);
return false;
} else {
LOG(INFO, "block #%ld size update by C%d V%ld ,%ld to %ld",
id, server_id, block_version, nsblock->block_size, block_size);
nsblock->block_size = block_size;
}
}
if (block_version < 0) {
/// Writing block
return false;

This comment has been minimized.

Copy link
@bluebore

bluebore Jan 24, 2016

Author Collaborator

不写注释好伤啊,两个小时前写的代码完全忘了啥意思了。。。
这里为啥要return啊。。

}
if (nsblock->block_size != block_size) {
// update
if (nsblock->block_size) {
LOG(WARNING, "block #%ld size mismatch", id);
assert(0);
return false;
} else {
//LOG(DEBUG, "UpdateBlockInfo(%ld) ignored, from %ld to %ld",
// id, nsblock->block_size, block_size);
LOG(INFO, "block #%ld size update by C%d V%ld ,%ld to %ld",
id, server_id, block_version, nsblock->block_size, block_size);
nsblock->block_size = block_size;
}
} else {
//LOG(DEBUG, "UpdateBlockInfo(%ld) ignored, from %ld to %ld",
// id, nsblock->block_size, block_size);
}
if (nsblock->replica.size() == 0) {
lost_blocks_.erase(id);
}
std::pair<std::set<int32_t>::iterator, bool> ret = nsblock->replica.insert(server_id);
int32_t cur_replica_num = nsblock->replica.size();
Expand Down Expand Up @@ -170,6 +189,9 @@ void BlockMapping::RemoveBlock(int64_t block_id) {
if (block->incomplete) {
incomplete_blocks_.erase(block_id);
}
if (block->replica.size() == 0) {
lost_blocks_.erase(block_id);
}
if (block->replica.size() == 1) {
hi_pri_recover_.erase(block_id);
}
Expand Down Expand Up @@ -204,6 +226,12 @@ void BlockMapping::DealWithDeadBlocks(int64_t cs_id, const std::set<int64_t>& bl
block->replica.erase(cs_id);
int32_t rep_num = block->replica.size();
if (rep_num < block->expect_replica_num) {
if (rep_num == 0) {
hi_pri_recover_.erase(block_id);
lost_blocks_.insert(block_id);
LOG(INFO, "Block #%ld lost all replica", block_id);
continue;
}
if (block->version == -1) {
LOG(INFO, "Incomplete block #%ld at C%d, don't recover",
block_id, cs_id);
Expand Down Expand Up @@ -246,6 +274,12 @@ void BlockMapping::PickRecoverBlocks(int32_t cs_id, int32_t block_num,
recover_q_.pop();
continue;
}
if (cur_block->replica.size() == 0) {
LOG(DEBUG, "All Replica lost #%ld , give up recover.", cur_block->id);
cur_block->pending_recover = false;
recover_q_.pop();
continue;
}
if (cur_block->replica.find(cs_id) != cur_block->replica.end()) {
tmp_holder.push_back(recover_item);
recover_q_.pop();
Expand Down Expand Up @@ -294,7 +328,8 @@ void BlockMapping::ProcessRecoveredBlock(int32_t cs_id, int64_t block_id, bool r
}

void BlockMapping::GetStat(int64_t* recover_num, int64_t* pending_num,
int64_t* urgent_num, int64_t* incomplete_num) {
int64_t* urgent_num, int64_t* lost_num,
int64_t* incomplete_num) {
MutexLock lock(&mu_);
if (recover_num) {
*recover_num = recover_q_.size();
Expand All @@ -308,6 +343,9 @@ void BlockMapping::GetStat(int64_t* recover_num, int64_t* pending_num,
if (urgent_num) {
*urgent_num = hi_pri_recover_.size();
}
if (lost_num) {
*lost_num = lost_blocks_.size();
}
if (incomplete_num) {
*incomplete_num = incomplete_blocks_.size();
}
Expand Down
8 changes: 6 additions & 2 deletions src/nameserver/block_mapping.h
Expand Up @@ -37,7 +37,9 @@ class BlockMapping {
bool GetBlock(int64_t block_id, NSBlock* block);
bool GetReplicaLocation(int64_t id, std::set<int32_t>* chunkserver_id);
bool ChangeReplicaNum(int64_t block_id, int32_t replica_num);
void AddNewBlock(int64_t block_id, int32_t replica, int64_t version, int64_t block_size);
void AddNewBlock(int64_t block_id, int32_t replica,
int64_t version, int64_t block_size,
const std::vector<int32_t>* init_replicas);
bool UpdateBlockInfo(int64_t id, int32_t server_id, int64_t block_size,
int64_t block_version, bool need_recovery);
void RemoveBlocksForFile(const FileInfo& file_info);
Expand All @@ -48,7 +50,8 @@ class BlockMapping {
std::map<int64_t, int32_t>* recover_blocks);
void ProcessRecoveredBlock(int32_t cs_id, int64_t block_id, bool recover_success);
void GetStat(int64_t* recover_num, int64_t* pending_num,
int64_t* urgent_num, int64_t* incomplete_num);
int64_t* urgent_num, int64_t* lost_num,
int64_t* incomplete_num);

private:
void AddToRecover(NSBlock* block);
Expand All @@ -67,6 +70,7 @@ class BlockMapping {
typedef std::map<int32_t, std::set<int64_t> > CheckList;
CheckList recover_check_;
std::set<int64_t> hi_pri_recover_;
std::set<int64_t> lost_blocks_;
std::set<int64_t> incomplete_blocks_;
};

Expand Down
4 changes: 2 additions & 2 deletions src/nameserver/chunkserver_manager.cc
Expand Up @@ -318,8 +318,8 @@ void ChunkServerManager::PickRecoverBlocks(int cs_id,
for (std::map<int64_t, int32_t>::iterator it = blocks.begin(); it != blocks.end(); ++it) {
ChunkServerInfo* cs = NULL;
if (!GetChunkServerPtr(it->second, &cs)) {
LOG(WARNING, "PickRecoverBlocks for %ld can't find chunkserver C%d",
it->second, cs_id);
LOG(WARNING, "PickRecoverBlocks for C%d can't find chunkserver C%d",
cs_id, it->second);
continue;
}
recover_blocks->insert(std::make_pair(it->first, cs->address()));
Expand Down
15 changes: 9 additions & 6 deletions src/nameserver/nameserver_impl.cc
Expand Up @@ -167,7 +167,7 @@ void NameServerImpl::BlockReport(::google::protobuf::RpcController* controller,
if (!block_mapping_->UpdateBlockInfo(cur_block_id, cs_id,
cur_block_size,
block_version,
!safe_mode_ && block_version >= 0)) {
!safe_mode_)) {
response->add_obsolete_blocks(cur_block_id);
chunkserver_manager_->RemoveBlock(cs_id, cur_block_id);
LOG(INFO, "BlockReport remove obsolete block: #%ld C%d ", cur_block_id, cs_id);
Expand Down Expand Up @@ -252,13 +252,14 @@ void NameServerImpl::AddBlock(::google::protobuf::RpcController* controller,
LOG(INFO, "[AddBlock] new block for %s id= #%ld ",
path.c_str(), new_block_id);
LocatedBlock* block = response->mutable_block();
block_mapping_->AddNewBlock(new_block_id, replica_num, -1, 0);
std::vector<int32_t> replicas;
for (int i =0; i<replica_num; i++) {
ChunkServerInfo* info = block->add_chains();
info->set_address(chains[i].second);
LOG(INFO, "Add %s to #%ld response", chains[i].second.c_str(), new_block_id);
block_mapping_->UpdateBlockInfo(new_block_id, chains[i].first, 0, -1, false);
replicas.push_back(chains[i].first);
}
block_mapping_->AddNewBlock(new_block_id, replica_num, -1, 0, &replicas);
block->set_block_id(new_block_id);
response->set_status(0);
file_info.add_blocks(new_block_id);
Expand Down Expand Up @@ -474,7 +475,7 @@ void NameServerImpl::RebuildBlockMapCallback(const FileInfo& file_info) {
int64_t block_id = file_info.blocks(i);
int64_t version = file_info.version();
block_mapping_->AddNewBlock(block_id, file_info.replicas(),
version, file_info.size());
version, file_info.size(), NULL);
}
}

Expand Down Expand Up @@ -559,8 +560,9 @@ bool NameServerImpl::WebService(const sofa::pbrpc::HTTPRequest& request,
}
table_str += "</table>";

int64_t recover_num, pending_num, urgent_num, incomplete_num;
block_mapping_->GetStat(&recover_num, &pending_num, &urgent_num, &incomplete_num);
int64_t recover_num, pending_num, urgent_num, lost_num, incomplete_num;
block_mapping_->GetStat(&recover_num, &pending_num, &urgent_num,
&lost_num, &incomplete_num);
str += "<h1>分布式文件系统控制台 - NameServer</h1>";

str += "<div class=\"row\">";
Expand All @@ -581,6 +583,7 @@ bool NameServerImpl::WebService(const sofa::pbrpc::HTTPRequest& request,
str += "Recover: " + common::NumToString(recover_num) + "</br>";
str += "Pending: " + common::NumToString(pending_num) + "</br>";
str += "Urgent: " + common::NumToString(urgent_num) + "</br>";
str += "Lost: " + common::NumToString(lost_num) + "</br>";
str += "Incomplete: " + common::NumToString(incomplete_num) + "</br>";
str += "</div>"; // <div class="col-sm-3 col-md-3">
str += "</div>"; // <div class="col-sm-6 col-md-6">
Expand Down

0 comments on commit d96b06d

Please sign in to comment.