Skip to content

Commit

Permalink
Support dts to replica databases/groups (#834)
Browse files Browse the repository at this point in the history
* Support dts to replica databases/groups

* feat: check dts status whether the replication task is finished
  • Loading branch information
csynineyang committed May 12, 2024
1 parent 325824e commit 1ea2d2d
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 2 deletions.
6 changes: 6 additions & 0 deletions conf/bootstrap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ registry:
options:
endpoints: "http://etcd:2379"

dts:
enable: false
name: dtle
options:
endpoints: "http://dtle:4646"

# name: nacos
# options:
# endpoints: "127.0.0.1:8848"
Expand Down
3 changes: 3 additions & 0 deletions pkg/admin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ type configWriter interface {
// UpsertCluster upserts a cluster into an existing tenant.
UpsertCluster(ctx context.Context, tenant, cluster string, body *ClusterDTO) error

// ExtendCluster extends a cluster in an existing tenant.
ExtendCluster(ctx context.Context, tenant, cluster string, body *ClusterDTO) error

// RemoveCluster removes a cluster from an existing tenant.
RemoveCluster(ctx context.Context, tenant, cluster string) error

Expand Down
250 changes: 250 additions & 0 deletions pkg/admin/config_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@
package admin

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
)

import (
Expand Down Expand Up @@ -336,6 +341,251 @@ func (cs *MyConfigService) UpsertCluster(ctx context.Context, tenant, cluster st
return nil
}

func (cs *MyConfigService) buildDTSJson(ctx context.Context, tenant, cluster, src, dst string, srcNode, dstNode *NodeDTO, vtables []*TableDTO, idx int) map[string]interface{} {
jobJson := make(map[string]interface{})
jobBody := make(map[string]interface{})
jobJson["Job"] = jobBody

jobId := tenant + "-" + cluster + "-" + src + "-" + dst + "-" + time.Now().Format("20060102150405")
jobBody["ID"] = jobId
jobBody["Datacenters"] = []string{"dc1"}
jobGroups := make([]interface{}, 0, 2)

jobSrc := make(map[string]interface{})
jobSrc["Name"] = "src"

jobTasks := make([]interface{}, 0, 1)
jobTask := make(map[string]interface{})
jobTask["Name"] = "src"
jobTask["Driver"] = "dtle"
jobConfig := make(map[string]interface{})
jobConfig["Gtid"] = ""
jobReplicate := make([]interface{}, 0, 1)
jobDatabase := make(map[string]interface{})
jobDatabase["TableSchema"] = src
jobDatabase["TableSchemaRename"] = dst
jobTables := []map[string]string{}
for i := range vtables {
vTable := vtables[i]
_, _, dbEnd, _ := config.ParseTopology(vTable.Topology.DbPattern)
tbFormat, _, tbEnd, _ := config.ParseTopology(vTable.Topology.TblPattern)
tableNum := int((tbEnd + 1) / (dbEnd + 1))
for j := 0; j < tableNum; j++ {
jobTable := map[string]string{}
jobTable["TableName"] = fmt.Sprintf(tbFormat, idx*tableNum+j)
jobTable["TableRename"] = fmt.Sprintf(tbFormat, idx*tableNum+j+tbEnd+1)
jobTables = append(jobTables, jobTable)
}
}
jobDatabase["Tables"] = jobTables

jobReplicate = append(jobReplicate, jobDatabase)
jobConfig["ReplicateDoDb"] = jobReplicate
jobSrcConfig := make(map[string]interface{})
jobSrcConfig["Host"] = srcNode.Host
jobSrcConfig["Port"] = srcNode.Port
jobSrcConfig["User"] = srcNode.Username
jobSrcConfig["Password"] = srcNode.Password
jobConfig["SrcConnectionConfig"] = jobSrcConfig
jobDstConfig := make(map[string]interface{})
jobDstConfig["Host"] = dstNode.Host
jobDstConfig["Port"] = dstNode.Port
jobDstConfig["User"] = dstNode.Username
jobDstConfig["Password"] = dstNode.Password
jobConfig["DestConnectionConfig"] = jobDstConfig
jobTask["Config"] = jobConfig
jobSrc["Tasks"] = append(jobTasks, jobTask)
jobGroups = append(jobGroups, jobSrc)

jobDst := make(map[string]interface{})
jobDst["Name"] = "dest"
jobTasks = make([]interface{}, 0, 1)
jobTask = make(map[string]interface{})
jobTask["Name"] = "dest"
jobTask["Driver"] = "dtle"
jobTask["Config"] = map[string]string{"DestType": "mysql"}
jobDst["Tasks"] = append(jobTasks, jobTask)
jobGroups = append(jobGroups, jobDst)
jobBody["TaskGroups"] = jobGroups

return jobJson
}

func (cs *MyConfigService) ExtendCluster(ctx context.Context, tenant, cluster string, body *ClusterDTO) error {
//1、校验node和group,保证node和group翻倍(缩容将node和group减半,流程同理)
groups, err := cs.ListDBGroups(ctx, tenant, cluster)
if err != nil {
return err
}
if len(groups) != len(body.Groups) {
return perrors.Errorf("new groups is not equle to old groups")
}
vtables, err := cs.ListTables(ctx, tenant, cluster)
if err != nil {
return err
}
allNodes, err := cs.ListNodes(ctx, tenant)
if err != nil {
return err
}

//2、创建复制group(物理数据库)的dts任务
//groups[0] --> body.Groups[0]
//groups[1] --> body.Groups[1]
//...
httpClient := &http.Client{}
dtsJobList := make([]map[string]interface{}, 0, len(groups))
dtsEndpoint := config.BootOpts.Dts.Options["endpoints"].(string)
for i := range groups {
srcGroup := groups[i].Name
var srcNode, dstNode *NodeDTO
for n := range allNodes {
if allNodes[n].Database == srcGroup {
srcNode = allNodes[n]
}
}
dstGroup := body.Groups[i]
for n := range allNodes {
if allNodes[n].Database == dstGroup {
dstNode = allNodes[n]
}
}
dtsJob := cs.buildDTSJson(ctx, tenant, cluster, srcGroup, dstGroup, srcNode, dstNode, vtables, i)
if dtsJob == nil {
return perrors.Errorf("failed to build DTS json parameter")
}
dtsJobList = append(dtsJobList, dtsJob)
dtsJobJson, _ := json.Marshal(dtsJob)
httpReq, err := http.NewRequest("POST", dtsEndpoint+"/v1/jobs", bytes.NewBuffer(dtsJobJson))
if err != nil {
return perrors.Errorf("failed to create POST http requst")
}

httpResp, err := httpClient.Do(httpReq)
if err != nil {
return perrors.Errorf("failed to start to replica source group")
}
httpResp.Body.Close()
}

//3、检查是否复制完毕
for {
time.Sleep(5 * time.Second)
finished := false

for i := range dtsJobList {
dtsJob := dtsJobList[i]["Job"].(map[string]interface{})
httpURL := dtsEndpoint + "/v1/job/" + dtsJob["ID"].(string)
httpReq, err := http.NewRequest("GET", httpURL, nil)
if err != nil {
return perrors.Errorf("failed to create GET http requst")
}

httpResp, err := httpClient.Do(httpReq)
if err != nil {
return perrors.Errorf("failed to check replica source group")
}

//TODO: check Status
finished = true
httpResp.Body.Close()
}

if finished {
break
}
}

//4、断开并拒绝所有客户端连接

//5、再次检查是否复制完毕

//6、停止dts任务
for i := range groups {
dtsJob := dtsJobList[i]["Job"].(map[string]interface{})
httpURL := dtsEndpoint + "/v1/job/" + dtsJob["ID"].(string)
httpReq, err := http.NewRequest("DELETE", httpURL, nil)
if err != nil {
return perrors.Errorf("failed to create DELETE http requst")
}

httpResp, err := httpClient.Do(httpReq)
if err != nil {
return perrors.Errorf("failed to stop to replica source group")
}
httpResp.Body.Close()
}

//7、更新groups节点
var groupBody GroupDTO
var groupNode string
for i := range body.Groups {
groupNode = ""
for n := range allNodes {
if allNodes[n].Database == body.Groups[i] {
groupNode = allNodes[n].Name
}
}
if strings.Compare(groupNode, "") == 0 {
continue
}
groupBody.ClusterName = cluster
groupBody.Name = body.Groups[i]
groupBody.Nodes = []string{groupNode}
err = cs.UpsertGroup(ctx, tenant, cluster, groupBody.Name, &groupBody)
if err != nil {
return err
}
}

//8、更新sharding路由
var tableBody TableDTO
for i := range vtables {
vTable := vtables[i]
_, _, dbEnd, err := config.ParseTopology(vTable.Topology.DbPattern)
if err != nil {
return err
}
_, _, tbEnd, err := config.ParseTopology(vTable.Topology.TblPattern)
if err != nil {
return err
}
dbTotal := 2 * (dbEnd + 1)
tableTotal := 2 * (tbEnd + 1)

tableBody.Name = vTable.Name
tableBody.Sequence = vTable.Sequence
tableBody.DbRules = []*config.Rule{
{
Columns: vTable.DbRules[0].Columns,
Type: vTable.DbRules[0].Type,
Expr: "$0 % " + fmt.Sprintf("%d", tableTotal) + " / " + fmt.Sprintf("%d", dbTotal),
},
}
tableBody.TblRules = []*config.Rule{
{
Columns: vTable.TblRules[0].Columns,
Type: vTable.TblRules[0].Type,
Expr: "$0 % " + fmt.Sprintf("%d", tableTotal),
},
}
tableBody.Topology = &config.Topology{
DbPattern: cluster + fmt.Sprintf("_${0000..%04d}", dbTotal-1),
TblPattern: vTable.Name + fmt.Sprintf("_${0000..%04d}", tableTotal-1),
}
tableBody.ShadowTopology = vTable.ShadowTopology
tableBody.Attributes = vTable.Attributes
err = cs.UpsertTable(ctx, tenant, cluster, tableBody.Name, &tableBody)
if err != nil {
return err
}
}

//9、接受客户端连接

return nil
}

func (cs *MyConfigService) RemoveCluster(ctx context.Context, tenant, cluster string) error {
op, err := cs.getCenter(ctx, tenant)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions pkg/admin/router/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func init() {
router.POST("/tenants/:tenant/clusters", CreateCluster)
router.GET("/tenants/:tenant/clusters/:cluster", GetCluster)
router.PUT("/tenants/:tenant/clusters/:cluster", UpdateCluster)
router.POST("/tenants/:tenant/clusters/:cluster", ExtendCluster)
router.DELETE("/tenants/:tenant/clusters/:cluster", RemoveCluster)
})
}
Expand Down Expand Up @@ -107,6 +108,23 @@ func UpdateCluster(c *gin.Context) error {
return nil
}

func ExtendCluster(c *gin.Context) error {
service := admin.GetService(c)
tenant := c.Param("tenant")
cluster := c.Param("cluster")
var clusterBody admin.ClusterDTO
if err := c.ShouldBindJSON(&clusterBody); err != nil {
return exception.Wrap(exception.CodeInvalidParams, err)
}

err := service.ExtendCluster(c, tenant, cluster, &clusterBody)
if err != nil {
return err
}
c.JSON(http.StatusOK, "success")
return nil
}

func RemoveCluster(c *gin.Context) error {
service := admin.GetService(c)
tenant := c.Param("tenant")
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/arana-db/arana/pkg/util/log"
)

var BootOpts *BootOptions

// LoadBootOptions loads BootOptions from specified file path.
func LoadBootOptions(path string) (*BootOptions, error) {
content, err := os.ReadFile(path)
Expand All @@ -59,6 +61,7 @@ func LoadBootOptions(path string) (*BootOptions, error) {
return nil, errors.Wrap(err, "failed to validate boot config")
}

BootOpts = &cfg
log.Init(cfg.Logging)
return &cfg, nil
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/config/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,18 @@ type (
Options map[string]interface{} `yaml:"options" json:"options"`
}

Dts struct {
Enable bool `yaml:"enable" json:"enable"`
Name string `yaml:"name" json:"name"`
Options map[string]interface{} `yaml:"options" json:"options"`
}

BootOptions struct {
Spec `yaml:",inline"`
Config *Options `yaml:"config" json:"config"`
Listeners []*Listener `validate:"required,dive" yaml:"listeners" json:"listeners"`
Registry *Registry `yaml:"registry" json:"registry"`
Dts *Dts `yaml:"dts" json:"dts"`
Trace *Trace `yaml:"trace" json:"trace"`
Supervisor *User `validate:"required,dive" yaml:"supervisor" json:"supervisor"`
Logging *log.Config `validate:"required,dive" yaml:"logging" json:"logging"`
Expand Down
5 changes: 3 additions & 2 deletions pkg/runtime/optimize/ddl/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,15 @@ func drdsCreateTable(ctx context.Context, o *optimize.Optimizer) (*rule.VTable,
{
Columns: []*config.ColumnRule{{Name: dbColume}},
Expr: strings.ReplaceAll(dbColume, dbColume, "$0") + " % " +
fmt.Sprintf("%d", stmt.Partition.Num),
fmt.Sprintf("%d", stmt.Partition.Num*stmt.Partition.Sub.Num) + " / " +
fmt.Sprintf("%d", stmt.Partition.Sub.Num),
},
},
TblRules: []*config.Rule{
{
Columns: []*config.ColumnRule{{Name: tbColume}},
Expr: strings.ReplaceAll(tbColume, tbColume, "$0") + " % " +
fmt.Sprintf("%d", stmt.Partition.Sub.Num),
fmt.Sprintf("%d", stmt.Partition.Num*stmt.Partition.Sub.Num),
},
},
Topology: &config.Topology{
Expand Down
1 change: 1 addition & 0 deletions scripts/sharding.sql
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ CREATE TABLE IF NOT EXISTS `employees_0000_r`.`student_0006` LIKE `employees_000
CREATE TABLE IF NOT EXISTS `employees_0000_r`.`student_0007` LIKE `employees_0000`.`student_0000`;

INSERT INTO employees_0000.student_0001(id,uid,name,score,nickname,gender,birth_year,created_at,modified_at) VALUES (1, 1, 'arana', 95, 'Awesome Arana', 0, 2021, NOW(), NOW());
INSERT INTO employees_0000.student_0001(id,uid,name,score,nickname,gender,birth_year,created_at,modified_at) VALUES (33, 33, 'arana33', 95, 'Awesome Arana', 0, 2021, NOW(), NOW());

CREATE TABLE IF NOT EXISTS `employees_0000`.`friendship_0000`
(
Expand Down

0 comments on commit 1ea2d2d

Please sign in to comment.