/
runner.go
116 lines (101 loc) · 2.42 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package main
import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/rs/zerolog/log"
)
type runner struct {
configuration *configuration
backend backend
respawnWorkerDelay time.Duration
respawnWorkerChan chan *rule
executor executor
stop context.CancelFunc
stopped context.Context
}
func (rn *runner) initialize() error {
if rn.configuration == nil {
return errors.New("configuration has not been set")
}
// Backend
switch rn.configuration.Backend {
case "":
return errors.New("missing configuration value for backend")
case "ipset":
rn.backend = &ipsetBackend{runner: rn}
case "nft":
rn.backend = &nftBackend{runner: rn}
case "test":
rn.backend = &testBackend{runner: rn}
default:
return fmt.Errorf("unknown backend: %s", rn.configuration.Backend)
}
if err := rn.backend.initialize(); err != nil {
return fmt.Errorf("failed to initialize backend: %w", err)
}
// Rules
for n, r := range rn.configuration.Rules {
r.name = n
if err := r.initialize(rn); err != nil {
return fmt.Errorf(`failed to initialize rule "%s": %s`, n, err)
}
}
return nil
}
func (rn *runner) finalize() error {
if err := rn.backend.finalize(); err != nil {
return fmt.Errorf("failed to finalize backend: %w", err)
}
return nil
}
func (rn *runner) spawnWorker(r *rule, requeue bool) {
go func() {
select {
case <-rn.stopped.Done():
default:
r.worker(requeue)
}
}()
log.Info().Str("rule", r.name).Msg("spawned worker")
}
func (rn *runner) run(requeueWorkers bool) {
for _, r := range rn.configuration.Rules {
rn.spawnWorker(r, requeueWorkers)
}
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(signalChan)
go func() {
for {
select {
case r := <-rn.respawnWorkerChan:
time.Sleep(rn.respawnWorkerDelay)
rn.spawnWorker(r, requeueWorkers)
case <-rn.stopped.Done():
return
}
}
}()
select {
case <-rn.stopped.Done():
case s := <-signalChan:
log.Info().Str("signal", s.String()).Msg("received signal")
rn.stop()
}
}
func newRunner(c *configuration) *runner {
ctx, cancel := context.WithCancel(context.Background())
return &runner{
configuration: c,
respawnWorkerDelay: 5 * time.Second,
respawnWorkerChan: make(chan *rule),
executor: &defaultExecutor{},
stop: cancel,
stopped: ctx,
}
}