Skip to content

Commit

Permalink
[WIP] feat: support delete on sharded table (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
dk-lockdown committed Jun 4, 2022
1 parent 005742a commit a0a5bda
Show file tree
Hide file tree
Showing 18 changed files with 768 additions and 226 deletions.
4 changes: 2 additions & 2 deletions docker/conf/config_shd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ data_source_cluster:
capacity: 10
max_capacity: 20
idle_timeout: 60s
dsn: root:123456@tcp(dbpack-mysql1:3306)/drug_0?timeout=10s&readTimeout=10s&writeTimeout=10s&parseTime=true&loc=Local&charset=utf8mb4,utf8
dsn: root:123456@tcp(dbpack-mysql1:3306)/drug?timeout=10s&readTimeout=10s&writeTimeout=10s&parseTime=true&loc=Local&charset=utf8mb4,utf8
ping_interval: 20s
ping_times_for_change_status: 3

- name: drug_1
capacity: 10
max_capacity: 20
idle_timeout: 60s
dsn: root:123456@tcp(dbpack-mysql2:3306)/drug_1?timeout=60s&readTimeout=60s&writeTimeout=60s&parseTime=true&loc=Local&charset=utf8mb4,utf8
dsn: root:123456@tcp(dbpack-mysql2:3306)/drug?timeout=60s&readTimeout=60s&writeTimeout=60s&parseTime=true&loc=Local&charset=utf8mb4,utf8
ping_interval: 20s
ping_times_for_change_status: 3

Expand Down
6 changes: 3 additions & 3 deletions docker/scripts/drug_0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/


DROP DATABASE IF EXISTS drug_0;
CREATE DATABASE IF NOT EXISTS drug_0;
USE drug_0;
DROP DATABASE IF EXISTS drug;
CREATE DATABASE IF NOT EXISTS drug;
USE drug;

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
Expand Down
6 changes: 3 additions & 3 deletions docker/scripts/drug_1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/


DROP DATABASE IF EXISTS drug_1;
CREATE DATABASE IF NOT EXISTS drug_1;
USE drug_1;
DROP DATABASE IF EXISTS drug;
CREATE DATABASE IF NOT EXISTS drug;
USE drug;

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
Expand Down
2 changes: 1 addition & 1 deletion pkg/dt/distributed_transaction_manger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"time"

"github.com/go-resty/resty/v2"
"github.com/pingcap/errors"
"github.com/pkg/errors"
"k8s.io/client-go/util/workqueue"

"github.com/cectc/dbpack/pkg/config"
Expand Down
83 changes: 83 additions & 0 deletions pkg/optimize/optimize_delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package optimize

import (
"context"

"github.com/pkg/errors"

"github.com/cectc/dbpack/pkg/cond"
"github.com/cectc/dbpack/pkg/plan"
"github.com/cectc/dbpack/pkg/proto"
"github.com/cectc/dbpack/pkg/topo"
"github.com/cectc/dbpack/third_party/parser/ast"
)

func (o Optimizer) optimizeDelete(ctx context.Context, stmt *ast.DeleteStmt, args []interface{}) (proto.Plan, error) {
var (
alg cond.ShardingAlgorithm
topology *topo.Topology
exists bool
)
tableName := stmt.TableRefs.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName).Name.String()
if alg, exists = o.algorithms[tableName]; !exists {
return nil, errors.New("sharding algorithm should not be nil")
}
if topology, exists = o.topologies[tableName]; !exists {
return nil, errors.New("topology should not be nil")
}

condition, err := cond.ParseCondition(stmt.Where, args...)
if err != nil {
return nil, errors.Wrap(err, "parse condition failed")
}
cd := condition.(cond.ConditionShard)
shards, err := cd.Shard(alg)
if err != nil {
return nil, errors.Wrap(err, "compute shards failed")
}

fullScan, shardMap := shards.ParseTopology(topology)
if fullScan && !alg.AllowFullScan() {
return nil, errors.New("full scan not allowed")
}

if len(shardMap) == 1 {
for k, v := range shardMap {
executor, exists := o.dbGroupExecutors[k]
if !exists {
return nil, errors.Errorf("db group %s should not be nil", k)
}

return &plan.DeleteOnSingleDBPlan{
Database: k,
Tables: v,
Stmt: stmt,
Args: args,
Executor: executor,
}, nil
}
}

plans := make([]*plan.DeleteOnSingleDBPlan, 0, len(shards))

for k, v := range shardMap {
executor, exists := o.dbGroupExecutors[k]
if !exists {
return nil, errors.Errorf("db group %s should not be nil", k)
}

plans = append(plans, &plan.DeleteOnSingleDBPlan{
Database: k,
Tables: v,
Stmt: stmt,
Args: args,
Executor: executor,
})
}

multiPlan := &plan.DeleteOnMultiDBPlan{
Stmt: stmt,
Plans: plans,
}
return multiPlan, nil
}
122 changes: 122 additions & 0 deletions pkg/optimize/optimize_insert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package optimize

import (
"context"
"strings"

"github.com/pkg/errors"

"github.com/cectc/dbpack/pkg/cond"
"github.com/cectc/dbpack/pkg/constant"
"github.com/cectc/dbpack/pkg/dt/schema"
"github.com/cectc/dbpack/pkg/log"
"github.com/cectc/dbpack/pkg/meta"
"github.com/cectc/dbpack/pkg/plan"
"github.com/cectc/dbpack/pkg/proto"
"github.com/cectc/dbpack/pkg/resource"
"github.com/cectc/dbpack/pkg/topo"
"github.com/cectc/dbpack/third_party/parser/ast"
"github.com/cectc/dbpack/third_party/parser/format"
"github.com/cectc/dbpack/third_party/parser/opcode"
)

func (o Optimizer) optimizeInsert(ctx context.Context, stmt *ast.InsertStmt, args []interface{}) (proto.Plan, error) {
var (
alg cond.ShardingAlgorithm
topology *topo.Topology
tableMeta schema.TableMeta
columns []string
exists bool
err error
)
tableName := stmt.Table.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName).Name.String()
for _, column := range stmt.Columns {
columns = append(columns, column.Name.String())
}

if alg, exists = o.algorithms[tableName]; !exists {
return nil, errors.New("sharding algorithm should not be nil")
}
if topology, exists = o.topologies[tableName]; !exists {
return nil, errors.New("topology should not be nil")
}

for db, tables := range topology.DBs {
sqlDB := resource.GetDBManager().GetDB(db)
tableMeta, err = meta.GetTableMetaCache().GetTableMeta(ctx, sqlDB, tables[0])
if err != nil {
continue
} else {
break
}
}
pk := tableMeta.GetPKName()
index := findPkIndex(stmt, pk)
// todo if index == -1, automatically generate a primary key
if index == -1 {
return nil, errors.New("the inserted columns should contain the primary key")
}
pkValue := getPkValue(ctx, stmt, index, args)

cd := &cond.KeyCondition{
Key: pk,
Op: opcode.EQ,
Value: pkValue,
}
shards, err := cd.Shard(alg)
if err != nil {
return nil, errors.Wrap(err, "compute shards failed")
}
fullScan, shardMap := shards.ParseTopology(topology)
if fullScan && !alg.AllowFullScan() {
return nil, errors.New("full scan not allowed")
}

if len(shardMap) == 1 {
for k, v := range shardMap {
executor, exists := o.dbGroupExecutors[k]
if !exists {
return nil, errors.Errorf("db group %s should not be nil", k)
}

return &plan.InsertPlan{
Database: k,
Table: v[0],
Columns: columns,
Stmt: stmt,
Args: args,
Executor: executor,
}, nil
}
}
return nil, errors.New("should never happen!")
}

func findPkIndex(stmt *ast.InsertStmt, pk string) int {
if stmt.Columns != nil {
for i, column := range stmt.Columns {
if column.Name.String() == pk {
return i
}
}
}
return -1
}

func getPkValue(ctx context.Context, stmt *ast.InsertStmt, pkIndex int, args []interface{}) interface{} {
commandType := proto.CommandType(ctx)
switch commandType {
case constant.ComQuery:
var sb strings.Builder
ctx := format.NewRestoreCtx(format.DefaultRestoreFlags, &sb)
if err := stmt.Lists[0][pkIndex].Restore(ctx); err != nil {
log.Panic(err)
}
return sb.String()
case constant.ComStmtExecute:
return args[pkIndex]
default:
log.Panicf("should never happen!")
}
return nil
}
90 changes: 90 additions & 0 deletions pkg/optimize/optimize_select.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package optimize

import (
"context"
"sort"

"github.com/pkg/errors"

"github.com/cectc/dbpack/pkg/cond"
"github.com/cectc/dbpack/pkg/plan"
"github.com/cectc/dbpack/pkg/proto"
"github.com/cectc/dbpack/pkg/topo"
"github.com/cectc/dbpack/third_party/parser/ast"
)

func (o Optimizer) optimizeSelect(ctx context.Context, stmt *ast.SelectStmt, args []interface{}) (proto.Plan, error) {
var (
alg cond.ShardingAlgorithm
topology *topo.Topology
exists bool
)
tableName := stmt.From.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName).Name.String()

if alg, exists = o.algorithms[tableName]; !exists {
return nil, errors.New("sharding algorithm should not be nil")
}
if topology, exists = o.topologies[tableName]; !exists {
return nil, errors.New("topology should not be nil")
}

condition, err := cond.ParseCondition(stmt.Where, args...)
if err != nil {
return nil, errors.Wrap(err, "parse condition failed")
}
cd := condition.(cond.ConditionShard)
shards, err := cd.Shard(alg)
if err != nil {
return nil, errors.Wrap(err, "compute shards failed")
}

fullScan, shardMap := shards.ParseTopology(topology)
if fullScan && !alg.AllowFullScan() {
return nil, errors.New("full scan not allowed")
}

if len(shardMap) == 1 {
for k, v := range shardMap {
executor, exists := o.dbGroupExecutors[k]
if !exists {
return nil, errors.Errorf("db group %s should not be nil", k)
}

return &plan.QueryOnSingleDBPlan{
Database: k,
Tables: v,
Stmt: stmt,
Args: args,
Executor: executor,
}, nil
}
}

plans := make([]*plan.QueryOnSingleDBPlan, 0, len(shards))

keys := make([]string, 0)
for k := range shardMap {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
executor, exists := o.dbGroupExecutors[k]
if !exists {
return nil, errors.Errorf("db group %s should not be nil", k)
}

plans = append(plans, &plan.QueryOnSingleDBPlan{
Database: k,
Tables: shardMap[k],
Stmt: stmt,
Args: args,
Executor: executor,
})
}

multiPlan := &plan.QueryOnMultiDBPlan{
Stmt: stmt,
Plans: plans,
}
return multiPlan, nil
}

0 comments on commit a0a5bda

Please sign in to comment.