/
bitfield_queue.go
159 lines (139 loc) · 4.89 KB
/
bitfield_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package miner
import (
"fmt"
"sort"
"github.com/chenjianmei111/go-bitfield"
"github.com/chenjianmei111/go-state-types/abi"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/chenjianmei111/specs-actors/v3/actors/util/adt"
)
// Wrapper for working with an AMT[ChainEpoch]*Bitfield functioning as a queue, bucketed by epoch.
// Keys in the queue are quantized (upwards), modulo some offset, to reduce the cardinality of keys.
type BitfieldQueue struct {
*adt.Array
quant QuantSpec
}
func LoadBitfieldQueue(store adt.Store, root cid.Cid, quant QuantSpec, bitwidth int) (BitfieldQueue, error) {
arr, err := adt.AsArray(store, root, bitwidth)
if err != nil {
return BitfieldQueue{}, xerrors.Errorf("failed to load epoch queue %v: %w", root, err)
}
return BitfieldQueue{arr, quant}, nil
}
// Adds values to the queue entry for an epoch.
func (q BitfieldQueue) AddToQueue(rawEpoch abi.ChainEpoch, values bitfield.BitField) error {
if isEmpty, err := values.IsEmpty(); err != nil {
return xerrors.Errorf("failed to decode early termination bitfield: %w", err)
} else if isEmpty {
// nothing to do.
return nil
}
epoch := q.quant.QuantizeUp(rawEpoch)
var bf bitfield.BitField
if _, err := q.Array.Get(uint64(epoch), &bf); err != nil {
return xerrors.Errorf("failed to lookup queue epoch %v: %w", epoch, err)
}
bf, err := bitfield.MergeBitFields(bf, values)
if err != nil {
return xerrors.Errorf("failed to merge bitfields for queue epoch %v: %w", epoch, err)
}
if err = q.Array.Set(uint64(epoch), bf); err != nil {
return xerrors.Errorf("failed to set queue epoch %v: %w", epoch, err)
}
return nil
}
func (q BitfieldQueue) AddToQueueValues(epoch abi.ChainEpoch, values ...uint64) error {
if len(values) == 0 {
return nil
}
return q.AddToQueue(epoch, bitfield.NewFromSet(values))
}
// Cut cuts the elements from the bits in the given bitfield out of the queue,
// shifting other bits down and removing any newly empty entries.
//
// See the docs on BitField.Cut to better understand what it does.
func (q BitfieldQueue) Cut(toCut bitfield.BitField) error {
var epochsToRemove []uint64
if err := q.ForEach(func(epoch abi.ChainEpoch, bf bitfield.BitField) error {
bf, err := bitfield.CutBitField(bf, toCut)
if err != nil {
return err
}
if empty, err := bf.IsEmpty(); err != nil {
return err
} else if !empty {
return q.Set(uint64(epoch), bf)
}
epochsToRemove = append(epochsToRemove, uint64(epoch))
return nil
}); err != nil {
return xerrors.Errorf("failed to cut from bitfield queue: %w", err)
}
if err := q.BatchDelete(epochsToRemove, true); err != nil {
return xerrors.Errorf("failed to remove empty epochs from bitfield queue: %w", err)
}
return nil
}
func (q BitfieldQueue) AddManyToQueueValues(values map[abi.ChainEpoch][]uint64) error {
// Pre-quantize to reduce the number of updates.
quantizedValues := make(map[abi.ChainEpoch][]uint64, len(values))
for rawEpoch, entries := range values { // nolint:nomaprange // subsequently sorted
epoch := q.quant.QuantizeUp(rawEpoch)
quantizedValues[epoch] = append(quantizedValues[epoch], entries...)
}
// Update each epoch in-order to be deterministic.
updatedEpochs := make([]abi.ChainEpoch, 0, len(quantizedValues))
for epoch := range quantizedValues { // nolint:nomaprange // subsequently sorted
updatedEpochs = append(updatedEpochs, epoch)
}
sort.Slice(updatedEpochs, func(i, j int) bool {
return updatedEpochs[i] < updatedEpochs[j]
})
for _, epoch := range updatedEpochs {
if err := q.AddToQueueValues(epoch, quantizedValues[epoch]...); err != nil {
return err
}
}
return nil
}
// Removes and returns all values with keys less than or equal to until.
// Modified return value indicates whether this structure has been changed by the call.
func (q BitfieldQueue) PopUntil(until abi.ChainEpoch) (values bitfield.BitField, modified bool, err error) {
var poppedValues []bitfield.BitField
var poppedKeys []uint64
stopErr := fmt.Errorf("stop")
if err = q.ForEach(func(epoch abi.ChainEpoch, bf bitfield.BitField) error {
if epoch > until {
return stopErr
}
poppedKeys = append(poppedKeys, uint64(epoch))
poppedValues = append(poppedValues, bf)
return err
}); err != nil && err != stopErr {
return bitfield.BitField{}, false, err
}
// Nothing expired.
if len(poppedKeys) == 0 {
return bitfield.New(), false, nil
}
if err = q.BatchDelete(poppedKeys, true); err != nil {
return bitfield.BitField{}, false, err
}
merged, err := bitfield.MultiMerge(poppedValues...)
if err != nil {
return bitfield.BitField{}, false, err
}
return merged, true, nil
}
// Iterates the queue.
func (q BitfieldQueue) ForEach(cb func(epoch abi.ChainEpoch, bf bitfield.BitField) error) error {
var bf bitfield.BitField
return q.Array.ForEach(&bf, func(i int64) error {
cpy, err := bf.Copy()
if err != nil {
return xerrors.Errorf("failed to copy bitfield in queue: %w", err)
}
return cb(abi.ChainEpoch(i), cpy)
})
}