forked from wal-g/wal-g
/
builder.go
100 lines (90 loc) · 1.87 KB
/
builder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package internal
import (
"context"
"fmt"
"sync"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
type OpInfo struct {
command bson.D
id int
opName string
status bool
err error
timeStart time.Time
timeEnd time.Time
subcmds []OpInfo
res bson.M
}
func NewOpInfo(opName string, id int, timeStart time.Time, timeEnd time.Time, err error) OpInfo {
return OpInfo{
command: bson.D{},
id: id,
opName: opName,
status: err == nil,
err: err,
timeStart: timeStart,
timeEnd: timeEnd,
subcmds: nil,
res: nil,
}
}
func BuildStage(ctx context.Context,
cli *mongo.Client,
roc <-chan RawMongoOp,
wg *sync.WaitGroup) (<-chan ExecFunc, <-chan error) {
cmds := make(chan ExecFunc, cap(roc))
errc := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
defer close(cmds)
defer close(errc)
for cmd := range roc {
opCmd, err := NewExecFunc(cli, cmd)
if err != nil {
errc <- fmt.Errorf("cannot parse command: %v", err)
return
}
select {
case cmds <- opCmd:
case <-ctx.Done():
return
}
}
}()
return cmds, errc
}
func NewExecFunc(client *mongo.Client, opdata RawMongoOp) (ExecFunc, error) {
switch opdata.OP {
case `c`:
op, err := NewCommandOp(opdata)
if err != nil {
return nil, err
}
return NewOpExec(client, op), nil
// TODO: refactor
case `t`:
transactionDoc, err := NewTxnOp(opdata)
if err != nil {
return nil, err
}
return NewTxnExec(client, transactionDoc), nil
case `sleep`:
sleepDoc, err := NewSleepOp(opdata)
if err != nil {
return nil, err
}
return NewSleepExec(client, sleepDoc), nil
case `abort`:
abortDoc, err := NewAbortOp(opdata)
if err != nil {
return nil, err
}
return NewAbortExec(client, abortDoc), nil
default:
return nil, fmt.Errorf("unknown command: %v", opdata.OP)
}
}