Skip to content

Commit

Permalink
refactor: optimize sequence generator (#277)
Browse files Browse the repository at this point in the history
* refactor: optimize sequence generator
  • Loading branch information
dk-lockdown committed Dec 1, 2022
1 parent 8bb0903 commit 67d9506
Show file tree
Hide file tree
Showing 19 changed files with 441 additions and 137 deletions.
9 changes: 5 additions & 4 deletions docker/conf/config_shd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ app_config:
sharding_rule:
column: id
sharding_algorithm: NumberMod
sharding_key_generator:
type: snowflake
worker: 123
sequence_generator:
# type: segment
# dsn: root:123456@tcp(dbpack-mysql1:3306)/world?timeout=10s&readTimeout=10s&writeTimeout=10s&parseTime=true&loc=Local&charset=utf8mb4,utf8
type: snowflake
config:
worker_id: 123
# dsn: root:123456@tcp(dbpack-mysql1:3306)/world?timeout=10s&readTimeout=10s&writeTimeout=10s&parseTime=true&loc=Local&charset=utf8mb4,utf8
topology:
"0": 0-4
"1": 5-9
Expand Down
10 changes: 10 additions & 0 deletions docker/scripts/segment.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
DROP DATABASE IF EXISTS segment;
CREATE DATABASE IF NOT EXISTS segment;
USE segment;

CREATE TABLE IF NOT EXISTS `segment` (
`biz_id` VARCHAR ( 128 ) NOT NULL DEFAULT '',
`step` INT NOT NULL DEFAULT '1000',
`max_id` INT NOT NULL DEFAULT '0',
PRIMARY KEY ( `biz_id` )
);
15 changes: 10 additions & 5 deletions pkg/cond/algo_number_mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"strconv"
"strings"

"github.com/cectc/dbpack/pkg/misc/uuid"
"github.com/pkg/errors"

"github.com/cectc/dbpack/pkg/proto"
"github.com/cectc/dbpack/pkg/topo"
"github.com/cectc/dbpack/third_party/parser/opcode"
)
Expand All @@ -30,15 +32,15 @@ type NumberMod struct {
shardingKey string
allowFullScan bool
topology *topo.Topology
idGnerator uuid.Generator
idGenerator proto.SequenceGenerator
}

func NewNumberMod(shardingKey string, allowFullScan bool, topology *topo.Topology, generator uuid.Generator) *NumberMod {
func NewNumberMod(shardingKey string, allowFullScan bool, topology *topo.Topology, generator proto.SequenceGenerator) *NumberMod {
return &NumberMod{
shardingKey: shardingKey,
allowFullScan: allowFullScan,
topology: topology,
idGnerator: generator,
idGenerator: generator,
}
}

Expand Down Expand Up @@ -217,5 +219,8 @@ func (shard *NumberMod) AllowFullScan() bool {
}

func (shard *NumberMod) NextID() (int64, error) {
return shard.idGnerator.NextID()
if shard.idGenerator != nil {
return shard.idGenerator.NextID()
}
return 0, errors.New("there is no sequence generator")
}
13 changes: 8 additions & 5 deletions pkg/cond/algo_number_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"github.com/pkg/errors"

"github.com/cectc/dbpack/pkg/misc/uuid"
"github.com/cectc/dbpack/pkg/proto"
"github.com/cectc/dbpack/pkg/topo"
"github.com/cectc/dbpack/third_party/parser/opcode"
)
Expand All @@ -47,14 +47,14 @@ type NumberRange struct {
allowFullScan bool
topology *topo.Topology
ranges map[int]*Range
idGnerator uuid.Generator
idGenerator proto.SequenceGenerator
}

func NewNumberRange(shardingKey string,
allowFullScan bool,
topology *topo.Topology,
config map[string]interface{},
generator uuid.Generator) (*NumberRange, error) {
generator proto.SequenceGenerator) (*NumberRange, error) {
ranges, err := parseNumberRangeConfig(config)
if err != nil {
return nil, err
Expand All @@ -64,7 +64,7 @@ func NewNumberRange(shardingKey string,
allowFullScan: allowFullScan,
topology: topology,
ranges: ranges,
idGnerator: generator,
idGenerator: generator,
}, nil
}

Expand Down Expand Up @@ -183,7 +183,10 @@ func (shard *NumberRange) AllowFullScan() bool {
return shard.allowFullScan
}
func (shard *NumberRange) NextID() (int64, error) {
return shard.idGnerator.NextID()
if shard.idGenerator != nil {
return shard.idGenerator.NextID()
}
return 0, errors.New("there is no sequence generator")
}

func (shard *NumberRange) calculateRange(begin, end int64) Condition {
Expand Down
6 changes: 3 additions & 3 deletions pkg/cond/sharding_algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package cond
import (
"github.com/pkg/errors"

"github.com/cectc/dbpack/pkg/misc/uuid"
"github.com/cectc/dbpack/pkg/proto"
"github.com/cectc/dbpack/pkg/topo"
)

Expand All @@ -29,11 +29,11 @@ type ShardingAlgorithm interface {
ShardRange(cond1, cond2 *KeyCondition) (Condition, error)
AllShards() Condition
AllowFullScan() bool
uuid.Generator
proto.SequenceGenerator
}

func NewShardingAlgorithm(algorithm, shardingKey string,
allowFullScan bool, topology *topo.Topology, config map[string]interface{}, generator uuid.Generator) (ShardingAlgorithm, error) {
allowFullScan bool, topology *topo.Topology, config map[string]interface{}, generator proto.SequenceGenerator) (ShardingAlgorithm, error) {
switch algorithm {
case "NumberMod":
return NewNumberMod(shardingKey, allowFullScan, topology, generator), nil
Expand Down
48 changes: 38 additions & 10 deletions pkg/config/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type (

LoadBalanceAlgorithm int32

SequenceType byte

// DataSource ...
DataSource struct {
Name string `yaml:"name" json:"name"`
Expand Down Expand Up @@ -77,19 +79,18 @@ type (
Config Parameters `yaml:"config,omitempty" json:"config,omitempty"`
}

ShardingKeyGenerator struct {
Type string `yaml:"type" json:"type"`
Worker int `yaml:"worker" json:"worker"`
DSN string `yaml:"dsn" json:"dsn"`
SequenceGenerator struct {
Type SequenceType `yaml:"type" json:"type"`
Config Parameters `yaml:"config,omitempty" json:"config,omitempty"`
}

LogicTable struct {
DBName string `yaml:"db_name" json:"db_name"`
TableName string `yaml:"table_name" json:"table_name"`
AllowFullScan bool `yaml:"allow_full_scan" json:"allow_full_scan"`
ShardingRule *ShardingRule `yaml:"sharding_rule" json:"sharding_rule"`
ShardingKeyGenerator *ShardingKeyGenerator `yaml:"sharding_key_generator" json:"sharding_key_generator"`
Topology map[int]string `yaml:"topology" json:"topology"`
DBName string `yaml:"db_name" json:"db_name"`
TableName string `yaml:"table_name" json:"table_name"`
AllowFullScan bool `yaml:"allow_full_scan" json:"allow_full_scan"`
ShardingRule *ShardingRule `yaml:"sharding_rule" json:"sharding_rule"`
SequenceGenerator *SequenceGenerator `yaml:"sequence_generator" json:"sequence_generator"`
Topology map[int]string `yaml:"topology" json:"topology"`
}

ShardingConfig struct {
Expand Down Expand Up @@ -123,6 +124,11 @@ const (
RandomWeight
)

const (
Segment SequenceType = iota
Snowflake
)

func (r *DataSourceRole) UnmarshalText(text []byte) error {
if r == nil {
return errors.New("can't unmarshal a nil *DataSourceRole")
Expand Down Expand Up @@ -249,3 +255,25 @@ func (dataSource *DataSourceRef) ParseWeight() (readWeight int, writeWeight int,
}
return rw, ww, nil
}

func (t *SequenceType) UnmarshalText(text []byte) error {
if t == nil {
return errors.New("can't unmarshal a nil *SequenceType")
}
if !t.unmarshalText(bytes.ToLower(text)) {
return fmt.Errorf("unrecognized sequence type: %q", text)
}
return nil
}

func (t *SequenceType) unmarshalText(text []byte) bool {
switch string(text) {
case "segment":
*t = Segment
case "snowflake":
*t = Snowflake
default:
return false
}
return true
}
16 changes: 5 additions & 11 deletions pkg/executor/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import (
"github.com/cectc/dbpack/pkg/filter"
"github.com/cectc/dbpack/pkg/group"
"github.com/cectc/dbpack/pkg/log"
"github.com/cectc/dbpack/pkg/misc/uuid"
"github.com/cectc/dbpack/pkg/mysql"
"github.com/cectc/dbpack/pkg/optimize"
"github.com/cectc/dbpack/pkg/proto"
"github.com/cectc/dbpack/pkg/sequence"
"github.com/cectc/dbpack/pkg/topo"
"github.com/cectc/dbpack/pkg/tracing"
"github.com/cectc/dbpack/third_party/parser/ast"
Expand Down Expand Up @@ -125,7 +125,7 @@ func convertShardingAlgorithmsAndTopologies(logicTables []*config.LogicTable) (
var (
algs = make(map[string]cond.ShardingAlgorithm, 0)
topos = make(map[string]*topo.Topology, 0)
generator uuid.Generator
generator proto.SequenceGenerator
)
for _, table := range logicTables {
topology, err := topo.ParseTopology(table.DBName, table.TableName, table.Topology)
Expand All @@ -134,19 +134,13 @@ func convertShardingAlgorithmsAndTopologies(logicTables []*config.LogicTable) (
}
topos[table.TableName] = topology

if table.ShardingKeyGenerator.Type == "segment" {
generator, err = uuid.NewSegmentWorker(table.ShardingKeyGenerator.DSN, 1000, table.TableName)
if table.SequenceGenerator != nil {
generator, err = sequence.NewSequence(table.SequenceGenerator, table.TableName)
if err != nil {
log.Errorf("segment dsn %s, table name %s, err %s", table.ShardingKeyGenerator.DSN, table.TableName, err)
return nil, nil, err
}
} else {
generator, err = uuid.NewWorker(table.ShardingKeyGenerator.Worker)
if err != nil {
log.Errorf("init snowflake id generator, err %s", err)
return nil, nil, err
}
}

alg, err := cond.NewShardingAlgorithm(table.ShardingRule.ShardingAlgorithm,
table.ShardingRule.Column, table.AllowFullScan, topology, table.ShardingRule.Config, generator)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions pkg/executor/sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ var conf = `
sharding_rule:
column: id
sharding_algorithm: NumberMod
sharding_key_generator:
sequence_generator:
type: snowflake
worker: 123
config:
worker_id: 123
topology:
"0": 0-4
"1": 5-9`
Expand All @@ -72,7 +73,7 @@ type _ShardingExecutorTestSuite struct {
executor proto.Executor
}

func TestMergeResult(t *testing.T) {
func TestShardingExecutor(t *testing.T) {
suite.Run(t, new(_ShardingExecutorTestSuite))
}

Expand Down
21 changes: 0 additions & 21 deletions pkg/misc/uuid/generator.go

This file was deleted.

1 change: 0 additions & 1 deletion pkg/misc/uuid/id_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
)

func BenchmarkNextID(b *testing.B) {
NextID()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
id := NextID()
Expand Down
50 changes: 0 additions & 50 deletions pkg/misc/uuid/snowflake.go

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/optimize/optimizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import (
"github.com/cectc/dbpack/pkg/constant"
"github.com/cectc/dbpack/pkg/dt/schema"
"github.com/cectc/dbpack/pkg/meta"
"github.com/cectc/dbpack/pkg/misc/uuid"
"github.com/cectc/dbpack/pkg/plan"
"github.com/cectc/dbpack/pkg/proto"
"github.com/cectc/dbpack/pkg/resource"
"github.com/cectc/dbpack/pkg/sequence"
"github.com/cectc/dbpack/pkg/topo"
"github.com/cectc/dbpack/pkg/visitor"
"github.com/cectc/dbpack/third_party/parser"
Expand Down Expand Up @@ -413,7 +413,7 @@ func TestOptimizeInsert(t *testing.T) {

func mockOptimizer() *Optimizer {
tp := mockTopology()
generator, _ := uuid.NewWorker(123)
generator, _ := sequence.NewWorker(123)
topologies := map[string]*topo.Topology{
"student": tp,
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/proto/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ type (
// Optimize optimizes the sql with arguments then returns a Plan.
Optimize(ctx context.Context, stmt ast.StmtNode, args ...interface{}) (Plan, error)
}

SequenceGenerator interface {
NextID() (int64, error)
}
)

const (
Expand Down

0 comments on commit 67d9506

Please sign in to comment.