Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
make CI happy
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 committed Dec 19, 2023
1 parent 7b42ecb commit 4df54f9
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 28 deletions.
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,6 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zuliangwang/ceresdbproto/golang v0.0.0-20231106082618-b7e1fc49a3de h1:AMfL1AEmlDt+gvePve1U8ly52BELslBVmea0gk20B/Y=
github.com/zuliangwang/ceresdbproto/golang v0.0.0-20231106082618-b7e1fc49a3de/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
Expand Down
6 changes: 4 additions & 2 deletions server/cluster/metadata/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,14 @@ type RouteTablesResult struct {
}

func (r RouteTablesResult) GetTableInfo(tableName string) (TableInfo, bool) {
var tableInfo TableInfo
entry, ok := r.RouteEntries[tableName]
if !ok {
return TableInfo{}, false
return tableInfo, false
}

return entry.Table, true
tableInfo = entry.Table
return tableInfo, true
}

func (r RouteTablesResult) GetShardID(tableName string) (storage.ShardID, bool) {
Expand Down
2 changes: 1 addition & 1 deletion server/coordinator/eventdispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package eventdispatch

import (
"context"
"github.com/CeresDB/ceresdbproto/golang/pkg/metaeventpb"

"github.com/CeresDB/ceresdbproto/golang/pkg/metaeventpb"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
)

Expand Down
20 changes: 12 additions & 8 deletions server/coordinator/eventdispatch/dispatch_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,35 +105,39 @@ func (d *DispatchImpl) DropTableOnShard(ctx context.Context, addr string, reques
}

func (d *DispatchImpl) OpenTableOnShard(ctx context.Context, addr string, request OpenTableOnShardRequest) (OpenTableOnShardResponse, error) {
var ret OpenTableOnShardResponse
client, err := d.getMetaEventClient(ctx, addr)
if err != nil {
return OpenTableOnShardResponse{}, err
return ret, err
}

resp, err := client.OpenTableOnShard(ctx, convertOpenTableOnShardRequestToPB(request))
if err != nil {
return OpenTableOnShardResponse{}, errors.WithMessagef(err, "open table on shard, addr:%s, request:%v", addr, request)
return ret, errors.WithMessagef(err, "open table on shard, addr:%s, request:%v", addr, request)
}
if resp.GetHeader().Code != 0 {
return OpenTableOnShardResponse{}, ErrDispatch.WithCausef("open table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError())
return ret, ErrDispatch.WithCausef("open table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError())
}
return convertOpenTableOnShardResponse(resp), nil
ret = convertOpenTableOnShardResponse(resp)
return ret, nil
}

func (d *DispatchImpl) CloseTableOnShard(ctx context.Context, addr string, request CloseTableOnShardRequest) (CloseTableOnShardResponse, error) {
var ret CloseTableOnShardResponse
client, err := d.getMetaEventClient(ctx, addr)
if err != nil {
return CloseTableOnShardResponse{}, err
return ret, err
}

resp, err := client.CloseTableOnShard(ctx, convertCloseTableOnShardRequestToPB(request))
if err != nil {
return CloseTableOnShardResponse{}, errors.WithMessagef(err, "close table on shard, addr:%s, request:%v", addr, request)
return ret, errors.WithMessagef(err, "close table on shard, addr:%s, request:%v", addr, request)
}
if resp.GetHeader().Code != 0 {
return CloseTableOnShardResponse{}, ErrDispatch.WithCausef("close table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError())
return ret, ErrDispatch.WithCausef("close table on shard, addr:%s, request:%v, err:%s", addr, request, resp.GetHeader().GetError())
}
return convertCloseTableOnShardResponse(resp), nil
ret = convertCloseTableOnShardResponse(resp)
return ret, nil
}

func (d *DispatchImpl) getGrpcClient(ctx context.Context, addr string) (*grpc.ClientConn, error) {
Expand Down
2 changes: 1 addition & 1 deletion server/coordinator/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package coordinator

import (
"context"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/transfertable"

"github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
Expand All @@ -30,6 +29,7 @@ import (
"github.com/CeresDB/ceresmeta/server/coordinator/procedure/ddl/droptable"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/split"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/transferleader"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure/operation/transfertable"
"github.com/CeresDB/ceresmeta/server/id"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/pkg/errors"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2023 The CeresDB Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package transfertable

import (
Expand Down Expand Up @@ -47,8 +63,7 @@ type Procedure struct {
params ProcedureParams

tableInfo metadata.TableInfo
hasSrcShardNode bool
srcShardNodeWithVersion metadata.ShardNodeWithVersion
srcShardNodeWithVersion *metadata.ShardNodeWithVersion
relatedVersionInfo procedure.RelatedVersionInfo

// Protect the state.
Expand Down Expand Up @@ -89,11 +104,9 @@ func NewProcedure(params ProcedureParams) (procedure.Procedure, error) {
}

shardWithVersion := map[storage.ShardID]uint64{}
hasSrcShardNode := false
srcShardNodeWithVersion := metadata.ShardNodeWithVersion{}
var srcShardNodeWithVersion *metadata.ShardNodeWithVersion
if len(entry.NodeShards) != 0 {
hasSrcShardNode = true
srcShardNodeWithVersion = entry.NodeShards[0]
srcShardNodeWithVersion = &entry.NodeShards[0]
shardWithVersion[srcShardNodeWithVersion.ShardID()] = srcShardNodeWithVersion.ShardInfo.Version
}

Expand All @@ -118,7 +131,6 @@ func NewProcedure(params ProcedureParams) (procedure.Procedure, error) {
fsm: transferTableOperationFsm,
params: params,
tableInfo: tableInfo,
hasSrcShardNode: hasSrcShardNode,
srcShardNodeWithVersion: srcShardNodeWithVersion,
relatedVersionInfo: relatedVersionInfo,
lock: sync.RWMutex{},
Expand Down Expand Up @@ -153,7 +165,7 @@ func (p *Procedure) Start(ctx context.Context) error {
return errors.WithMessage(err, "transferTable procedure persist")
}

if p.hasSrcShardNode {
if p.srcShardNodeWithVersion != nil {
if err := p.fsm.Event(eventCloseTable, transferTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
_ = p.params.OnFailed(err)
Expand Down Expand Up @@ -273,8 +285,10 @@ func openTableCallback(event *fsm.Event) {
return
}
nodeAddr := nodes[0].NodeName
var shardInfo metadata.ShardInfo
shardInfo.ID = shardID
openShardRequest := eventdispatch.OpenTableOnShardRequest{
UpdateShardInfo: eventdispatch.UpdateShardInfo{CurrShardInfo: metadata.ShardInfo{ID: shardID}},
UpdateShardInfo: eventdispatch.UpdateShardInfo{CurrShardInfo: shardInfo},
TableInfo: req.p.tableInfo,
}

Expand Down
8 changes: 4 additions & 4 deletions server/coordinator/procedure/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ func (m MockDispatch) DropTableOnShard(_ context.Context, _ string, _ eventdispa
return 0, nil
}

func (m MockDispatch) OpenTableOnShard(_ context.Context, _ string, _ eventdispatch.OpenTableOnShardRequest) error {
return nil
func (m MockDispatch) OpenTableOnShard(_ context.Context, _ string, _ eventdispatch.OpenTableOnShardRequest) (eventdispatch.OpenTableOnShardResponse, error) {
return eventdispatch.OpenTableOnShardResponse{Version: 0}, nil
}

func (m MockDispatch) CloseTableOnShard(_ context.Context, _ string, _ eventdispatch.CloseTableOnShardRequest) error {
return nil
func (m MockDispatch) CloseTableOnShard(_ context.Context, _ string, _ eventdispatch.CloseTableOnShardRequest) (eventdispatch.CloseTableOnShardResponse, error) {
return eventdispatch.CloseTableOnShardResponse{Version: 0}, nil
}

type MockStorage struct{}
Expand Down
2 changes: 1 addition & 1 deletion server/service/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (a *API) transferTable(req *http.Request) apiFuncResult {
}

select {
case _ = <-resultCh:
case <-resultCh:
log.Info("transfer leader succeed", zap.String("request", fmt.Sprintf("%+v", transferTableRequest)), zap.Int64("costTime", time.Since(start).Milliseconds()))
return okResult(statusSuccess)
case err = <-errorCh:
Expand Down

0 comments on commit 4df54f9

Please sign in to comment.