Permalink
Browse files

added replica identifier, fixed repl bug

  • Loading branch information...
1 parent 7cee72e commit 866a34d027c3df446bb424974740e1cfc47b5259 ideawu committed Dec 11, 2013
Showing with 59 additions and 22 deletions.
  1. +16 −6 src/backend_sync.cpp
  2. +1 −1 src/serv.cpp
  3. +19 −11 src/slave.cpp
  4. +4 −0 src/slave.h
  5. +5 −0 src/ssdb.cpp
  6. +2 −1 src/t_hash.cpp
  7. +4 −2 src/t_kv.cpp
  8. +2 −1 src/t_zset.cpp
  9. +3 −0 ssdb.conf
  10. +3 −0 ssdb_slave.conf
View
@@ -177,16 +177,26 @@ void BackendSync::Client::reset(){
}
void BackendSync::Client::noop(){
- this->last_noop_seq = this->last_seq;
- Binlog noop(this->last_seq, BinlogType::NOOP, BinlogCommand::NONE, "");
+ uint64_t seq;
+ if(this->status == Client::COPY && this->last_key.empty()){
+ seq = 0;
+ }else{
+ seq = this->last_seq;
+ this->last_noop_seq = this->last_seq;
+ }
+ Binlog noop(seq, BinlogType::NOOP, BinlogCommand::NONE, "");
log_debug("fd: %d, %s", link->fd(), noop.dumps().c_str());
link->send(noop.repr());
}
int BackendSync::Client::copy(){
if(this->iter == NULL){
log_debug("new iterator, last_key: '%s'", hexmem(last_key.data(), last_key.size()).c_str());
- this->iter = backend->ssdb->iterator(this->last_key, "", -1);
+ std::string key = this->last_key;
+ if(this->last_key.empty()){
+ key.push_back(DataType::KV);
+ }
+ this->iter = backend->ssdb->iterator(key, "", -1);
}
for(int i=0; i<1000; i++){
if(!iter->next()){
@@ -202,8 +212,7 @@ int BackendSync::Client::copy(){
}else{
Bytes key = iter->key();
Bytes val = iter->val();
- this->last_key = key.String();
-
+
if(key.size() == 0){
continue;
}
@@ -219,7 +228,8 @@ int BackendSync::Client::copy(){
}else{
continue;
}
-
+ this->last_key = key.String();
+
Binlog log(this->last_seq, BinlogType::COPY, cmd, key.Slice());
log_trace("fd: %d, %s", link->fd(), log.dumps().c_str());
link->send(log.repr(), val);
View
@@ -165,7 +165,7 @@ static Command commands[] = {
PROC(dump, "b"),
PROC(sync140, "b"),
PROC(info, "r"),
- // doing compaction in a reader thread, because we have one
+ // doing compaction in a reader thread, because we have only one
// writer thread(for performance reason), we don't want to block writes
PROC(compact, "rt"),
PROC(key_range, "r"),
View
@@ -18,17 +18,19 @@ Slave::Slave(SSDB *ssdb, leveldb::DB* meta_db, const char *ip, int port, bool is
this->log_type = BinlogType::SYNC;
}
+ {
+ char buf[128];
+ snprintf(buf, sizeof(buf), "%s|%d", master_ip.c_str(), master_port);
+ this->set_id(buf);
+ }
+
this->link = NULL;
this->last_seq = 0;
this->last_key = "";
this->connect_retry = 0;
this->copy_count = 0;
this->sync_count = 0;
-
- load_status();
- log_debug("last_seq: %" PRIu64 ", last_key: %s",
- last_seq, hexmem(last_key.data(), last_key.size()).c_str());
}
Slave::~Slave(){
@@ -43,6 +45,10 @@ Slave::~Slave(){
}
void Slave::start(){
+ load_status();
+ log_debug("last_seq: %" PRIu64 ", last_key: %s",
+ last_seq, hexmem(last_key.data(), last_key.size()).c_str());
+
thread_quit = false;
int err = pthread_create(&run_thread_tid, NULL, &Slave::_run_thread, this);
if(err != 0){
@@ -59,12 +65,14 @@ void Slave::stop(){
}
}
+void Slave::set_id(const std::string &id){
+ this->id_ = id;
+}
+
std::string Slave::status_key(){
static std::string key;
if(key.empty()){
- char buf[100];
- snprintf(buf, sizeof(buf), "new.slave.status|%s|%d", master_ip.c_str(), master_port);
- key.assign(buf);
+ key = "new.slave.status|" + this->id_;
}
return key;
}
@@ -96,10 +104,10 @@ int Slave::connect(){
int port = this->master_port;
if(++connect_retry % 50 == 1){
- log_info("[%d] connecting to master at %s:%d...", connect_retry-1, ip, port);
+ log_info("[%s][%d] connecting to master at %s:%d...", this->id_.c_str(), connect_retry-1, ip, port);
link = Link::connect(ip, port);
if(link == NULL){
- log_error("failed to connect to master: %s:%d!", ip, port);
+ log_error("[%s]failed to connect to master: %s:%d!", this->id_.c_str(), ip, port);
goto err;
}else{
connect_retry = 0;
@@ -110,12 +118,12 @@ int Slave::connect(){
link->send("sync140", seq_buf, this->last_key, type);
if(link->flush() == -1){
- log_error("network error");
+ log_error("[%s]network error", this->id_.c_str());
delete link;
link = NULL;
goto err;
}
- log_info("ready to receive binlogs");
+ log_info("[%s]ready to receive binlogs", this->id_.c_str());
return 1;
}
}
View
@@ -14,6 +14,8 @@ class Slave{
std::string last_key;
uint64_t copy_count;
uint64_t sync_count;
+
+ std::string id_;
SSDB *ssdb;
Link *link;
@@ -46,6 +48,8 @@ class Slave{
~Slave();
void start();
void stop();
+
+ void set_id(const std::string &id);
};
#endif
View
@@ -125,8 +125,13 @@ SSDB* SSDB::open(const Config &conf, const std::string &base_dir){
is_mirror = false;
}
+ std::string id = c->get_str("id");
+
log_info("slaveof: %s:%d, type: %s", ip.c_str(), port, type.c_str());
Slave *slave = new Slave(ssdb, ssdb->meta_db, ip.c_str(), port, is_mirror);
+ if(!id.empty()){
+ slave->set_id(id);
+ }
slave->start();
ssdb->slaves.push_back(slave);
}
View
@@ -225,7 +225,8 @@ int SSDB::hlist(const Bytes &name_s, const Bytes &name_e, uint64_t limit,
static int hset_one(const SSDB *ssdb, const Bytes &name, const Bytes &key, const Bytes &val, char log_type){
if(name.empty() || key.empty()){
log_error("empty name or key!");
- return -1;
+ //return -1;
+ return 0;
}
if(name.size() > SSDB_KEY_LEN_MAX ){
log_error("name too long!");
View
@@ -10,7 +10,8 @@ int SSDB::multi_set(const std::vector<Bytes> &kvs, int offset, char log_type){
const Bytes &key = *it;
if(key.empty()){
log_error("empty key!");
- return -1;
+ return 0;
+ //return -1;
}
const Bytes &val = *(it + 1);
std::string buf = encode_kv_key(key);
@@ -47,7 +48,8 @@ int SSDB::multi_del(const std::vector<Bytes> &keys, int offset, char log_type){
int SSDB::set(const Bytes &key, const Bytes &val, char log_type){
if(key.empty()){
log_error("empty key!");
- return -1;
+ //return -1;
+ return 0;
}
Transaction trans(binlogs);
View
@@ -381,7 +381,8 @@ static std::string filter_score(const Bytes &score){
static int zset_one(SSDB *ssdb, const Bytes &name, const Bytes &key, const Bytes &score, char log_type){
if(name.empty() || key.empty()){
log_error("empty name or key!");
- return -1;
+ return 0;
+ //return -1;
}
if(name.size() > SSDB_KEY_LEN_MAX ){
log_error("name too long!");
View
@@ -18,6 +18,9 @@ server:
replication:
slaveof:
+ # to identify a master even if it moved(ip, port changed)
+ # if set to empty or not defined, ip:port will be used.
+ #id: svc_2
# sync|mirror, default is sync
#type: sync
#ip: 127.0.0.1
View
@@ -10,6 +10,9 @@ server:
replication:
slaveof:
+ # to identify a master even if it moved(ip, port changed)
+ # if set to empty or not defined, ip:port will be used.
+ id: svc_1
# sync|mirror, default is sync
type: sync
ip: 127.0.0.1

0 comments on commit 866a34d

Please sign in to comment.