-
Notifications
You must be signed in to change notification settings - Fork 291
/
Copy pathqueue.go
168 lines (159 loc) · 5.38 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package oplog
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"sync"
"github.com/hashicorp/boundary/internal/db/common"
"github.com/hashicorp/boundary/internal/errors"
"github.com/hashicorp/go-dbw"
"google.golang.org/genproto/protobuf/field_mask"
"google.golang.org/protobuf/proto"
)
// Queue provides a FIFO queue
type Queue struct {
// Buffer for the queue
bytes.Buffer
// Catalog provides a TypeCatalog for the types added to the queue
Catalog *TypeCatalog
mx sync.Mutex
}
// add message to queue. typeName defines the type of message added to the
// queue and allows the msg to be removed using a TypeCatalog with a
// coresponding typeName entry. OpType defines the msg's operation (create, add,
// update, etc). If OpType == OpType_OP_TYPE_UPDATE, the WithFieldMaskPaths()
// and SetToNullPaths() options are supported.
func (q *Queue) add(ctx context.Context, m proto.Message, typeName string, t OpType, opt ...Option) error {
const op = "oplog.(Queue).Add"
// we're not checking the Catalog for nil, since it's not used
// when Adding messages to the queue
opts := GetOpts(opt...)
withFieldMasks := opts[optionWithFieldMaskPaths].([]string)
withNullPaths := opts[optionWithSetToNullPaths].([]string)
withOperationOptions := opts[optionWithOperationOptions].([]dbw.Option)
if _, ok := m.(ReplayableMessage); !ok {
return errors.New(ctx, errors.InvalidParameter, op, fmt.Sprintf("%T is not a replayable message", m))
}
value, err := proto.Marshal(m)
if err != nil {
return errors.New(ctx, errors.Encode, op, "error marshaling add parameter", errors.WithWrap(err))
}
if t == OpType_OP_TYPE_UPDATE {
if len(withFieldMasks) == 0 && len(withNullPaths) == 0 {
return errors.New(ctx, errors.InvalidParameter, op, "missing field masks or null paths for update")
}
fMasks := withFieldMasks
if fMasks == nil {
fMasks = []string{}
}
nullPaths := withNullPaths
if nullPaths == nil {
nullPaths = []string{}
}
i, _, _, err := common.Intersection(fMasks, nullPaths)
if err != nil {
return errors.Wrap(ctx, err, op)
}
if len(i) != 0 {
return errors.New(ctx, errors.InvalidParameter, op, fmt.Sprintf("field masks and null paths intersect with: %s", i))
}
}
var operationOpts *OperationOptions
if len(withOperationOptions) > 0 {
dbwOptions := dbw.GetOpts(withOperationOptions...)
operationOpts, err = convertFromDbwOpts(ctx, dbwOptions)
if err != nil {
return errors.Wrap(ctx, err, op)
}
}
msg := &AnyOperation{
TypeName: typeName,
Value: value,
OperationType: t,
FieldMask: &field_mask.FieldMask{Paths: withFieldMasks},
NullMask: &field_mask.FieldMask{Paths: withNullPaths},
Options: operationOpts,
}
data, err := proto.Marshal(msg)
if err != nil {
return errors.New(ctx, errors.Encode, op, "error marshaling message", errors.WithWrap(err))
}
q.mx.Lock()
defer q.mx.Unlock()
err = binary.Write(q, binary.LittleEndian, uint32(len(data)))
if err != nil {
return errors.New(ctx, errors.Io, op, "binary write error", errors.WithWrap(err))
}
n, err := q.Write(data)
if err != nil {
return errors.New(ctx, errors.Io, op, "error writing to queue buffer", errors.WithWrap(err))
}
if n != len(data) {
return errors.New(ctx, errors.Io, op, fmt.Sprintf("error writing to queue buffer (incorrect number of bytes %d of %d)", n, len(data)))
}
return nil
}
type queueEntry struct {
msg proto.Message
opType OpType
fieldMask []string
setToNullPaths []string
operationOptions *OperationOptions
}
// remove pb message from the queue and EOF if empty. It also returns the OpType
// for the msg and if it's OpType_OP_TYPE_UPDATE, the it will also return the
// fieldMask and setToNullPaths for the update operation.
func (q *Queue) remove(ctx context.Context) (*queueEntry, error) {
const op = "oplog.(Queue).Remove"
if q.Catalog == nil {
return nil, errors.New(ctx, errors.InvalidParameter, op, "nil catalog")
}
q.mx.Lock()
defer q.mx.Unlock()
var n uint32
err := binary.Read(q, binary.LittleEndian, &n)
if err == io.EOF {
return nil, err // intentionally not wrapping error, return io.EOF so client can handle it correctly
}
if err != nil {
return nil, errors.New(ctx, errors.Io, op, "binary read error", errors.WithWrap(err))
}
data := q.Next(int(n))
msg := new(AnyOperation)
err = proto.Unmarshal(data, msg)
if err != nil {
return nil, errors.New(ctx, errors.Decode, op, "error unmarshaling message", errors.WithWrap(err))
}
if msg.Value == nil {
return nil, nil
}
any, err := q.Catalog.Get(ctx, msg.TypeName)
if err != nil {
return nil, errors.Wrap(ctx, err, op, errors.WithMsg(fmt.Sprintf("error getting the TypeName: %s", msg.TypeName)))
}
pm := any.(proto.Message)
if err = proto.Unmarshal(msg.Value, pm); err != nil {
return nil, errors.New(ctx, errors.Decode, op, "error unmarshaling value", errors.WithWrap(err))
}
var masks, nullPaths []string
if msg.OperationType == OpType_OP_TYPE_UPDATE {
if msg.FieldMask != nil {
masks = msg.FieldMask.GetPaths()
}
if msg.NullMask != nil {
nullPaths = msg.NullMask.GetPaths()
}
if len(masks) == 0 && len(nullPaths) == 0 {
return nil, errors.New(ctx, errors.InvalidParameter, op, "field mask or null paths is required")
}
}
return &queueEntry{
msg: pm,
opType: msg.OperationType,
fieldMask: masks,
setToNullPaths: nullPaths,
operationOptions: msg.GetOptions(),
}, nil
}