Skip to content

Commit

Permalink
bench ok
Browse files Browse the repository at this point in the history
  • Loading branch information
yedf2 committed Oct 7, 2021
1 parent 8c2e388 commit a78f354
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 28 deletions.
43 changes: 22 additions & 21 deletions bench/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/yedf/dtm/common"
"github.com/yedf/dtm/dtmcli"
"github.com/yedf/dtm/dtmsvr"
"github.com/yedf/dtm/examples"
)

Expand All @@ -36,36 +37,20 @@ func txGet() *sql.Tx {
}

func reloadData() {
time.Sleep(dtmsvr.UpdateBranchAsyncInterval * 2)
began := time.Now()
db := sdbGet()
_, err := dtmcli.DBExec(db, "drop table if exists dtm_busi.user_account_log")
dtmcli.FatalIfError(err)
_, err = dtmcli.DBExec(db, `create table if not exists dtm_busi.user_account_log (
id INT(11) AUTO_INCREMENT PRIMARY KEY,
user_id INT(11) NOT NULL,
delta DECIMAL(11, 2) not null,
gid varchar(45) not null,
branch_id varchar(45) not null,
branch_type varchar(45) not null,
reason varchar(45),
create_time datetime not null default now(),
update_time datetime not null default now(),
key(user_id),
key(create_time)
)
`)
dtmcli.FatalIfError(err)
tables := []string{"dtm_busi.user_account", "dtm_busi.user_account_log", "dtm.trans_global", "dtm.trans_branch", "dtm_barrier.barrier"}
for _, t := range tables {
_, err = dtmcli.DBExec(db, fmt.Sprintf("truncate %s", t))
_, err := dtmcli.DBExec(db, fmt.Sprintf("truncate %s", t))
dtmcli.FatalIfError(err)
}
s := "insert ignore into dtm_busi.user_account(user_id, balance) values "
ss := []string{}
for i := 1; i <= total; i++ {
ss = append(ss, fmt.Sprintf("(%d, 1000000)", i))
}
_, err = db.Exec(s + strings.Join(ss, ","))
_, err := db.Exec(s + strings.Join(ss, ","))
dtmcli.FatalIfError(err)
dtmcli.Logf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds())
}
Expand All @@ -80,8 +65,24 @@ func StartSvr() {
benchAddRoute(app)
dtmcli.Logf("bench listening at %d", benchPort)
go app.Run(fmt.Sprintf(":%d", benchPort))
reloadData()
time.Sleep(1100 * time.Millisecond) // sleep 1 second for async branch status update to finish
db := sdbGet()
_, err := dtmcli.DBExec(db, "drop table if exists dtm_busi.user_account_log")
dtmcli.FatalIfError(err)
_, err = dtmcli.DBExec(db, `create table if not exists dtm_busi.user_account_log (
id INT(11) AUTO_INCREMENT PRIMARY KEY,
user_id INT(11) NOT NULL,
delta DECIMAL(11, 2) not null,
gid varchar(45) not null,
branch_id varchar(45) not null,
branch_type varchar(45) not null,
reason varchar(45),
create_time datetime not null default now(),
update_time datetime not null default now(),
key(user_id),
key(create_time)
)
`)
dtmcli.FatalIfError(err)
}

func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) {
Expand Down
9 changes: 5 additions & 4 deletions dtmsvr/dtmsvr.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,24 @@ func PopulateDB(skipDrop bool) {
examples.RunSQLScript(config.DB, file, skipDrop)
}

// UpdateBranchAsyncInterval unit millisecond
var UpdateBranchAsyncInterval time.Duration = 1000
// UpdateBranchAsyncInterval interval to flush branch
var UpdateBranchAsyncInterval = 200 * time.Millisecond
var updateBranchAsyncChan chan branchStatus = make(chan branchStatus, 1000)

func updateBranchAsync() {
for { // flush branches every second
updates := []TransBranch{}
started := time.Now()
for time.Since(started) < UpdateBranchAsyncInterval*time.Millisecond {
checkInterval := 20 * time.Millisecond
for time.Since(started) < UpdateBranchAsyncInterval-checkInterval && len(updates) < 20 {
select {
case updateBranch := <-updateBranchAsyncChan:
updates = append(updates, TransBranch{
ModelBase: common.ModelBase{ID: updateBranch.id},
Status: updateBranch.status,
FinishTime: updateBranch.finish_time,
})
case <-time.After(50 * time.Millisecond):
case <-time.After(checkInterval):
}
}
for len(updates) > 0 {
Expand Down
4 changes: 1 addition & 3 deletions test/dtmsvr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,13 @@ func TestSqlDB(t *testing.T) {

func TestUpdateBranchAsync(t *testing.T) {
common.DtmConfig.UpdateBranchSync = 0
dtmsvr.UpdateBranchAsyncInterval = 50
saga := genSaga("gid-update-branch-async", false, false)
saga.WaitResult = true
err := saga.Submit()
assert.Nil(t, err)
WaitTransProcessed(saga.Gid)
time.Sleep(100 * time.Millisecond)
time.Sleep(dtmsvr.UpdateBranchAsyncInterval)
assert.Equal(t, []string{dtmcli.StatusPrepared, dtmcli.StatusSucceed, dtmcli.StatusPrepared, dtmcli.StatusSucceed}, getBranchesStatus(saga.Gid))
assert.Equal(t, dtmcli.StatusSucceed, getTransStatus(saga.Gid))
common.DtmConfig.UpdateBranchSync = 1
dtmsvr.UpdateBranchAsyncInterval = 1000
}

0 comments on commit a78f354

Please sign in to comment.