Skip to content
Permalink
Browse files

Merge pull request #62 from meitu/bugfix/gc-iter

Bugfix/gc iter once
  • Loading branch information...
shafreeck committed Apr 9, 2019
2 parents 6e690c9 + cf47d2b commit 2bf1a0277e751b6d838709a33229654e9a90c10b
Showing with 334 additions and 96 deletions.
  1. +5 −1 conf/config.go
  2. +5 −1 conf/mockconfig.go
  3. +24 −0 conf/titan.toml
  4. +7 −3 db/db.go
  5. +14 −4 db/expire.go
  6. +105 −80 db/gc.go
  7. +101 −0 db/gc_test.go
  8. +36 −0 db/store/store.go
  9. +10 −1 db/tikvgc.go
  10. +27 −6 db/ztransfer.go
@@ -41,6 +41,7 @@ type Tikv struct {

// TikvGC config is the config of implement tikv sdk gcwork
type TikvGC struct {
Enable bool `cfg:"enable; true; boolean; true for enabling tikv gc"`
Interval time.Duration `cfg:"interval;20m;;gc work tick interval"`
LeaderLifeTime time.Duration `cfg:"leader-life-time;30m;;lease flush leader interval"`
SafePointLifeTime time.Duration `cfg:"safe-point-life-time;10m;;safe point life time "`
@@ -49,21 +50,24 @@ type TikvGC struct {

// GC config is the config of Titan GC work
type GC struct {
Enable bool `cfg:"enable; true; boolean; true for enabling gc"`
Interval time.Duration `cfg:"interval;1s;;gc work tick interval"`
LeaderLifeTime time.Duration `cfg:"leader-life-time;3m;;lease flush leader interval"`
BatchLimit int `cfg:"batch-limit;256;numeric;key count limitation per-transection"`
}

// Expire config is the config of Titan expire work
type Expire struct {
Enable bool `cfg:"enable; true; boolean; true for enabling expire"`
Interval time.Duration `cfg:"interval;1s;;expire work tick interval"`
LeaderLifeTime time.Duration `cfg:"leader-life-time;3m;;lease flush leader interval"`
BatchLimit int `cfg:"batch-limit;256;numeric;key count limitation per-transection"`
}

// ZT config is the config of zlist
type ZT struct {
Wrokers int `cfg:"workers;5;numeric;parallel workers count"`
Enable bool `cfg:"enable; true; boolean; true for enabling zt"`
Workers int `cfg:"workers;5;numeric;parallel workers count"`
BatchCount int `cfg:"batch;10;numeric;object transfer limitation per-transection"`
QueueDepth int `cfg:"depth;100;numeric;ZT Worker queue depth"`
Interval time.Duration `cfg:"interval;1000ms; ;Queue fill interval in milsecond"`
@@ -8,22 +8,26 @@ func MockConf() *Titan {
Tikv: Tikv{
PdAddrs: "mocktikv://",
GC: GC{
Enable: true,
Interval: time.Second,
LeaderLifeTime: 3 * time.Minute,
BatchLimit: 256,
},
Expire: Expire{
Enable: true,
Interval: time.Second,
LeaderLifeTime: 3 * time.Minute,
BatchLimit: 256,
},
ZT: ZT{
Wrokers: 5,
Enable: true,
Workers: 5,
BatchCount: 10,
QueueDepth: 100,
Interval: 1000 * time.Millisecond,
},
TikvGC: TikvGC{
Enable: true,
Interval: 20 * time.Minute,
LeaderLifeTime: 30 * time.Minute,
SafePointLifeTime: 10 * time.Minute,
@@ -51,6 +51,12 @@ pd-addrs = ""

[tikv.gc]

#type: bool
#rules: boolean
#description: true for enabling gc
#default: true
#enable = true

#type: time.Duration
#description: gc work tick interval
#default: 1s
@@ -70,6 +76,12 @@ pd-addrs = ""

[tikv.expire]

#type: bool
#rules: boolean
#description: true for enabling expire
#default: true
#enable = true

#type: time.Duration
#description: expire work tick interval
#default: 1s
@@ -89,6 +101,12 @@ pd-addrs = ""

[tikv.zt]

#type: bool
#rules: boolean
#description: true for enabling zt
#default: true
#enable = true

#type: int
#rules: numeric
#description: parallel workers count
@@ -115,6 +133,12 @@ pd-addrs = ""

[tikv.tikv-gc]

#type: bool
#rules: boolean
#description: true for enabling tikv gc
#default: true
#enable = true

#type: time.Duration
#description: gc work tick interval
#default: 20m
@@ -304,14 +304,17 @@ func checkLeader(txn store.Transaction, key, id []byte, interval time.Duration)
return false, err
}

zap.L().Debug("no leader now, create new lease",
zap.ByteString("key", key),
zap.ByteString("id", id))
if env := zap.L().Check(zap.DebugLevel, "no leader now, create new lease"); env != nil {
env.Write(zap.ByteString("key", key),
zap.ByteString("id", id),
zap.Duration("interval", interval))
}

if err := flushLease(txn, key, id, interval); err != nil {
zap.L().Error("create lease failed",
zap.ByteString("key", key),
zap.ByteString("id", id),
zap.Duration("interval", interval),
zap.Error(err))
return false, err
}
@@ -327,6 +330,7 @@ func checkLeader(txn store.Transaction, key, id []byte, interval time.Duration)
zap.L().Error("create lease failed",
zap.ByteString("key", key),
zap.ByteString("id", id),
zap.Int64("last_ts", ts),
zap.Error(err))
return false, err
}
@@ -8,6 +8,7 @@ import (
"github.com/meitu/titan/conf"
"github.com/meitu/titan/db/store"
"github.com/meitu/titan/metrics"
"github.com/pingcap/tidb/kv"
"go.uber.org/zap"
)

@@ -84,13 +85,20 @@ func StartExpire(db *DB, conf *conf.Expire) error {
defer ticker.Stop()
id := UUID()
for range ticker.C {
if !conf.Enable {
continue
}
isLeader, err := isLeader(db, sysExpireLeader, id, conf.LeaderLifeTime)
if err != nil {
zap.L().Error("[Expire] check expire leader failed", zap.Error(err))
continue
}
if !isLeader {
zap.L().Debug("[Expire] not expire leader")
if logEnv := zap.L().Check(zap.DebugLevel, "[Expire] not expire leader"); logEnv != nil {
logEnv.Write(zap.ByteString("leader", sysExpireLeader),
zap.ByteString("uuid", id),
zap.Duration("leader-life-time", conf.LeaderLifeTime))
}
continue
}
runExpire(db, conf.BatchLimit)
@@ -122,7 +130,8 @@ func runExpire(db *DB, batchLimit int) {
zap.L().Error("[Expire] txn begin failed", zap.Error(err))
return
}
iter, err := txn.t.Iter(expireKeyPrefix, nil)
endPrefix := kv.Key(expireKeyPrefix).PrefixNext()
iter, err := txn.t.Iter(expireKeyPrefix, endPrefix)
if err != nil {
zap.L().Error("[Expire] seek failed", zap.ByteString("prefix", expireKeyPrefix), zap.Error(err))
txn.Rollback()
@@ -157,8 +166,9 @@ func runExpire(db *DB, batchLimit int) {
return
}
}

zap.L().Debug("[Expire] delete metakey", zap.ByteString("mkey", mkey), zap.String("key", string(rawkey)))
if logEnv := zap.L().Check(zap.DebugLevel, "[Expire] delete metakey"); logEnv != nil {
logEnv.Write(zap.ByteString("mkey", mkey))
}
// Remove from expire list
if err := txn.t.Delete(key); err != nil {
zap.L().Error("[Expire] delete failed",

0 comments on commit 2bf1a02

Please sign in to comment.
You can’t perform that action at this time.