/
transaction.go
386 lines (330 loc) · 10.6 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
package writeaheadlog
import (
"encoding/binary"
"errors"
"fmt"
"golang.org/x/crypto/sha3"
"sync"
"sync/atomic"
)
const (
checksumSize = 16
// For the txn's first page, the content is different.
// status, ID, checksum, nextPageOffset
txnMetaSize = 8 + 8 + checksumSize + 8
maxHeadPagePayloadSize = PageSize - txnMetaSize
)
const (
txnStatusInvalid = iota
txnStatusUncommitted
txnStatusCommitted
txnStatusApplied
)
var bufPool = sync.Pool{
New: func() interface{} {
return make([]byte, PageSize)
},
}
// Transaction is a batch of Operations to be committed as a whole
type Transaction struct {
ID uint64 // Each transaction is assigned a unique id
Operations []Operation // Each transaction is composed of a list of Operations
headPage *page // The first page to write on
wal *Wal
// State of the transaction. Default to false when created, and set to true when a certain step is finished.
// The overall routine is setup -> commit -> release
setupComplete bool
commitComplete bool
releaseComplete bool
status uint64 // txnStatusInvalid, txnStatusUncommitted, txnStatusCommitted, txnStatusApplied
InitComplete chan struct{}
InitErr error
}
// NewTransaction create a new transaction. Start a thread to write the Operations to disk.
func (w *Wal) NewTransaction(ops []Operation) (*Transaction, error) {
// Validate the input
if len(ops) == 0 {
return nil, errors.New("cannot create a transaction without Operations")
}
for i, op := range ops {
err := op.verify()
if err != nil {
return nil, fmt.Errorf("ops[%d] not valid: %v", i, err)
}
}
// Create New Transaction
txn := &Transaction{
Operations: ops,
wal: w,
InitComplete: make(chan struct{}),
status: txnStatusInvalid,
}
// Set the sequence number and increase the WAL's transactionCounter
go txn.threadedInit()
// Increment the numUnfinishedTxns
atomic.AddInt64(&w.numUnfinishedTxns, 1)
return txn, nil
}
// threadedInit write the metadata and Operations to wal.LogFile
func (t *Transaction) threadedInit() {
defer close(t.InitComplete)
data := marshalOps(t.Operations)
if len(data) > maxHeadPagePayloadSize {
t.headPage = t.wal.requestPages(data[:maxHeadPagePayloadSize])
t.headPage.nextPage = t.wal.requestPages(data[maxHeadPagePayloadSize:])
} else {
t.headPage = t.wal.requestPages(data)
}
t.status = txnStatusUncommitted
// write the header page
if err := t.writeHeaderPage(false); err != nil {
t.InitErr = fmt.Errorf("writing the first page to disk failed: %v", err)
return
}
// write subsequent pages
for page := t.headPage.nextPage; page != nil; page = page.nextPage {
if err := t.writePage(page); err != nil {
t.InitErr = fmt.Errorf("writing the page to disk failed: %v", err)
return
}
}
}
// Append appends additional updates to a transaction
func (t *Transaction) Append(ops []Operation) <-chan error {
// Verify the updates
for _, op := range ops {
op.verify()
}
done := make(chan error, 1)
if t.setupComplete || t.commitComplete || t.releaseComplete {
done <- errors.New("misuse of transaction - can't append to transaction once it is committed/released")
return done
}
go func() {
done <- t.append(ops)
}()
return done
}
// append is a helper function to append updates to a transaction on which
// Commit hasn't been called yet
func (t *Transaction) append(ops []Operation) (err error) {
// If there is nothing to append we are done
if len(ops) == 0 {
return nil
}
// Make sure that the initialization finished
<-t.InitComplete
if t.InitErr != nil {
return t.InitErr
}
// Marshal the data
data := marshalOps(ops)
if err != nil {
return err
}
// Find last page, to which we will append
lastPage := t.headPage
for lastPage.nextPage != nil {
lastPage = lastPage.nextPage
}
// Preserve the original payload of the last page and the original ops
// of the transaction if an error occurs
defer func() {
if err != nil {
lastPage.payload = lastPage.payload[:len(lastPage.payload)]
t.Operations = t.Operations[:len(t.Operations)]
lastPage.nextPage = nil
// Write last page
err2 := t.writePage(lastPage)
err = composeError(fmt.Errorf("writing the last page to disk failed: %v", err), err2)
}
}()
// Write as much data to the last page as possible
var lenDiff int
if lastPage == t.headPage {
// firstPage holds less data than subsequent pages
lenDiff = maxHeadPagePayloadSize - len(lastPage.payload)
} else {
lenDiff = maxPayloadSize - len(lastPage.payload)
}
if len(data) <= lenDiff {
lastPage.payload = append(lastPage.payload, data...)
data = nil
} else {
lastPage.payload = append(lastPage.payload, data[:lenDiff]...)
data = data[lenDiff:]
}
// If there is no more data to write, we don't need to allocate any new
// pages. Write the new last page to disk and append the new ops.
if len(data) == 0 {
if err := t.writePage(lastPage); err != nil {
return fmt.Errorf("writing the last page to disk failed: %v", err)
}
t.Operations = append(t.Operations, ops...)
return nil
}
// Get enough pages for the remaining data
lastPage.nextPage = t.wal.requestPages(data)
// Write the new pages, then write the tail page that links to them.
// Writing in this order ensures that if writing the new pages fails, the
// old tail page remains valid.
for page := lastPage.nextPage; page != nil; page = page.nextPage {
if err := t.writePage(page); err != nil {
return fmt.Errorf("writing the page to disk failed: %v", err)
}
}
// write last page
if err := t.writePage(lastPage); err != nil {
return fmt.Errorf("writing the last page to disk failed: %v", err)
}
// Append the ops to the transaction
t.Operations = append(t.Operations, ops...)
return nil
}
// Commit returns a error channel which will block until commit finished.
// The content is error nil if no error in commit, and error if there is an error
func (t *Transaction) Commit() <-chan error {
done := make(chan error, 1)
if t.setupComplete || t.commitComplete || t.releaseComplete {
done <- errors.New("misuse of transaction - call each of the signaling methods exactly ones, in serial, in order")
return done
}
t.setupComplete = true
// Commit the transaction non-blocking
go func() {
done <- t.commit()
}()
return done
}
// commit commits a transaction by setting the correct status and checksum
func (t *Transaction) commit() error {
// Make sure that the initialization of the transaction finished
<-t.InitComplete
if t.InitErr != nil {
return t.InitErr
}
// Set the transaction status
t.status = txnStatusCommitted
// Set the sequence number and increase the WAL's transactionCounter
t.ID = atomic.AddUint64(&t.wal.nextTxnID, 1) - 1
// Calculate the checksum
checksum := t.checksum()
if t.wal.utils.disrupt("CommitFail") {
return errors.New("write failed on purpose")
}
// Marshal metadata into buffer
buf := bufPool.Get().([]byte)
binary.LittleEndian.PutUint64(buf[:], t.status)
binary.LittleEndian.PutUint64(buf[8:], t.ID)
copy(buf[16:], checksum)
// Finalize the commit by writing the metadata to disk.
_, err := t.wal.logFile.WriteAt(buf[:16+checksumSize], int64(t.headPage.offset))
bufPool.Put(buf)
if err != nil {
return fmt.Errorf("writing the first page failed: %v", err)
}
if err := t.wal.fSync(); err != nil {
return fmt.Errorf("writing the first page failed, %v", err)
}
t.commitComplete = true
return nil
}
// Release informs the WAL that it is safe to reuse t's pages.
func (t *Transaction) Release() error {
if !t.setupComplete || !t.commitComplete || t.releaseComplete {
return errors.New("misuse of transaction - call each of the signaling methods exactly once, in serial, in order")
}
t.releaseComplete = true
// Set the status to applied
t.status = txnStatusApplied
// Write the status to disk
if t.wal.utils.disrupt("ReleaseFail") {
return errors.New("write failed on purpose")
}
buf := bufPool.Get().([]byte)
binary.LittleEndian.PutUint64(buf, t.status)
_, err := t.wal.logFile.WriteAt(buf[:8], int64(t.headPage.offset))
bufPool.Put(buf)
if err != nil {
return fmt.Errorf("couldn't write the page to file: %v", err)
}
if err := t.wal.fSync(); err != nil {
return fmt.Errorf("couldn't write the page to file: %v", err)
}
// Update the wal's available pages
t.wal.mu.Lock()
for page := t.headPage; page != nil; page = page.nextPage {
// Append the index of the freed page
t.wal.availablePages = append(t.wal.availablePages, page.offset)
}
t.wal.mu.Unlock()
// Decrease the number of active transactions
if atomic.LoadInt64(&t.wal.numUnfinishedTxns) == 0 {
panic("Sanity check failed. atomicUnfinishedTxns should never be negative")
}
atomic.AddInt64(&t.wal.numUnfinishedTxns, -1)
return nil
}
// checksum calculate the BLAKE2b-256 hash of a Transaction
func (t *Transaction) checksum() []byte {
h := sha3.NewLegacyKeccak256()
buf := bufPool.Get().([]byte)
defer bufPool.Put(buf)
// write metadata and contents to the buffer
binary.LittleEndian.PutUint64(buf[:], t.status)
binary.LittleEndian.PutUint64(buf[8:], t.ID)
_, _ = h.Write(buf[:16])
// write pages
for page := t.headPage; page != nil; page = page.nextPage {
for i := range buf {
buf[i] = 0
}
page.marshal(buf[:0])
_, _ = h.Write(buf)
}
var sum [checksumSize]byte
copy(sum[:], h.Sum(buf[:0]))
return sum[:]
}
// writeHeaderPage write the header page of a transaction to the logfile.
// The header page is different from the normal page since it has some additional meta data
// t.status | t.ID | t.checkSum | page.offset
// The input bool checksum indicates whether the checksum field to be filled
func (t *Transaction) writeHeaderPage(checksum bool) error {
buf := bufPool.Get().([]byte)
defer bufPool.Put(buf)
for i := range buf {
buf[i] = 0
}
binary.LittleEndian.PutUint64(buf[:], t.status)
binary.LittleEndian.PutUint64(buf[8:], t.ID)
// According to input checksum, decide whether to create the hash.
var hash []byte
if checksum {
hash = t.checksum()
} else {
hash = make([]byte, checksumSize, checksumSize)
}
copy(buf[16:], hash)
binary.LittleEndian.PutUint64(buf[checksumSize+16:], t.headPage.nextOffset())
copy(buf[checksumSize+24:], t.headPage.payload)
_, err := t.wal.logFile.WriteAt(buf, int64(t.headPage.offset))
return err
}
// writePage write the page to file. If the page is header page, just write offset+payload
func (t *Transaction) writePage(page *page) error {
buf := bufPool.Get().([]byte)
defer bufPool.Put(buf)
offset := page.offset
if page == t.headPage {
const shift = txnMetaSize - pageMetaSize
offset += shift
buf = buf[:len(buf)-shift]
}
for i := range buf {
buf[i] = 0
}
page.marshal(buf[:0])
_, err := t.wal.logFile.WriteAt(buf, int64(offset))
return err
}