From c251dfe0826410bb703f87f1beecd1f854a8e68f Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 23 Nov 2020 09:30:07 -0800 Subject: [PATCH 1/6] priority mutex --- utils/priority_mutex.go | 80 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 utils/priority_mutex.go diff --git a/utils/priority_mutex.go b/utils/priority_mutex.go new file mode 100644 index 00000000..3add7aa5 --- /dev/null +++ b/utils/priority_mutex.go @@ -0,0 +1,80 @@ +package utils + +import ( + "sync" +) + +// PriorityMutex is a special type of mutex +// that allows callers to request priority +// over other callers. This can be useful +// if there is a "hot path" in an application +// that requires lock access. +// +// WARNING: It is possible to cause lock starvation +// if not careful (i.e. only high priority callers +// ever do work). +type PriorityMutex struct { + high []chan struct{} + low []chan struct{} + + m sync.Mutex + l bool +} + +// NewPriorityMutex returns a new *PriorityMutex. +func NewPriorityMutex() *PriorityMutex { + return &PriorityMutex{ + high: []chan struct{}{}, + low: []chan struct{}{}, + } +} + +// Lock attempts to acquire either a high or low +// priority mutex. When priority is true, a lock +// will be granted before other low priority callers. +func (m *PriorityMutex) Lock(priority bool) { + m.m.Lock() + + if !m.l { + m.l = true + m.m.Unlock() + return + } + + c := make(chan struct{}) + if priority { + m.high = append(m.high, c) + } else { + m.low = append(m.low, c) + } + + m.m.Unlock() + <-c +} + +// Unlock selects the next highest priority lock +// to grant. If there are no locks to grant, it +// sets the value of m.l to false. +func (m *PriorityMutex) Unlock() { + m.m.Lock() + defer m.m.Unlock() + + if len(m.high) > 0 { + c := m.high[0] + m.high = m.high[1:] + close(c) + return + } + + if len(m.low) > 0 { + c := m.low[0] + m.low = m.low[1:] + close(c) + return + } + + // We only set m.l to false when there are + // no items to unlock because it could cause + // lock contention for the next lock to fetch it. + m.l = false +} From a1e951b6f910ef13aa53529c34470aa574cba373 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 23 Nov 2020 09:46:26 -0800 Subject: [PATCH 2/6] add test for priority mutex --- utils/priority_mutex_test.go | 56 ++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 utils/priority_mutex_test.go diff --git a/utils/priority_mutex_test.go b/utils/priority_mutex_test.go new file mode 100644 index 00000000..c07cf9d4 --- /dev/null +++ b/utils/priority_mutex_test.go @@ -0,0 +1,56 @@ +package utils + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "golang.org/x/sync/errgroup" +) + +func TestPriorityMutex(t *testing.T) { + arr := []bool{} + expected := make([]bool, 60) + l := NewPriorityMutex() + ctx := context.Background() + g, ctx := errgroup.WithContext(ctx) + + // Lock while adding all locks + l.Lock(true) + + // Add a bunch of low prio items + for i := 0; i < 50; i++ { + expected[i+10] = false + g.Go(func() error { + l.Lock(false) + arr = append(arr, false) + l.Unlock() + return nil + }) + } + + // Add a few high prio items + for i := 0; i < 10; i++ { + expected[i] = true + g.Go(func() error { + l.Lock(true) + arr = append(arr, true) + l.Unlock() + return nil + }) + } + + // Wait for all goroutines to ask for lock + time.Sleep(1 * time.Second) + + // Ensure number of expected locks is correct + assert.Len(t, l.high, 10) + assert.Len(t, l.low, 50) + + l.Unlock() + assert.NoError(t, g.Wait()) + + // Check array for all high prio, remaining low prio + assert.Equal(t, expected, arr) +} From 53313f355cebb995257c977cd70be05a6bb95c88 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 23 Nov 2020 09:47:11 -0800 Subject: [PATCH 3/6] Add license --- utils/priority_mutex.go | 14 ++++++++++++++ utils/priority_mutex_test.go | 14 ++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/utils/priority_mutex.go b/utils/priority_mutex.go index 3add7aa5..d41ce64a 100644 --- a/utils/priority_mutex.go +++ b/utils/priority_mutex.go @@ -1,3 +1,17 @@ +// Copyright 2020 Coinbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package utils import ( diff --git a/utils/priority_mutex_test.go b/utils/priority_mutex_test.go index c07cf9d4..2af71bdb 100644 --- a/utils/priority_mutex_test.go +++ b/utils/priority_mutex_test.go @@ -1,3 +1,17 @@ +// Copyright 2020 Coinbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package utils import ( From 4a327b498f7bde12c85f7f881a79a08a09167f95 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 23 Nov 2020 09:50:17 -0800 Subject: [PATCH 4/6] fix linting errors --- utils/priority_mutex_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/utils/priority_mutex_test.go b/utils/priority_mutex_test.go index 2af71bdb..8bc4e221 100644 --- a/utils/priority_mutex_test.go +++ b/utils/priority_mutex_test.go @@ -27,8 +27,7 @@ func TestPriorityMutex(t *testing.T) { arr := []bool{} expected := make([]bool, 60) l := NewPriorityMutex() - ctx := context.Background() - g, ctx := errgroup.WithContext(ctx) + g, _ := errgroup.WithContext(context.Background()) // Lock while adding all locks l.Lock(true) From f52612682884dc915baceb9e050372e43f82d8b9 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 23 Nov 2020 10:25:04 -0800 Subject: [PATCH 5/6] Address stone's nits --- utils/priority_mutex_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/utils/priority_mutex_test.go b/utils/priority_mutex_test.go index 8bc4e221..92541776 100644 --- a/utils/priority_mutex_test.go +++ b/utils/priority_mutex_test.go @@ -64,6 +64,10 @@ func TestPriorityMutex(t *testing.T) { l.Unlock() assert.NoError(t, g.Wait()) - // Check array for all high prio, remaining low prio + // Check results array to ensure all of the high priority items processed first, + // followed by all of the low priority items. assert.Equal(t, expected, arr) + + // Ensure lock is no longer occupied + assert.False(t, l.l) } From 8673f2f3a6aee202eb0ac7f6d6668bb723cd244e Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 23 Nov 2020 10:29:11 -0800 Subject: [PATCH 6/6] remove constructor --- utils/priority_mutex.go | 8 -------- utils/priority_mutex_test.go | 2 +- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/utils/priority_mutex.go b/utils/priority_mutex.go index d41ce64a..7802db78 100644 --- a/utils/priority_mutex.go +++ b/utils/priority_mutex.go @@ -35,14 +35,6 @@ type PriorityMutex struct { l bool } -// NewPriorityMutex returns a new *PriorityMutex. -func NewPriorityMutex() *PriorityMutex { - return &PriorityMutex{ - high: []chan struct{}{}, - low: []chan struct{}{}, - } -} - // Lock attempts to acquire either a high or low // priority mutex. When priority is true, a lock // will be granted before other low priority callers. diff --git a/utils/priority_mutex_test.go b/utils/priority_mutex_test.go index 92541776..2a1c2fca 100644 --- a/utils/priority_mutex_test.go +++ b/utils/priority_mutex_test.go @@ -26,7 +26,7 @@ import ( func TestPriorityMutex(t *testing.T) { arr := []bool{} expected := make([]bool, 60) - l := NewPriorityMutex() + l := new(PriorityMutex) g, _ := errgroup.WithContext(context.Background()) // Lock while adding all locks