forked from cloudevents/sdk-go
/
engine.go
138 lines (120 loc) · 3.14 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package http
import (
"context"
"fmt"
"net"
"net/http"
"strings"
"github.com/ian-mi/sdk-go/v2/pkg/protocol"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/tracecontext"
)
var _ protocol.Opener = (*Protocol)(nil)
func (e *Protocol) OpenInbound(ctx context.Context) error {
e.reMu.Lock()
defer e.reMu.Unlock()
if e.Handler == nil {
e.Handler = http.NewServeMux()
}
if !e.handlerRegistered {
// handler.Handle might panic if the user tries to use the same path as the sdk.
e.Handler.Handle(e.GetPath(), e)
e.handlerRegistered = true
}
addr, err := e.listen()
if err != nil {
return err
}
e.server = &http.Server{
Addr: addr.String(),
Handler: &ochttp.Handler{
Propagation: &tracecontext.HTTPFormat{},
Handler: attachMiddleware(e.Handler, e.middleware),
FormatSpanName: formatSpanName,
},
}
// Shutdown
defer func() {
_ = e.server.Close()
e.server = nil
}()
errChan := make(chan error, 1)
go func() {
errChan <- e.server.Serve(e.listener)
}()
// wait for the server to return or ctx.Done().
select {
case <-ctx.Done():
// Try a gracefully shutdown.
timeout := *e.ShutdownTimeout
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
err := e.server.Shutdown(ctx)
<-errChan // Wait for server goroutine to exit
return err
case err := <-errChan:
return err
}
}
// HasTracePropagation implements Protocol.HasTracePropagation
func (e *Protocol) HasTracePropagation() bool { // TODO: clean this all up.
return false
}
// GetPort returns the listening port.
// Returns -1 if there is a listening error.
// Note this will call net.Listen() if the listener is not already started.
func (e *Protocol) GetPort() int {
// Ensure we have a listener and therefore a port.
if _, err := e.listen(); err == nil || e.Port != nil {
return *e.Port
}
return -1
}
func formatSpanName(r *http.Request) string {
return "cloudevents.http." + r.URL.Path
}
func (e *Protocol) setPort(port int) {
if e.Port == nil {
e.Port = new(int)
}
*e.Port = port
}
// listen if not already listening, update t.Port
func (e *Protocol) listen() (net.Addr, error) {
if e.listener == nil {
port := 8080
if e.Port != nil {
port = *e.Port
if port < 0 || port > 65535 {
return nil, fmt.Errorf("invalid port %d", port)
}
}
var err error
if e.listener, err = net.Listen("tcp", fmt.Sprintf(":%d", port)); err != nil {
return nil, err
}
}
addr := e.listener.Addr()
if tcpAddr, ok := addr.(*net.TCPAddr); ok {
e.setPort(tcpAddr.Port)
}
return addr, nil
}
// GetPath returns the path the transport is hosted on. If the path is '/',
// the transport will handle requests on any URI. To discover the true path
// a request was received on, inspect the context from Receive(cxt, ...) with
// TransportContextFrom(ctx).
func (e *Protocol) GetPath() string {
path := strings.TrimSpace(e.Path)
if len(path) > 0 {
return path
}
return "/" // default
}
// attachMiddleware attaches the HTTP middleware to the specified handler.
func attachMiddleware(h http.Handler, middleware []Middleware) http.Handler {
for _, m := range middleware {
h = m(h)
}
return h
}