Skip to content

Commit

Permalink
fix: scheduler concurrent dead lock (#509)
Browse files Browse the repository at this point in the history
* fix scheduler concurrent dead lock

Signed-off-by: santong <weipeng.swp@alibaba-inc.com>
  • Loading branch information
244372610 committed Jul 29, 2021
1 parent 70e4207 commit a5e4b0e
Show file tree
Hide file tree
Showing 24 changed files with 243 additions and 117 deletions.
1 change: 0 additions & 1 deletion cdnsystem/server/service/cdn_seed_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ func (css *CdnSeedServer) GetPieceTasks(ctx context.Context, req *base.PieceTask
return nil, dferrors.Newf(dfcodes.BadRequest, "failed to validate seed request for task(%s): %v", req.TaskId, err)
}
task, err := css.taskMgr.Get(req.TaskId)
logger.Debugf("task: %+v", task)
if err != nil {
if cdnerrors.IsDataNotFound(err) {
return nil, dferrors.Newf(dfcodes.CdnTaskNotFound, "failed to get task(%s) from cdn: %v", req.TaskId, err)
Expand Down
4 changes: 2 additions & 2 deletions client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ func New(opt *config.DaemonOption) (Daemon, error) {
PeerId: request.PeerID,
})
if er != nil {
logger.Errorf("leave task %s/%s, error: %v", request.TaskID, request.PeerID, er)
logger.Errorf("step 4:leave task %s/%s, error: %v", request.TaskID, request.PeerID, er)
} else {
logger.Infof("leave task %s/%s state ok", request.TaskID, request.PeerID)
logger.Infof("step 4:leave task %s/%s state ok", request.TaskID, request.PeerID)
}
})
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions client/daemon/peer/peertask_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,8 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer
span.RecordError(getErr)
// fast way to exit retry
if curPeerPacket != pt.peerPacket {
pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet", getErr)
pt.Warnf("get piece tasks with error: %s, but peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s", getErr,
curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId)
peerPacketChanged = true
return nil, true, nil
}
Expand All @@ -709,13 +710,14 @@ func (pt *peerTask) getPieceTasks(span trace.Span, curPeerPacket *scheduler.Peer
}
// fast way to exit retry
if curPeerPacket != pt.peerPacket {
pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet")
pt.Warnf("get empty pieces and peer packet changed, switch to new peer packet, current destPeer %s, new destPeer %s",
curPeerPacket.MainPeer.PeerId, pt.peerPacket.MainPeer.PeerId)
peerPacketChanged = true
return nil, true, nil
}
span.AddEvent("retry due to empty pieces",
trace.WithAttributes(config.AttributeGetPieceRetry.Int(count)))
pt.Warnf("peer %s returns success but with empty pieces, retry later", peer.PeerId)
pt.Infof("peer %s returns success but with empty pieces, retry later", peer.PeerId)
return nil, false, dferrors.ErrEmptyValue
}
return pp, false, nil
Expand Down
9 changes: 6 additions & 3 deletions client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,13 @@ func newFilePeerTask(ctx context.Context,
// trace register
_, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
result, err := schedulerClient.RegisterPeerTask(ctx, request)
logger.Infof("step 1: peer %s start to register", request.PeerId)
regSpan.RecordError(err)
regSpan.End()

var backSource bool
if err != nil {
logger.Errorf("step 1: peer %s register failed: err", request.PeerId, err)
// check if it is back source error
if de, ok := err.(*dferrors.DfError); ok && de.Code == dfcodes.SchedNeedBackSource {
backSource = true
Expand All @@ -102,18 +104,17 @@ func newFilePeerTask(ctx context.Context,
if !backSource {
span.RecordError(err)
span.End()
logger.Errorf("register peer task failed: %s, peer id: %s", err, request.PeerId)
return ctx, nil, nil, err
}
}
if result == nil {
defer span.End()
span.RecordError(err)
err = errors.Errorf("empty schedule result")
err = errors.Errorf("step 1: peer register result is nil")
return ctx, nil, nil, err
}
span.SetAttributes(config.AttributeTaskID.String(result.TaskId))
logger.Infof("register task success, task id: %s, peer id: %s, SizeScope: %s",
logger.Infof("step 1: register task success, task id: %s, peer id: %s, SizeScope: %s",
result.TaskId, request.PeerId, base.SizeScope_name[int32(result.SizeScope)])

var singlePiece *scheduler.SinglePiece
Expand Down Expand Up @@ -147,7 +148,9 @@ func newFilePeerTask(ctx context.Context,
}

peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request)
logger.Infof("step 2: start report peer %s piece result", request.PeerId)
if err != nil {
logger.Errorf("step 2: peer %s report piece failed: err", request.PeerId, err)
defer span.End()
span.RecordError(err)
return ctx, nil, nil, err
Expand Down
8 changes: 4 additions & 4 deletions client/daemon/peer/peertask_file_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ func (p *filePeerTaskCallback) Done(pt Task) error {
Code: dfcodes.Success,
})
if err != nil {
pt.Log().Errorf("report successful peer result, error: %v", err)
pt.Log().Errorf("step 3: report successful peer result, error: %v", err)
} else {
pt.Log().Infof("report successful peer result ok")
pt.Log().Infof("step 3: report successful peer result ok")
}
return nil
}
Expand All @@ -131,9 +131,9 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro
Code: code,
})
if err != nil {
pt.Log().Errorf("report fail peer result, error: %v", err)
pt.Log().Errorf("step 3: report fail peer result, error: %v", err)
} else {
pt.Log().Infof("report fail peer result ok")
pt.Log().Infof("step 3: report fail peer result ok")
}
return nil
}
2 changes: 2 additions & 0 deletions client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func newStreamPeerTask(ctx context.Context,
// trace register
_, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
result, err := schedulerClient.RegisterPeerTask(ctx, request)
logger.Infof("step 1: peer %s start to register", request.PeerId)
regSpan.RecordError(err)
regSpan.End()

Expand Down Expand Up @@ -128,6 +129,7 @@ func newStreamPeerTask(ctx context.Context,
}

peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request)
logger.Infof("step 2: start report peer %s piece result", request.PeerId)
if err != nil {
defer span.End()
span.RecordError(err)
Expand Down
8 changes: 4 additions & 4 deletions client/daemon/peer/peertask_stream_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ func (p *streamPeerTaskCallback) Done(pt Task) error {
Code: dfcodes.Success,
})
if err != nil {
pt.Log().Errorf("report successful peer result, error: %v", err)
pt.Log().Errorf("step 3: report successful peer result, error: %v", err)
} else {
pt.Log().Infof("report successful peer result ok")
pt.Log().Infof("step 3: report successful peer result ok")
}
return nil
}
Expand All @@ -129,9 +129,9 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er
Code: code,
})
if err != nil {
pt.Log().Errorf("report fail peer result, error: %v", err)
pt.Log().Errorf("step 3: report fail peer result, error: %v", err)
} else {
pt.Log().Infof("report fail peer result ok")
pt.Log().Infof("step 3: report fail peer result ok")
}
return nil
}
2 changes: 1 addition & 1 deletion client/daemon/service/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (m *manager) GetPieceTasks(ctx context.Context, request *base.PieceTaskRequ
return nil, dferrors.New(code, err.Error())
}

logger.Warnf("try to get piece tasks, "+
logger.Infof("try to get piece tasks, "+
"but target peer task is initializing, "+
"there is no available pieces, "+
"task id: %s, src peer: %s, dst peer: %s, piece num: %d, limit: %d",
Expand Down
3 changes: 2 additions & 1 deletion internal/dfcodes/rpc_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ const (
ClientRequestLimitFail base.Code = 4006

// scheduler response error 5000-5999
SchedError base.Code = 5000
SchedError base.Code = 5000
/** @deprecated */
SchedNeedBackSource base.Code = 5001 // client should try to download from source
SchedPeerGone base.Code = 5002 // client should disconnect from scheduler
SchedPeerRegisterFail base.Code = 5003
Expand Down
66 changes: 39 additions & 27 deletions pkg/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"d7y.io/dragonfly/v2/pkg/synclock"
"github.com/pkg/errors"
"github.com/serialx/hashring"
"google.golang.org/grpc"
Expand Down Expand Up @@ -55,7 +54,7 @@ type ConnStatus string
type Connection struct {
ctx context.Context
cancelFun context.CancelFunc
rwMutex *synclock.LockerPool
rwMutex sync.RWMutex
dialOpts []grpc.DialOption
key2NodeMap sync.Map // key -> node(many to one)
node2ClientMap sync.Map // node -> clientConn(one to one)
Expand All @@ -75,11 +74,7 @@ func newDefaultConnection(ctx context.Context) *Connection {
return &Connection{
ctx: childCtx,
cancelFun: cancel,
rwMutex: synclock.NewLockerPool(),
dialOpts: defaultClientOpts,
key2NodeMap: sync.Map{},
node2ClientMap: sync.Map{},
accessNodeMap: sync.Map{},
connExpireTime: defaultConnExpireTime,
gcConnTimeout: defaultGcConnTimeout,
gcConnInterval: defaultGcConnInterval,
Expand Down Expand Up @@ -169,10 +164,10 @@ func (conn *Connection) CorrectKey2NodeRelation(tmpHashKey, realHashKey string)
if tmpHashKey == realHashKey {
return
}
conn.rwMutex.Lock()
defer conn.rwMutex.Unlock()
key, _ := conn.key2NodeMap.Load(tmpHashKey)
serverNode := key.(string)
conn.rwMutex.Lock(serverNode, false)
defer conn.rwMutex.UnLock(serverNode, false)
conn.key2NodeMap.Store(realHashKey, serverNode)
conn.key2NodeMap.Delete(tmpHashKey)
}
Expand All @@ -197,18 +192,33 @@ func (conn *Connection) UpdateAccessNodeMapByServerNode(serverNode string) {
}

func (conn *Connection) AddServerNodes(addrs []dfnet.NetAddr) error {
conn.rwMutex.Lock()
defer conn.rwMutex.Unlock()
for _, addr := range addrs {
serverNode := addr.GetEndpoint()
conn.rwMutex.Lock(serverNode, false)
conn.hashRing = conn.hashRing.AddNode(serverNode)
conn.rwMutex.UnLock(serverNode, false)
logger.With("conn", conn.name).Debugf("success add %s to server node list", addr)
}
return nil
}

// findCandidateClientConn find candidate node client conn other than exclusiveNodes
func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...string) (*candidateClient, error) {
if node, ok := conn.key2NodeMap.Load(key); ok {
candidateNode := node.(string)
selected := true
for _, exclusiveNode := range exclusiveNodes {
if exclusiveNode == candidateNode {
selected = false
}
}
if selected {
if client, ok := conn.node2ClientMap.Load(node); ok {
return client.(*candidateClient), nil
}
}
}

ringNodes, ok := conn.hashRing.GetNodes(key, conn.hashRing.Size())
if !ok {
logger.Warnf("cannot obtain expected %d server nodes", conn.hashRing.Size())
Expand All @@ -231,12 +241,10 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st
logger.With("conn", conn.name).Infof("candidate result for hash key %s: all server node list: %v, exclusiveNodes node list: %v, candidate node list: %v",
key, ringNodes, exclusiveNodes, candidateNodes)
for _, candidateNode := range candidateNodes {
conn.rwMutex.Lock(candidateNode, true)
// Check whether there is a corresponding mapping client in the node2ClientMap
// TODO 下面部分可以直接调用loadOrCreate方法,但是日志没有这么调用打印全
if client, ok := conn.node2ClientMap.Load(candidateNode); ok {
logger.With("conn", conn.name).Infof("hit cache candidateNode %s for hash key %s", candidateNode, key)
conn.rwMutex.UnLock(candidateNode, true)
return &candidateClient{
node: candidateNode,
Ref: client,
Expand All @@ -246,15 +254,13 @@ func (conn *Connection) findCandidateClientConn(key string, exclusiveNodes ...st
clientConn, err := conn.createClient(candidateNode, append(defaultClientOpts, conn.dialOpts...)...)
if err == nil {
logger.With("conn", conn.name).Infof("success connect to candidateNode %s for hash key %s", candidateNode, key)
conn.rwMutex.UnLock(candidateNode, true)
return &candidateClient{
node: candidateNode,
Ref: clientConn,
}, nil
}

logger.With("conn", conn.name).Infof("failed to connect candidateNode %s for hash key %s: %v", candidateNode, key, err)
conn.rwMutex.UnLock(candidateNode, true)
}
return nil, dferrors.ErrNoCandidateNode
}
Expand All @@ -273,6 +279,8 @@ func (conn *Connection) createClient(target string, opts ...grpc.DialOption) (*g

// GetServerNode
func (conn *Connection) GetServerNode(hashKey string) (string, bool) {
conn.rwMutex.RLock()
defer conn.rwMutex.RUnlock()
node, ok := conn.key2NodeMap.Load(hashKey)
serverNode := node.(string)
if ok {
Expand All @@ -283,8 +291,8 @@ func (conn *Connection) GetServerNode(hashKey string) (string, bool) {

func (conn *Connection) GetClientConnByTarget(node string) (*grpc.ClientConn, error) {
logger.With("conn", conn.name).Debugf("start to get client conn by target %s", node)
conn.rwMutex.Lock(node, true)
defer conn.rwMutex.UnLock(node, true)
conn.rwMutex.RLock()
defer conn.rwMutex.RUnlock()
clientConn, err := conn.loadOrCreateClientConnByNode(node)
if err != nil {
return nil, errors.Wrapf(err, "get client conn by conn %s", node)
Expand Down Expand Up @@ -322,30 +330,32 @@ func (conn *Connection) loadOrCreateClientConnByNode(node string) (clientConn *g
// stick whether hash key need already associated with specify node
func (conn *Connection) GetClientConn(hashKey string, stick bool) (*grpc.ClientConn, error) {
logger.With("conn", conn.name).Debugf("start to get client conn hashKey %s, stick %t", hashKey, stick)
conn.rwMutex.RLock()
node, ok := conn.key2NodeMap.Load(hashKey)
if stick && !ok {
conn.rwMutex.RUnlock()
// if request is stateful, hash key must exist in key2NodeMap
return nil, fmt.Errorf("it is a stateful request, but cannot find hash key(%s) in key2NodeMap", hashKey)
}
if ok {
// if exist
serverNode := node.(string)
conn.rwMutex.Lock(serverNode, true)
clientConn, err := conn.loadOrCreateClientConnByNode(serverNode)
conn.rwMutex.UnLock(serverNode, true)
conn.rwMutex.RUnlock()
if err != nil {
return nil, err
}
return clientConn, nil
}
logger.With("conn", conn.name).Infof("no server node associated with hash key %s was found, start find candidate", hashKey)
conn.rwMutex.RUnlock()
// if absence
conn.rwMutex.Lock()
defer conn.rwMutex.Unlock()
client, err := conn.findCandidateClientConn(hashKey)
if err != nil {
return nil, errors.Wrapf(err, "prob candidate client conn for hash key %s", hashKey)
}
conn.rwMutex.Lock(client.node, false)
defer conn.rwMutex.UnLock(client.node, false)
conn.key2NodeMap.Store(hashKey, client.node)
conn.node2ClientMap.Store(client.node, client.Ref)
conn.accessNodeMap.Store(client.node, time.Now())
Expand All @@ -363,29 +373,32 @@ func (conn *Connection) TryMigrate(key string, cause error, exclusiveNodes []str
}
}
currentNode := ""
conn.rwMutex.RLock()
if currentNode, ok := conn.key2NodeMap.Load(key); ok {
preNode = currentNode.(string)
exclusiveNodes = append(exclusiveNodes, currentNode.(string))
} else {
logger.With("conn", conn.name).Warnf("failed to find server node for hash key %s", key)
}
conn.rwMutex.RUnlock()
conn.rwMutex.Lock()
defer conn.rwMutex.Unlock()
client, err := conn.findCandidateClientConn(key, exclusiveNodes...)
if err != nil {
return "", errors.Wrapf(err, "find candidate client conn for hash key %s", key)
}
logger.With("conn", conn.name).Infof("successfully migrate hash key %s from server node %s to %s", key, currentNode, client.node)
conn.rwMutex.Lock(client.node, false)
defer conn.rwMutex.UnLock(client.node, false)
conn.key2NodeMap.Store(key, client.node)
conn.node2ClientMap.Store(client.node, client.Ref)
conn.accessNodeMap.Store(client.node, time.Now())
return
}

func (conn *Connection) Close() error {
conn.rwMutex.Lock()
defer conn.rwMutex.Unlock()
for i := range conn.serverNodes {
serverNode := conn.serverNodes[i].GetEndpoint()
conn.rwMutex.Lock(serverNode, false)
conn.hashRing.RemoveNode(serverNode)
value, ok := conn.node2ClientMap.Load(serverNode)
if ok {
Expand All @@ -406,19 +419,18 @@ func (conn *Connection) Close() error {
return true
})
conn.accessNodeMap.Delete(serverNode)
conn.rwMutex.UnLock(serverNode, false)
}
conn.cancelFun()
return nil
}

func (conn *Connection) UpdateState(addrs []dfnet.NetAddr) {
// TODO lock
conn.serverNodes = addrs
var addresses []string
for _, addr := range addrs {
addresses = append(addresses, addr.GetEndpoint())
}

conn.rwMutex.Lock()
defer conn.rwMutex.Unlock()
conn.serverNodes = addrs
conn.hashRing = hashring.New(addresses)
}

0 comments on commit a5e4b0e

Please sign in to comment.