Skip to content

Commit

Permalink
[YUNIKORN-2539] Shim: Add deadlock tracking feature (#814)
Browse files Browse the repository at this point in the history
Replaces sync.{RW}Mutex with internal locking.{RW}Mutex implementations.
The new implementation wraps the go-deadlock library with logic to
conditionally enable deadlock detection based on the presence of
environment variables:

To enable the feature:

- DEADLOCK_DETECTION_ENABLED=true

To customize the timeout before potential deadlocks are logged (default
is 60 seconds):

- DEADLOCK_TIMEOUT_SECONDS=60

See https://github.com/sasha-s/go-deadlock for more details.

Closes: #814
  • Loading branch information
craigcondit committed Apr 5, 2024
1 parent 661cd7d commit 2ee5d13
Show file tree
Hide file tree
Showing 28 changed files with 184 additions and 62 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ module github.com/apache/yunikorn-k8shim
go 1.21

require (
github.com/apache/yunikorn-core v1.5.0-3
github.com/apache/yunikorn-core v0.0.0-20240405160823-c94a7d938c41
github.com/apache/yunikorn-scheduler-interface v1.5.0-1
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/looplab/fsm v1.0.1
github.com/onsi/ginkgo/v2 v2.15.0
github.com/onsi/gomega v1.30.0
github.com/prometheus/client_golang v1.18.0
github.com/sasha-s/go-deadlock v0.3.1
go.uber.org/zap v1.26.0
gopkg.in/yaml.v3 v3.0.1
gotest.tools/v3 v3.5.1
Expand Down Expand Up @@ -98,6 +99,7 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/selinux v1.11.0 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/petermattis/goid v0.0.0-20240327183114-c42a807a84ba // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
Expand Down Expand Up @@ -161,6 +163,7 @@ require (

replace (
github.com/opencontainers/runc => github.com/opencontainers/runc v1.1.12
github.com/petermattis/goid => github.com/petermattis/goid v0.0.0-20240327183114-c42a807a84ba
golang.org/x/crypto => golang.org/x/crypto v0.19.0
golang.org/x/lint => golang.org/x/lint v0.0.0-20210508222113-6edffad5e616
golang.org/x/net => golang.org/x/net v0.21.0
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cq
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df h1:7RFfzj4SSt6nnvCPbCqijJi1nWCd+TqAT3bYCStRC18=
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20230305170008-8188dc5388df/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
github.com/apache/yunikorn-core v1.5.0-3 h1:WqQhiZ7Q9JQz9FWDrY4q+BNOCDzihGSwou51lzwReiM=
github.com/apache/yunikorn-core v1.5.0-3/go.mod h1:z4jL+/iga3IpL48irH5Kb1RsQGNpS/554QWcpUkRFdA=
github.com/apache/yunikorn-core v0.0.0-20240405160823-c94a7d938c41 h1:g+vqZUconPFGzrhw0r7IJkkZ1QzAfhc75f5rsApG5tU=
github.com/apache/yunikorn-core v0.0.0-20240405160823-c94a7d938c41/go.mod h1:OWWw/VSMRy/OAHg3vje72OpO1fboOQm5mi7KNcKFSQc=
github.com/apache/yunikorn-scheduler-interface v1.5.0-1 h1:RusZkydfuZNIKx/CHP9Jf97wikQ3cxJGWYlgJ9CjRuM=
github.com/apache/yunikorn-scheduler-interface v1.5.0-1/go.mod h1:3jCo/Ash4yEmw05ozK3BihJDEEAMOZEN7rmxNfb0gO0=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
Expand Down Expand Up @@ -175,6 +175,8 @@ github.com/opencontainers/selinux v1.11.0 h1:+5Zbo97w3Lbmb3PeqQtpmTkMwsW5nRI3YaL
github.com/opencontainers/selinux v1.11.0/go.mod h1:E5dMC3VPuVvVHDYmi78qvhJp8+M586T4DlDRYpFkyec=
github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/petermattis/goid v0.0.0-20240327183114-c42a807a84ba h1:3jPgmsFGBID1wFfU2AbYocNcN4wqU68UaHSdMjiw/7U=
github.com/petermattis/goid v0.0.0-20240327183114-c42a807a84ba/go.mod h1:pxMtw7cyUw6B2bRH0ZBANSPg+AoSud1I1iyJHI69jH4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -190,6 +192,8 @@ github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3c
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0=
github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
Expand Down
4 changes: 2 additions & 2 deletions pkg/admission/conf/am_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"regexp"
"strconv"
"strings"
"sync"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
Expand All @@ -33,6 +32,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
schedulerconf "github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
)

Expand Down Expand Up @@ -105,7 +105,7 @@ type AdmissionControllerConf struct {
defaultQueueName string
configMaps []*v1.ConfigMap

lock sync.RWMutex
lock locking.RWMutex
}

func NewAdmissionControllerConf(configMaps []*v1.ConfigMap) *AdmissionControllerConf {
Expand Down
5 changes: 2 additions & 3 deletions pkg/admission/namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,19 @@
package admission

import (
"sync"

v1 "k8s.io/api/core/v1"
informersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"

"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
)

type NamespaceCache struct {
nameSpaces map[string]nsFlags

sync.RWMutex
locking.RWMutex
}

type triState int
Expand Down
5 changes: 2 additions & 3 deletions pkg/admission/priority_class_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,20 @@
package admission

import (
"sync"

schedulingv1 "k8s.io/api/scheduling/v1"
informersv1 "k8s.io/client-go/informers/scheduling/v1"
"k8s.io/client-go/tools/cache"

"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
)

type PriorityClassCache struct {
priorityClasses map[string]bool

sync.RWMutex
locking.RWMutex
}

// NewPriorityClassCache creates a new cache and registers the handler for the cache with the Informer.
Expand Down
4 changes: 2 additions & 2 deletions pkg/admission/webhook_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"crypto/x509"
"errors"
"fmt"
"sync"
"time"

"go.uber.org/zap"
Expand All @@ -37,6 +36,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/admission/conf"
"github.com/apache/yunikorn-k8shim/pkg/admission/pki"
"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
)

Expand Down Expand Up @@ -81,7 +81,7 @@ type webhookManagerImpl struct {
caKey2 *rsa.PrivateKey
expiration time.Time

sync.RWMutex
locking.RWMutex
}

// NewWebhookManager is used to create a new webhook manager
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"sort"
"strings"
"sync"

"github.com/looplab/fsm"
"go.uber.org/zap"
Expand All @@ -35,6 +34,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
"github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
Expand All @@ -53,7 +53,7 @@ type Application struct {
schedulingParamsDefinition string
placeholderOwnerReferences []metav1.OwnerReference
sm *fsm.FSM
lock *sync.RWMutex
lock *locking.RWMutex
schedulerAPI api.SchedulerAPI
placeholderAsk *si.Resource // total placeholder request for the app (all task groups)
placeholderTimeoutInSec int64
Expand Down Expand Up @@ -81,7 +81,7 @@ func NewApplication(appID, queueName, user string, groups []string, tags map[str
tags: tags,
sm: newAppState(),
taskGroups: make([]TaskGroup, 0),
lock: &sync.RWMutex{},
lock: &locking.RWMutex{},
schedulerAPI: scheduler,
placeholderTimeoutInSec: 0,
schedulingStyle: constants.SchedulingPolicyStyleParamDefault,
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"sort"
"strings"
"sync"
"testing"
"time"

Expand All @@ -41,14 +40,15 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
"github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-scheduler-interface/lib/go/api"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)

type recorderTime struct {
time int64
lock *sync.RWMutex
lock *locking.RWMutex
}

func TestNewApplication(t *testing.T) {
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestFailApplication(t *testing.T) {

rt := &recorderTime{
time: int64(0),
lock: &sync.RWMutex{},
lock: &locking.RWMutex{},
}
ms := &mockSchedulerAPI{}
// set test mode
Expand Down Expand Up @@ -662,7 +662,7 @@ func TestSetTaskGroupsAndSchedulingPolicy(t *testing.T) {

type threadSafePodsMap struct {
pods map[string]*v1.Pod
sync.RWMutex
locking.RWMutex
}

func newThreadSafePodsMap() *threadSafePodsMap {
Expand Down
5 changes: 3 additions & 2 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
schedulerconf "github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
"github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-k8shim/pkg/plugin/predicates"
"github.com/apache/yunikorn-k8shim/pkg/plugin/support"
Expand All @@ -64,7 +65,7 @@ type Context struct {
pluginMode bool // true if we are configured as a scheduler plugin
namespace string // yunikorn namespace
configMaps []*v1.ConfigMap // cached yunikorn configmaps
lock *sync.RWMutex // lock
lock *locking.RWMutex // lock
txnID atomic.Uint64 // transaction ID counter
klogger klog.Logger
}
Expand All @@ -87,7 +88,7 @@ func NewContextWithBootstrapConfigMaps(apis client.APIProvider, bootstrapConfigM
apiProvider: apis,
namespace: apis.GetAPIs().GetConf().Namespace,
configMaps: bootstrapConfigMaps,
lock: &sync.RWMutex{},
lock: &locking.RWMutex{},
klogger: klog.NewKlogr(),
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/external/scheduler_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package external

import (
"fmt"
"sync"
"sync/atomic"

"go.uber.org/zap"
Expand All @@ -35,6 +34,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
Expand Down Expand Up @@ -70,7 +70,7 @@ type SchedulerCache struct {
inProgressAllocations map[string]string // map of pod to node ID, presence indicates an in-process allocation for scheduler
schedulingTasks map[string]interface{} // list of task IDs which are currently being processed by the scheduler
pvcRefCounts map[string]map[string]int
lock sync.RWMutex
lock locking.RWMutex
clients *client.Clients // client APIs
klogger klog.Logger

Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/placeholder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ package cache

import (
"strings"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"

"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
)

Expand All @@ -43,12 +43,12 @@ type PlaceholderManager struct {
running atomic.Value
cleanupTime time.Duration
// a simple mutex will do we do not have separate read and write paths
sync.RWMutex
locking.RWMutex
}

var (
placeholderMgr *PlaceholderManager
mu sync.Mutex
mu locking.Mutex
)

func NewPlaceholderManager(clients *client.Clients) *PlaceholderManager {
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"context"
"fmt"
"strconv"
"sync"
"time"

"github.com/looplab/fsm"
Expand All @@ -35,6 +34,7 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
"github.com/apache/yunikorn-k8shim/pkg/locking"
"github.com/apache/yunikorn-k8shim/pkg/log"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
Expand All @@ -57,7 +57,7 @@ type Task struct {
originator bool
schedulingState TaskSchedulingState
sm *fsm.FSM
lock *sync.RWMutex
lock *locking.RWMutex
}

func NewTask(tid string, app *Application, ctx *Context, pod *v1.Pod) *Task {
Expand Down Expand Up @@ -101,7 +101,7 @@ func createTaskInternal(tid string, app *Application, resource *si.Resource,
context: ctx,
sm: newTaskState(),
schedulingState: TaskSchedPending,
lock: &sync.RWMutex{},
lock: &locking.RWMutex{},
}
if tgName := utils.GetTaskGroupFromPodSpec(pod); tgName != "" {
task.taskGroupName = tgName
Expand Down
5 changes: 3 additions & 2 deletions pkg/cache/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package cache

import (
"sync"
"testing"
"time"

Expand All @@ -36,6 +35,8 @@ import (
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/locking"

"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)

Expand Down Expand Up @@ -482,7 +483,7 @@ func TestHandleSubmitTaskEvent(t *testing.T) {
mockedContext.addPriorityClass(priorityClass2)
rt := &recorderTime{
time: int64(0),
lock: &sync.RWMutex{},
lock: &locking.RWMutex{},
}
conf.GetSchedulerConf().SetTestMode(true)
mr := events.NewMockedRecorder()
Expand Down
Loading

0 comments on commit 2ee5d13

Please sign in to comment.