From a78f354ffc8a38365b80663ca249bb7a5185f85a Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Thu, 7 Oct 2021 19:34:41 +0800 Subject: [PATCH] bench ok --- bench/http.go | 43 ++++++++++++++++++++++--------------------- dtmsvr/dtmsvr.go | 9 +++++---- test/dtmsvr_test.go | 4 +--- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/bench/http.go b/bench/http.go index 78dd464cb..ee33f1ade 100644 --- a/bench/http.go +++ b/bench/http.go @@ -10,6 +10,7 @@ import ( "github.com/gin-gonic/gin" "github.com/yedf/dtm/common" "github.com/yedf/dtm/dtmcli" + "github.com/yedf/dtm/dtmsvr" "github.com/yedf/dtm/examples" ) @@ -36,28 +37,12 @@ func txGet() *sql.Tx { } func reloadData() { + time.Sleep(dtmsvr.UpdateBranchAsyncInterval * 2) began := time.Now() db := sdbGet() - _, err := dtmcli.DBExec(db, "drop table if exists dtm_busi.user_account_log") - dtmcli.FatalIfError(err) - _, err = dtmcli.DBExec(db, `create table if not exists dtm_busi.user_account_log ( - id INT(11) AUTO_INCREMENT PRIMARY KEY, - user_id INT(11) NOT NULL, - delta DECIMAL(11, 2) not null, - gid varchar(45) not null, - branch_id varchar(45) not null, - branch_type varchar(45) not null, - reason varchar(45), - create_time datetime not null default now(), - update_time datetime not null default now(), - key(user_id), - key(create_time) -) -`) - dtmcli.FatalIfError(err) tables := []string{"dtm_busi.user_account", "dtm_busi.user_account_log", "dtm.trans_global", "dtm.trans_branch", "dtm_barrier.barrier"} for _, t := range tables { - _, err = dtmcli.DBExec(db, fmt.Sprintf("truncate %s", t)) + _, err := dtmcli.DBExec(db, fmt.Sprintf("truncate %s", t)) dtmcli.FatalIfError(err) } s := "insert ignore into dtm_busi.user_account(user_id, balance) values " @@ -65,7 +50,7 @@ func reloadData() { for i := 1; i <= total; i++ { ss = append(ss, fmt.Sprintf("(%d, 1000000)", i)) } - _, err = db.Exec(s + strings.Join(ss, ",")) + _, err := db.Exec(s + strings.Join(ss, ",")) dtmcli.FatalIfError(err) dtmcli.Logf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds()) } @@ -80,8 +65,24 @@ func StartSvr() { benchAddRoute(app) dtmcli.Logf("bench listening at %d", benchPort) go app.Run(fmt.Sprintf(":%d", benchPort)) - reloadData() - time.Sleep(1100 * time.Millisecond) // sleep 1 second for async branch status update to finish + db := sdbGet() + _, err := dtmcli.DBExec(db, "drop table if exists dtm_busi.user_account_log") + dtmcli.FatalIfError(err) + _, err = dtmcli.DBExec(db, `create table if not exists dtm_busi.user_account_log ( + id INT(11) AUTO_INCREMENT PRIMARY KEY, + user_id INT(11) NOT NULL, + delta DECIMAL(11, 2) not null, + gid varchar(45) not null, + branch_id varchar(45) not null, + branch_type varchar(45) not null, + reason varchar(45), + create_time datetime not null default now(), + update_time datetime not null default now(), + key(user_id), + key(create_time) +) +`) + dtmcli.FatalIfError(err) } func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) { diff --git a/dtmsvr/dtmsvr.go b/dtmsvr/dtmsvr.go index ff1541462..f05fe6ee0 100644 --- a/dtmsvr/dtmsvr.go +++ b/dtmsvr/dtmsvr.go @@ -55,15 +55,16 @@ func PopulateDB(skipDrop bool) { examples.RunSQLScript(config.DB, file, skipDrop) } -// UpdateBranchAsyncInterval unit millisecond -var UpdateBranchAsyncInterval time.Duration = 1000 +// UpdateBranchAsyncInterval interval to flush branch +var UpdateBranchAsyncInterval = 200 * time.Millisecond var updateBranchAsyncChan chan branchStatus = make(chan branchStatus, 1000) func updateBranchAsync() { for { // flush branches every second updates := []TransBranch{} started := time.Now() - for time.Since(started) < UpdateBranchAsyncInterval*time.Millisecond { + checkInterval := 20 * time.Millisecond + for time.Since(started) < UpdateBranchAsyncInterval-checkInterval && len(updates) < 20 { select { case updateBranch := <-updateBranchAsyncChan: updates = append(updates, TransBranch{ @@ -71,7 +72,7 @@ func updateBranchAsync() { Status: updateBranch.status, FinishTime: updateBranch.finish_time, }) - case <-time.After(50 * time.Millisecond): + case <-time.After(checkInterval): } } for len(updates) > 0 { diff --git a/test/dtmsvr_test.go b/test/dtmsvr_test.go index 4eeb154c5..d2fa6cda7 100644 --- a/test/dtmsvr_test.go +++ b/test/dtmsvr_test.go @@ -118,15 +118,13 @@ func TestSqlDB(t *testing.T) { func TestUpdateBranchAsync(t *testing.T) { common.DtmConfig.UpdateBranchSync = 0 - dtmsvr.UpdateBranchAsyncInterval = 50 saga := genSaga("gid-update-branch-async", false, false) saga.WaitResult = true err := saga.Submit() assert.Nil(t, err) WaitTransProcessed(saga.Gid) - time.Sleep(100 * time.Millisecond) + time.Sleep(dtmsvr.UpdateBranchAsyncInterval) assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid)) assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid)) common.DtmConfig.UpdateBranchSync = 1 - dtmsvr.UpdateBranchAsyncInterval = 1000 }