Skip to content

Commit

Permalink
fix(pkg/counter): finish making counter atomic (#3276)
Browse files Browse the repository at this point in the history
Last changes renamed Increment/Decrement to Increase/Decrease. This was
reverted after discussions.

With this new approach:

- Overflow is detected in both cases (and proved by test).
- Thread safety is guaranteed by atomic Add/Sub operations.
- Unit tests prove all assumptions and thread safety for all cases.
- Benchmark tests can be used to test other approaches in the future.
- Overflow of a sum of all given arguments isn't detected (avoid overhead).

NOTE: The issue is now fully fixed...

Originally, the bug was only about reading counter value in parallel to
atomic operation (multiple LOAD/STORE simultaneously, and some not
protected by atomics).

Previous fix, made by commit 1e54ce2, fixed data race but overflow errors
could be reported more than once. It also had atomic operations for each
given argument (if multiple arguments).

This version addresses all concerns.
  • Loading branch information
rafaeldtinoco committed Jun 28, 2023
1 parent 8d5237a commit 0b97409
Show file tree
Hide file tree
Showing 15 changed files with 352 additions and 192 deletions.
95 changes: 54 additions & 41 deletions pkg/counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,73 +12,86 @@ type Counter struct {
value uint64
}

// NewCounter creates a new counter with the given initial value.
func NewCounter(initialValue uint64) Counter {
return Counter{value: initialValue}
}

// Increase increases the counter by the given value (thread-safe).
func (c *Counter) Increase(values ...uint64) error {
_, err := c.IncreaseAndRead(values...)
return err
}
// Increment

// IncreaseAndRead increases the counter by the given value (thread-safe) and returns the new value.
func (c *Counter) IncreaseAndRead(values ...uint64) (uint64, error) {
if len(values) == 0 {
values = append(values, 1)
}
// Increment increments counter by given value (default: 1, thread-safe).
func (c *Counter) Increment(x ...uint64) error {
var err error

var n uint64

for _, value := range values {
n = atomic.AddUint64(&c.value, value)
if n < value {
return n, errors.New("counter overflow")
val := uint64(1)
if len(x) != 0 {
for _, v := range x {
val += v
}
val-- // initial 1
}

return n, nil // return last known value (atomically incremented)
}
// NOTE: checking if val > 0 adds ~200ns/op in benchmarking: not worth if
// not proved that amount of times Increment(0) is called makes it
// worth it.
_, err = c.IncrementValueAndRead(val)

// Decrease decreases the counter by the given value (thread-safe).
func (c *Counter) Decrease(values ...uint64) error {
_, err := c.DecreaseAndRead(values...)
return err
}

// DecreaseAndRead decreases the counter by the given value (thread-safe) and returns the new value.
func (c *Counter) DecreaseAndRead(values ...uint64) (uint64, error) {
if len(values) == 0 {
values = append(values, 1)
// IncrementValueAndRead increments counter by given value and returns the new value (thread-safe).
func (c *Counter) IncrementValueAndRead(x uint64) (uint64, error) {
var err error

n := atomic.AddUint64(&c.value, x)
if n < x {
err = errors.New("counter overflow")
}

var n uint64
return n, err
}

for _, value := range values {
if value == 0 {
continue
}
n = atomic.AddUint64(&c.value, ^uint64(value-1))
if n == math.MaxUint64 {
return n, errors.New("counter underflow")
// Decrement

// Decrement decrements counter by given value (default: 1, thread-safe).
func (c *Counter) Decrement(x ...uint64) error {
val := uint64(1)
if len(x) != 0 {
for _, v := range x {
val += v
}
val-- // initial 1
}

return n, nil // return last known value (atomically decremented)
// NOTE: checking if val > 0 adds ~200ns/op in benchmarking: not worth if
// not proved that amount of times Decrement(0) is called makes it
// worth it.
_, err := c.DecrementValueAndRead(val)

return err
}

// Read returns the current value of the counter (thread-safe).
func (c *Counter) Read() uint64 {
return atomic.LoadUint64(&c.value)
// DecrementValueAndRead decrements counter by given value and returns the new value (thread-safe).
func (c *Counter) DecrementValueAndRead(x uint64) (uint64, error) {
var err error

n := atomic.AddUint64(&c.value, ^uint64(x-1))
if n > math.MaxUint64-x {
err = errors.New("counter underflow")
}

return n, err
}

// Set sets the counter to the given value (thread-safe).
// Setters and Getters

func (c *Counter) Set(value uint64) {
atomic.StoreUint64(&c.value, value)
}

// Format implements fmt.Formatter (thread-safe).
func (c *Counter) Get() uint64 {
return atomic.LoadUint64(&c.value)
}

func (c *Counter) Format(f fmt.State, r rune) {
f.Write([]byte(strconv.FormatUint(c.Read(), 10)))
f.Write([]byte(strconv.FormatUint(c.Get(), 10)))
}
121 changes: 121 additions & 0 deletions pkg/counter/counter_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package counter

import "testing"

// Increment

func BenchmarkIncrement(b *testing.B) {
c := NewCounter(0)

for i := 0; i < b.N; i++ {
err := c.Increment()
if err != nil {
b.Fatal(err)
}
}
}

func BenchmarkIncrementWithValue(b *testing.B) {
c := NewCounter(0)

for i := 0; i < b.N; i++ {
err := c.Increment(1)
if err != nil {
b.Fatal(err)
}
}
}

func BenchmarkIncrementWithMultipleValue(b *testing.B) {
c := NewCounter(0)

for i := 0; i < b.N; i += 3 {
err := c.Increment(1, 1, 1)
if err != nil {
b.Fatal(err)
}
}

// Note: Looping 1/3 of times in this case
}

func BenchmarkZeroedIncrementWithValue(b *testing.B) {
c := NewCounter(0)

for i := 0; i < b.N; i++ {
err := c.Increment(0)
if err != nil {
b.Fatal(err)
}
}
}

func BenchmarkZeroedIncrementWithMultipleValue(b *testing.B) {
c := NewCounter(0)

for i := 0; i < b.N; i++ {
err := c.Increment(0, 0, 0)
if err != nil {
b.Fatal(err)
}
}
}

// Decrement

func BenchmarkDecrement(b *testing.B) {
c := NewCounter(uint64(b.N))

for i := b.N; i > 0; i-- {
err := c.Decrement()
if err != nil {
b.Fatal(err)
}
}
}

func BenchmarkDecrementWithValue(b *testing.B) {
c := NewCounter(uint64(b.N))

for i := b.N; i > 0; i-- {
err := c.Decrement(1)
if err != nil {
b.Fatal(err)
}
}
}

func BenchmarkDecrementWithMultipleValue(b *testing.B) {
c := NewCounter(uint64(b.N))

for i := b.N; i > 3; i -= 3 {
err := c.Decrement(1, 1, 1)
if err != nil {
b.Fatal(err)
}
}

// Note: Looping 1/3 of times in this case
}

func BenchmarkZeroedDecrementWithValue(b *testing.B) {
c := NewCounter(uint64(b.N))

for i := b.N; i > 0; i-- {
err := c.Decrement(0)
if err != nil {
b.Fatal(err)
}
}
}

func BenchmarkZeroedDecrementWithMultipleValue(b *testing.B) {
c := NewCounter(uint64(b.N))

for i := b.N; i > 0; i-- {
err := c.Decrement(0, 0, 0)
if err != nil {
b.Fatal(err)
}
}
}
Loading

0 comments on commit 0b97409

Please sign in to comment.