Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
fix: wrong shard id when drop non-exist table (#230)
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi authored Sep 19, 2023
1 parent c47a525 commit 07a50fc
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 32 deletions.
8 changes: 6 additions & 2 deletions server/coordinator/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,14 @@ func (f *Factory) makeCreatePartitionTableProcedure(ctx context.Context, request
})
}

func (f *Factory) CreateDropTableProcedure(ctx context.Context, request DropTableRequest) (procedure.Procedure, error) {
// CreateDropTableProcedure creates a procedure to do drop table.
//
// And if no error is thrown, the returned boolean value is used to tell whether the procedure is created.
// In some cases, e.g. the table doesn't exist, it should not be an error and false will be returned.
func (f *Factory) CreateDropTableProcedure(ctx context.Context, request DropTableRequest) (procedure.Procedure, bool, error) {
id, err := f.allocProcedureID(ctx)
if err != nil {
return nil, err
return nil, false, err
}

snapshot := request.ClusterMetadata.GetClusterSnapshot()
Expand Down
12 changes: 7 additions & 5 deletions server/coordinator/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func TestDropTable(t *testing.T) {
re := require.New(t)
ctx := context.Background()
f, m := setupFactory(t)
// Create normal table procedure.
p, err := f.CreateDropTableProcedure(ctx, coordinator.DropTableRequest{
// Drop normal table procedure.
p, ok, err := f.CreateDropTableProcedure(ctx, coordinator.DropTableRequest{
ClusterMetadata: m,
ClusterSnapshot: m.GetClusterSnapshot(),
SourceReq: &metaservicepb.DropTableRequest{
Expand All @@ -92,11 +92,11 @@ func TestDropTable(t *testing.T) {
OnFailed: nil,
})
re.NoError(err)
re.Equal(procedure.DropTable, p.Typ())
re.Equal(procedure.StateInit, string(p.State()))
re.False(ok)
re.Nil(p)

// Create partition table procedure.
_, err = f.CreateDropTableProcedure(ctx, coordinator.DropTableRequest{
p, ok, err = f.CreateDropTableProcedure(ctx, coordinator.DropTableRequest{
ClusterMetadata: m,
ClusterSnapshot: m.GetClusterSnapshot(),
SourceReq: &metaservicepb.DropTableRequest{
Expand All @@ -113,6 +113,8 @@ func TestDropTable(t *testing.T) {
})
// Drop non-existing partition table.
re.Error(err)
re.False(ok)
re.Nil(p)
}

func TestTransferLeader(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type ProcedureParams struct {
OnFailed func(error) error
}

func NewProcedure(params ProcedureParams) (*Procedure, error) {
func NewProcedure(params ProcedureParams) (procedure.Procedure, error) {
relatedVersionInfo, err := buildRelatedVersionInfo(params)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ func testDropPartitionTable(t *testing.T, dispatch eventdispatch.Dispatch, c *cl
}, Storage: s,
}

procedure, err := droppartitiontable.NewProcedure(req)
procedure, ok, err := droppartitiontable.NewProcedure(req)
re.NoError(err)
re.True(ok)
err = procedure.Start(context.Background())
re.NoError(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,22 @@ type ProcedureParams struct {
OnFailed func(error) error
}

func NewProcedure(params ProcedureParams) (*Procedure, error) {
func NewProcedure(params ProcedureParams) (*Procedure, bool, error) {
fsm := fsm.NewFSM(
stateBegin,
createDropPartitionTableEvents,
createDropPartitionTableCallbacks,
)
relatedVersionInfo, err := buildRelatedVersionInfo(params)
if err != nil {
return nil, err
return nil, false, err
}

return &Procedure{
fsm: fsm,
params: params,
relatedVersionInfo: relatedVersionInfo,
}, nil
}, true, nil
}

func buildRelatedVersionInfo(params ProcedureParams) (procedure.RelatedVersionInfo, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func testCreateTable(t *testing.T, dispatch eventdispatch.Dispatch, c *cluster.C
func testDropTable(t *testing.T, dispatch eventdispatch.Dispatch, c *cluster.Cluster, nodeName, tableName string) {
re := require.New(t)
// New DropTableProcedure to drop table.
procedure, err := droptable.NewDropTableProcedure(droptable.ProcedureParams{
procedure, ok, err := droptable.NewDropTableProcedure(droptable.ProcedureParams{
ID: 0,
Dispatch: dispatch,
ClusterMetadata: c.GetMetadata(),
Expand All @@ -131,6 +131,7 @@ func testDropTable(t *testing.T, dispatch eventdispatch.Dispatch, c *cluster.Clu
},
})
re.NoError(err)
re.True(ok)
err = procedure.Start(context.Background())
re.NoError(err)
}
36 changes: 18 additions & 18 deletions server/coordinator/procedure/ddl/droptable/drop_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,25 @@ type ProcedureParams struct {
OnFailed func(error) error
}

func NewDropTableProcedure(params ProcedureParams) (procedure.Procedure, error) {
shardID, err := validateTable(params)
func NewDropTableProcedure(params ProcedureParams) (procedure.Procedure, bool, error) {
table, exists, err := params.ClusterMetadata.GetTable(params.SourceReq.GetSchemaName(), params.SourceReq.GetName())
if err != nil {
log.Error("get table", zap.Error(err))
return nil, false, err
}
if !exists {
log.Warn("drop non-existing table", zap.String("schema", params.SourceReq.GetSchemaName()), zap.String("table", params.SourceReq.GetName()))
return nil, false, nil
}

shardID, err := findShardID(table.ID, params)
if err != nil {
return nil, err
return nil, false, err
}

relatedVersionInfo, err := buildRelatedVersionInfo(params, shardID)
if err != nil {
return nil, err
return nil, false, err
}

fsm := fsm.NewFSM(
Expand All @@ -145,7 +155,7 @@ func NewDropTableProcedure(params ProcedureParams) (procedure.Procedure, error)
relatedVersionInfo: relatedVersionInfo,
params: params,
state: procedure.StateInit,
}, nil
}, true, nil
}

func buildRelatedVersionInfo(params ProcedureParams, shardID storage.ShardID) (procedure.RelatedVersionInfo, error) {
Expand All @@ -162,20 +172,10 @@ func buildRelatedVersionInfo(params ProcedureParams, shardID storage.ShardID) (p
}, nil
}

func validateTable(params ProcedureParams) (storage.ShardID, error) {
table, exists, err := params.ClusterMetadata.GetTable(params.SourceReq.GetSchemaName(), params.SourceReq.GetName())
if err != nil {
log.Error("get table", zap.Error(err))
return 0, err
}
if !exists {
log.Error("drop non-existing table", zap.String("schema", params.SourceReq.GetSchemaName()), zap.String("table", params.SourceReq.GetName()))
return 0, err
}

func findShardID(tableID storage.TableID, params ProcedureParams) (storage.ShardID, error) {
for _, shardView := range params.ClusterSnapshot.Topology.ShardViewsMapping {
for _, tableID := range shardView.TableIDs {
if table.ID == tableID {
for _, id := range shardView.TableIDs {
if tableID == id {
return shardView.ShardID, nil
}
}
Expand Down
7 changes: 6 additions & 1 deletion server/service/grpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (s *Service) DropTable(ctx context.Context, req *metaservicepb.DropTableReq
errorCh <- err
return nil
}
procedure, err := c.GetProcedureFactory().CreateDropTableProcedure(ctx, coordinator.DropTableRequest{
procedure, ok, err := c.GetProcedureFactory().CreateDropTableProcedure(ctx, coordinator.DropTableRequest{
ClusterMetadata: c.GetMetadata(),
ClusterSnapshot: c.GetMetadata().GetClusterSnapshot(),
SourceReq: req,
Expand All @@ -268,6 +268,11 @@ func (s *Service) DropTable(ctx context.Context, req *metaservicepb.DropTableReq
log.Error("fail to drop table", zap.Error(err))
return &metaservicepb.DropTableResponse{Header: responseHeader(err, "drop table")}, nil
}
if !ok {
log.Warn("table may have been dropped already")
return &metaservicepb.DropTableResponse{Header: okResponseHeader()}, nil
}

err = c.GetProcedureManager().Submit(ctx, procedure)
if err != nil {
log.Error("fail to drop table, manager submit procedure", zap.Error(err))
Expand Down

0 comments on commit 07a50fc

Please sign in to comment.