Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/etcd leader #209

Merged
merged 7 commits into from
Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions command/test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@ import (
"bytes"
"io"
"strings"
"testing"

"github.com/distributedio/titan/conf"
"github.com/distributedio/titan/context"
"github.com/distributedio/titan/db"

"go.etcd.io/etcd/integration"
)

var Cfg = &conf.MockConf().TiKV
var mockdb *db.RedisStore

func init() {
t := &testing.T{}
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, ClientTLS: nil})
Cfg.EtcdAddrs = clus.RandClient().Endpoints()
mockdb, _ = db.Open(Cfg)
}

Expand Down
31 changes: 16 additions & 15 deletions conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,38 @@ type Server struct {

// TiKV config is the config of tikv sdk
type TiKV struct {
PdAddrs string `cfg:"pd-addrs; mocktikv://; ;pd address in tidb"`
GC GC `cfg:"gc"`
Expire Expire `cfg:"expire"`
ZT ZT `cfg:"zt"`
TiKVGC TiKVGC `cfg:"tikv-gc"`
Logger TiKVLogger `cfg:"logger"`
PdAddrs string `cfg:"pd-addrs; mocktikv://; ;pd address in tidb"`
EtcdAddrs []string `cfg:"etcd-addrs; []; ;etcd address default use pd-addrs"`
GC GC `cfg:"gc"`
Expire Expire `cfg:"expire"`
ZT ZT `cfg:"zt"`
TiKVGC TiKVGC `cfg:"tikv-gc"`
Logger TiKVLogger `cfg:"logger"`
}

// TiKVGC config is the config of implement tikv sdk gcwork
type TiKVGC struct {
Disable bool `cfg:"disable; false; boolean; false is used to disable tikvgc "`
Interval time.Duration `cfg:"interval;20m;;gc work tick interval"`
LeaderLifeTime time.Duration `cfg:"leader-life-time;30m;;lease flush leader interval"`
LeaderTTL int `cfg:"leader-ttl;15;;leader ttl seconds"`
SafePointLifeTime time.Duration `cfg:"safe-point-life-time;10m;;safe point life time "`
Concurrency int `cfg:"concurrency;2;;gc work concurrency"`
}

// GC config is the config of Titan GC work
type GC struct {
Disable bool `cfg:"disable; false; boolean; false is used to disable gc"`
Interval time.Duration `cfg:"interval;1s;;gc work tick interval"`
LeaderLifeTime time.Duration `cfg:"leader-life-time;3m;;lease flush leader interval"`
BatchLimit int `cfg:"batch-limit;256;numeric;key count limitation per-transection"`
Disable bool `cfg:"disable; false; boolean; false is used to disable gc"`
Interval time.Duration `cfg:"interval;1s;;gc work tick interval"`
LeaderTTL int `cfg:"leader-ttl;15;;leader ttl seconds"`
BatchLimit int `cfg:"batch-limit;256;numeric;key count limitation per-transection"`
}

// Expire config is the config of Titan expire work
type Expire struct {
Disable bool `cfg:"disable; false; boolean; false is used to disable expire"`
Interval time.Duration `cfg:"interval;1s;;expire work tick interval"`
LeaderLifeTime time.Duration `cfg:"leader-life-time;3m;;lease flush leader interval"`
BatchLimit int `cfg:"batch-limit;256;numeric;key count limitation per-transection"`
Disable bool `cfg:"disable; false; boolean; false is used to disable expire"`
Interval time.Duration `cfg:"interval;1s;;expire work tick interval"`
LeaderTTL int `cfg:"leader-ttl;15;;leader ttl seconds"`
BatchLimit int `cfg:"batch-limit;256;numeric;key count limitation per-transection"`
}

// ZT config is the config of zlist
Expand Down
18 changes: 9 additions & 9 deletions conf/mockconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ func MockConf() *Titan {
TiKV: TiKV{
PdAddrs: "mocktikv://",
GC: GC{
Disable: false,
Interval: time.Second,
LeaderLifeTime: 3 * time.Minute,
BatchLimit: 256,
Disable: false,
Interval: time.Second,
LeaderTTL: 15,
BatchLimit: 256,
},
Expire: Expire{
Disable: false,
Interval: time.Second,
LeaderLifeTime: 3 * time.Minute,
BatchLimit: 256,
Disable: false,
Interval: time.Second,
LeaderTTL: 15,
BatchLimit: 256,
},
ZT: ZT{
Disable: false,
Expand All @@ -29,7 +29,7 @@ func MockConf() *Titan {
TiKVGC: TiKVGC{
Disable: false,
Interval: 20 * time.Minute,
LeaderLifeTime: 30 * time.Minute,
LeaderTTL: 15,
SafePointLifeTime: 10 * time.Minute,
Concurrency: 2,
},
Expand Down
15 changes: 9 additions & 6 deletions conf/titan.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ ssl-key-file = ""
#type: string, description: pd address in tidb, default: mocktikv://
#pd-addrs = "mocktikv://"

#type: []string, description: etcd address default use pd-addrs, default: []
#etcd-addrs = []


[tikv.gc]

Expand All @@ -51,8 +54,8 @@ ssl-key-file = ""
#type: time.Duration, description: gc work tick interval, default: 1s
#interval = "1s"

#type: time.Duration, description: lease flush leader interval, default: 3m
#leader-life-time = "3m0s"
#type: int, description: leader ttl seconds, default: 15
#leader-ttl = 15

#type: int, rules: numeric, description: key count limitation per-transection, default: 256
#batch-limit = 256
Expand All @@ -67,8 +70,8 @@ ssl-key-file = ""
#type: time.Duration, description: expire work tick interval, default: 1s
#interval = "1s"

#type: time.Duration, description: lease flush leader interval, default: 3m
#leader-life-time = "3m0s"
#type: int, description: leader ttl seconds, default: 15
#leader-ttl = 15

#type: int, rules: numeric, description: key count limitation per-transection, default: 256
#batch-limit = 256
Expand Down Expand Up @@ -102,8 +105,8 @@ ssl-key-file = ""
#type: time.Duration, description: gc work tick interval, default: 20m
#interval = "20m0s"

#type: time.Duration, description: lease flush leader interval, default: 30m
#leader-life-time = "30m0s"
#type: int, description: leader ttl seconds, default: 15
#leader-ttl = 15

#type: time.Duration, description: safe point life time, default: 10m
#safe-point-life-time = "10m0s"
Expand Down
147 changes: 4 additions & 143 deletions db/db.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
package db

import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"strconv"
"time"

"go.uber.org/zap"

"github.com/distributedio/titan/conf"
"github.com/distributedio/titan/db/store"
"github.com/distributedio/titan/metrics"
)

var (
Expand Down Expand Up @@ -117,10 +113,10 @@ func Open(conf *conf.TiKV) (*RedisStore, error) {
}
rds := &RedisStore{Storage: s, conf: conf}
sysdb := rds.DB(sysNamespace, sysDatabaseID)
go StartGC(sysdb, &conf.GC)
go StartExpire(sysdb, &conf.Expire)
go StartZT(sysdb, &conf.ZT)
go StartTiKVGC(sysdb, &conf.TiKVGC)

if err := RegisterTask(sysdb, conf); err != nil {
return nil, err
}
return rds, nil
}

Expand Down Expand Up @@ -270,138 +266,3 @@ func dbPrefix(ns string, id []byte) []byte {
}
return prefix
}

func flushLease(txn store.Transaction, key, id []byte, interval time.Duration) error {
databytes := make([]byte, 24)
copy(databytes, id)
ts := uint64((time.Now().Add(interval).Unix()))
binary.BigEndian.PutUint64(databytes[16:], ts)

if err := txn.Set(key, databytes); err != nil {
return err
}
return nil
}

func checkLeader(txn store.Transaction, key, id []byte, interval time.Duration) (bool, error) {
val, err := txn.Get(key)
if err != nil {
if !IsErrNotFound(err) {
zap.L().Error("query leader message faild",
zap.ByteString("key", key),
zap.ByteString("id", id),
zap.Error(err))
return false, err
}

if env := zap.L().Check(zap.DebugLevel, "no leader now, create new lease"); env != nil {
env.Write(zap.ByteString("key", key),
zap.ByteString("id", id),
zap.Duration("interval", interval))
}

if err := flushLease(txn, key, id, interval); err != nil {
zap.L().Error("create lease failed",
zap.ByteString("key", key),
zap.ByteString("id", id),
zap.Duration("interval", interval),
zap.Error(err))
return false, err
}

return true, nil
}

curID := val[0:16]
ts := int64(binary.BigEndian.Uint64(val[16:]))

if time.Now().Unix() > ts {
if err := flushLease(txn, key, id, interval); err != nil {
zap.L().Error("create lease failed",
zap.ByteString("key", key),
zap.ByteString("id", id),
zap.Int64("last_ts", ts),
zap.Error(err))
return false, err
}
return true, nil
}

if bytes.Equal(curID, id) {
if err := flushLease(txn, key, id, interval); err != nil {
zap.L().Error("flush lease failed",
zap.ByteString("key", key),
zap.ByteString("curid", curID),
zap.ByteString("id", id),
zap.Error(err))
return false, err
}
return true, nil
}
return false, nil
}

func isLeader(db *DB, leader []byte, id []byte, interval time.Duration) (bool, error) {
count := 0
label := "default"
switch {
case bytes.Equal(leader, sysZTLeader):
label = "ZT"
case bytes.Equal(leader, sysGCLeader):
label = "GC"
case bytes.Equal(leader, sysExpireLeader):
label = "EX"
case bytes.Equal(leader, sysTiKVGCLeader):
label = "TGC"

}

for {
txn, err := db.Begin()
if err != nil {
zap.L().Error("transection begin failed",
zap.ByteString("leader", leader),
zap.Error(err))
continue
}

isLeader, err := checkLeader(txn.t, leader, id, interval)
mtFunc := func() {
if isLeader {
metrics.GetMetrics().IsLeaderGaugeVec.WithLabelValues(label).Set(1)
return
}
metrics.GetMetrics().IsLeaderGaugeVec.WithLabelValues(label).Set(0)
}

if err != nil {
if err := txn.Rollback(); err != nil {
return isLeader, err
}
if IsRetryableError(err) {
count++
if count < 3 {
continue
}
}
mtFunc()
return isLeader, err
}

if err := txn.Commit(context.Background()); err != nil {
if err := txn.Rollback(); err != nil {
return isLeader, err
}
if IsRetryableError(err) {
count++
if count < 3 {
continue
}
}
mtFunc()
return isLeader, err
}
mtFunc()
return isLeader, err
}
}
30 changes: 12 additions & 18 deletions db/expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,28 +96,22 @@ func unExpireAt(txn store.Transaction, mkey []byte, expireAt int64) error {
}

// StartExpire get leader from db
func StartExpire(db *DB, conf *conf.Expire) {
func StartExpire(task *Task) {
conf := task.conf.(conf.Expire)
ticker := time.NewTicker(conf.Interval)
defer ticker.Stop()
id := UUID()
for range ticker.C {
if conf.Disable {
continue
}
isLeader, err := isLeader(db, sysExpireLeader, id, conf.LeaderLifeTime)
if err != nil {
zap.L().Error("[Expire] check expire leader failed", zap.Error(err))
continue
}
if !isLeader {
if logEnv := zap.L().Check(zap.DebugLevel, "[Expire] not expire leader"); logEnv != nil {
logEnv.Write(zap.ByteString("leader", sysExpireLeader),
zap.ByteString("uuid", id),
zap.Duration("leader-life-time", conf.LeaderLifeTime))
for {
select {
case <-task.Done():
if logEnv := zap.L().Check(zap.DebugLevel, "[EX] current is not expire leader"); logEnv != nil {
logEnv.Write(zap.ByteString("key", task.key),
zap.ByteString("uuid", task.id),
zap.String("lable", task.lable))
}
continue
break
case <-ticker.C:
}
runExpire(db, conf.BatchLimit)
runExpire(task.db, conf.BatchLimit)
}
}

Expand Down