Skip to content

Commit

Permalink
Return errors from connections
Browse files Browse the repository at this point in the history
Fixes #44
  • Loading branch information
dcormier committed Oct 15, 2019
1 parent 87c5255 commit 98bb566
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 21 deletions.
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
module github.com/go-redsync/redsync

go 1.13

require (
github.com/gomodule/redigo v2.0.0+incompatible
github.com/hashicorp/go-multierror v1.0.0
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203
)

go 1.13
// TODO: Remove this once this issue is addressed, or redigo no longer points to
// v2.0.0+incompatible, above: https://github.com/gomodule/redigo/issues/366
replace github.com/gomodule/redigo => github.com/gomodule/redigo v1.7.0
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
github.com/gomodule/redigo v1.7.0 h1:ZKld1VOtsGhAe37E7wMxEDgAlGM5dvFY+DiOhSkhP9Y=
github.com/gomodule/redigo v1.7.0/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8=
65 changes: 46 additions & 19 deletions mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/gomodule/redigo/redis"
"github.com/hashicorp/go-multierror"
)

// A DelayFunc is used to decide the amount of time to wait between retries.
Expand Down Expand Up @@ -44,9 +45,12 @@ func (m *Mutex) Lock() error {

start := time.Now()

n := m.actOnPoolsAsync(func(pool Pool) bool {
n, err := m.actOnPoolsAsync(func(pool Pool) (bool, error) {
return m.acquire(pool, value)
})
if n == 0 && err != nil {
return err
}

now := time.Now()
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.factor)))
Expand All @@ -55,7 +59,7 @@ func (m *Mutex) Lock() error {
m.until = until
return nil
}
m.actOnPoolsAsync(func(pool Pool) bool {
m.actOnPoolsAsync(func(pool Pool) (bool, error) {
return m.release(pool, value)
})
}
Expand All @@ -64,19 +68,25 @@ func (m *Mutex) Lock() error {
}

// Unlock unlocks m and returns the status of unlock.
func (m *Mutex) Unlock() bool {
n := m.actOnPoolsAsync(func(pool Pool) bool {
func (m *Mutex) Unlock() (bool, error) {
n, err := m.actOnPoolsAsync(func(pool Pool) (bool, error) {
return m.release(pool, m.value)
})
return n >= m.quorum
if n < m.quorum {
return false, err
}
return true, nil
}

// Extend resets the mutex's expiry and returns the status of expiry extension.
func (m *Mutex) Extend() bool {
n := m.actOnPoolsAsync(func(pool Pool) bool {
func (m *Mutex) Extend() (bool, error) {
n, err := m.actOnPoolsAsync(func(pool Pool) (bool, error) {
return m.touch(pool, m.value, int(m.expiry/time.Millisecond))
})
return n >= m.quorum
if n < m.quorum {
return false, err
}
return true, nil
}

func genValue() (string, error) {
Expand All @@ -88,11 +98,17 @@ func genValue() (string, error) {
return base64.StdEncoding.EncodeToString(b), nil
}

func (m *Mutex) acquire(pool Pool, value string) bool {
func (m *Mutex) acquire(pool Pool, value string) (bool, error) {
conn := pool.Get()
defer conn.Close()
reply, err := redis.String(conn.Do("SET", m.name, value, "NX", "PX", int(m.expiry/time.Millisecond)))
return err == nil && reply == "OK"
if err != nil {
if err == redis.ErrNil {
return false, nil
}
return false, err
}
return reply == "OK", nil
}

var deleteScript = redis.NewScript(1, `
Expand All @@ -103,12 +119,12 @@ var deleteScript = redis.NewScript(1, `
end
`)

func (m *Mutex) release(pool Pool, value string) bool {
func (m *Mutex) release(pool Pool, value string) (bool, error) {
conn := pool.Get()
defer conn.Close()
status, err := redis.Int64(deleteScript.Do(conn, m.name, value))

return err == nil && status != 0
return err == nil && status != 0, err
}

var touchScript = redis.NewScript(1, `
Expand All @@ -119,26 +135,37 @@ var touchScript = redis.NewScript(1, `
end
`)

func (m *Mutex) touch(pool Pool, value string, expiry int) bool {
func (m *Mutex) touch(pool Pool, value string, expiry int) (bool, error) {
conn := pool.Get()
defer conn.Close()
status, err := redis.Int64(touchScript.Do(conn, m.name, value, expiry))

return err == nil && status != 0
return err == nil && status != 0, err
}

func (m *Mutex) actOnPoolsAsync(actFn func(Pool) bool) int {
ch := make(chan bool)
func (m *Mutex) actOnPoolsAsync(actFn func(Pool) (bool, error)) (int, error) {
type result struct {
Status bool
Err error
}

ch := make(chan result)
for _, pool := range m.pools {
go func(pool Pool) {
ch <- actFn(pool)
r := result{}
r.Status, r.Err = actFn(pool)
ch <- r
}(pool)
}
n := 0
var err error
for range m.pools {
if <-ch {
r := <-ch
if r.Status {
n++
} else if r.Err != nil {
err = multierror.Append(err, r.Err)
}
}
return n
return n, err
}
5 changes: 4 additions & 1 deletion mutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ func TestMutexExtend(t *testing.T) {
time.Sleep(1 * time.Second)

expiries := getPoolExpiries(pools, mutex.name)
ok := mutex.Extend()
ok, err := mutex.Extend()
if err != nil {
t.Fatalf("Expected err == nil, got %q", err)
}
if !ok {
t.Fatalf("Expected ok == true, got %v", ok)
}
Expand Down

0 comments on commit 98bb566

Please sign in to comment.