-
Notifications
You must be signed in to change notification settings - Fork 29
/
processor.go
112 lines (86 loc) · 2.94 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package server
import (
"context"
"sync"
"time"
"github.com/choria-io/go-choria/protocol"
"github.com/choria-io/go-choria/choria"
"github.com/choria-io/go-choria/server/agents"
)
func (srv *Instance) handleRawMessage(ctx context.Context, wg *sync.WaitGroup, replies chan *agents.AgentReply, rawmsg *choria.ConnectorMessage) {
var msg *choria.Message
totalCtr.WithLabelValues(srv.cfg.Identity).Inc()
transport, err := srv.fw.NewTransportFromJSON(string(rawmsg.Data))
if err != nil {
srv.log.Errorf("Could not deceode message into transport: %s", err)
unvalidatedCtr.WithLabelValues(srv.cfg.Identity).Inc()
return
}
sreq, err := srv.fw.NewSecureRequestFromTransport(transport, false)
if err != nil {
unvalidatedCtr.WithLabelValues(srv.cfg.Identity).Inc()
srv.log.Errorf("Could not decode incoming request: %s", err)
return
}
req, err := srv.fw.NewRequestFromSecureRequest(sreq)
if err != nil {
unvalidatedCtr.WithLabelValues(srv.cfg.Identity).Inc()
srv.log.Errorf("Could not decode secure request: %s", err)
return
}
protocol.CopyFederationData(transport, req)
if !srv.discovery.ShouldProcess(req, srv.agents.KnownAgents()) {
filteredCtr.WithLabelValues(srv.cfg.Identity).Inc()
srv.log.Debugf("Skipping message %s that does not match local properties", req.RequestID())
return
}
passedCtr.WithLabelValues(srv.cfg.Identity).Inc()
msg, err = choria.NewMessageFromRequest(req, transport.ReplyTo(), srv.fw)
if err != nil {
unvalidatedCtr.WithLabelValues(srv.cfg.Identity).Inc()
srv.log.Errorf("Could not create Message: %s", err)
return
}
if !msg.ValidateTTL() {
ttlExpiredCtr.WithLabelValues(srv.cfg.Identity).Inc()
srv.log.Errorf("Message %s created at %s is too old, TTL is %d", msg.String(), msg.TimeStamp, msg.TTL)
return
}
validatedCtr.WithLabelValues(srv.cfg.Identity).Inc()
srv.lastMsgProcessed = time.Now()
wg.Add(1)
go srv.agents.Dispatch(ctx, wg, replies, msg, req)
}
func (srv *Instance) handleReply(reply *agents.AgentReply) {
if reply.Error != nil {
srv.log.Errorf("Request %s failed, discarding: %s", reply.Message.RequestID, reply.Error.Error())
return
}
msg, err := choria.NewMessageFromRequest(reply.Request, reply.Message.ReplyTo(), srv.fw)
if err != nil {
srv.log.Errorf("Cannot create reply Message for %s: %s", reply.Message.RequestID, err)
return
}
msg.Payload = string(reply.Body)
err = srv.connector.Publish(msg)
if err != nil {
srv.log.Errorf("Publishing reply Message for %s failed: %s", reply.Message.RequestID, err)
}
}
func (srv *Instance) processRequests(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
replies := make(chan *agents.AgentReply, 100)
for {
select {
case rawmsg := <-srv.requests:
srv.handleRawMessage(ctx, wg, replies, rawmsg)
case reply := <-replies:
go srv.handleReply(reply)
case <-ctx.Done():
srv.log.Infof("Request processor existing on interrupt")
srv.publichShutdownEvent()
srv.connector.Close()
return
}
}
}