diff --git a/command/test_test.go b/command/test_test.go index 03e66e90..50f4cb17 100644 --- a/command/test_test.go +++ b/command/test_test.go @@ -8,13 +8,9 @@ import ( "github.com/meitu/titan/conf" "github.com/meitu/titan/context" "github.com/meitu/titan/db" - "github.com/meitu/titan/db/store" ) -var cfg = &conf.Tikv{ - PdAddrs: store.MockAddr, - DB: conf.DB{}, -} +var cfg = &conf.MockConf().Tikv var mockdb *db.RedisStore func init() { diff --git a/conf/config.go b/conf/config.go index 1252eeb2..66450869 100644 --- a/conf/config.go +++ b/conf/config.go @@ -2,7 +2,7 @@ package conf import "time" -//Titan configuration center +// Titan configuration center type Titan struct { Server Server `cfg:"server"` Status Status `cfg:"status"` @@ -12,29 +12,34 @@ type Titan struct { PIDFileName string `cfg:"pid-filename; titan.pid; ; the file name to record connd PID"` } +// DB config is the config of titan data struct type DB struct { Hash Hash `cfg:"hash"` } +// Hash config is the config of titan hash data struct type Hash struct { MetaSlot int64 `cfg:"meta-slot;0;numeric;hashes slot key count"` } -//Server config is the config of titan server +// Server config is the config of titan server type Server struct { Auth string `cfg:"auth;;;client connetion auth"` Listen string `cfg:"listen; 0.0.0.0:7369; netaddr; address to listen"` MaxConnection int64 `cfg:"max-connection;1000;numeric;client connection count"` } -//Tikv config is the config of tikv sdk +// Tikv config is the config of tikv sdk type Tikv struct { PdAddrs string `cfg:"pd-addrs;required; ;pd address in tidb"` DB DB `cfg:"db"` + GC GC `cfg:"gc"` + Expire Expire `cfg:"expire"` ZT ZT `cfg:"zt"` TikvGC TikvGC `cfg:"tikv-gc"` } +// TikvGC config is the config of implement tikv sdk gcwork type TikvGC struct { Interval time.Duration `cfg:"interval;20m;;gc work tick interval"` LeaderLifeTime time.Duration `cfg:"leader-life-time;30m;;lease flush leader interval"` @@ -42,7 +47,21 @@ type TikvGC struct { Concurrency int `cfg:"concurrency;2;;gc work concurrency"` } -//ZT config is the config of zlist +// GC config is the config of Titan GC work +type GC struct { + Interval time.Duration `cfg:"interval;1s;;gc work tick interval"` + LeaderLifeTime time.Duration `cfg:"leader-life-time;3m;;lease flush leader interval"` + BatchLimit int `cfg:"batch-limit;256;numeric;key count limitation per-transection"` +} + +// Expire config is the config of Titan expire work +type Expire struct { + Interval time.Duration `cfg:"interval;1s;;expire work tick interval"` + LeaderLifeTime time.Duration `cfg:"leader-life-time;3m;;lease flush leader interval"` + BatchLimit int `cfg:"batch-limit;256;numeric;key count limitation per-transection"` +} + +// ZT config is the config of zlist type ZT struct { Wrokers int `cfg:"workers;5;numeric;parallel workers count"` BatchCount int `cfg:"batch;10;numeric;object transfer limitation per-transection"` @@ -50,7 +69,7 @@ type ZT struct { Interval time.Duration `cfg:"interval;1000ms; ;Queue fill interval in milsecond"` } -//Logger config is the config of default zap log +// Logger config is the config of default zap log type Logger struct { Name string `cfg:"name; titan; ; the default logger name"` Path string `cfg:"path; logs/titan; ; the default log path"` @@ -59,7 +78,7 @@ type Logger struct { TimeRotate string `cfg:"time-rotate; 0 0 0 * * *; ; log time rotate pattern(s m h D M W)"` } -//TikvLogger config is the config of tikv log +// TikvLogger config is the config of tikv log type TikvLogger struct { Path string `cfg:"path; logs/tikv;nonempty ; the default log path"` Level string `cfg:"level; info; ; log level(debug, info, warn, error, panic, fatal)"` @@ -67,7 +86,7 @@ type TikvLogger struct { TimeRotate string `cfg:"time-rotate; 0 0 0 * * *; ; log time rotate pattern(s m h D M W)"` } -//Status config is the config of exported server +// Status config is the config of exported server type Status struct { Listen string `cfg:"listen;0.0.0.0:7345;nonempty; listen address of http server"` } diff --git a/conf/mockconfig.go b/conf/mockconfig.go new file mode 100644 index 00000000..8db70426 --- /dev/null +++ b/conf/mockconfig.go @@ -0,0 +1,34 @@ +package conf + +import "time" + +// MockConf init and return titan mock conf +func MockConf() *Titan { + return &Titan{ + Tikv: Tikv{ + PdAddrs: "mocktikv://", + GC: GC{ + Interval: time.Second, + LeaderLifeTime: 3 * time.Minute, + BatchLimit: 256, + }, + Expire: Expire{ + Interval: time.Second, + LeaderLifeTime: 3 * time.Minute, + BatchLimit: 256, + }, + ZT: ZT{ + Wrokers: 5, + BatchCount: 10, + QueueDepth: 100, + Interval: 1000 * time.Millisecond, + }, + TikvGC: TikvGC{ + Interval: 20 * time.Minute, + LeaderLifeTime: 30 * time.Minute, + SafePointLifeTime: 10 * time.Minute, + Concurrency: 2, + }, + }, + } +} diff --git a/conf/titan.toml b/conf/titan.toml index e4fdd350..0e51f64a 100644 --- a/conf/titan.toml +++ b/conf/titan.toml @@ -49,6 +49,44 @@ pd-addrs = "" #meta-slot = 0 +[tikv.gc] + +#type: time.Duration +#description: gc work tick interval +#default: 1s +#interval = "1s" + +#type: time.Duration +#description: lease flush leader interval +#default: 3m +#leader-life-time = "3m0s" + +#type: int +#rules: numeric +#description: key count limitation per-transection +#default: 256 +#batch-limit = 256 + + +[tikv.expire] + +#type: time.Duration +#description: expire work tick interval +#default: 1s +#interval = "1s" + +#type: time.Duration +#description: lease flush leader interval +#default: 3m +#leader-life-time = "3m0s" + +#type: int +#rules: numeric +#description: key count limitation per-transection +#default: 256 +#batch-limit = 256 + + [tikv.zt] #type: int diff --git a/db/db.go b/db/db.go index d72de23d..4de6ee0f 100644 --- a/db/db.go +++ b/db/db.go @@ -110,8 +110,8 @@ func Open(conf *conf.Tikv) (*RedisStore, error) { } rds := &RedisStore{Storage: s, conf: conf} sysdb := rds.DB(sysNamespace, sysDatabaseID) - go StartGC(sysdb) - go StartExpire(sysdb) + go StartGC(sysdb, &conf.GC) + go StartExpire(sysdb, &conf.Expire) go StartZT(sysdb, &conf.ZT) go StartTikvGC(sysdb, &conf.TikvGC) return rds, nil diff --git a/db/db_test.go b/db/db_test.go index 854e944a..59063b89 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -17,12 +17,12 @@ func TestMain(m *testing.M) { if err != nil { panic(err) } - conf := &conf.Tikv{} + mockConf := conf.MockConf() mockDB = &DB{ Namespace: "mockdb-ns", ID: 1, - kv: &RedisStore{Storage: store, conf: conf}, - conf: &conf.DB, + kv: &RedisStore{Storage: store, conf: &mockConf.Tikv}, + conf: &mockConf.Tikv.DB, } os.Exit(m.Run()) diff --git a/db/expire.go b/db/expire.go index a3a44925..8b9ac167 100644 --- a/db/expire.go +++ b/db/expire.go @@ -5,20 +5,15 @@ import ( "context" "time" + "github.com/meitu/titan/conf" "github.com/meitu/titan/db/store" "github.com/meitu/titan/metrics" "go.uber.org/zap" ) -const ( - expireBatchLimit = 256 - expireTick = time.Duration(time.Second) -) - var ( - expireKeyPrefix = []byte("$sys:0:at:") - sysExpireLeader = []byte("$sys:0:EXL:EXLeader") - sysExpireLeaderFlushInterval = 10 * time.Second + expireKeyPrefix = []byte("$sys:0:at:") + sysExpireLeader = []byte("$sys:0:EXL:EXLeader") // $sys:0:at:{ts}:{metaKey} expireTimestampOffset = len(expireKeyPrefix) @@ -84,12 +79,12 @@ func unExpireAt(txn store.Transaction, mkey []byte, expireAt int64) error { } // StartExpire get leader from db -func StartExpire(db *DB) error { - ticker := time.NewTicker(expireTick) +func StartExpire(db *DB, conf *conf.Expire) error { + ticker := time.NewTicker(conf.Interval) defer ticker.Stop() id := UUID() for range ticker.C { - isLeader, err := isLeader(db, sysExpireLeader, id, sysExpireLeaderFlushInterval) + isLeader, err := isLeader(db, sysExpireLeader, id, conf.LeaderLifeTime) if err != nil { zap.L().Error("[Expire] check expire leader failed", zap.Error(err)) continue @@ -98,7 +93,7 @@ func StartExpire(db *DB) error { zap.L().Debug("[Expire] not expire leader") continue } - runExpire(db) + runExpire(db, conf.BatchLimit) } return nil } @@ -121,7 +116,7 @@ func toTikvDataKey(namespace []byte, id DBID, key []byte) []byte { return b } -func runExpire(db *DB) { +func runExpire(db *DB, batchLimit int) { txn, err := db.Begin() if err != nil { zap.L().Error("[Expire] txn begin failed", zap.Error(err)) @@ -133,7 +128,7 @@ func runExpire(db *DB) { txn.Rollback() return } - limit := expireBatchLimit + limit := batchLimit now := time.Now().UnixNano() for iter.Valid() && iter.Key().HasPrefix(expireKeyPrefix) && limit > 0 { key := iter.Key() @@ -190,5 +185,5 @@ func runExpire(db *DB) { txn.Rollback() zap.L().Error("[Expire] commit failed", zap.Error(err)) } - metrics.GetMetrics().ExpireKeysTotal.WithLabelValues("expired").Add(float64(expireBatchLimit - limit)) + metrics.GetMetrics().ExpireKeysTotal.WithLabelValues("expired").Add(float64(batchLimit - limit)) } diff --git a/db/gc.go b/db/gc.go index 723d3cf0..a564b18d 100644 --- a/db/gc.go +++ b/db/gc.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/meitu/titan/conf" "github.com/meitu/titan/db/store" "github.com/meitu/titan/metrics" "go.uber.org/zap" @@ -11,12 +12,6 @@ import ( var ( sysGCLeader = []byte("$sys:0:GCL:GCLeader") - gcInterval = time.Duration(1) //TODO 使用配置 -) - -const ( - sysGCBurst = 256 - sysGCLeaseFlushInterval = 10 * time.Second ) func toTikvGCKey(key []byte) []byte { @@ -57,8 +52,8 @@ func gcGetPrefix(txn store.Transaction) ([]byte, error) { return key[len(gcPrefix):], nil } -func gcDeleteRange(txn store.Transaction, prefix []byte, limit int64) (int64, error) { - count := int64(0) +func gcDeleteRange(txn store.Transaction, prefix []byte, limit int) (int, error) { + var count int itr, err := txn.Iter(prefix, nil) if err != nil { return count, err @@ -91,7 +86,7 @@ func gcComplete(txn store.Transaction, prefix []byte) error { return txn.Delete(toTikvGCKey(prefix)) } -func doGC(db *DB, limit int64) error { +func doGC(db *DB, limit int) error { left := limit for left > 0 { txn, err := db.Begin() @@ -108,8 +103,8 @@ func doGC(db *DB, limit int64) error { zap.L().Debug("[GC] no gc item") return nil } - count := int64(0) - zap.L().Debug("[GC] start to delete prefix", zap.String("prefix", string(prefix)), zap.Int64("limit", limit)) + count := 0 + zap.L().Debug("[GC] start to delete prefix", zap.String("prefix", string(prefix)), zap.Int("limit", limit)) if count, err = gcDeleteRange(txn.t, prefix, limit); err != nil { return err } @@ -140,11 +135,12 @@ func doGC(db *DB, limit int64) error { // StartGC start gc //1.获取leader许可 //2.leader 执行清理任务 -func StartGC(db *DB) { - ticker := time.Tick(gcInterval * time.Second) +func StartGC(db *DB, conf *conf.GC) { + ticker := time.NewTicker(conf.Interval) + defer ticker.Stop() id := UUID() - for range ticker { - isLeader, err := isLeader(db, sysGCLeader, id, sysGCLeaseFlushInterval) + for range ticker.C { + isLeader, err := isLeader(db, sysGCLeader, id, conf.LeaderLifeTime) if err != nil { zap.L().Error("[GC] check GC leader failed", zap.Error(err)) continue @@ -153,7 +149,7 @@ func StartGC(db *DB) { zap.L().Debug("[GC] not GC leader") continue } - if err := doGC(db, sysGCBurst); err != nil { + if err := doGC(db, conf.BatchLimit); err != nil { zap.L().Error("[GC] do GC failed", zap.Error(err)) continue } diff --git a/db/test_test.go b/db/test_test.go index 1940f394..12e6826f 100644 --- a/db/test_test.go +++ b/db/test_test.go @@ -12,6 +12,7 @@ func MockDB() *DB { if err != nil { panic(err) } - redis := &RedisStore{Storage: store, conf: &conf.Tikv{}} + mockConf := conf.MockConf() + redis := &RedisStore{Storage: store, conf: &mockConf.Tikv} return &DB{Namespace: "ns", ID: DBID(1), kv: redis} } diff --git a/db/tikvgc.go b/db/tikvgc.go index c3a71a59..5a24ce2d 100644 --- a/db/tikvgc.go +++ b/db/tikvgc.go @@ -22,10 +22,11 @@ const ( // StartTikvGC start tikv gcwork func StartTikvGC(db *DB, tikvCfg *conf.TikvGC) { - ticker := time.Tick(tikvCfg.Interval) + ticker := time.NewTicker(tikvCfg.Interval) + defer ticker.Stop() uuid := UUID() ctx := context.Background() - for range ticker { + for range ticker.C { isLeader, err := isLeader(db, sysTikvGCLeader, uuid, tikvCfg.LeaderLifeTime) if err != nil { zap.L().Error("[TikvGC] check TikvGC leader failed", zap.Error(err)) diff --git a/db/ztransfer.go b/db/ztransfer.go index cd20d5f5..59147af5 100644 --- a/db/ztransfer.go +++ b/db/ztransfer.go @@ -194,9 +194,10 @@ func StartZT(db *DB, conf *conf.ZT) { // check leader and fill the channel prefix := toZTKey(nil) - tick := time.Tick(conf.Interval) + ticker := time.NewTicker(conf.Interval) + defer ticker.Stop() id := UUID() - for range tick { + for range ticker.C { isLeader, err := isLeader(db, sysZTLeader, id, sysZTLeaderFlushInterval) if err != nil { zap.L().Error("[ZT] check ZT leader failed", @@ -209,7 +210,7 @@ func StartZT(db *DB, conf *conf.ZT) { continue } - if prefix, err = runZT(db, prefix, tick); err != nil { + if prefix, err = runZT(db, prefix, ticker.C); err != nil { zap.L().Error("[ZT] error in run ZT", zap.Int64("dbid", int64(db.ID)), zap.ByteString("prefix", prefix), diff --git a/tools/integration/titan.go b/tools/integration/titan.go index 9b0f93ae..ef052dbd 100644 --- a/tools/integration/titan.go +++ b/tools/integration/titan.go @@ -20,10 +20,7 @@ var ( MaxConnection: 10000, Auth: "", } - tikvConf = conf.Tikv{ - PdAddrs: "mocktikv://", - } - + tikvConf = conf.MockConf().Tikv //ServerAddr default server addr ServerAddr = "127.0.0.1:17369" lis net.Listener