-
Notifications
You must be signed in to change notification settings - Fork 947
/
httpstream.go
119 lines (106 loc) · 4.55 KB
/
httpstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package httpstream
import (
"fmt"
"io"
"net/http"
"strings"
"time"
)
const (
// HeaderConnection is the connection field of http head.
HeaderConnection = "Connection"
// HeaderUpgrade is the upgrade field of http head.
HeaderUpgrade = "Upgrade"
// HeaderProtocolVersion is the stream protocols which client supports.
HeaderProtocolVersion = "X-Stream-Protocol-Version"
// HeaderAcceptedProtocolVersions is the stream protocols which server supports.
HeaderAcceptedProtocolVersions = "X-Accepted-Stream-Protocol-Versions"
)
// NewStreamHandler defines a function that is called when a new Stream is
// received. If no error is returned, the Stream is accepted; otherwise,
// the stream is rejected. After the reply frame has been sent, replySent is closed.
type NewStreamHandler func(stream Stream, replySent <-chan struct{}) error
// NoOpNewStreamHandler is a stream handler that accepts a new stream and
// performs no other logic.
func NoOpNewStreamHandler(stream Stream, replySent <-chan struct{}) error { return nil }
// ResponseUpgrader knows how to upgrade HTTP requests and responses to
// add streaming support to them.
type ResponseUpgrader interface {
// UpgradeResponse upgrades an HTTP response to one that supports multiplexed
// streams. newStreamHandler will be called asynchronously whenever the
// other end of the upgraded connection creates a new stream.
UpgradeResponse(w http.ResponseWriter, req *http.Request, newStreamHandler NewStreamHandler) Connection
}
// Connection represents an upgraded HTTP connection.
type Connection interface {
// CreateStream creates a new Stream with the supplied headers.
CreateStream(headers http.Header) (Stream, error)
// Close resets all streams and closes the connection.
Close() error
// CloseChan returns a channel that is closed when the underlying connection is closed.
CloseChan() <-chan bool
// SetIdleTimeout sets the amount of time the connection may remain idle before
// it is automatically closed.
SetIdleTimeout(timeout time.Duration)
}
// Stream represents a bidirectional communications channel that is part of an
// upgraded connection.
type Stream interface {
io.ReadWriteCloser
// Reset closes both directions of the stream, indicating that neither client
// or server can use it any more.
Reset() error
// Headers returns the headers used to create the stream.
Headers() http.Header
// Identifier returns the stream's ID.
Identifier() uint32
}
// IsUpgradeRequest returns true if the given request is a connection upgrade request
func IsUpgradeRequest(req *http.Request) bool {
for _, h := range req.Header[http.CanonicalHeaderKey(HeaderConnection)] {
if strings.Contains(strings.ToLower(h), strings.ToLower(HeaderUpgrade)) {
return true
}
}
return false
}
func negotiateProtocol(clientProtocols, serverProtocols []string) string {
for i := range clientProtocols {
for j := range serverProtocols {
if clientProtocols[i] == serverProtocols[j] {
return clientProtocols[i]
}
}
}
return ""
}
// Handshake performs a subprotocol negotiation. If the client did request a
// subprotocol, Handshake will select the first common value found in
// serverProtocols. If a match is found, Handshake adds a response header
// indicating the chosen subprotocol. If no match is found, HTTP forbidden is
// returned, along with a response header containing the list of protocols the
// server can accept.
func Handshake(w http.ResponseWriter, req *http.Request, serverProtocols []string) (string, error) {
clientProtocols := req.Header[http.CanonicalHeaderKey(HeaderProtocolVersion)]
if len(clientProtocols) == 0 {
// Kube 1.0 clients didn't support subprotocol negotiation.
// TODO require clientProtocols once Kube 1.0 is no longer supported
return "", nil
}
if len(serverProtocols) == 0 {
// Kube 1.0 servers didn't support subprotocol negotiation. This is mainly for testing.
// TODO require serverProtocols once Kube 1.0 is no longer supported
return "", nil
}
negotiatedProtocol := negotiateProtocol(clientProtocols, serverProtocols)
if len(negotiatedProtocol) == 0 {
w.WriteHeader(http.StatusForbidden)
for i := range serverProtocols {
w.Header().Add(HeaderAcceptedProtocolVersions, serverProtocols[i])
}
fmt.Fprintf(w, "unable to upgrade: unable to negotiate protocol: client supports %v, server accepts %v", clientProtocols, serverProtocols)
return "", fmt.Errorf("unable to upgrade: unable to negotiate protocol: client supports %v, server supports %v", clientProtocols, serverProtocols)
}
w.Header().Add(HeaderProtocolVersion, negotiatedProtocol)
return negotiatedProtocol, nil
}