Skip to content

Commit

Permalink
lockset: Extract a Queue type
Browse files Browse the repository at this point in the history
This change splits the logic within lockset.Set into a separate lockset.Queue
type, which can be used to solve for dependency orderings across asynchronous
tasks on the fly.
  • Loading branch information
bobvawter committed May 24, 2024
1 parent 9b64f74 commit 164a6b3
Show file tree
Hide file tree
Showing 4 changed files with 475 additions and 196 deletions.
225 changes: 42 additions & 183 deletions internal/util/lockset/lockset.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,10 @@ func (s *Status) String() string {

// A waiter represents a request to acquire locks on some number of
// keys. Instances of this type should only be accessed while
// holding the lock on the parent Set.
// holding the parent [Set.waiterMu] lock.
type waiter[K any] struct {
fn Callback[K] // nil if already executed.
headCount int // The number of keys where this waiter is head of queue.
keys []K // Desired key set.
next *waiter[K] // The waiter that was scheduled next.
result notify.Var[*Status] // The outbox for the waiter.
scheduleStart time.Time // The time at which Schedule was called.
}
Expand All @@ -176,28 +174,15 @@ type waiter[K any] struct {
// A Set is internally synchronized and is safe for concurrent use. A
// Set should not be copied after it has been created.
type Set[K comparable] struct {
runner Runner
queue *Queue[K, *waiter[K]] // Internally synchronized.
runner Runner // Executes callbacks.
waiterMu sync.Mutex // Synchronizes access to waiters

deferredStart prometheus.Counter
execCompleteTime prometheus.Observer
execWaitTime prometheus.Observer
immediateStart prometheus.Counter
retriedTasks prometheus.Counter

mu struct {
sync.Mutex
// These waiters are used to maintain a global ordering of
// waiters to implement [RetryAtHead].
head, tail *waiter[K]

// Deadlocks between waiters are avoided since the relative
// order of enqueued waiters is maintained. That is, if
// Schedule() is called with W1 and then W2, the first waiter
// will be ahead of the second in all key queues that they have
// in common. Furthermore, first waiter is guaranteed to be
// executed, since it will be at the head of all its key queues.
queues map[K][]*waiter[K]
}
}

// New construct a Set that executes tasks using the given runner.
Expand All @@ -210,17 +195,16 @@ func New[K comparable](runner Runner, metricsLabel string) (*Set[K], error) {
if metricsLabel == "" {
return nil, errors.New("metrics label must not be empty")
}
ret := &Set[K]{
return &Set[K]{
queue: NewQueue[K, *waiter[K]](),
runner: runner,

deferredStart: deferredStart.WithLabelValues(metricsLabel),
execCompleteTime: execCompleteTime.WithLabelValues(metricsLabel),
execWaitTime: execWaitTime.WithLabelValues(metricsLabel),
immediateStart: immediateStart.WithLabelValues(metricsLabel),
retriedTasks: retriedTasks.WithLabelValues(metricsLabel),
}
ret.mu.queues = make(map[K][]*waiter[K])
return ret, nil
}, nil
}

// Schedule executes the Callback once all keys have been locked.
Expand Down Expand Up @@ -254,17 +238,27 @@ func (s *Set[K]) Schedule(keys []K, fn Callback[K]) (outcome Outcome, cancel fun
scheduleStart: scheduleStart,
}
w.result.Set(queued)
s.enqueue(w)
ready, err := s.queue.Enqueue(keys, w)
if err != nil {
w.result.Set(StatusFor(err))
return &w.result, func() {}
}
if ready {
s.immediateStart.Inc()
s.dispose(w, false)
} else {
s.deferredStart.Inc()
}
return &w.result, func() {
// Swap the callback so that it does nothing. We want to guard
// against revivifying an already completed waiter, so we
// look at whether a function is still defined.
s.mu.Lock()
s.waiterMu.Lock()
needsDispose := w.fn != nil
if needsDispose {
w.fn = func([]K) error { return context.Canceled }
}
s.mu.Unlock()
s.waiterMu.Unlock()

// Async cleanup.
if needsDispose {
Expand All @@ -273,111 +267,18 @@ func (s *Set[K]) Schedule(keys []K, fn Callback[K]) (outcome Outcome, cancel fun
}
}

// dequeue removes the waiter from all wait queues. This method will
// also update the head of queue counts for any newly-eligible waiter.
// Waiters that have reached the heads of their respective queues will
// be returned so that they may be executed.
func (s *Set[K]) dequeue(w *waiter[K]) []*waiter[K] {
s.mu.Lock()
defer s.mu.Unlock()

var ret []*waiter[K]
status, _ := w.result.Get()

// If the waiter has reached a terminal condition, clean up its
// entries.
if status.Completed() {
// Remove the waiter from each key's queue.
for _, k := range w.keys {
q := s.mu.queues[k]

// Search for the waiter in the queue. It's always going to
// be the first element in the slice, except in the
// cancellation case.
var idx int
for idx = range q {
if q[idx] == w {
break
}
}

if idx == len(q) {
panic(fmt.Sprintf("waiter not found in queue: %d", idx))
}

// If the waiter was the first in the queue (likely),
// promote the next waiter, possibly making it eligible to
// be run.
if idx == 0 {
q = q[1:]
if len(q) == 0 {
// The waiter was the only element of the queue, so
// we'll just delete the slice from the map.
delete(s.mu.queues, k)
continue
}

// Promote the next waiter. If the waiter is now at the
// head of its queues, it can be started.
head := q[0]
head.headCount++
if head.headCount == len(head.keys) {
ret = append(ret, head)
} else if head.headCount > len(head.keys) {
panic("over counted")
}
} else {
// The (canceled) waiter was in the middle of the queue,
// just remove it from the slice.
q = append(q[:idx], q[idx+1:]...)
}

// Put the shortened queue back in the map.
s.mu.queues[k] = q
}
}

// Make some progress on the global queue.
head := s.mu.head
for head != nil {
outcome, _ := head.result.Get()

// Advance the queue if the head waiter is finished.
if outcome.Completed() {
head = head.next
s.mu.head = head
continue
}

// The head has requested to be retried, so add it to the
// slice of waiters to execute. Changing the status here
// also ensures that this action is a one-shot.
if outcome == retryRequested {
head.result.Set(retryQueued)
ret = append(ret, head)
}
break
}
// If we reached the end, clear the tail field.
if head == nil {
s.mu.tail = nil
}

return ret
}

// dispose of the waiter callback in a separate goroutine. The waiter
// will be dequeued from the Set, possibly leading to cascading
// callbacks.
func (s *Set[K]) dispose(w *waiter[K], cancel bool) {
work := func(_ context.Context) {
// Clear the function reference to make the effects of dispose a
// one-shot.
s.mu.Lock()
s.waiterMu.Lock()
fn := w.fn
w.fn = nil
startedAtHead := w == s.mu.head
s.mu.Unlock()
startedAtHead := s.queue.IsHead(w)
s.waiterMu.Unlock()

// Already executed and/or canceled.
if fn == nil {
Expand Down Expand Up @@ -422,11 +323,11 @@ func (s *Set[K]) dispose(w *waiter[K], cancel bool) {
// Otherwise, re-enable the waiter. The status will be
// set to retryRequested for later re-dispatching by the
// dispose method.
s.mu.Lock()
s.waiterMu.Lock()
w.fn = fn
w.result.Set(retryRequested)
endedAtHead := w == s.mu.head
s.mu.Unlock()
endedAtHead := s.queue.IsHead(w)
s.waiterMu.Unlock()

// It's possible that another task completed while this
// one was executing, which moved it to the head of the
Expand All @@ -436,18 +337,28 @@ func (s *Set[K]) dispose(w *waiter[K], cancel bool) {
s.dispose(w, false)
}

// We can't dequeue the waiter if it's going to retry
// later on. That would incorrectly unblock anything
// also waiting on this waiter's keys.
// We can't dequeue the waiter if it's going to retry at
// some later point in time. Since we know that the task
// was running somewhere in the middle of the global
// queue, there's nothing more that we need to do.
return
}
default:
w.result.Set(&Status{err: err})
}

// Remove the waiter's locks and get a slice of new tasks to
// kick off.
next := s.dequeue(w)
// Remove the waiter's locks and get a slice of newly-unblocked
// tasks to kick off.
next, _ := s.queue.Dequeue(w)
// Calling dequeue also advances the global queue. If the
// element at the head of the queue wants to be retried, also
// add it to the list.
if head, ok := s.queue.PeekHead(); ok && head != nil {
if status, _ := head.result.Get(); status == retryRequested {
head.result.Set(retryQueued)
next = append(next, head)
}
}
for _, unblocked := range next {
s.dispose(unblocked, false)
}
Expand All @@ -458,40 +369,6 @@ func (s *Set[K]) dispose(w *waiter[K], cancel bool) {
}
}

// enqueue adds the waiter to the Set. If the waiter is immediately
// eligible for execution, it will be started.
func (s *Set[K]) enqueue(w *waiter[K]) {
s.mu.Lock()
defer s.mu.Unlock()

// Insert the waiter into the global queue.
if s.mu.tail == nil {
s.mu.head = w
} else {
s.mu.tail.next = w
}
s.mu.tail = w

// Add the waiter to each key queue. If it's the only waiter for
// that key, also increment its headCount.
for _, k := range w.keys {
q := s.mu.queues[k]
q = append(q, w)
s.mu.queues[k] = q
if len(q) == 1 {
w.headCount++
}
}

// This will also be satisfied if the waiter has an empty key set.
if w.headCount == len(w.keys) {
s.immediateStart.Inc()
s.dispose(w, false)
} else {
s.deferredStart.Inc()
}
}

// Wait returns the first non-nil error.
func Wait(ctx context.Context, outcomes []Outcome) error {
outcome:
Expand All @@ -514,24 +391,6 @@ outcome:
return nil
}

// Make a copy of the key slice and deduplicate it.
func dedup[K comparable](keys []K) []K {
keys = append([]K(nil), keys...)
seen := make(map[K]struct{}, len(keys))
idx := 0
for _, key := range keys {
if _, dup := seen[key]; dup {
continue
}
seen[key] = struct{}{}

keys[idx] = key
idx++
}
keys = keys[:idx]
return keys
}

// tryCall invokes the function with a panic handler.
func tryCall[K any](fn func([]K) error, keys []K) (err error) {
// Install panic handler before executing user code.
Expand Down
23 changes: 10 additions & 13 deletions internal/util/lockset/lockset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestSmoke(t *testing.T) {
const numResources = 128
const numWaiters = 10 * numResources
r := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Verify that each resource and waiter are run in the expected
Expand Down Expand Up @@ -160,11 +160,11 @@ func TestSmoke(t *testing.T) {
r.NoError(eg.Wait())

// Wait for each task to arrive at a successful state.
r.NoError(Wait(ctx, outcomes))

waitErr := Wait(ctx, outcomes)
for i := 0; i < numResources; i++ {
r.Equalf(expectedOrder[i], executionOrder[i], "key %d", i)
}
r.NoError(waitErr)
}

func TestCancel(t *testing.T) {
Expand Down Expand Up @@ -365,9 +365,8 @@ func TestRetryAfterPromotion(t *testing.T) {
return nil
})

s.mu.Lock()
promoterWaiter := s.mu.head
s.mu.Unlock()
promoterWaiter, ok := s.queue.PeekHead()
r.True(ok)
r.NotNil(promoterWaiter)

blockRetry := make(chan struct{})
Expand All @@ -380,17 +379,15 @@ func TestRetryAfterPromotion(t *testing.T) {
}
return RetryAtHead(nil).Or(func() {
// Ensure the tail was promoted.
s.mu.Lock()
r.Same(retryWaiter, s.mu.head)
s.mu.Unlock()
h, ok := s.queue.PeekHead()
r.True(ok)
r.Same(retryWaiter, h)
retryRan.Store(true)
})
})

s.mu.Lock()
retryWaiter = s.mu.tail
s.mu.Unlock()

retryWaiter, ok = s.queue.PeekTail()
r.True(ok)
r.NotNil(retryWaiter)
r.NotSame(promoterWaiter, retryWaiter)

Expand Down
Loading

0 comments on commit 164a6b3

Please sign in to comment.