This repository has been archived by the owner on Jun 25, 2024. It is now read-only.
forked from lightninglabs/pool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
order_event.go
493 lines (420 loc) · 14.8 KB
/
order_event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
package clientdb
import (
"fmt"
"io"
"time"
"github.com/getvoltage/pool/auctioneerrpc"
"github.com/getvoltage/pool/event"
"github.com/getvoltage/pool/order"
"github.com/getvoltage/pool/poolrpc"
"go.etcd.io/bbolt"
)
// OrderEvent is the main interface for order specific events.
type OrderEvent interface {
event.Event
// Nonce returns the nonce of the order this event refers to.
Nonce() order.Nonce
}
// CreatedEvent is an event implementation that tracks the creation of an order.
// This is distinct from the order state change to allow us to efficiently
// filter all events by their type to get the creation timestamps of all orders.
type CreatedEvent struct {
// timestamp is the unique timestamp the event was created/recorded at.
timestamp time.Time
// Nonce of the order this event refers to.
nonce order.Nonce
}
// NewCreatedEvent creates a new CreatedEvent from an order with the current
// system time as the timestamp.
func NewCreatedEvent(o order.Order) *CreatedEvent {
return &CreatedEvent{
timestamp: time.Now(),
nonce: o.Nonce(),
}
}
// Type returns the type of the event.
//
// NOTE: This is part of the event.Event interface.
func (e *CreatedEvent) Type() event.Type {
return event.TypeOrderCreated
}
// Timestamp is the time the event happened. This will be made unique once it is
// stored. To avoid collisions, the timestamp is adjusted on the nanosecond
// scale to reach uniqueness.
//
// NOTE: This is part of the event.Event interface.
func (e *CreatedEvent) Timestamp() time.Time {
return e.timestamp
}
// SetTimestamp updates the timestamp of the event. This is needed to adjust
// timestamps in case they collide to ensure the global uniqueness of all event
// timestamps.
//
// NOTE: This is part of the event.Event interface.
func (e *CreatedEvent) SetTimestamp(ts time.Time) {
e.timestamp = ts
}
// String returns a human readable representation of the event.
//
// NOTE: This is part of the event.Event interface.
func (e *CreatedEvent) String() string {
return "OrderCreated"
}
// Serialize writes the event data to a binary storage format. This does not
// serialize the event type as that's handled generically to allow for easy
// filtering.
//
// NOTE: This is part of the event.Event interface.
func (e *CreatedEvent) Serialize(w io.Writer) error {
return WriteElements(w, e.nonce)
}
// Deserialize reads the event data from a binary storage format. This does not
// deserialize the event type as that's handled generically to allow for easy
// filtering.
//
// NOTE: This is part of the event.Event interface.
func (e *CreatedEvent) Deserialize(r io.Reader) error {
return ReadElements(r, &e.nonce)
}
// Nonce returns the nonce of the order this event refers to.
//
// NOTE: This is part of the order.OrderEvent interface.
func (e *CreatedEvent) Nonce() order.Nonce {
return e.nonce
}
// A compile time assertion to make sure CreatedEvent implements both the
// event.Event and order.OrderEvent interface.
var _ event.Event = (*CreatedEvent)(nil)
var _ OrderEvent = (*CreatedEvent)(nil)
// UpdatedEvent is an event implementation that tracks the updates of an order.
// This event is only meant for updates that we also persist in the database.
// Temporary state changes and other updates that occur during match making are
// tracked by MatchEvent.
type UpdatedEvent struct {
// timestamp is the unique timestamp the event was created/recorded at.
timestamp time.Time
// Nonce of the order this event refers to.
nonce order.Nonce
// PrevState is the state the order had previous to the state change.
PrevState order.State
// NewState is the state the order had after the state change.
NewState order.State
// UnitsFilled is the number of units that was filled at the moment of
// the update.
UnitsFilled order.SupplyUnit
}
// NewUpdatedEvent creates a new UpdatedEvent from an order and its previous
// state with the current system time as the timestamp.
func NewUpdatedEvent(prevState order.State,
o order.Order) *UpdatedEvent {
return &UpdatedEvent{
timestamp: time.Now(),
nonce: o.Nonce(),
PrevState: prevState,
NewState: o.Details().State,
UnitsFilled: o.Details().Units - o.Details().UnitsUnfulfilled,
}
}
// Type returns the type of the event.
//
// NOTE: This is part of the event.Event interface.
func (e *UpdatedEvent) Type() event.Type {
return event.TypeOrderStateChange
}
// Timestamp is the time the event happened. This will be made unique once it is
// stored. To avoid collisions, the timestamp is adjusted on the nanosecond
// scale to reach uniqueness.
//
// NOTE: This is part of the event.Event interface.
func (e *UpdatedEvent) Timestamp() time.Time {
return e.timestamp
}
// SetTimestamp updates the timestamp of the event. This is needed to adjust
// timestamps in case they collide to ensure the global uniqueness of all event
// timestamps.
//
// NOTE: This is part of the event.Event interface.
func (e *UpdatedEvent) SetTimestamp(ts time.Time) {
e.timestamp = ts
}
// String returns a human readable representation of the event.
//
// NOTE: This is part of the event.Event interface.
func (e *UpdatedEvent) String() string {
return fmt.Sprintf("OrderUpdate(%v)", e.NewState)
}
// Serialize writes the event data to a binary storage format. This does not
// serialize the event type as that's handled generically to allow for easy
// filtering.
//
// NOTE: This is part of the event.Event interface.
func (e *UpdatedEvent) Serialize(w io.Writer) error {
return WriteElements(w, e.nonce, e.PrevState, e.NewState, e.UnitsFilled)
}
// Deserialize reads the event data from a binary storage format. This does not
// deserialize the event type as that's handled generically to allow for easy
// filtering.
//
// NOTE: This is part of the event.Event interface.
func (e *UpdatedEvent) Deserialize(r io.Reader) error {
return ReadElements(
r, &e.nonce, &e.PrevState, &e.NewState, &e.UnitsFilled,
)
}
// Nonce returns the nonce of the order this event refers to.
//
// NOTE: This is part of the order.OrderEvent interface.
func (e *UpdatedEvent) Nonce() order.Nonce {
return e.nonce
}
// A compile time assertion to make sure UpdatedEvent implements both the
// event.Event and order.OrderEvent interface.
var _ event.Event = (*UpdatedEvent)(nil)
var _ OrderEvent = (*UpdatedEvent)(nil)
// MatchEvent is an event implementation that tracks the match making process of
// an order.
type MatchEvent struct {
// timestamp is the unique timestamp the event was created/recorded at.
timestamp time.Time
// Nonce of the order this event refers to.
nonce order.Nonce
// MatchState is the state of the order matching process the event was
// created in.
MatchState order.MatchState
// UnitsFilled is the number of units that was or would have been filled
// for the current match attempt the order was in when this event was
// created.
UnitsFilled order.SupplyUnit
// MatchedOrder is the order counterpart that our order was matched to
// in the current match attempt this event was created for.
MatchedOrder order.Nonce
// RejectReason is set to the reason we rejected the match in case the
// MatchState is set to MatchStateRejected.
RejectReason uint32
}
// NewMatchEvent creates a new MatchEvent from an order and its matched
// counterpart with the given time as the timestamp.
func NewMatchEvent(ts time.Time, nonce order.Nonce, state order.MatchState,
unitsFilled order.SupplyUnit, matchedOrder order.Nonce,
rejectReason uint32) *MatchEvent {
return &MatchEvent{
timestamp: ts,
nonce: nonce,
MatchState: state,
UnitsFilled: unitsFilled,
MatchedOrder: matchedOrder,
RejectReason: rejectReason,
}
}
// Type returns the type of the event.
//
// NOTE: This is part of the event.Event interface.
func (e *MatchEvent) Type() event.Type {
return event.TypeOrderMatch
}
// Timestamp is the time the event happened. This will be made unique once it is
// stored. To avoid collisions, the timestamp is adjusted on the nanosecond
// scale to reach uniqueness.
//
// NOTE: This is part of the event.Event interface.
func (e *MatchEvent) Timestamp() time.Time {
return e.timestamp
}
// SetTimestamp updates the timestamp of the event. This is needed to adjust
// timestamps in case they collide to ensure the global uniqueness of all event
// timestamps.
//
// NOTE: This is part of the event.Event interface.
func (e *MatchEvent) SetTimestamp(ts time.Time) {
e.timestamp = ts
}
// String returns a human readable representation of the event.
//
// NOTE: This is part of the event.Event interface.
func (e *MatchEvent) String() string {
return fmt.Sprintf("OrderMatch(%v)", e.MatchState)
}
// Serialize writes the event data to a binary storage format. This does not
// serialize the event type as that's handled generically to allow for easy
// filtering.
//
// NOTE: This is part of the event.Event interface.
func (e *MatchEvent) Serialize(w io.Writer) error {
return WriteElements(
w, e.nonce, e.MatchState, e.UnitsFilled, e.MatchedOrder,
e.RejectReason,
)
}
// Deserialize reads the event data from a binary storage format. This does not
// deserialize the event type as that's handled generically to allow for easy
// filtering.
//
// NOTE: This is part of the event.Event interface.
func (e *MatchEvent) Deserialize(r io.Reader) error {
return ReadElements(
r, &e.nonce, &e.MatchState, &e.UnitsFilled, &e.MatchedOrder,
&e.RejectReason,
)
}
// Nonce returns the nonce of the order this event refers to.
//
// NOTE: This is part of the order.OrderEvent interface.
func (e *MatchEvent) Nonce() order.Nonce {
return e.nonce
}
// A compile time assertion to make sure MatchEvent implements both the
// event.Event and order.OrderEvent interface.
var _ event.Event = (*MatchEvent)(nil)
var _ OrderEvent = (*MatchEvent)(nil)
// GetOrderEvents returns all events of an order by looking up the event
// reference keys in the order bucket.
func (db *DB) GetOrderEvents(o order.Nonce) ([]event.Event, error) {
var events []event.Event
err := db.View(func(tx *bbolt.Tx) error {
ordersBucket, err := getBucket(tx, ordersBucketKey)
if err != nil {
return err
}
orderBucket := ordersBucket.Bucket(o[:])
if orderBucket == nil {
return ErrNoOrder
}
eventSubBucket := orderBucket.Bucket(eventRefSubBucket)
if eventSubBucket == nil {
return fmt.Errorf("order event sub bucket not found")
}
// We first need to collect all timestamps from the order
// bucket.
eventTimestampMap := make(map[time.Time]struct{})
err = eventSubBucket.ForEach(func(k, v []byte) error {
// Only look at keys with correct length.
if len(k) != event.TimestampLength {
return nil
}
// Assert there are no corrupt values, in the reference
// key we only store the event's type, not the value
// itself.
if len(v) != 1 {
return fmt.Errorf("unexpected timestamp "+
"value length: %d", len(v))
}
ts := time.Unix(0, int64(byteOrder.Uint64(k)))
eventTimestampMap[ts] = struct{}{}
return nil
})
if err != nil {
return err
}
// Now get the events themselves and return them. This is always
// sorted by the events' timestamps.
predicate := func(ts time.Time, _ event.Type) bool {
_, ok := eventTimestampMap[ts]
return ok
}
events, err = getEventsTX(tx, predicate)
return err
})
if err != nil {
return nil, err
}
return events, nil
}
// StoreOrderEvents stores a list of individual order events in a single
// database transaction. The events' timestamps are adjusted on the nanosecond
// scale to ensure they're unique.
func (db *DB) StoreOrderEvents(events []OrderEvent) error {
// Pre-adjust the timestamps so we get as few collisions later as
// possible. We need to convert the type because slices of interfaces
// aren't compatible by default.
baseEvents := make([]event.Event, len(events))
for idx, evt := range events {
baseEvents[idx] = evt
}
event.MakeUniqueTimestamps(baseEvents)
return db.Update(func(tx *bbolt.Tx) error {
ordersBucket, err := getBucket(tx, ordersBucketKey)
if err != nil {
return err
}
// Each event could be for a different order so we have to look
// up the individual order bucket in each iteration.
for _, evt := range events {
nonce := evt.Nonce()
orderBucket := ordersBucket.Bucket(nonce[:])
if orderBucket == nil {
return ErrNoOrder
}
if err := storeEventTX(orderBucket, evt); err != nil {
return err
}
}
return nil
})
}
// StoreBatchEvents creates a match event of the given match state for each of
// our orders involved in a batch and stores it to the main event store. In case
// of a batch reject the RPC reason enum value can optionally be specified.
func (db *DB) StoreBatchEvents(batch *order.Batch, state order.MatchState,
rejectReason poolrpc.MatchRejectReason) error {
ts := time.Now()
events := make([]OrderEvent, 0, len(batch.MatchedOrders))
for nonce, matchedOrders := range batch.MatchedOrders {
for _, matchedOrder := range matchedOrders {
events = append(events, NewMatchEvent(
ts, nonce, state, matchedOrder.UnitsFilled,
matchedOrder.Order.Nonce(),
uint32(rejectReason),
))
}
}
if err := db.StoreOrderEvents(events); err != nil {
return fmt.Errorf("error storing match events: %w", err)
}
return nil
}
// StoreBatchPartialRejectEvents creates a reject match event for each of our
// orders involved in a batch and stores it to the main event store, including
// the reason for the reject.
func (db *DB) StoreBatchPartialRejectEvents(batch *order.Batch,
partialRejects map[order.Nonce]*auctioneerrpc.OrderReject) error {
// In case an order itself wasn't rejected but was just in the same
// batch as a reject, we mark it in the event as such. If it was, this
// default value will be overwritten accordingly.
const defaultRejectReason = uint32(
poolrpc.MatchRejectReason_PARTIAL_REJECT_COLLATERAL,
)
// The RPC names are so long, we create more shortcuts here to increase
// readability.
const reasonFundingFailed = uint32(
poolrpc.MatchRejectReason_PARTIAL_REJECT_CHANNEL_FUNDING_FAILED,
)
const reasonDuplicatePeer = uint32(
poolrpc.MatchRejectReason_PARTIAL_REJECT_DUPLICATE_PEER,
)
ts := time.Now()
events := make([]OrderEvent, 0, len(batch.MatchedOrders))
for nonce, matchedOrders := range batch.MatchedOrders {
for _, matchedOrder := range matchedOrders {
otherNonce := matchedOrder.Order.Nonce()
evt := NewMatchEvent(
ts, nonce, order.MatchStateRejected,
matchedOrder.UnitsFilled, otherNonce,
defaultRejectReason,
)
reject, ok := partialRejects[otherNonce]
if ok {
switch reject.ReasonCode {
case auctioneerrpc.OrderReject_CHANNEL_FUNDING_FAILED:
evt.RejectReason = reasonFundingFailed
case auctioneerrpc.OrderReject_DUPLICATE_PEER:
evt.RejectReason = reasonDuplicatePeer
}
}
events = append(events, evt)
}
}
if err := db.StoreOrderEvents(events); err != nil {
return fmt.Errorf("error storing match events: %w", err)
}
return nil
}