-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
funcs.go
169 lines (155 loc) · 3.38 KB
/
funcs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package stm
import (
"math/rand"
"runtime/pprof"
"sync"
"time"
)
var (
txPool = sync.Pool{New: func() any {
expvars.Add("new txs", 1)
tx := &Tx{
reads: make(map[txVar]VarValue),
writes: make(map[txVar]any),
watching: make(map[txVar]struct{}),
}
tx.cond.L = &tx.mu
return tx
}}
failedCommitsProfile *pprof.Profile
)
const (
profileFailedCommits = false
sleepBetweenRetries = false
)
func init() {
if profileFailedCommits {
failedCommitsProfile = pprof.NewProfile("stmFailedCommits")
}
}
func newTx() *Tx {
tx := txPool.Get().(*Tx)
tx.tries = 0
tx.completed = false
return tx
}
func WouldBlock[R any](fn Operation[R]) (block bool) {
tx := newTx()
tx.reset()
_, block = catchRetry(fn, tx)
if len(tx.watching) != 0 {
panic("shouldn't have installed any watchers")
}
tx.recycle()
return
}
// Atomically executes the atomic function fn.
func Atomically[R any](op Operation[R]) R {
expvars.Add("atomically", 1)
// run the transaction
tx := newTx()
retry:
tx.tries++
tx.reset()
if sleepBetweenRetries {
shift := int64(tx.tries - 1)
const maxShift = 30
if shift > maxShift {
shift = maxShift
}
ns := int64(1) << shift
d := time.Duration(rand.Int63n(ns))
if d > 100*time.Microsecond {
tx.updateWatchers()
time.Sleep(time.Duration(ns))
}
}
tx.mu.Lock()
ret, retry := catchRetry(op, tx)
tx.mu.Unlock()
if retry {
expvars.Add("retries", 1)
// wait for one of the variables we read to change before retrying
tx.wait()
goto retry
}
// verify the read log
tx.lockAllVars()
if tx.inputsChanged() {
tx.unlock()
expvars.Add("failed commits", 1)
if profileFailedCommits {
failedCommitsProfile.Add(new(int), 0)
}
goto retry
}
// commit the write log and broadcast that variables have changed
tx.commit()
tx.mu.Lock()
tx.completed = true
tx.cond.Broadcast()
tx.mu.Unlock()
tx.unlock()
expvars.Add("commits", 1)
tx.recycle()
return ret
}
// AtomicGet is a helper function that atomically reads a value.
func AtomicGet[T any](v *Var[T]) T {
return v.value.Load().Get().(T)
}
// AtomicSet is a helper function that atomically writes a value.
func AtomicSet[T any](v *Var[T], val T) {
v.mu.Lock()
v.changeValue(val)
v.mu.Unlock()
}
// Compose is a helper function that composes multiple transactions into a
// single transaction.
func Compose[R any](fns ...Operation[R]) Operation[struct{}] {
return VoidOperation(func(tx *Tx) {
for _, f := range fns {
f(tx)
}
})
}
// Select runs the supplied functions in order. Execution stops when a
// function succeeds without calling Retry. If no functions succeed, the
// entire selection will be retried.
func Select[R any](fns ...Operation[R]) Operation[R] {
return func(tx *Tx) R {
switch len(fns) {
case 0:
// empty Select blocks forever
tx.Retry()
panic("unreachable")
case 1:
return fns[0](tx)
default:
oldWrites := tx.writes
tx.writes = make(map[txVar]any, len(oldWrites))
for k, v := range oldWrites {
tx.writes[k] = v
}
ret, retry := catchRetry(fns[0], tx)
if retry {
tx.writes = oldWrites
return Select(fns[1:]...)(tx)
} else {
return ret
}
}
}
}
type Operation[R any] func(*Tx) R
func VoidOperation(f func(*Tx)) Operation[struct{}] {
return func(tx *Tx) struct{} {
f(tx)
return struct{}{}
}
}
func AtomicModify[T any](v *Var[T], f func(T) T) {
Atomically(VoidOperation(func(tx *Tx) {
v.Set(tx, f(v.Get(tx)))
}))
}