Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
refactor: refactor cluster package
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 committed Nov 8, 2022
1 parent 5ab3689 commit caef7af
Show file tree
Hide file tree
Showing 29 changed files with 1,232 additions and 1,375 deletions.
1,032 changes: 170 additions & 862 deletions server/cluster/cluster.go

Large diffs are not rendered by default.

28 changes: 0 additions & 28 deletions server/cluster/data/table_manager.go

This file was deleted.

28 changes: 0 additions & 28 deletions server/cluster/data/topology_manager.go

This file was deleted.

2 changes: 1 addition & 1 deletion server/cluster/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ var (
ErrNodeShardsIsEmpty = coderr.NewCodeError(coderr.Internal, "node's shard list is empty")
ErrGetShardView = coderr.NewCodeError(coderr.Internal, "get shard view")
ErrTableAlreadyExists = coderr.NewCodeError(coderr.Internal, "table already exists")
ErrShardViewVersionNotMatch = coderr.NewCodeError(coderr.Internal, "shard view version not match")
ErrShardViewVersionNotMatch = coderr.NewCodeError(coderr.Internal, "shard view versions not match")
)
107 changes: 23 additions & 84 deletions server/cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,13 @@ type Manager interface {
GetCluster(ctx context.Context, clusterName string) (*Cluster, error)
// AllocSchemaID means get or create schema.
// The second output parameter bool: Returns true if the table was newly created.
AllocSchemaID(ctx context.Context, clusterName, schemaName string) (uint32, bool, error)
// AllocTableID means get or create table.
// The second output parameter bool: Returns true if the table was newly created.
AllocTableID(ctx context.Context, clusterName, schemaName, tableName, nodeName string) (*Table, bool, error)
GetTables(ctx context.Context, clusterName, nodeName string, shardIDs []uint32) (map[uint32]*ShardTables, error)
AllocSchemaID(ctx context.Context, clusterName, schemaName string) (storage.SchemaID, bool, error)
GetTables(clusterName, nodeName string, shardIDs []storage.ShardID) (map[storage.ShardID]ShardTables, error)
DropTable(ctx context.Context, clusterName, schemaName, tableName string) error
GetShardIDs(ctx context.Context, clusterName, nodeName string) ([]uint32, error)
RouteTables(ctx context.Context, clusterName, schemaName string, tableNames []string) (*RouteTablesResult, error)
GetNodeShards(ctx context.Context, clusterName string) (*GetNodeShardsResult, error)
RouteTables(ctx context.Context, clusterName, schemaName string, tableNames []string) (RouteTablesResult, error)
GetNodeShards(ctx context.Context, clusterName string) (GetNodeShardsResult, error)

RegisterNode(ctx context.Context, clusterName string, nodeInfo *metaservicepb.NodeInfo) error
RegisterNode(ctx context.Context, clusterName string, nodeName storage.Node, shardInfos []ShardInfo) error
GetRegisteredNode(ctx context.Context, clusterName string, node string) (*metaservicepb.NodeInfo, error)
}

Expand Down Expand Up @@ -132,7 +128,7 @@ func (m *managerImpl) CreateCluster(ctx context.Context, clusterName string, ini
return nil, errors.WithMessagef(err, "cluster manager CreateCluster, clusterName:%s", clusterName)
}

if err := cluster.Load(ctx); err != nil {
if err := cluster.load(ctx); err != nil {
log.Error("fail to load cluster", zap.Error(err))
return nil, errors.WithMessagef(err, "cluster manager CreateCluster, clusterName:%s", clusterName)
}
Expand All @@ -153,7 +149,7 @@ func (m *managerImpl) GetCluster(_ context.Context, clusterName string) (*Cluste
return nil, ErrClusterNotFound
}

func (m *managerImpl) AllocSchemaID(ctx context.Context, clusterName, schemaName string) (uint32, bool, error) {
func (m *managerImpl) AllocSchemaID(ctx context.Context, clusterName, schemaName string) (storage.SchemaID, bool, error) {
cluster, err := m.getCluster(clusterName)
if err != nil {
log.Error("cluster not found", zap.Error(err))
Expand All @@ -167,79 +163,36 @@ func (m *managerImpl) AllocSchemaID(ctx context.Context, clusterName, schemaName
return 0, false, errors.WithMessagef(err, "cluster manager AllocSchemaID, "+
"clusterName:%s, schemaName:%s", clusterName, schemaName)
}
return schema.GetID(), exists, nil
}

func (m *managerImpl) AllocTableID(ctx context.Context, clusterName, schemaName, tableName, nodeName string) (*Table, bool, error) {
cluster, err := m.getCluster(clusterName)
if err != nil {
log.Error("cluster not found", zap.Error(err))
return nil, false, errors.WithMessage(err, "get cluster")
}

table, exists, err := cluster.GetTable(ctx, schemaName, tableName)
if err != nil {
return nil, false, errors.WithMessage(err, "get table")
}

if exists {
return table, true, nil
}

ret, err := cluster.CreateTable(ctx, nodeName, schemaName, tableName)
if err != nil {
log.Error("fail to create table", zap.Error(err))
return nil, false, errors.WithMessagef(err, "create table, "+
"clusterName:%s, schemaName:%s, tableName:%s, nodeName:%s", clusterName, schemaName, tableName, nodeName)
}
return ret.Table, false, nil
return schema.ID, exists, nil
}

func (m *managerImpl) GetTables(ctx context.Context, clusterName, nodeName string, shardIDs []uint32) (map[uint32]*ShardTables, error) {
func (m *managerImpl) GetTables(clusterName, nodeName string, shardIDs []storage.ShardID) (map[storage.ShardID]ShardTables, error) {
cluster, err := m.getCluster(clusterName)
if err != nil {
log.Error("cluster not found", zap.Error(err))
return nil, errors.WithMessage(err, "cluster manager GetTables")
}

shardTablesWithRole, err := cluster.GetTables(ctx, shardIDs, nodeName)
if err != nil {
return nil, errors.WithMessagef(err, "cluster manager GetTables, "+
"clusterName:%s, nodeName:%s, shardIDs:%v", clusterName, nodeName, shardIDs)
return nil, errors.WithMessagef(err, "get cluster, cluster name:%s", clusterName)
}

ret := make(map[uint32]*ShardTables, len(shardIDs))
for shardID, shardTables := range shardTablesWithRole {
tableInfos := make([]*TableInfo, 0, len(shardTables.tables))

for _, t := range shardTables.tables {
tableInfos = append(tableInfos, &TableInfo{
ID: t.meta.GetId(), Name: t.meta.GetName(),
SchemaID: t.schema.GetId(), SchemaName: t.schema.GetName(),
})
}
ret[shardID] = &ShardTables{Shard: shardTables.shard, Tables: tableInfos}
}
return ret, nil
shardTables := cluster.GetTables(shardIDs, nodeName)
return shardTables, nil
}

func (m *managerImpl) DropTable(ctx context.Context, clusterName, schemaName, tableName string) error {
cluster, err := m.getCluster(clusterName)
if err != nil {
log.Error("cluster not found", zap.Error(err))
return errors.WithMessage(err, "cluster manager DropTable")
return errors.WithMessage(err, "cluster manager drop table")
}

_, err = cluster.DropTable(ctx, schemaName, tableName)
if err != nil {
return errors.WithMessagef(err, "cluster manager DropTable, clusterName:%s, schemaName:%s, tableName:%s",
return errors.WithMessagef(err, "cluster manager drop table, clusterName:%s, schemaName:%s, tableName:%s",
clusterName, schemaName, tableName)
}

return nil
}

func (m *managerImpl) RegisterNode(ctx context.Context, clusterName string, nodeInfo *metaservicepb.NodeInfo) error {
func (m *managerImpl) RegisterNode(ctx context.Context, clusterName string, node storage.Node, shardInfos []ShardInfo) error {
m.lock.RLock()
defer m.lock.RUnlock()

Expand All @@ -252,7 +205,7 @@ func (m *managerImpl) RegisterNode(ctx context.Context, clusterName string, node
log.Error("cluster not found", zap.Error(err))
return errors.WithMessage(err, "cluster manager RegisterNode")
}
err = cluster.RegisterNode(ctx, nodeInfo)
err = cluster.RegisterNode(ctx, node, shardInfos)
if err != nil {
return errors.WithMessage(err, "cluster manager RegisterNode")
}
Expand Down Expand Up @@ -287,20 +240,6 @@ func (m *managerImpl) GetRegisteredNode(_ context.Context, clusterName string, n
return &nodeInfo, nil
}

func (m *managerImpl) GetShardIDs(_ context.Context, clusterName, nodeName string) ([]uint32, error) {
cluster, err := m.getCluster(clusterName)
if err != nil {
log.Error("cluster not found", zap.Error(err))
return nil, errors.WithMessage(err, "cluster manager GetShards")
}

shardIDs, err := cluster.GetShardIDsByNode(nodeName)
if err != nil {
return nil, errors.WithMessage(err, "cluster manager GetShards")
}
return shardIDs, nil
}

func (m *managerImpl) getCluster(clusterName string) (*Cluster, error) {
m.lock.RLock()
cluster, ok := m.clusters[clusterName]
Expand Down Expand Up @@ -337,7 +276,7 @@ func (m *managerImpl) Start(ctx context.Context) error {
m.clusters = make(map[string]*Cluster, len(clusters.Clusters))
for _, cluster := range clusters.Clusters {
cluster := NewCluster(cluster, m.storage, m.kv, m.rootPath, m.idAllocatorStep)
if err := cluster.Load(ctx); err != nil {
if err := cluster.load(ctx); err != nil {
log.Error("fail to load cluster", zap.String("cluster", cluster.Name()), zap.Error(err))
return errors.WithMessagef(err, "fail to load cluster:%v", cluster.Name())
}
Expand All @@ -362,31 +301,31 @@ func (m *managerImpl) Stop(_ context.Context) error {
return nil
}

func (m *managerImpl) RouteTables(ctx context.Context, clusterName, schemaName string, tableNames []string) (*RouteTablesResult, error) {
func (m *managerImpl) RouteTables(ctx context.Context, clusterName, schemaName string, tableNames []string) (RouteTablesResult, error) {
cluster, err := m.getCluster(clusterName)
if err != nil {
log.Error("cluster not found", zap.Error(err))
return nil, errors.WithMessage(err, "cluster manager routeTables")
return RouteTablesResult{}, errors.WithMessage(err, "cluster manager routeTables")
}

ret, err := cluster.RouteTables(ctx, schemaName, tableNames)
if err != nil {
log.Error("cluster manager RouteTables", zap.Error(err))
return nil, errors.WithMessage(err, "cluster manager routeTables")
return RouteTablesResult{}, errors.WithMessage(err, "cluster manager routeTables")
}

return ret, nil
}

func (m *managerImpl) GetNodeShards(ctx context.Context, clusterName string) (*GetNodeShardsResult, error) {
func (m *managerImpl) GetNodeShards(ctx context.Context, clusterName string) (GetNodeShardsResult, error) {
cluster, err := m.getCluster(clusterName)
if err != nil {
return nil, errors.WithMessage(err, "cluster manager GetNodes")
return GetNodeShardsResult{}, errors.WithMessage(err, "cluster manager GetNodes")
}

ret, err := cluster.GetNodeShards(ctx)
if err != nil {
return nil, errors.WithMessage(err, "cluster manager GetNodes")
return GetNodeShardsResult{}, errors.WithMessage(err, "cluster manager GetNodes")
}

return ret, nil
Expand Down
8 changes: 4 additions & 4 deletions server/cluster/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import (

type RegisteredNode struct {
meta *clusterpb.Node
shardInfos []*ShardInfo
shardInfos []ShardInfo
}

func NewRegisteredNode(meta *clusterpb.Node, shardInfos []*ShardInfo) *RegisteredNode {
return &RegisteredNode{
func NewRegisteredNode(meta *clusterpb.Node, shardInfos []ShardInfo) RegisteredNode {
return RegisteredNode{
meta,
shardInfos,
}
}

func (n *RegisteredNode) GetShardInfos() []*ShardInfo {
func (n *RegisteredNode) GetShardInfos() []ShardInfo {
return n.shardInfos
}

Expand Down
28 changes: 0 additions & 28 deletions server/cluster/schema.go

This file was deleted.

53 changes: 0 additions & 53 deletions server/cluster/shard.go

This file was deleted.

Loading

0 comments on commit caef7af

Please sign in to comment.