Skip to content

Commit

Permalink
*: Decouple Checkpoint type from dest-type (#786) (#790)
Browse files Browse the repository at this point in the history
* *: Decouple Checkpoint type from dest-type

after this pr:
we have this checkpoint type: mysql/tidb, file, flash
defalut using checkpoint type accroding to dest-type like before
mysql/tidb -> mysql/tidb (the same downstream)
flash -> flash
kafka/file -> file

and we can configure the checkpoint type despite what the dest-type is;
currently only can configure the checkpoint type to be mysql/tidb
  • Loading branch information
sre-bot authored and IANTHEREAL committed Nov 5, 2019
1 parent 8ecb418 commit 5e079f8
Show file tree
Hide file tree
Showing 16 changed files with 105 additions and 161 deletions.
13 changes: 11 additions & 2 deletions cmd/drainer/drainer.toml
Expand Up @@ -84,8 +84,17 @@ password = ""
port = 3306

[syncer.to.checkpoint]
# you can uncomment this to change the database to save checkpoint when the downstream is mysql or tidb
#schema = "tidb_binlog"
# only support mysql or tidb now, you can uncomment this to control where the checkpoint is saved.
# the default way how checkpoint is saved according to db-type is:
# mysql/tidb -> the according downstream mysql/tidb
# file/kafka -> file in `data-dir`
# type = "mysql"
# you can uncomment this to change the database to save checkpoint when the checkpoint type is mysql or tidb
# schema = "tidb_binlog"
# host = "127.0.0.1"
# user = "root"
# password = ""
# port = 3306

# Uncomment this if you want to use file as db-type.
#[syncer.to]
Expand Down
16 changes: 7 additions & 9 deletions drainer/checkpoint/checkpoint.go
Expand Up @@ -41,28 +41,26 @@ type CheckPoint interface {
}

// NewCheckPoint returns a CheckPoint instance by giving name
func NewCheckPoint(name string, cfg *Config) (CheckPoint, error) {
func NewCheckPoint(cfg *Config) (CheckPoint, error) {
var (
cp CheckPoint
err error
)
switch name {
switch cfg.CheckpointType {
case "mysql", "tidb":
cp, err = newMysql(name, cfg)
cp, err = newMysql(cfg)
case "file":
cp, err = NewPb(cfg)
case "kafka":
cp, err = newKafka(cfg)
cp, err = NewFile(cfg)
case "flash":
cp, err = newFlash(cfg)
default:
err = errors.Errorf("unsupported checkpoint type %s", name)
err = errors.Errorf("unsupported checkpoint type %s", cfg.CheckpointType)
}
if err != nil {
return nil, errors.Annotatef(err, "initialize %s type checkpoint with config %+v", name, cfg)
return nil, errors.Annotatef(err, "initialize %s type checkpoint with config %+v", cfg.CheckpointType, cfg)
}

log.Info("initialize checkpoint", zap.String("name", name), zap.Int64("checkpoint", cp.TS()), zap.Reflect("cfg", cfg))
log.Info("initialize checkpoint", zap.String("type", cfg.CheckpointType), zap.Int64("checkpoint", cp.TS()), zap.Reflect("cfg", cfg))

return cp, nil
}
18 changes: 9 additions & 9 deletions drainer/checkpoint/pb.go → drainer/checkpoint/file.go
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/siddontang/go/ioutil2"
)

// PbCheckPoint is local CheckPoint struct.
type PbCheckPoint struct {
// FileCheckPoint is local CheckPoint struct.
type FileCheckPoint struct {
sync.RWMutex
closed bool
initialCommitTS int64
Expand All @@ -34,9 +34,9 @@ type PbCheckPoint struct {
CommitTS int64 `toml:"commitTS" json:"commitTS"`
}

// NewPb creates a new Pb.
func NewPb(cfg *Config) (CheckPoint, error) {
pb := &PbCheckPoint{
// NewFile creates a new FileCheckpoint.
func NewFile(cfg *Config) (CheckPoint, error) {
pb := &FileCheckPoint{
initialCommitTS: cfg.InitialCommitTS,
name: cfg.CheckPointFile,
}
Expand All @@ -49,7 +49,7 @@ func NewPb(cfg *Config) (CheckPoint, error) {
}

// Load implements CheckPointor.Load interface.
func (sp *PbCheckPoint) Load() error {
func (sp *FileCheckPoint) Load() error {
sp.Lock()
defer sp.Unlock()

Expand Down Expand Up @@ -81,7 +81,7 @@ func (sp *PbCheckPoint) Load() error {
}

// Save implements CheckPoint.Save interface
func (sp *PbCheckPoint) Save(ts, slaveTS int64) error {
func (sp *FileCheckPoint) Save(ts, slaveTS int64) error {
sp.Lock()
defer sp.Unlock()

Expand All @@ -107,15 +107,15 @@ func (sp *PbCheckPoint) Save(ts, slaveTS int64) error {
}

// TS implements CheckPoint.TS interface
func (sp *PbCheckPoint) TS() int64 {
func (sp *FileCheckPoint) TS() int64 {
sp.RLock()
defer sp.RUnlock()

return sp.CommitTS
}

// Close implements CheckPoint.Close interface
func (sp *PbCheckPoint) Close() error {
func (sp *FileCheckPoint) Close() error {
sp.Lock()
defer sp.Unlock()

Expand Down
Expand Up @@ -20,12 +20,12 @@ import (
"github.com/pingcap/errors"
)

func (t *testCheckPointSuite) TestPb(c *C) {
func (t *testCheckPointSuite) TestFile(c *C) {
fileName := "/tmp/test"
notExistFileName := "test_not_exist"
cfg := new(Config)
cfg.CheckPointFile = fileName
meta, err := NewPb(cfg)
meta, err := NewFile(cfg)
c.Assert(err, IsNil)
defer os.RemoveAll(fileName)

Expand All @@ -48,15 +48,15 @@ func (t *testCheckPointSuite) TestPb(c *C) {

// check not exist meta file
cfg.CheckPointFile = notExistFileName
meta, err = NewPb(cfg)
meta, err = NewFile(cfg)
c.Assert(err, IsNil)
err = meta.Load()
c.Assert(err, IsNil)
c.Assert(meta.TS(), Equals, int64(0))

// check not exist meta file, but with initialCommitTs
cfg.InitialCommitTS = 123
meta, err = NewPb(cfg)
meta, err = NewFile(cfg)
c.Assert(err, IsNil)
c.Assert(meta.TS(), Equals, cfg.InitialCommitTS)

Expand Down
58 changes: 0 additions & 58 deletions drainer/checkpoint/kafka.go

This file was deleted.

44 changes: 0 additions & 44 deletions drainer/checkpoint/kafka_test.go

This file was deleted.

9 changes: 2 additions & 7 deletions drainer/checkpoint/mysql.go
Expand Up @@ -34,19 +34,15 @@ type MysqlCheckPoint struct {
db *sql.DB
schema string
table string
// type, tidb or mysql
tp string

CommitTS int64 `toml:"commitTS" json:"commitTS"`
TsMap map[string]int64 `toml:"ts-map" json:"ts-map"`
}

var sqlOpenDB = pkgsql.OpenDB

func newMysql(tp string, cfg *Config) (CheckPoint, error) {
if err := checkConfig(cfg); err != nil {
return nil, errors.Annotate(err, "check config failed")
}
func newMysql(cfg *Config) (CheckPoint, error) {
setDefaultConfig(cfg)

db, err := sqlOpenDB("mysql", cfg.Db.Host, cfg.Db.Port, cfg.Db.User, cfg.Db.Password)
if err != nil {
Expand All @@ -59,7 +55,6 @@ func newMysql(tp string, cfg *Config) (CheckPoint, error) {
initialCommitTS: cfg.InitialCommitTS,
schema: cfg.Schema,
table: cfg.Table,
tp: tp,
TsMap: make(map[string]int64),
}

Expand Down
9 changes: 4 additions & 5 deletions drainer/checkpoint/mysql_test.go
Expand Up @@ -48,7 +48,7 @@ func (s *saveSuite) TestShouldSaveCheckpoint(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
mock.ExpectExec("replace into db.tbl.*").WillReturnResult(sqlmock.NewResult(0, 0))
cp := MysqlCheckPoint{db: db, schema: "db", table: "tbl", tp: "other"}
cp := MysqlCheckPoint{db: db, schema: "db", table: "tbl"}
err = cp.Save(1111, 0)
c.Assert(err, IsNil)
}
Expand All @@ -61,7 +61,6 @@ func (s *saveSuite) TestShouldUpdateTsMap(c *C) {
db: db,
schema: "db",
table: "tbl",
tp: "tidb",
TsMap: make(map[string]int64),
}
err = cp.Save(65536, 3333)
Expand Down Expand Up @@ -122,7 +121,7 @@ func (s *newMysqlSuite) TestCannotOpenDB(c *C) {
return nil, errors.New("no db")
}

_, err := newMysql("tidb", &Config{})
_, err := newMysql(&Config{})
c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, ".*no db.*")
}
Expand All @@ -138,14 +137,14 @@ func (s *newMysqlSuite) TestCreationErrors(c *C) {
}

mock.ExpectExec("create schema.*").WillReturnError(errors.New("fail schema"))
_, err = newMysql("tidb", &Config{})
_, err = newMysql(&Config{})
c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, ".*fail schema.*")

mock.ExpectExec("create schema.*").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec("create table.*").WillReturnError(errors.New("fail table"))

_, err = newMysql("tidb", &Config{})
_, err = newMysql(&Config{})
c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, ".*fail table.*")
}
9 changes: 3 additions & 6 deletions drainer/checkpoint/util.go
Expand Up @@ -30,6 +30,8 @@ type DBConfig struct {

// Config is the savepoint configuration
type Config struct {
CheckpointType string

Db *DBConfig
Schema string
Table string
Expand All @@ -39,10 +41,7 @@ type Config struct {
CheckPointFile string `toml:"dir" json:"dir"`
}

func checkConfig(cfg *Config) error {
if cfg == nil {
cfg = new(Config)
}
func setDefaultConfig(cfg *Config) {
if cfg.Db == nil {
cfg.Db = new(DBConfig)
}
Expand All @@ -61,8 +60,6 @@ func checkConfig(cfg *Config) error {
if cfg.Table == "" {
cfg.Table = "checkpoint"
}

return nil
}

func genCreateSchema(sp *MysqlCheckPoint) string {
Expand Down
8 changes: 6 additions & 2 deletions drainer/server.go
Expand Up @@ -119,8 +119,12 @@ func NewServer(cfg *Config) (*Server, error) {
cfg.SyncerCfg.To.ClusterID = clusterID
pdCli.Close()

cpCfg := GenCheckPointCfg(cfg, clusterID)
cp, err := checkpoint.NewCheckPoint(cfg.SyncerCfg.DestDBType, cpCfg)
cpCfg, err := GenCheckPointCfg(cfg, clusterID)
if err != nil {
return nil, errors.Trace(err)
}

cp, err := checkpoint.NewCheckPoint(cpCfg)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion drainer/server_test.go
Expand Up @@ -348,6 +348,6 @@ func (s *newServerSuite) TestInvalidDestDBType(c *C) {
cfg.SyncerCfg.DestDBType = "nothing"
cfg.adjustConfig()
_, err := NewServer(cfg)
c.Assert(err, ErrorMatches, ".*unsupported checkpoint type.*")
c.Assert(err, ErrorMatches, ".*unknown DestDBType.*")
c.Assert(cfg.SyncerCfg.To.ClusterID, Equals, uint64(8012))
}
7 changes: 6 additions & 1 deletion drainer/sync/util.go
Expand Up @@ -40,7 +40,12 @@ type DBConfig struct {

// CheckpointConfig is the Checkpoint configuration.
type CheckpointConfig struct {
Schema string `toml:"schema" json:"schema"`
Type string `toml:"type" json:"type"`
Schema string `toml:"schema" json:"schema"`
Host string `toml:"host" json:"host"`
User string `toml:"user" json:"user"`
Password string `toml:"password" json:"password"`
Port int `toml:"port" json:"port"`
}

type baseError struct {
Expand Down

0 comments on commit 5e079f8

Please sign in to comment.