-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathnomad.go
123 lines (108 loc) · 2.91 KB
/
nomad.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package input
import (
"context"
"github.com/devopsext/events/common"
"github.com/devopsext/events/processor"
sreCommon "github.com/devopsext/sre/common"
"github.com/devopsext/utils"
nomad "github.com/hashicorp/nomad/api"
"sync"
"time"
)
type NomadInputOptions struct {
Address string
Token string
Topics []string
}
type NomadInput struct {
options NomadInputOptions
client *nomad.Client
ctx context.Context
processors *common.Processors
eventer sreCommon.Eventer
tracer sreCommon.Tracer
logger sreCommon.Logger
requests sreCommon.Counter
errors sreCommon.Counter
}
func (n *NomadInput) Start(wg *sync.WaitGroup, outputs *common.Outputs) {
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
p := n.processors.Find("NomadEvent").(*processor.NomadProcessor)
if p == nil {
n.logger.Debug("Nomad processor is not found for NomadEvent")
return
}
n.logger.Info("Start nomad input...")
topics := make(map[nomad.Topic][]string)
for _, topic := range n.options.Topics {
topics[nomad.Topic(topic)] = []string{"*"}
}
q := &nomad.QueryOptions{}
stream := n.client.EventStream()
for {
eventCh, err := stream.Stream(n.ctx, topics, 0, q)
if err != nil {
n.logger.Error(err)
time.Sleep(5 * time.Second)
continue
}
chanOk := true
for {
select {
case es, ok := <-eventCh:
if !ok {
n.logger.Error("Stream channel closed, restarting")
chanOk = false
time.Sleep(2 * time.Second)
break
}
if es.Err != nil {
n.logger.Error("Stream channel return error '%v', restarting", es.Err)
chanOk = false
time.Sleep(2 * time.Second)
break
}
for _, ne := range es.Events {
err := p.ProcessEvent(ne)
if err != nil {
n.logger.Error(err)
}
}
}
if !chanOk {
break
}
}
n.logger.Debug("Restart nomad input...")
}
}(wg)
}
func NewNomadInput(options NomadInputOptions, processors *common.Processors, observability *common.Observability) *NomadInput {
logger := observability.Logs()
if utils.IsEmpty(options.Address) || utils.IsEmpty(options.Token) {
logger.Debug("Nomad input address or token is not defined. Skipped")
return nil
}
config := nomad.DefaultConfig()
config.Address = options.Address
config.SecretID = options.Token
client, err := nomad.NewClient(config)
if err != nil {
logger.Error(err)
return nil
}
meter := observability.Metrics()
return &NomadInput{
options: options,
client: client,
processors: processors,
ctx: context.Background(),
eventer: observability.Events(),
tracer: observability.Traces(),
logger: observability.Logs(),
requests: meter.Counter("requests", "Count of all nomad input requests", []string{"subscription"}, "nomad", "input"),
errors: meter.Counter("errors", "Count of all nomad input errors", []string{"subscription"}, "nomad", "input"),
}
}