/
transaction.go
165 lines (153 loc) · 5.15 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package db
import (
"errors"
"sync"
"sync/atomic"
)
var (
ErrDiscarded = errors.New("transaction has already been discarded")
ErrCommitted = errors.New("transaction has already been committed")
)
// Transaction is an atomic database transaction for a single collection which
// can be used to guarantee consistency.
type Transaction struct {
db *DB
mut sync.Mutex
colInfo *colInfo
batchWriter dbBatchWriter
readWriter *readerWithBatchWriter
committed bool
discarded bool
// internalCount keeps track of the number of models inserted/deleted within
// the transaction. An Insert increments internalCount and a Delete decrements
// it. When the transaction is committed, internalCount is added to the
// current count.
internalCount int64
}
// OpenTransaction opens and returns a new transaction for the collection. While
// the transaction is open, no other state changes (e.g. Insert, Update, or
// Delete) can be made to the collection (but concurrent reads are still
// allowed).
//
// Transactions are atomic, meaning that either:
//
// (1) The transaction will succeed and *all* queued operations will be
// applied, or
// (2) the transaction will fail or be discarded, in which case *none* of
// the queued operations will be applied.
//
// The transaction must be closed once done, either by committing or discarding
// the transaction. No changes will be made to the database state until the
// transaction is committed.
func (c *Collection) OpenTransaction() *Transaction {
// Note we acquire an RLock on the global write mutex. We're not really a
// "reader" but we behave like one in the context of an RWMutex. Up to one
// write lock for each collection can be held, or one global write lock can be
// held at any given time.
c.info.db.globalWriteLock.RLock()
c.info.writeMut.Lock()
return &Transaction{
db: c.info.db,
colInfo: c.info.copy(),
batchWriter: c.ldb,
readWriter: newReaderWithBatchWriter(c.ldb),
}
}
// checkState acquires a lock on txn.mut and then calls unsafeCheckState.
func (txn *Transaction) checkState() error {
txn.mut.Lock()
defer txn.mut.Unlock()
return txn.unsafeCheckState()
}
// unsafeCheckState checks the state of the transaction, assuming the caller has
// already acquired a lock. It returns an error if the transaction has already
// been committed or discarded.
func (txn *Transaction) unsafeCheckState() error {
if txn.discarded {
return ErrDiscarded
} else if txn.committed {
return ErrCommitted
}
return nil
}
// Commit commits the transaction. If error is not nil, then the transaction is
// discarded. A new transaction must be created if you wish to retry the
// operations.
//
// Other methods should not be called after transaction has been committed.
func (txn *Transaction) Commit() error {
txn.mut.Lock()
defer txn.mut.Unlock()
if err := txn.unsafeCheckState(); err != nil {
return err
}
// Right before we commit, we need to update the count with txn.internalCount.
if err := updateCountWithTransaction(txn.colInfo, txn.readWriter, int(txn.internalCount)); err != nil {
_ = txn.Discard()
return err
}
if err := txn.batchWriter.Write(txn.readWriter.batch, nil); err != nil {
_ = txn.Discard()
return err
}
txn.committed = true
txn.colInfo.writeMut.Unlock()
txn.db.globalWriteLock.RUnlock()
return nil
}
// Discard discards the transaction.
//
// Other methods should not be called after transaction has been discarded.
// However, it is safe to call Discard multiple times.
func (txn *Transaction) Discard() error {
txn.mut.Lock()
defer txn.mut.Unlock()
if txn.committed {
return ErrCommitted
}
if txn.discarded {
return nil
}
txn.discarded = true
txn.colInfo.writeMut.Unlock()
txn.db.globalWriteLock.RUnlock()
return nil
}
// Insert queues an operation to insert the given model into the database. It
// returns an error if a model with the same id already exists. The model will
// not actually be inserted until the transaction is committed.
func (txn *Transaction) Insert(model Model) error {
if err := txn.checkState(); err != nil {
return err
}
if err := insertWithTransaction(txn.colInfo, txn.readWriter, model); err != nil {
return err
}
txn.updateInternalCount(1)
return nil
}
// Update queues an operation to update an existing model in the database. It
// returns an error if the given model doesn't already exist. The model will
// not actually be updated until the transaction is committed.
func (txn *Transaction) Update(model Model) error {
if err := txn.checkState(); err != nil {
return err
}
return updateWithTransaction(txn.colInfo, txn.readWriter, model)
}
// Delete queues an operation to delete the model with the given ID from the
// database. It returns an error if the model doesn't exist in the database. The
// model will not actually be deleted until the transaction is committed.
func (txn *Transaction) Delete(id []byte) error {
if err := txn.checkState(); err != nil {
return err
}
if err := deleteWithTransaction(txn.colInfo, txn.readWriter, id); err != nil {
return err
}
txn.updateInternalCount(-1)
return nil
}
func (txn *Transaction) updateInternalCount(diff int64) {
atomic.AddInt64(&txn.internalCount, diff)
}