Skip to content

Commit

Permalink
feat(sequence): add group sequence implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Jianhui Dong committed Sep 4, 2022
1 parent e947693 commit 2017ce7
Showing 1 changed file with 238 additions and 5 deletions.
243 changes: 238 additions & 5 deletions pkg/sequence/group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,23 @@ package group

import (
"context"
"io"
"strconv"
"sync"
"time"
)

import (
"github.com/pkg/errors"

"go.uber.org/zap"
)

import (
"github.com/arana-db/arana/pkg/proto"
"github.com/arana-db/arana/pkg/runtime"
rcontext "github.com/arana-db/arana/pkg/runtime/context"
"github.com/arana-db/arana/pkg/util/log"
)

func init() {
Expand All @@ -33,22 +46,205 @@ func init() {

const (
SequencePluginName = "group"

_stepKey = "step"
_startSequence int64 = 1
_defaultGroupStrep int64 = 100

_initGroupSequenceTableSql = `
CREATE TABLE IF NOT EXISTS __arana_group_sequence (
id int AUTO_INCREMENT COMMENT 'primary key',
seq_val bigint COMMENT 'the current group start value',
step int COMMENT 'the step of the group',
table_name varchar(255) NOT NULL COMMENT 'arana logic table name',
renew_time datetime NOT NULL COMMENT 'node renew time',
PRIMARY KEY(id),
UNIQUE KEY(table_name)
) ENGINE = InnoDB;
`
_initGroupSequence = `INSERT INTO __arana_group_sequence(seq_val, step, table_name, renew_time) VALUE (?, ?, ?, now())`
_selectNextGroupWithXLock = `SELECT seq_val FROM __arana_group_sequence WHERE table_name = ? FOR UPDATE;`
_updateNextGroup = `UPDATE __arana_group_sequence set seq_val = ?, renew_time = now() WHERE table_name = ?;`

//_keepaliveNode do keep alive for node
_keepaliveNode = `UPDATE __arana_group_sequence SET renew_time=now() WHERE table_name = ?`
)

var (
// mu Solving the competition of the initialization of Sequence related library tables
mu sync.Mutex

finishInitTable = false
)

type groupSequence struct {
workId int32
currentVal int64
preTimestamp int64
mu sync.Mutex

tableName string
step int64

nextGroupStartVal int64
nextGroupMaxVal int64
currentGroupMaxVal int64
currentVal int64
}

// Start sequence and do some initialization operations
func (seq *groupSequence) Start(ctx context.Context, option proto.SequenceConfig) error {
rt := ctx.Value(proto.RuntimeCtxKey{}).(runtime.Runtime)
ctx = rcontext.WithRead(rcontext.WithDirect(ctx))

// init table
if err := seq.initTableAndKeepalive(ctx, rt); err != nil {
return err
}

// init sequence
if err := seq.initStep(ctx, rt, option); err != nil {
return err
}

seq.tableName = option.Name
return nil
}

// Acquire Apply for a increase ID
func (seq *groupSequence) initTableAndKeepalive(ctx context.Context, rt runtime.Runtime) error {
mu.Lock()
defer mu.Unlock()

if finishInitTable {
return nil
}

tx, err := rt.Begin(ctx)
if err != nil {
return err
}

defer tx.Rollback(ctx)

ret, err := tx.Exec(ctx, "", _initGroupSequenceTableSql)
if err != nil {
return err
}
_, _ = ret.RowsAffected()

if _, _, err := tx.Commit(ctx); err != nil {
return err
}

k := &nodeKeepLive{rt: rt, tableName: seq.tableName}
go k.keepalive()

finishInitTable = true
return nil
}

func (seq *groupSequence) initStep(ctx context.Context, rt runtime.Runtime, option proto.SequenceConfig) error {
seq.mu.Lock()
defer seq.mu.Unlock()

var step int64
stepValue, ok := option.Option[_stepKey]
if ok {
tempStep, err := strconv.Atoi(stepValue)
if err != nil {
return err
}
step = int64(tempStep)
} else {
step = _defaultGroupStrep
}
seq.step = step

return nil
}

// Acquire Apply for an increase ID
func (seq *groupSequence) Acquire(ctx context.Context) (int64, error) {
return 0, nil
seq.mu.Lock()
defer seq.mu.Unlock()

if seq.currentVal >= seq.currentGroupMaxVal {
schema := rcontext.Schema(ctx)
rt, err := runtime.Load(schema)
if err != nil {
log.Errorf("[sequence] load runtime.Runtime from schema=%s fail, %s", schema, err.Error())
return 0, err
}

err = seq.acquireNextGroup(ctx, rt)
if err != nil {
return 0, err
}
seq.currentVal = seq.nextGroupStartVal
seq.currentGroupMaxVal = seq.nextGroupMaxVal
} else {
seq.currentVal++
}

return seq.currentVal, nil
}

func (seq *groupSequence) acquireNextGroup(ctx context.Context, rt runtime.Runtime) error {
ctx = rcontext.WithDirect(ctx)
tx, err := rt.Begin(ctx)
if err != nil {
return err
}

defer tx.Rollback(ctx)

rs, err := tx.Query(ctx, "", _selectNextGroupWithXLock, seq.tableName)
if err != nil {
return err
}

ds, err := rs.Dataset()
if err != nil {
return err
}
val := make([]proto.Value, 1)
row, err := ds.Next()
if err != nil {
if errors.Is(err, io.EOF) {
seq.nextGroupStartVal = _startSequence
seq.nextGroupMaxVal = _startSequence + seq.step - 1
rs, err := tx.Exec(ctx, "", _initGroupSequence, seq.nextGroupMaxVal+1, seq.step, seq.tableName)
if err != nil {
return err
}
_, _ = rs.RowsAffected()

_, _, err = tx.Commit(ctx)
if err != nil {
return err
}
return nil
}
return err
}
if err = row.Scan(val); err != nil {
return err
}
_, _ = ds.Next()

if val[0] != nil {
seq.nextGroupStartVal = val[0].(int64)
seq.nextGroupMaxVal = seq.nextGroupStartVal + seq.step - 1
rs, err := tx.Exec(ctx, "", _updateNextGroup, seq.nextGroupMaxVal+1, seq.tableName)
if err != nil {
return err
}
_, _ = rs.RowsAffected()

_, _, err = tx.Commit(ctx)
if err != nil {
return err
}
}

return nil
}

// Reset resets sequence info
Expand All @@ -70,3 +266,40 @@ func (seq *groupSequence) Stop() error {
func (seq *groupSequence) CurrentVal() int64 {
return seq.currentVal
}

// nodeKeepLive do update renew time to keep work-id still belong to self
type nodeKeepLive struct {
rt runtime.Runtime
tableName string
}

func (n *nodeKeepLive) keepalive() {
ticker := time.NewTicker(1 * time.Minute)

f := func() {
ctx := context.Background()

tx, err := n.rt.Begin(ctx)
if err != nil {
log.Error("[Sequence][Group] keepalive open tx fail", zap.String("table name", n.tableName), zap.Error(err))
return
}

defer tx.Rollback(ctx)

ret, err := tx.Exec(context.Background(), "", _keepaliveNode, n.tableName)
if err != nil {
log.Error("[Sequence][Group] keepalive fail", zap.String("table name", n.tableName), zap.Error(err))
return
}
_, _ = ret.RowsAffected()

if _, _, err := tx.Commit(ctx); err != nil {
log.Error("[Sequence][Group] keepalive tx commit fail", zap.String("table name", n.tableName), zap.Error(err))
}
}

for range ticker.C {
f()
}
}

0 comments on commit 2017ce7

Please sign in to comment.