/
request_processor.go
156 lines (144 loc) · 3.52 KB
/
request_processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package dhcp
import (
"errors"
"fmt"
"github.com/insomniacslk/dhcp/dhcpv4"
"net"
)
const dhcpRequestChanBufSize = 1024
type ResponseGetter func(req Request) (Response, error)
type RequestProcessor struct {
socket Socket
dhcpRequestChan chan Request
server *Server
callbackSaveLeases CallbackSaveLeases
log RLogger
}
func NewRequestProcessor(listen Listen,
socketFactory SocketFactory,
callbackSaveLeases CallbackSaveLeases,
server *Server,
logger RLogger) (*RequestProcessor, error) {
var err error
listenerName := fmt.Sprintf("listener[%s]", listen.ToString())
l := &RequestProcessor{
dhcpRequestChan: make(chan Request, dhcpRequestChanBufSize),
callbackSaveLeases: callbackSaveLeases,
log: logger.WithName(listenerName),
server: server,
}
l.socket, err = socketFactory(listen.Addr, listen.Interface, logger)
if err != nil {
return nil, err
}
l.startRequestProcessors()
return l, nil
}
func (s *RequestProcessor) runResponseProcessor(responseChan <-chan Response) {
var response Response
var responses []Response
var err error
var more bool
for {
select {
case response, more = <-responseChan:
if !more {
return
}
responses = append(responses, response)
s.log.Debugf("added offer response to queue")
default:
if len(responses) > 0 {
s.log.Debugf("saving offers (%d)", len(responses))
err = s.callbackSaveLeases(responses)
if err != nil {
s.log.Errorf(err, "failed to save %d offers", len(responses))
responses = []Response{}
break
}
for _, response = range responses {
err = response.Send()
if err != nil {
s.log.Errorf(err, "failed to send response: %s", response)
}
}
} else {
s.log.Debugf("empty response queue")
}
response, more = <-responseChan
if !more {
return
}
responses = []Response{response}
}
}
}
func (s *RequestProcessor) runRequestProcessor() {
var (
req Request
resp Response
err error
more bool
)
responseChan := make(chan Response, 1024)
go s.runResponseProcessor(responseChan)
s.log.Debugf("Started worker")
for {
req, more = <-s.dhcpRequestChan
if !more {
s.log.Infof("No more packets. Exiting worker")
close(responseChan)
return
}
resp, err = s.server.GetResponse(req)
if err != nil {
s.log.Errorf(err, "Failed to get response to request: %s", req.String())
continue
} else {
//TODO send NAK and continue
switch resp.Response.MessageType() {
case dhcpv4.MessageTypeOffer:
resp.Lease.AckSent = false
responseChan <- resp
case dhcpv4.MessageTypeAck:
resp.Lease.AckSent = true
responseChan <- resp
default:
s.log.Infof("unknown response type: %s", resp.Response.String())
}
}
}
}
func (s *RequestProcessor) startRequestProcessors() {
go s.runRequestProcessor()
}
func (s *RequestProcessor) Serve() error {
for {
req, err := s.socket.NextRequest()
if err != nil {
e, ok := errors.Unwrap(err).(*net.OpError)
if ok {
if e.Err == net.ErrClosed {
s.log.Infof("Connection closed. Stopping server.")
return nil
}
}
s.log.Errorf(err, "Error reading packet")
if !e.Temporary() {
return e
}
} else {
s.dhcpRequestChan <- *req
}
}
}
func (s *Response) Send() error {
if isAddressZero(s.Request.GatewayIPAddr) {
return s.Request.socket.SendBroadcast(s.Request, s.Response)
}
return s.Request.socket.SendResponse(s.Request, s.Response)
}
func (s *RequestProcessor) Close() {
s.socket.Close()
//TODO: close responders
}