Permalink
Browse files

1.6.8.4 improve repl performance

  • Loading branch information...
1 parent 6fb853b commit b5f4e1f23850d24bd3a38cf7f77f08d377ad5058 ideawu committed Feb 26, 2014
Showing with 70 additions and 49 deletions.
  1. +3 −0 README.md
  2. +63 −47 src/backend_sync.cpp
  3. +2 −0 src/include.h
  4. +1 −1 src/version.h
  5. +1 −1 version
View
@@ -133,3 +133,6 @@ See [Changes-Made-to-LevelDB wiki](https://github.com/ideawu/ssdb/wiki/Changes-M
SSDB is licensed under [New BSD License](http://opensource.org/licenses/BSD-3-Clause), a very flexible license to use.
+## Thanks
+
+* 刘建辉 liujianhui@gongchang.com
View
@@ -78,14 +78,16 @@ void* BackendSync::_run_thread(void *arg){
}
bool is_empty = true;
+ // WARN: MUST do first sync() before first copy(), because
+ // sync() will refresh last_seq, and copy() will not
+ if(client.sync(logs)){
+ is_empty = false;
+ }
if(client.status == Client::COPY){
if(client.copy()){
is_empty = false;
}
}
- if(client.sync(logs)){
- is_empty = false;
- }
if(is_empty){
if(idle >= NOOP_IDLES){
idle = 0;
@@ -99,12 +101,12 @@ void* BackendSync::_run_thread(void *arg){
}
if(link->flush() == -1){
- log_info("fd: %d, send error: %s", link->fd(), strerror(errno));
+ log_info("%s:%d fd: %d, send error: %s", link->remote_ip, link->remote_port, link->fd(), strerror(errno));
break;
}
}
- log_info("Sync Client quit, fd: %d, delete link", link->fd());
+ log_info("Sync Client quit, %s:%d fd: %d, delete link", link->remote_ip, link->remote_port, link->fd());
delete link;
Locking l(&backend->mutex);
@@ -151,16 +153,18 @@ void BackendSync::Client::init(){
}
const char *type = is_mirror? "mirror" : "sync";
if(last_key == "" && last_seq != 0){
- log_info("[%s]fd: %d, sync, seq: %" PRIu64 ", key: '%s'",
+ log_info("[%s] %s:%d fd: %d, sync, seq: %" PRIu64 ", key: '%s'",
type,
+ link->remote_ip, link->remote_port,
link->fd(),
last_seq, hexmem(last_key.data(), last_key.size()).c_str()
);
this->status = Client::SYNC;
}else{
// a slave must reset its last_key when receiving 'copy_end' command
- log_info("[%s]fd: %d, copy recover, seq: %" PRIu64 ", key: '%s'",
+ log_info("[%s] %s:%d fd: %d, copy recover, seq: %" PRIu64 ", key: '%s'",
type,
+ link->remote_ip, link->remote_port,
link->fd(),
last_seq, hexmem(last_key.data(), last_key.size()).c_str()
);
@@ -169,7 +173,7 @@ void BackendSync::Client::init(){
}
void BackendSync::Client::reset(){
- log_info("fd: %d, copy begin", link->fd());
+ log_info("%s:%d fd: %d, copy begin", link->remote_ip, link->remote_port, link->fd());
this->status = Client::COPY;
this->last_seq = 0;
this->last_key = "";
@@ -197,51 +201,62 @@ int BackendSync::Client::copy(){
log_debug("new iterator, last_key: '%s'", hexmem(last_key.data(), last_key.size()).c_str());
std::string key = this->last_key;
if(this->last_key.empty()){
- key.push_back(DataType::HASH);
+ key.push_back(DataType::MIN_PREFIX);
}
this->iter = backend->ssdb->iterator(key, "", -1);
}
- for(int i=0; i<1000; i++){
+ int ret = 0;
+ int iterate_count = 0;
+ while(true){
+ // Prevent copy() from blocking too long
+ if(++iterate_count > 10000 || link->output->size() > 2 * 1024 * 1024){
+ break;
+ }
+
if(!iter->next()){
- log_info("fd: %d, copy end", link->fd());
- this->status = Client::SYNC;
- delete this->iter;
- this->iter = NULL;
-
- Binlog log(this->last_seq, BinlogType::COPY, BinlogCommand::END, "");
- log_trace("fd: %d, %s", link->fd(), log.dumps().c_str());
- link->send(log.repr(), "copy_end");
- return 1;
- }else{
- Bytes key = iter->key();
- Bytes val = iter->val();
-
- if(key.size() == 0){
- continue;
- }
- this->last_key = key.String();
+ goto copy_end;
+ }
+ Bytes key = iter->key();
+ if(key.size() == 0){
+ continue;
+ }
+ // finish copying all valid data types
+ if(key.data()[0] > DataType::MAX_PREFIX){
+ goto copy_end;
+ }
+ Bytes val = iter->val();
+ this->last_key = key.String();
- char cmd = 0;
- char data_type = key.data()[0];
- if(data_type == DataType::KV){
- cmd = BinlogCommand::KSET;
- }else if(data_type == DataType::HASH){
- cmd = BinlogCommand::HSET;
- }else if(data_type == DataType::ZSET){
- cmd = BinlogCommand::ZSET;
- }else{
- continue;
- }
-
- Binlog log(this->last_seq, BinlogType::COPY, cmd, key.Slice());
- log_debug("fd: %d, %s", link->fd(), log.dumps().c_str());
- link->send(log.repr(), val);
- //if(link->output->size() > 1024 * 1024){
- return 1;
- //}
+ char cmd = 0;
+ char data_type = key.data()[0];
+ if(data_type == DataType::KV){
+ cmd = BinlogCommand::KSET;
+ }else if(data_type == DataType::HASH){
+ cmd = BinlogCommand::HSET;
+ }else if(data_type == DataType::ZSET){
+ cmd = BinlogCommand::ZSET;
+ }else{
+ continue;
}
+
+ ret = 1;
+
+ Binlog log(this->last_seq, BinlogType::COPY, cmd, key.Slice());
+ log_trace("fd: %d, %s", link->fd(), log.dumps().c_str());
+ link->send(log.repr(), val);
}
- return 0;
+ return ret;
+
+copy_end:
+ log_info("%s:%d fd: %d, copy end", link->remote_ip, link->remote_port, link->fd());
+ this->status = Client::SYNC;
+ delete this->iter;
+ this->iter = NULL;
+
+ Binlog log(this->last_seq, BinlogType::COPY, BinlogCommand::END, "");
+ log_trace("fd: %d, %s", link->fd(), log.dumps().c_str());
+ link->send(log.repr(), "copy_end");
+ return 1;
}
int BackendSync::Client::sync(BinlogQueue *logs){
@@ -274,7 +289,8 @@ int BackendSync::Client::sync(BinlogQueue *logs){
continue;
}
if(this->last_seq != 0 && log.seq() != expect_seq){
- log_warn("fd: %d, OUT_OF_SYNC! log.seq: %" PRIu64 ", expect_seq: %" PRIu64 "",
+ log_warn("%s:%d fd: %d, OUT_OF_SYNC! log.seq: %" PRIu64 ", expect_seq: %" PRIu64 "",
+ link->remote_ip, link->remote_port,
link->fd(),
log.seq(),
expect_seq
View
@@ -54,6 +54,8 @@ class DataType{
static const char ZSIZE = 'Z';
static const char QUEUE = 'q';
static const char QSIZE = 'Q';
+ static const char MIN_PREFIX = HASH;
+ static const char MAX_PREFIX = ZSET;
};
class BinlogType{
View
@@ -1,6 +1,6 @@
#ifndef SSDB_DEPS_H
#ifndef SSDB_VERSION
-#define SSDB_VERSION "1.6.8.3"
+#define SSDB_VERSION "1.6.8.4"
#include <stdlib.h>
#include <jemalloc/jemalloc.h>
#endif
View
@@ -1 +1 @@
-1.6.8.3
+1.6.8.4

0 comments on commit b5f4e1f

Please sign in to comment.