Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[utils] Implement PriorityMutex #244

Merged
merged 6 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions utils/priority_mutex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2020 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"sync"
)

// PriorityMutex is a special type of mutex
// that allows callers to request priority
// over other callers. This can be useful
// if there is a "hot path" in an application
// that requires lock access.
//
// WARNING: It is possible to cause lock starvation
// if not careful (i.e. only high priority callers
// ever do work).
type PriorityMutex struct {
high []chan struct{}
low []chan struct{}

m sync.Mutex
l bool
Comment on lines +34 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit: use more descriptive variable names. l is not intuitive to somebody not implementing this

}

// Lock attempts to acquire either a high or low
// priority mutex. When priority is true, a lock
// will be granted before other low priority callers.
func (m *PriorityMutex) Lock(priority bool) {
m.m.Lock()

if !m.l {
m.l = true
m.m.Unlock()
return
}

c := make(chan struct{})
if priority {
m.high = append(m.high, c)
} else {
m.low = append(m.low, c)
}

m.m.Unlock()
<-c
}

// Unlock selects the next highest priority lock
// to grant. If there are no locks to grant, it
// sets the value of m.l to false.
func (m *PriorityMutex) Unlock() {
m.m.Lock()
defer m.m.Unlock()

if len(m.high) > 0 {
c := m.high[0]
m.high = m.high[1:]
close(c)
return
}

if len(m.low) > 0 {
c := m.low[0]
m.low = m.low[1:]
close(c)
return
}

// We only set m.l to false when there are
// no items to unlock because it could cause
// lock contention for the next lock to fetch it.
m.l = false
patrick-ogrady marked this conversation as resolved.
Show resolved Hide resolved
}
73 changes: 73 additions & 0 deletions utils/priority_mutex_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2020 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
)

func TestPriorityMutex(t *testing.T) {
arr := []bool{}
expected := make([]bool, 60)
l := new(PriorityMutex)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout, @bobg !

g, _ := errgroup.WithContext(context.Background())

// Lock while adding all locks
l.Lock(true)

// Add a bunch of low prio items
for i := 0; i < 50; i++ {
expected[i+10] = false
g.Go(func() error {
l.Lock(false)
arr = append(arr, false)
l.Unlock()
return nil
})
}

// Add a few high prio items
for i := 0; i < 10; i++ {
expected[i] = true
g.Go(func() error {
l.Lock(true)
arr = append(arr, true)
l.Unlock()
return nil
})
}

// Wait for all goroutines to ask for lock
time.Sleep(1 * time.Second)

// Ensure number of expected locks is correct
assert.Len(t, l.high, 10)
assert.Len(t, l.low, 50)

l.Unlock()
assert.NoError(t, g.Wait())

// Check results array to ensure all of the high priority items processed first,
// followed by all of the low priority items.
assert.Equal(t, expected, arr)

// Ensure lock is no longer occupied
assert.False(t, l.l)
}