/
seqobsv.go
85 lines (72 loc) · 1.51 KB
/
seqobsv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// SPDX-FileCopyrightText: 2021 The margaret Authors
//
// SPDX-License-Identifier: MIT
// Package seqobsv wants to supply an observable value sepcialized for sequence numbers in append-only logs.
// It should be fine for access from multiple goroutines.
//
// These values only go up by one. For margaret they start with 0.
//
package seqobsv
import (
"math"
"sync"
)
type Observable struct {
mu sync.Mutex
val uint64
waiters waitMap
}
type emptyChan chan struct{}
type waitMap map[uint64][]emptyChan
// New creates a new Observable
func New(start uint64) *Observable {
return &Observable{
val: start,
waiters: make(waitMap),
}
}
// Value returns the current value
func (seq *Observable) Value() uint64 {
seq.mu.Lock()
v := seq.val
seq.mu.Unlock()
return v
}
func (seq *Observable) Seq() int64 {
seq.mu.Lock()
v := seq.val
seq.mu.Unlock()
if v > math.MaxInt64 {
panic("bigger then int64")
}
return int64(v)
}
func (seq *Observable) Inc() uint64 {
seq.mu.Lock()
curr := seq.val
if waiters, has := seq.waiters[curr]; has {
for _, ch := range waiters {
close(ch)
}
delete(seq.waiters, curr)
}
seq.val = seq.val + 1
currVal := seq.val
seq.mu.Unlock()
return currVal
}
func (seq *Observable) WaitFor(n uint64) <-chan struct{} {
seq.mu.Lock()
defer seq.mu.Unlock()
ch := make(emptyChan)
if n < seq.val {
go func() { close(ch) }()
return ch
}
waitersForN, has := seq.waiters[n]
if !has {
waitersForN = make([]emptyChan, 0)
}
seq.waiters[n] = append(waitersForN, ch)
return ch
}