-
Notifications
You must be signed in to change notification settings - Fork 290
/
Copy pathticketer.go
82 lines (73 loc) · 3.4 KB
/
ticketer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package oplog
import (
"context"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/boundary/internal/oplog/store"
"github.com/hashicorp/go-dbw"
)
const DefaultAggregateName = "global"
// Ticketer provides an interface to storage for Tickets, so you can easily substitute your own ticketer
type Ticketer interface {
// GetTicket returns a ticket for the specified name. You MUST GetTicket in the same transaction
// that you're using to write to the database tables. Names allow us to shard tickets around domain root names.
// Before getting a ticket you must insert it with it's name into the oplog_ticket table. This is done via a
// db migrations script. Requiring this insert as part of migrations ensures that the tickets are initialized in
// a separate transaction from when a client calls GetTicket(aggregateName) which is critical for the optimized locking
// pattern to work properly
GetTicket(ctx context.Context, aggregateName string) (*store.Ticket, error)
// Redeem ticket will attempt to redeem the ticket and ensure it's serialized with other tickets using the same
// aggregate name
Redeem(ctx context.Context, ticket *store.Ticket) error
}
// DbwTicketer defines a ticketer that uses the dbw pkg for database operations.
type DbwTicketer struct {
tx *dbw.DB
withAggregateNames bool
}
// NewTicketer creates a new ticketer that uses dbw for storage
func NewTicketer(ctx context.Context, tx *dbw.DB, opt ...Option) (*DbwTicketer, error) {
const op = "oplog.NewDbwTicketer"
if tx == nil {
return nil, errors.New(ctx, errors.InvalidParameter, op, "nil tx")
}
opts := GetOpts(opt...)
enableAggregateNames := opts[optionWithAggregateNames].(bool)
return &DbwTicketer{tx: tx, withAggregateNames: enableAggregateNames}, nil
}
// GetTicket returns a ticket for the specified name. You MUST GetTicket in the same transaction
// that you're using to write to the database tables. Names allow us to shard tickets around domain root names
func (ticketer *DbwTicketer) GetTicket(ctx context.Context, aggregateName string) (*store.Ticket, error) {
const op = "oplog.(GormTicketer).GetTicket"
if aggregateName == "" {
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing ticket name")
}
name := DefaultAggregateName
if ticketer.withAggregateNames {
name = aggregateName
}
ticket := store.Ticket{}
if err := dbw.New(ticketer.tx).LookupWhere(ctx, &ticket, "name = ?", []interface{}{name}); err != nil {
if errors.Is(err, dbw.ErrRecordNotFound) {
return nil, errors.New(ctx, errors.TicketNotFound, op, "ticket not found")
}
return nil, errors.Wrap(ctx, err, op, errors.WithMsg("error retrieving ticket from storage"))
}
return &ticket, nil
}
// Redeem will attempt to redeem the ticket. If the ticket version has already been used, then an error is returned
func (ticketer *DbwTicketer) Redeem(ctx context.Context, t *store.Ticket) error {
const op = "oplog.(GormTicketer).Redeem"
if t == nil {
return errors.New(ctx, errors.InvalidParameter, op, "nil ticket")
}
currentVersion := t.Version
t.Version = t.Version + 1
rowsUpdated, err := dbw.New(ticketer.tx).Update(ctx, t, []string{"Version"}, nil, dbw.WithVersion(¤tVersion))
if err != nil {
return errors.Wrap(ctx, err, op, errors.WithMsg("error trying to redeem ticket"))
}
if rowsUpdated != 1 {
return errors.New(ctx, errors.TicketAlreadyRedeemed, op, "ticket already redeemed")
}
return nil
}