Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

feat: add transfer leader procedure #67

Merged
merged 32 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7062d98
feat: add scatter procedure
ZuLiangWang Oct 9, 2022
e107d2f
feat: add scatter procedure
ZuLiangWang Oct 10, 2022
9f9d4ab
test: add scatter procedure unit test
ZuLiangWang Oct 11, 2022
7e0c800
refactor: remove unused comment
ZuLiangWang Oct 12, 2022
69ec77e
refactor: rename variable
ZuLiangWang Oct 13, 2022
8525260
feat: add scatter procedure
ZuLiangWang Oct 13, 2022
01e657d
feat: add scatter procedure
ZuLiangWang Oct 13, 2022
4fa40e0
test: add topology alloc test
ZuLiangWang Oct 13, 2022
c422c3a
test: add topology alloc test
ZuLiangWang Oct 13, 2022
8ebab50
test: add topology alloc test
ZuLiangWang Oct 13, 2022
fd1ddb7
refactor: remove redundant variable
ZuLiangWang Oct 13, 2022
e72c32e
refactor: update dispatch
ZuLiangWang Oct 13, 2022
daec6e5
refactor: rename variable & add log
ZuLiangWang Oct 13, 2022
d9494d3
refactor: add log
ZuLiangWang Oct 13, 2022
5d1958a
refactor: add log
ZuLiangWang Oct 13, 2022
7e17301
refactor: add log
ZuLiangWang Oct 13, 2022
14bcf2f
refactor: add log
ZuLiangWang Oct 14, 2022
a0d7aed
refactor: add log
ZuLiangWang Oct 14, 2022
21600a1
feat: add transfer leader procedure
ZuLiangWang Sep 30, 2022
81ffad5
refactor: rename callbackRequest
ZuLiangWang Sep 30, 2022
a8aca9c
refactor: add state lock
ZuLiangWang Oct 9, 2022
f5492a7
test: add transfer leader procedure unit test
ZuLiangWang Oct 11, 2022
510fdbd
refactor: remove redundant variable
ZuLiangWang Oct 13, 2022
9e7270d
refactor: update dispatch
ZuLiangWang Oct 13, 2022
60e1318
refactor: rename variable
ZuLiangWang Oct 13, 2022
0710a3e
refactor: add log
ZuLiangWang Oct 13, 2022
e1a6def
refactor: add log
ZuLiangWang Oct 13, 2022
4c69c05
refactor: add log
ZuLiangWang Oct 13, 2022
11764f4
refactor: add log
ZuLiangWang Oct 14, 2022
8a1a1ed
refactor: add log
ZuLiangWang Oct 14, 2022
9abdfbf
refactor: fix unit test
ZuLiangWang Oct 14, 2022
46c5f9e
refactor: remove log level & fix func visibility
ZuLiangWang Oct 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,11 @@ func (c *Cluster) UpdateClusterTopology(ctx context.Context, state clusterpb.Clu
}
clusterTopology.ShardView = shardView
clusterTopology.State = state
return c.storage.PutClusterTopology(ctx, c.clusterID, c.metaData.clusterTopology.Version, clusterTopology)
if err = c.storage.PutClusterTopology(ctx, c.clusterID, c.metaData.clusterTopology.Version, clusterTopology); err != nil {
return err
}
c.metaData.clusterTopology = clusterTopology
return nil
}

type ShardWithLock struct {
Expand Down
63 changes: 63 additions & 0 deletions server/coordinator/procedure/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

package procedure

import (
"context"
"testing"

"github.com/CeresDB/ceresmeta/server/cluster"
"github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch"
"github.com/CeresDB/ceresmeta/server/etcdutil"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
)

const (
nodeName0 = "node0"
nodeName1 = "node1"
testRootPath = "/rootPath"
defaultIDAllocatorStep = 20
clusterName = "ceresdbCluster1"
defaultNodeCount = 2
defaultReplicationFactor = 1
defaultShardTotal = 2
)

type MockDispatch struct{}

func (m MockDispatch) OpenShard(_ context.Context, _ string, _ *eventdispatch.OpenShardRequest) error {
return nil
}

func (m MockDispatch) CloseShard(_ context.Context, _ string, _ *eventdispatch.CloseShardRequest) error {
return nil
}

func (m MockDispatch) CreateTableOnShard(_ context.Context, _ string, _ *eventdispatch.CreateTableOnShardRequest) error {
return nil
}

func (m MockDispatch) DropTableOnShard(_ context.Context, _ string, _ *eventdispatch.DropTableOnShardRequest) error {
return nil
}

func newTestEtcdStorage(t *testing.T) (storage.Storage, clientv3.KV, etcdutil.CloseFn) {
_, client, closeSrv := etcdutil.PrepareEtcdServerAndClient(t)
storage := storage.NewStorageWithEtcdBackend(client, testRootPath, storage.Options{
MaxScanLimit: 100, MinScanLimit: 10,
})
return storage, client, closeSrv
}

func newTestCluster(ctx context.Context, t *testing.T) *cluster.Cluster {
re := require.New(t)
storage, kv, _ := newTestEtcdStorage(t)
manager, err := cluster.NewManagerImpl(storage, kv, testRootPath, defaultIDAllocatorStep)
re.NoError(err)

cluster, err := manager.CreateCluster(ctx, clusterName, defaultNodeCount, defaultReplicationFactor, defaultShardTotal)
re.NoError(err)
return cluster
}
40 changes: 19 additions & 21 deletions server/coordinator/procedure/scatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func scatterPrepareCallback(event *fsm.Event) {
minNodeCount := c.GetClusterMinNodeCount()

if !(c.GetClusterState() == clusterpb.ClusterTopology_EMPTY) {
event.Cancel(errors.WithMessage(cluster.ErrClusterStateInvalid, "cluster topology state is not empty"))
cancelEventWithLog(event, cluster.ErrClusterStateInvalid, "cluster topology state is not empty")
return
}

Expand All @@ -66,28 +66,26 @@ func scatterPrepareCallback(event *fsm.Event) {

shards, err := allocNodeShards(ctx, shardTotal, minNodeCount, nodeList, request.allocator)
if err != nil {
event.Cancel(errors.WithMessage(err, "alloc node shards failed"))
cancelEventWithLog(event, err, "alloc node shards failed")
return
}

for nodeName, node := range nodeCache {
for _, shardID := range node.GetShardIDs() {
openShardRequest := &eventdispatch.OpenShardRequest{
Shard: &cluster.ShardInfo{
ShardID: shardID,
ShardRole: clusterpb.ShardRole_LEADER,
},
}
for _, shard := range shards {
openShardRequest := &eventdispatch.OpenShardRequest{
Shard: &cluster.ShardInfo{
ShardID: shard.GetId(),
ShardRole: clusterpb.ShardRole_LEADER,
},
}

if err := request.dispatch.OpenShard(ctx, nodeName, openShardRequest); err != nil {
event.Cancel(errors.WithMessage(err, "open shard failed"))
return
}
if err := request.dispatch.OpenShard(ctx, shard.Node, openShardRequest); err != nil {
cancelEventWithLog(event, err, "open shard failed")
return
}
}

if err := c.UpdateClusterTopology(ctx, clusterpb.ClusterTopology_STABLE, shards); err != nil {
event.Cancel(errors.WithMessage(err, "update cluster topology failed"))
cancelEventWithLog(event, err, "update cluster topology failed")
return
}
}
Expand Down Expand Up @@ -138,7 +136,7 @@ func scatterSuccessCallback(event *fsm.Event) {
request := event.Args[0].(*ScatterCallbackRequest)

if err := request.cluster.Load(request.ctx); err != nil {
event.Cancel(errors.WithMessage(err, "coordinator scatterShard"))
cancelEventWithLog(event, err, "cluster load data failed")
return
}
}
Expand Down Expand Up @@ -184,7 +182,7 @@ func (p *ScatterProcedure) Typ() Typ {
}

func (p *ScatterProcedure) Start(ctx context.Context) error {
p.UpdateStateWithLock(StateRunning)
p.updateStateWithLock(StateRunning)

scatterCallbackRequest := &ScatterCallbackRequest{
cluster: p.cluster,
Expand All @@ -195,28 +193,28 @@ func (p *ScatterProcedure) Start(ctx context.Context) error {

if err := p.fsm.Event(eventScatterPrepare, scatterCallbackRequest); err != nil {
err := p.fsm.Event(eventScatterFailed, scatterCallbackRequest)
p.UpdateStateWithLock(StateFailed)
p.updateStateWithLock(StateFailed)
return errors.WithMessage(err, "coordinator transferLeaderShard start")
}

if err := p.fsm.Event(eventScatterSuccess, scatterCallbackRequest); err != nil {
return errors.WithMessage(err, "coordinator transferLeaderShard start")
}

p.UpdateStateWithLock(StateFinished)
p.updateStateWithLock(StateFinished)
return nil
}

func (p *ScatterProcedure) Cancel(_ context.Context) error {
p.UpdateStateWithLock(StateCancelled)
p.updateStateWithLock(StateCancelled)
return nil
}

func (p *ScatterProcedure) State() State {
return p.state
}

func (p *ScatterProcedure) UpdateStateWithLock(state State) {
func (p *ScatterProcedure) updateStateWithLock(state State) {
p.lock.Lock()
p.state = state
p.lock.Unlock()
Expand Down
37 changes: 1 addition & 36 deletions server/coordinator/procedure/scatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,14 @@ import (

"github.com/CeresDB/ceresdbproto/pkg/clusterpb"
"github.com/CeresDB/ceresdbproto/pkg/metaservicepb"
"github.com/CeresDB/ceresmeta/server/cluster"
"github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch"
"github.com/CeresDB/ceresmeta/server/etcdutil"
"github.com/CeresDB/ceresmeta/server/id"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
)

const (
nodeName0 = "node0"
nodeName1 = "node1"
testRootPath = "/rootPath"
defaultIDAllocatorStep = 20
clusterName = "ceresdbCluster1"
defaultNodeCount = 2
defaultReplicationFactor = 1
defaultShardTotal = 8
)

func TestScatter(t *testing.T) {
re := require.New(t)
ctx := context.Background()
dispatch := eventdispatch.NewDispatchImpl()
dispatch := MockDispatch{}
cluster := newTestCluster(ctx, t)

nodeInfo1 := &metaservicepb.NodeInfo{
Expand Down Expand Up @@ -122,22 +106,3 @@ func TestAllocNodeShard(t *testing.T) {
re.Equal("node0", shardView[1].Node)
re.Equal("node1", shardView[2].Node)
}

func newTestEtcdStorage(t *testing.T) (storage.Storage, clientv3.KV, etcdutil.CloseFn) {
_, client, closeSrv := etcdutil.PrepareEtcdServerAndClient(t)
storage := storage.NewStorageWithEtcdBackend(client, testRootPath, storage.Options{
MaxScanLimit: 100, MinScanLimit: 10,
})
return storage, client, closeSrv
}

func newTestCluster(ctx context.Context, t *testing.T) *cluster.Cluster {
re := require.New(t)
storage, kv, _ := newTestEtcdStorage(t)
manager, err := cluster.NewManagerImpl(storage, kv, testRootPath, defaultIDAllocatorStep)
re.NoError(err)

cluster, err := manager.CreateCluster(ctx, clusterName, defaultNodeCount, defaultReplicationFactor, defaultShardTotal)
re.NoError(err)
return cluster
}
Loading