/
protocol.go
261 lines (223 loc) · 6.27 KB
/
protocol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
package http
import (
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
"sync"
"time"
"github.com/cloudevents/sdk-go/v2/binding"
cecontext "github.com/cloudevents/sdk-go/v2/context"
"github.com/cloudevents/sdk-go/v2/protocol"
)
const (
// DefaultShutdownTimeout defines the default timeout given to the http.Server when calling Shutdown.
DefaultShutdownTimeout = time.Minute * 1
)
// Protocol acts as both a http client and a http handler.
type Protocol struct {
Target *url.URL
RequestTemplate *http.Request
Client *http.Client
incoming chan msgErr
// To support Opener:
// ShutdownTimeout defines the timeout given to the http.Server when calling Shutdown.
// If nil, DefaultShutdownTimeout is used.
ShutdownTimeout time.Duration
// Port is the port to bind the receiver to. Defaults to 8080.
Port *int
// Path is the path to bind the receiver to. Defaults to "/".
Path string
// Receive Mutex
reMu sync.Mutex
// Handler is the handler the http Server will use. Use this to reuse the
// http server. If nil, the Protocol will create a one.
Handler *http.ServeMux
listener net.Listener
roundTripper http.RoundTripper
server *http.Server
handlerRegistered bool
middleware []Middleware
}
func New(opts ...Option) (*Protocol, error) {
p := &Protocol{
incoming: make(chan msgErr),
}
if err := p.applyOptions(opts...); err != nil {
return nil, err
}
if p.Client == nil {
p.Client = http.DefaultClient
}
if p.roundTripper != nil {
p.Client.Transport = p.roundTripper
}
if p.ShutdownTimeout == 0 {
p.ShutdownTimeout = DefaultShutdownTimeout
}
return p, nil
}
func (p *Protocol) applyOptions(opts ...Option) error {
for _, fn := range opts {
if err := fn(p); err != nil {
return err
}
}
return nil
}
// Send implements binding.Sender
func (p *Protocol) Send(ctx context.Context, m binding.Message, transformers ...binding.Transformer) error {
if ctx == nil {
return fmt.Errorf("nil Context")
} else if m == nil {
return fmt.Errorf("nil Message")
}
_, err := p.Request(ctx, m, transformers...)
return err
}
// Request implements binding.Requester
func (p *Protocol) Request(ctx context.Context, m binding.Message, transformers ...binding.Transformer) (binding.Message, error) {
if ctx == nil {
return nil, fmt.Errorf("nil Context")
} else if m == nil {
return nil, fmt.Errorf("nil Message")
}
var err error
defer func() { _ = m.Finish(err) }()
req := p.makeRequest(ctx)
if p.Client == nil || req == nil || req.URL == nil {
return nil, fmt.Errorf("not initialized: %#v", p)
}
if err = WriteRequest(ctx, m, req, transformers...); err != nil {
return nil, err
}
return p.do(ctx, req)
}
func (p *Protocol) makeRequest(ctx context.Context) *http.Request {
// TODO: support custom headers from context?
req := &http.Request{
Method: http.MethodPost,
Header: make(http.Header),
// TODO: HeaderFrom(ctx),
}
if p.RequestTemplate != nil {
req.Method = p.RequestTemplate.Method
req.URL = p.RequestTemplate.URL
req.Close = p.RequestTemplate.Close
req.Host = p.RequestTemplate.Host
copyHeadersEnsure(p.RequestTemplate.Header, &req.Header)
}
if p.Target != nil {
req.URL = p.Target
}
// Override the default request with target from context.
if target := cecontext.TargetFrom(ctx); target != nil {
req.URL = target
}
return req.WithContext(ctx)
}
// Ensure to is a non-nil map before copying
func copyHeadersEnsure(from http.Header, to *http.Header) {
if len(from) > 0 {
if *to == nil {
*to = http.Header{}
}
copyHeaders(from, *to)
}
}
func copyHeaders(from, to http.Header) {
if from == nil || to == nil {
return
}
for header, values := range from {
for _, value := range values {
to.Add(header, value)
}
}
}
// Receive the next incoming HTTP request as a CloudEvent.
// Returns non-nil error if the incoming HTTP request fails to parse as a CloudEvent
// Returns io.EOF if the receiver is closed.
func (p *Protocol) Receive(ctx context.Context) (binding.Message, error) {
if ctx == nil {
return nil, fmt.Errorf("nil Context")
}
msg, fn, err := p.Respond(ctx)
// No-op the response when finish is invoked.
if msg != nil {
return binding.WithFinish(msg, func(err error) {
if fn != nil {
_ = fn(ctx, nil, nil)
}
}), err
} else {
return nil, err
}
}
// Respond receives the next incoming HTTP request as a CloudEvent and waits
// for the response callback to invoked before continuing.
// Returns non-nil error if the incoming HTTP request fails to parse as a CloudEvent
// Returns io.EOF if the receiver is closed.
func (p *Protocol) Respond(ctx context.Context) (binding.Message, protocol.ResponseFn, error) {
if ctx == nil {
return nil, nil, fmt.Errorf("nil Context")
}
select {
case in, ok := <-p.incoming:
if !ok {
return nil, nil, io.EOF
}
return in.msg, in.respFn, in.err
case <-ctx.Done():
return nil, nil, io.EOF
}
}
type msgErr struct {
msg *Message
respFn protocol.ResponseFn
err error
}
// ServeHTTP implements http.Handler.
// Blocks until ResponseFn is invoked.
func (p *Protocol) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
m := NewMessageFromHttpRequest(req)
if m == nil || m.ReadEncoding() == binding.EncodingUnknown {
p.incoming <- msgErr{msg: nil, err: binding.ErrUnknownEncoding}
return // if there was no message, return.
}
done := make(chan struct{})
var finishErr error
m.OnFinish = func(err error) error {
finishErr = err
return nil
}
var fn protocol.ResponseFn = func(ctx context.Context, resp binding.Message, er protocol.Result, transformers ...binding.Transformer) error {
// Unblock the ServeHTTP after the reply is written
defer func() {
done <- struct{}{}
}()
status := http.StatusOK
if finishErr != nil {
http.Error(rw, fmt.Sprintf("cannot forward CloudEvent: %v", finishErr), http.StatusInternalServerError)
}
if er != nil {
var result *Result
if protocol.ResultAs(er, &result) {
if result.StatusCode > 100 && result.StatusCode < 600 {
status = result.StatusCode
}
}
}
if resp != nil {
err := WriteResponseWriter(ctx, resp, status, rw, transformers...)
return resp.Finish(err)
}
rw.WriteHeader(status)
return nil
}
p.incoming <- msgErr{msg: m, respFn: fn} // Send to Request
// Block until ResponseFn is invoked
<-done
}