Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

WIP: feat: support transfer table #289

Draft
wants to merge 2 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,6 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zuliangwang/ceresdbproto/golang v0.0.0-20231106082618-b7e1fc49a3de h1:AMfL1AEmlDt+gvePve1U8ly52BELslBVmea0gk20B/Y=
github.com/zuliangwang/ceresdbproto/golang v0.0.0-20231106082618-b7e1fc49a3de/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
Expand Down
21 changes: 21 additions & 0 deletions server/cluster/metadata/cluster_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,23 @@ func (c *ClusterMetadata) CreateTableMetadata(ctx context.Context, request Creat
return res, nil
}

func (c *ClusterMetadata) RemoveTableTopology(ctx context.Context, shardID storage.ShardID, version uint64, tableID storage.TableID) error {
c.logger.Info("remove table topology start", zap.String("cluster", c.Name()), zap.Uint64("tableID", uint64(tableID)))

if !c.ensureClusterStable() {
return errors.WithMessage(ErrClusterStateInvalid, "invalid cluster state, cluster state must be stable")
}

// Remove table from topology manager.
err := c.topologyManager.RemoveTable(ctx, shardID, version, []storage.TableID{tableID})
if err != nil {
return errors.WithMessage(err, "topology manager remove table")
}

c.logger.Info("remove table topology succeed", zap.String("cluster", c.Name()), zap.Uint64("tableID", uint64(tableID)))
return nil
}

func (c *ClusterMetadata) AddTableTopology(ctx context.Context, shardVersionUpdate ShardVersionUpdate, table storage.Table) error {
c.logger.Info("add table topology start", zap.String("cluster", c.Name()), zap.String("tableName", table.Name))

Expand Down Expand Up @@ -705,6 +722,10 @@ func (c *ClusterMetadata) GetTablesByIDs(tableIDs []storage.TableID) []storage.T
return c.tableManager.GetTablesByIDs(tableIDs)
}

func (c *ClusterMetadata) Topology() Topology {
return c.topologyManager.GetTopology()
}

func needUpdate(oldCache RegisteredNode, registeredNode RegisteredNode) bool {
if len(oldCache.ShardInfos) >= 50 {
return !sortCompare(oldCache.ShardInfos, registeredNode.ShardInfos)
Expand Down
1 change: 1 addition & 0 deletions server/cluster/metadata/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ var (
ErrTableAlreadyExists = coderr.NewCodeError(coderr.Internal, "table already exists")
ErrOpenTable = coderr.NewCodeError(coderr.Internal, "open table")
ErrParseTopologyType = coderr.NewCodeError(coderr.Internal, "parse topology type")
ErrTransferTable = coderr.NewCodeError(coderr.Internal, "transfer table")
)
7 changes: 7 additions & 0 deletions server/cluster/metadata/topology_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ func (t *Topology) IsPrepareFinished() bool {
return true
}

func (t *Topology) ShardVersion(shardID storage.ShardID) (uint64, error) {
if shardView, ok := t.ShardViewsMapping[shardID]; ok {
return shardView.Version, nil
}
return 0, errors.Errorf("shard not found, shardID:%d", shardID)
}

type TopologyManagerImpl struct {
logger *zap.Logger
storage storage.Storage
Expand Down
54 changes: 53 additions & 1 deletion server/cluster/metadata/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ type ShardNodeWithVersion struct {
ShardNode storage.ShardNode
}

func (s ShardNodeWithVersion) ShardID() storage.ShardID {
return s.ShardInfo.ID
}

func (s ShardNodeWithVersion) NodeName() string {
return s.ShardNode.NodeName
}

func (s ShardNodeWithVersion) Version() uint64 {
return s.ShardInfo.Version
}

type CreateClusterOpts struct {
NodeCount uint32
ShardTotal uint32
Expand Down Expand Up @@ -139,7 +151,8 @@ type ShardVersionUpdate struct {
}

type RouteEntry struct {
Table TableInfo
Table TableInfo
// Currently, we only support one shard per table.
NodeShards []ShardNodeWithVersion
}

Expand All @@ -148,6 +161,45 @@ type RouteTablesResult struct {
RouteEntries map[string]RouteEntry
}

func (r RouteTablesResult) GetTableInfo(tableName string) (TableInfo, bool) {
var tableInfo TableInfo
entry, ok := r.RouteEntries[tableName]
if !ok {
return tableInfo, false
}

tableInfo = entry.Table
return tableInfo, true
}

func (r RouteTablesResult) GetShardID(tableName string) (storage.ShardID, bool) {
entry, ok := r.RouteEntries[tableName]
if !ok {
return 0, false
}

if len(entry.NodeShards) == 0 {
return 0, false
}

// Return the first shard id.
return entry.NodeShards[0].ShardInfo.ID, true
}

func (r RouteTablesResult) GetNodeName(tableName string) (string, bool) {
entry, ok := r.RouteEntries[tableName]
if !ok {
return "", false
}

if len(entry.NodeShards) == 0 {
return "", false
}

// Return the first shard id.
return entry.NodeShards[0].ShardNode.NodeName, true
}

type GetNodeShardsResult struct {
ClusterTopologyVersion uint64
NodeShards []ShardNodeWithVersion
Expand Down
25 changes: 23 additions & 2 deletions server/coordinator/eventdispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package eventdispatch
import (
"context"

"github.com/CeresDB/ceresdbproto/golang/pkg/metaeventpb"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
)

Expand All @@ -27,8 +28,8 @@ type Dispatch interface {
CloseShard(context context.Context, address string, request CloseShardRequest) error
CreateTableOnShard(context context.Context, address string, request CreateTableOnShardRequest) (uint64, error)
DropTableOnShard(context context.Context, address string, request DropTableOnShardRequest) (uint64, error)
OpenTableOnShard(ctx context.Context, address string, request OpenTableOnShardRequest) error
CloseTableOnShard(context context.Context, address string, request CloseTableOnShardRequest) error
OpenTableOnShard(ctx context.Context, address string, request OpenTableOnShardRequest) (OpenTableOnShardResponse, error)
CloseTableOnShard(context context.Context, address string, request CloseTableOnShardRequest) (CloseTableOnShardResponse, error)
}

type OpenShardRequest struct {
Expand Down Expand Up @@ -62,7 +63,27 @@ type OpenTableOnShardRequest struct {
TableInfo metadata.TableInfo
}

type OpenTableOnShardResponse struct {
Version uint64
}

type CloseTableOnShardRequest struct {
UpdateShardInfo UpdateShardInfo
TableInfo metadata.TableInfo
}

type CloseTableOnShardResponse struct {
Version uint64
}

func convertOpenTableOnShardResponse(resp *metaeventpb.OpenTableOnShardResponse) OpenTableOnShardResponse {
return OpenTableOnShardResponse{
Version: 1,
}
}

func convertCloseTableOnShardResponse(resp *metaeventpb.CloseTableOnShardResponse) CloseTableOnShardResponse {
return CloseTableOnShardResponse{
Version: 1,
}
}
24 changes: 14 additions & 10 deletions server/coordinator/eventdispatch/dispatch_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,36 +104,40 @@ func (d *DispatchImpl) DropTableOnShard(ctx context.Context, addr string, reques
return resp.GetLatestShardVersion(), nil
}

func (d *DispatchImpl) OpenTableOnShard(ctx context.Context, addr string, request OpenTableOnShardRequest) error {
func (d *DispatchImpl) OpenTableOnShard(ctx context.Context, addr string, request OpenTableOnShardRequest) (OpenTableOnShardResponse, error) {
var ret OpenTableOnShardResponse
client, err := d.getMetaEventClient(ctx, addr)
if err != nil {
return err
return ret, err
}

resp, err := client.OpenTableOnShard(ctx, convertOpenTableOnShardRequestToPB(request))
if err != nil {
return errors.WithMessagef(err, "open table on shard, addr:%s, request:%v", addr, request)
return ret, errors.WithMessagef(err, "open table on shard, addr:%s, request:%v", addr, request)
}
if resp.GetHeader().Code != 0 {
return ErrDispatch.WithCausef("open table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError())
return ret, ErrDispatch.WithCausef("open table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError())
}
return nil
ret = convertOpenTableOnShardResponse(resp)
return ret, nil
}

func (d *DispatchImpl) CloseTableOnShard(ctx context.Context, addr string, request CloseTableOnShardRequest) error {
func (d *DispatchImpl) CloseTableOnShard(ctx context.Context, addr string, request CloseTableOnShardRequest) (CloseTableOnShardResponse, error) {
var ret CloseTableOnShardResponse
client, err := d.getMetaEventClient(ctx, addr)
if err != nil {
return err
return ret, err
}

resp, err := client.CloseTableOnShard(ctx, convertCloseTableOnShardRequestToPB(request))
if err != nil {
return errors.WithMessagef(err, "close table on shard, addr:%s, request:%v", addr, request)
return ret, errors.WithMessagef(err, "close table on shard, addr:%s, request:%v", addr, request)
}
if resp.GetHeader().Code != 0 {
return ErrDispatch.WithCausef("close table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError())
return ret, ErrDispatch.WithCausef("close table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError())
}
return nil
ret = convertCloseTableOnShardResponse(resp)
return ret, nil
}

func (d *DispatchImpl) getGrpcClient(ctx context.Context, addr string) (*grpc.ClientConn, error) {
Expand Down
29 changes: 29 additions & 0 deletions server/coordinator/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/CeresDB/ceresmeta/server/coordinator/procedure/ddl/droptable"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/split"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/transferleader"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/transfertable"
"github.com/CeresDB/ceresmeta/server/id"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/pkg/errors"
Expand Down Expand Up @@ -98,6 +99,16 @@ type BatchRequest struct {
BatchType procedure.Typ
}

type TransferTableRequest struct {
ClusterMetadata *metadata.ClusterMetadata
SchemaName string
TableName string
DestShardID storage.ShardID

OnSucceeded func() error
OnFailed func(error) error
}

func NewFactory(logger *zap.Logger, allocator id.Allocator, dispatch eventdispatch.Dispatch, storage procedure.Storage) *Factory {
return &Factory{
idAllocator: allocator,
Expand Down Expand Up @@ -280,6 +291,24 @@ func (f *Factory) CreateBatchTransferLeaderProcedure(ctx context.Context, reques
return transferleader.NewBatchTransferLeaderProcedure(id, request.Batch)
}

func (f *Factory) CreateTransferTableProcedure(ctx context.Context, request TransferTableRequest) (procedure.Procedure, error) {
id, err := f.allocProcedureID(ctx)
if err != nil {
return nil, err
}

return transfertable.NewProcedure(transfertable.ProcedureParams{
Dispatch: f.dispatch,
ClusterMetadata: request.ClusterMetadata,
ID: id,
SchemaName: request.SchemaName,
TableName: request.TableName,
DestShardID: request.DestShardID,
OnSucceeded: request.OnSucceeded,
OnFailed: request.OnFailed,
})
}

func (f *Factory) allocProcedureID(ctx context.Context) (uint64, error) {
id, err := f.idAllocator.Alloc(ctx)
if err != nil {
Expand Down
Loading
Loading