-
Notifications
You must be signed in to change notification settings - Fork 1
/
queueing.go
150 lines (133 loc) · 3.95 KB
/
queueing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package queueing
import (
"context"
"crypto/rand"
"errors"
"fmt"
"log"
"math"
"math/big"
"strconv"
"time"
"github.com/ScoreTrak/ScoreTrak/pkg/exec"
"github.com/ScoreTrak/ScoreTrak/pkg/exec/resolver"
"github.com/gofrs/uuid"
)
type ScoringData struct {
Service QService
Properties map[string]string
Deadline time.Time
MasterTime time.Time
Host string
RoundID uint64
}
type QService struct {
ID uuid.UUID
Group string
Name string
ReturningTopic string
}
type QCheck struct {
Service QService
RoundID uint64
Passed bool
Log string
Err string
}
type IndexedQueue struct {
Q *QCheck
I int
}
type Config struct {
Use string `default:"none"`
Kafka struct {
}
NSQ struct {
ProducerNSQD string `default:"nsqd:4150"`
IgnoreAllScoresIfWorkerFails bool `default:"true"`
Topic string `default:"default"`
MaxInFlight int `default:"200"` // This should be more than min(NumberOfChecks, #NSQD Nodes)
AuthSecret string `default:""`
ClientRootCA string `default:""`
ClientSSLKey string `default:""`
ClientSSLCert string `default:""`
ConcurrentHandlers int `default:"200"`
NSQLookupd []string `default:"[\"nsqlookupd:4161\"]"`
ConsumerNSQDPool []string `default:"[\"\"]"` // "[\"nsqd:4150\"]"`
}
}
type MasterConfig struct {
ReportForceRefreshSeconds uint `default:"60"`
ChannelPrefix string `default:"master"`
}
func RandomInt() (string, error) {
n, err := rand.Int(rand.Reader, big.NewInt(math.MaxInt32))
if err != nil {
return "", err
}
return n.Text(10), nil
}
func TopicFromServiceRound(roundID uint64) (string, error) {
n, err := RandomInt()
if err != nil {
return "", err
}
return "round_" + strconv.FormatUint(roundID, 10) + "_" + n + "_ack", nil
}
var ErrUnknownPanic = errors.New("unknown panic")
var ErrPanic = errors.New("panic")
func CommonExecute(sd *ScoringData, execDeadline time.Time) QCheck {
if time.Now().After(sd.Deadline) {
return QCheck{Service: sd.Service, Passed: false, Log: "", Err: "The check arrived late to the worker. Make sure the time is synced between workers and masters, and there are enough workers to handle the load", RoundID: sd.RoundID}
}
executable := resolver.ExecutableByName(sd.Service.Name)
err := exec.UpdateExecutableProperties(executable, sd.Properties)
if err != nil {
errLog := fmt.Sprintf("Failed to set properties for %+v. Resolved Service: %+v. Properties provided %v. See Error details for additional information", sd.Service, executable, sd.Properties)
return QCheck{Service: sd.Service, Passed: false, Log: errLog, Err: err.Error(), RoundID: sd.RoundID}
}
ctx := context.Background()
ctx, cancel := context.WithDeadline(ctx, execDeadline)
defer cancel()
e := exec.NewExec(ctx, sd.Host, executable)
type checkRet struct {
passed bool
log string
err error
}
cq := make(chan checkRet)
go func() {
defer func() {
if x := recover(); x != nil {
var err error
switch x := x.(type) {
case string:
err = fmt.Errorf("%w: %s", ErrPanic, x)
case error:
err = x
default:
err = ErrUnknownPanic
}
log.Println(fmt.Errorf("unable to perform a check on scoring data %+v: %w", *sd, err))
return
}
}()
passed, l, err := e.Execute()
cq <- checkRet{passed: passed, log: l, err: err}
}()
select {
case res := <-cq:
var errstr string
if res.err != nil {
errstr = res.err.Error()
}
return QCheck{Service: sd.Service, Passed: res.passed, Log: res.log, Err: errstr, RoundID: sd.RoundID}
case <-time.After(time.Until(execDeadline.Add(time.Second))):
log.Panicln("check is possibly causing resource leakage", sd.Service, execDeadline)
return QCheck{}
}
}
type RoundTookTooLongToExecute struct {
Msg string
}
func (e *RoundTookTooLongToExecute) Error() string { return e.Msg }