/
protocol_lifecycle.go
125 lines (109 loc) · 2.89 KB
/
protocol_lifecycle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package http
import (
"context"
"fmt"
"net"
"net/http"
"strings"
"github.com/cloudevents/sdk-go/v2/protocol"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/tracecontext"
)
var _ protocol.Opener = (*Protocol)(nil)
func (p *Protocol) OpenInbound(ctx context.Context) error {
p.reMu.Lock()
defer p.reMu.Unlock()
if p.Handler == nil {
p.Handler = http.NewServeMux()
}
if !p.handlerRegistered {
// handler.Handle might panic if the user tries to use the same path as the sdk.
p.Handler.Handle(p.GetPath(), p)
p.handlerRegistered = true
}
// After listener is invok
listener, err := p.listen()
if err != nil {
return err
}
p.server = &http.Server{
Addr: listener.Addr().String(),
Handler: &ochttp.Handler{
Propagation: &tracecontext.HTTPFormat{},
Handler: attachMiddleware(p.Handler, p.middleware),
FormatSpanName: formatSpanName,
},
}
// Shutdown
defer func() {
_ = p.server.Close()
p.server = nil
}()
errChan := make(chan error, 1)
go func() {
errChan <- p.server.Serve(listener)
}()
// wait for the server to return or ctx.Done().
select {
case <-ctx.Done():
// Try a gracefully shutdown.
ctx, cancel := context.WithTimeout(context.Background(), p.ShutdownTimeout)
defer cancel()
err := p.server.Shutdown(ctx)
<-errChan // Wait for server goroutine to exit
return err
case err := <-errChan:
return err
}
}
// GetListeningPort returns the listening port.
// Returns -1 if it's not listening.
func (p *Protocol) GetListeningPort() int {
if listener := p.listener.Load(); listener != nil {
if tcpAddr, ok := listener.(net.Listener).Addr().(*net.TCPAddr); ok {
return tcpAddr.Port
}
}
return -1
}
func formatSpanName(r *http.Request) string {
return "cloudevents.http." + r.URL.Path
}
// listen if not already listening, update t.Port
func (p *Protocol) listen() (net.Listener, error) {
if p.listener.Load() == nil {
port := 8080
if p.Port != -1 {
port = p.Port
if port < 0 || port > 65535 {
return nil, fmt.Errorf("invalid port %d", port)
}
}
var err error
var listener net.Listener
if listener, err = net.Listen("tcp", fmt.Sprintf(":%d", port)); err != nil {
return nil, err
}
p.listener.Store(listener)
return listener, nil
}
return p.listener.Load().(net.Listener), nil
}
// GetPath returns the path the transport is hosted on. If the path is '/',
// the transport will handle requests on any URI. To discover the true path
// a request was received on, inspect the context from Receive(cxt, ...) with
// TransportContextFrom(ctx).
func (p *Protocol) GetPath() string {
path := strings.TrimSpace(p.Path)
if len(path) > 0 {
return path
}
return "/" // default
}
// attachMiddleware attaches the HTTP middleware to the specified handler.
func attachMiddleware(h http.Handler, middleware []Middleware) http.Handler {
for _, m := range middleware {
h = m(h)
}
return h
}