Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
config: adding [tidb] max-allowed-packet config
Browse files Browse the repository at this point in the history
this allows sending rows > 4 MiB when using TiDB backend
  • Loading branch information
kennytm committed Nov 19, 2019
1 parent 0edbc12 commit c093739
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 14 deletions.
8 changes: 4 additions & 4 deletions lightning/common/util.go
Expand Up @@ -46,12 +46,12 @@ const (
defaultMaxRetry = 3
)

func ToDSN(host string, port int, user string, psw string, sqlMode string) string {
return fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8&sql_mode='%s'", user, psw, host, port, sqlMode)
func ToDSN(host string, port int, user string, psw string, sqlMode string, maxAllowedPacket uint64) string {
return fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8&sql_mode='%s'&maxAllowedPacket=%d", user, psw, host, port, sqlMode, maxAllowedPacket)
}

func ConnectDB(host string, port int, user string, psw string, sqlMode string) (*sql.DB, error) {
dbDSN := ToDSN(host, port, user, psw, sqlMode)
func ConnectDB(host string, port int, user string, psw string, sqlMode string, maxAllowedPacket uint64) (*sql.DB, error) {
dbDSN := ToDSN(host, port, user, psw, sqlMode, maxAllowedPacket)
db, err := sql.Open("mysql", dbDSN)
if err != nil {
return nil, errors.Trace(err)
Expand Down
4 changes: 2 additions & 2 deletions lightning/common/util_test.go
Expand Up @@ -121,8 +121,8 @@ func (s *utilSuite) TestIsRetryableError(c *C) {
}

func (s *utilSuite) TestToDSN(c *C) {
dsn := common.ToDSN("127.0.0.1", 4000, "root", "123456", "strict")
c.Assert(dsn, Equals, "root:123456@tcp(127.0.0.1:4000)/?charset=utf8&sql_mode='strict'")
dsn := common.ToDSN("127.0.0.1", 4000, "root", "123456", "strict", 1234)
c.Assert(dsn, Equals, "root:123456@tcp(127.0.0.1:4000)/?charset=utf8&sql_mode='strict'&maxAllowedPacket=1234")
}

func (s *utilSuite) TestIsContextCanceledError(c *C) {
Expand Down
14 changes: 8 additions & 6 deletions lightning/config/config.go
Expand Up @@ -67,7 +67,8 @@ type DBStore struct {
PdAddr string `toml:"pd-addr" json:"pd-addr"`
StrSQLMode string `toml:"sql-mode" json:"sql-mode"`

SQLMode mysql.SQLMode `toml:"-" json:"-"`
SQLMode mysql.SQLMode `toml:"-" json:"-"`
MaxAllowedPacket uint64 `toml:"max-allowed-packet" json:"max-allowed-packet"`

DistSQLScanConcurrency int `toml:"distsql-scan-concurrency" json:"distsql-scan-concurrency"`
BuildStatsConcurrency int `toml:"build-stats-concurrency" json:"build-stats-concurrency"`
Expand Down Expand Up @@ -136,9 +137,9 @@ type MydumperRuntime struct {
}

type TikvImporter struct {
Addr string `toml:"addr" json:"addr"`
Backend string `toml:"backend" json:"backend"`
OnDuplicate string `toml:"on-duplicate" json:"on-duplicate"`
Addr string `toml:"addr" json:"addr"`
Backend string `toml:"backend" json:"backend"`
OnDuplicate string `toml:"on-duplicate" json:"on-duplicate"`
}

type Checkpoint struct {
Expand Down Expand Up @@ -184,6 +185,7 @@ func NewConfig() *Config {
User: "root",
StatusPort: 10080,
StrSQLMode: mysql.DefaultSQLMode,
MaxAllowedPacket: defaultMaxAllowedPacket,
BuildStatsConcurrency: 20,
DistSQLScanConcurrency: 100,
IndexSerialScanConcurrency: 20,
Expand All @@ -201,7 +203,7 @@ func NewConfig() *Config {
},
},
TikvImporter: TikvImporter{
Backend: BackendImporter,
Backend: BackendImporter,
OnDuplicate: ReplaceOnDup,
},
PostRestore: PostRestore{
Expand Down Expand Up @@ -419,7 +421,7 @@ func (cfg *Config) Adjust() error {
if len(cfg.Checkpoint.DSN) == 0 {
switch cfg.Checkpoint.Driver {
case CheckpointDriverMySQL:
cfg.Checkpoint.DSN = common.ToDSN(cfg.TiDB.Host, cfg.TiDB.Port, cfg.TiDB.User, cfg.TiDB.Psw, mysql.DefaultSQLMode)
cfg.Checkpoint.DSN = common.ToDSN(cfg.TiDB.Host, cfg.TiDB.Port, cfg.TiDB.User, cfg.TiDB.Psw, mysql.DefaultSQLMode, defaultMaxAllowedPacket)
case CheckpointDriverFile:
cfg.Checkpoint.DSN = "/tmp/" + cfg.Checkpoint.Schema + ".pb"
}
Expand Down
2 changes: 1 addition & 1 deletion lightning/config/config_test.go
Expand Up @@ -391,7 +391,7 @@ func (s *configTestSuite) TestLoadConfig(c *C) {
taskCfg.Checkpoint.Driver = config.CheckpointDriverMySQL
err = taskCfg.Adjust()
c.Assert(err, IsNil)
c.Assert(taskCfg.Checkpoint.DSN, Equals, "guest:@tcp(172.16.30.11:4001)/?charset=utf8&sql_mode='"+mysql.DefaultSQLMode+"'")
c.Assert(taskCfg.Checkpoint.DSN, Equals, "guest:@tcp(172.16.30.11:4001)/?charset=utf8&sql_mode='"+mysql.DefaultSQLMode+"'&maxAllowedPacket=67108864")

result := taskCfg.String()
c.Assert(result, Matches, `.*"pd-addr":"172.16.30.11:2379,172.16.30.12:2379".*`)
Expand Down
2 changes: 2 additions & 0 deletions lightning/config/const.go
Expand Up @@ -23,4 +23,6 @@ const (
MinRegionSize int64 = 256 * _M

BufferSizeScale = 5

defaultMaxAllowedPacket = 64 * 1024 * 1024
)
2 changes: 1 addition & 1 deletion lightning/restore/tidb.go
Expand Up @@ -44,7 +44,7 @@ type TiDBManager struct {
}

func NewTiDBManager(dsn config.DBStore) (*TiDBManager, error) {
db, err := common.ConnectDB(dsn.Host, dsn.Port, dsn.User, dsn.Psw, dsn.StrSQLMode)
db, err := common.ConnectDB(dsn.Host, dsn.Port, dsn.User, dsn.Psw, dsn.StrSQLMode, dsn.MaxAllowedPacket)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 4 additions & 0 deletions tidb-lightning.toml
Expand Up @@ -133,6 +133,10 @@ pd-addr = "127.0.0.1:2379"
# lightning uses some code of tidb(used as library), and the flag controls it's log level.
log-level = "error"

# sets maximum packet size allowed for SQL connections.
# set this to 0 to automatically fetch the `max_allowed_packet` variable from server on every connection.
# max-allowed-packet = 67_108_864

# set tidb session variables to speed up checksum/analyze table.
# see https://pingcap.com/docs/sql/statistics/#control-analyze-concurrency for the meaning of each setting
build-stats-concurrency = 20
Expand Down

0 comments on commit c093739

Please sign in to comment.