Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
feat: add transfer leader procedure (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang committed Oct 14, 2022
1 parent ed69aad commit 1564d8a
Show file tree
Hide file tree
Showing 7 changed files with 342 additions and 58 deletions.
6 changes: 5 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,11 @@ func (c *Cluster) UpdateClusterTopology(ctx context.Context, state clusterpb.Clu
}
clusterTopology.ShardView = shardView
clusterTopology.State = state
return c.storage.PutClusterTopology(ctx, c.clusterID, c.metaData.clusterTopology.Version, clusterTopology)
if err = c.storage.PutClusterTopology(ctx, c.clusterID, c.metaData.clusterTopology.Version, clusterTopology); err != nil {
return err
}
c.metaData.clusterTopology = clusterTopology
return nil
}

type ShardWithLock struct {
Expand Down
63 changes: 63 additions & 0 deletions server/coordinator/procedure/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

package procedure

import (
"context"
"testing"

"github.com/CeresDB/ceresmeta/server/cluster"
"github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch"
"github.com/CeresDB/ceresmeta/server/etcdutil"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
)

const (
nodeName0 = "node0"
nodeName1 = "node1"
testRootPath = "/rootPath"
defaultIDAllocatorStep = 20
clusterName = "ceresdbCluster1"
defaultNodeCount = 2
defaultReplicationFactor = 1
defaultShardTotal = 2
)

type MockDispatch struct{}

func (m MockDispatch) OpenShard(_ context.Context, _ string, _ *eventdispatch.OpenShardRequest) error {
return nil
}

func (m MockDispatch) CloseShard(_ context.Context, _ string, _ *eventdispatch.CloseShardRequest) error {
return nil
}

func (m MockDispatch) CreateTableOnShard(_ context.Context, _ string, _ *eventdispatch.CreateTableOnShardRequest) error {
return nil
}

func (m MockDispatch) DropTableOnShard(_ context.Context, _ string, _ *eventdispatch.DropTableOnShardRequest) error {
return nil
}

func newTestEtcdStorage(t *testing.T) (storage.Storage, clientv3.KV, etcdutil.CloseFn) {
_, client, closeSrv := etcdutil.PrepareEtcdServerAndClient(t)
storage := storage.NewStorageWithEtcdBackend(client, testRootPath, storage.Options{
MaxScanLimit: 100, MinScanLimit: 10,
})
return storage, client, closeSrv
}

func newTestCluster(ctx context.Context, t *testing.T) *cluster.Cluster {
re := require.New(t)
storage, kv, _ := newTestEtcdStorage(t)
manager, err := cluster.NewManagerImpl(storage, kv, testRootPath, defaultIDAllocatorStep)
re.NoError(err)

cluster, err := manager.CreateCluster(ctx, clusterName, defaultNodeCount, defaultReplicationFactor, defaultShardTotal)
re.NoError(err)
return cluster
}
40 changes: 19 additions & 21 deletions server/coordinator/procedure/scatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func scatterPrepareCallback(event *fsm.Event) {
minNodeCount := c.GetClusterMinNodeCount()

if !(c.GetClusterState() == clusterpb.ClusterTopology_EMPTY) {
event.Cancel(errors.WithMessage(cluster.ErrClusterStateInvalid, "cluster topology state is not empty"))
cancelEventWithLog(event, cluster.ErrClusterStateInvalid, "cluster topology state is not empty")
return
}

Expand All @@ -66,28 +66,26 @@ func scatterPrepareCallback(event *fsm.Event) {

shards, err := allocNodeShards(ctx, shardTotal, minNodeCount, nodeList, request.allocator)
if err != nil {
event.Cancel(errors.WithMessage(err, "alloc node shards failed"))
cancelEventWithLog(event, err, "alloc node shards failed")
return
}

for nodeName, node := range nodeCache {
for _, shardID := range node.GetShardIDs() {
openShardRequest := &eventdispatch.OpenShardRequest{
Shard: &cluster.ShardInfo{
ShardID: shardID,
ShardRole: clusterpb.ShardRole_LEADER,
},
}
for _, shard := range shards {
openShardRequest := &eventdispatch.OpenShardRequest{
Shard: &cluster.ShardInfo{
ShardID: shard.GetId(),
ShardRole: clusterpb.ShardRole_LEADER,
},
}

if err := request.dispatch.OpenShard(ctx, nodeName, openShardRequest); err != nil {
event.Cancel(errors.WithMessage(err, "open shard failed"))
return
}
if err := request.dispatch.OpenShard(ctx, shard.Node, openShardRequest); err != nil {
cancelEventWithLog(event, err, "open shard failed")
return
}
}

if err := c.UpdateClusterTopology(ctx, clusterpb.ClusterTopology_STABLE, shards); err != nil {
event.Cancel(errors.WithMessage(err, "update cluster topology failed"))
cancelEventWithLog(event, err, "update cluster topology failed")
return
}
}
Expand Down Expand Up @@ -138,7 +136,7 @@ func scatterSuccessCallback(event *fsm.Event) {
request := event.Args[0].(*ScatterCallbackRequest)

if err := request.cluster.Load(request.ctx); err != nil {
event.Cancel(errors.WithMessage(err, "coordinator scatterShard"))
cancelEventWithLog(event, err, "cluster load data failed")
return
}
}
Expand Down Expand Up @@ -184,7 +182,7 @@ func (p *ScatterProcedure) Typ() Typ {
}

func (p *ScatterProcedure) Start(ctx context.Context) error {
p.UpdateStateWithLock(StateRunning)
p.updateStateWithLock(StateRunning)

scatterCallbackRequest := &ScatterCallbackRequest{
cluster: p.cluster,
Expand All @@ -195,28 +193,28 @@ func (p *ScatterProcedure) Start(ctx context.Context) error {

if err := p.fsm.Event(eventScatterPrepare, scatterCallbackRequest); err != nil {
err := p.fsm.Event(eventScatterFailed, scatterCallbackRequest)
p.UpdateStateWithLock(StateFailed)
p.updateStateWithLock(StateFailed)
return errors.WithMessage(err, "coordinator transferLeaderShard start")
}

if err := p.fsm.Event(eventScatterSuccess, scatterCallbackRequest); err != nil {
return errors.WithMessage(err, "coordinator transferLeaderShard start")
}

p.UpdateStateWithLock(StateFinished)
p.updateStateWithLock(StateFinished)
return nil
}

func (p *ScatterProcedure) Cancel(_ context.Context) error {
p.UpdateStateWithLock(StateCancelled)
p.updateStateWithLock(StateCancelled)
return nil
}

func (p *ScatterProcedure) State() State {
return p.state
}

func (p *ScatterProcedure) UpdateStateWithLock(state State) {
func (p *ScatterProcedure) updateStateWithLock(state State) {
p.lock.Lock()
p.state = state
p.lock.Unlock()
Expand Down
37 changes: 1 addition & 36 deletions server/coordinator/procedure/scatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,14 @@ import (

"github.com/CeresDB/ceresdbproto/pkg/clusterpb"
"github.com/CeresDB/ceresdbproto/pkg/metaservicepb"
"github.com/CeresDB/ceresmeta/server/cluster"
"github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch"
"github.com/CeresDB/ceresmeta/server/etcdutil"
"github.com/CeresDB/ceresmeta/server/id"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
)

const (
nodeName0 = "node0"
nodeName1 = "node1"
testRootPath = "/rootPath"
defaultIDAllocatorStep = 20
clusterName = "ceresdbCluster1"
defaultNodeCount = 2
defaultReplicationFactor = 1
defaultShardTotal = 8
)

func TestScatter(t *testing.T) {
re := require.New(t)
ctx := context.Background()
dispatch := eventdispatch.NewDispatchImpl()
dispatch := MockDispatch{}
cluster := newTestCluster(ctx, t)

nodeInfo1 := &metaservicepb.NodeInfo{
Expand Down Expand Up @@ -122,22 +106,3 @@ func TestAllocNodeShard(t *testing.T) {
re.Equal("node0", shardView[1].Node)
re.Equal("node1", shardView[2].Node)
}

func newTestEtcdStorage(t *testing.T) (storage.Storage, clientv3.KV, etcdutil.CloseFn) {
_, client, closeSrv := etcdutil.PrepareEtcdServerAndClient(t)
storage := storage.NewStorageWithEtcdBackend(client, testRootPath, storage.Options{
MaxScanLimit: 100, MinScanLimit: 10,
})
return storage, client, closeSrv
}

func newTestCluster(ctx context.Context, t *testing.T) *cluster.Cluster {
re := require.New(t)
storage, kv, _ := newTestEtcdStorage(t)
manager, err := cluster.NewManagerImpl(storage, kv, testRootPath, defaultIDAllocatorStep)
re.NoError(err)

cluster, err := manager.CreateCluster(ctx, clusterName, defaultNodeCount, defaultReplicationFactor, defaultShardTotal)
re.NoError(err)
return cluster
}
Loading

0 comments on commit 1564d8a

Please sign in to comment.