Skip to content

Commit

Permalink
handling websocket connection upgrade fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
dencoded committed Dec 22, 2017
1 parent a19b64c commit a3aedd8
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 1 deletion.
147 changes: 146 additions & 1 deletion gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,7 +1282,7 @@ func TestWithCacheAllSafeRequests(t *testing.T) {
}
}

func TestWebsocketsUpstream(t *testing.T) {
func TestWebsocketsUpstreamUpgradeRequest(t *testing.T) {
// setup and run web socket upstream
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
Expand Down Expand Up @@ -1371,3 +1371,148 @@ func TestWebsocketsUpstream(t *testing.T) {
}()
testHttp(t, tests, false)
}

func TestWebsocketsSeveralOpenClose(t *testing.T) {
config.Global.ListenAddress = "127.0.0.1"
config.Global.HttpServerOptions.EnableWebSockets = true

// setup and run web socket upstream
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

wsHandler := func(w http.ResponseWriter, req *http.Request) {
// upgrade connection
conn, err := upgrader.Upgrade(w, req, nil)
if err != nil {
t.Error("cannot upgrade:", err)
http.Error(w, fmt.Sprintf("cannot upgrade: %v", err), http.StatusInternalServerError)
}

// start simple reader/writer per connection
go func() {
for {
mt, p, err := conn.ReadMessage()
if err != nil {
t.Error("cannot read message:", err)
return
}
conn.WriteMessage(mt, []byte("reply to message: "+string(p)))
}
}()
}
wsServer := httptest.NewServer(http.HandlerFunc(wsHandler))
defer wsServer.Close()
u, _ := url.Parse(wsServer.URL)
u.Scheme = "ws"
targetUrl := u.String()
t.Log("upstream target URL:", targetUrl)

ln, _ := generateListener(0)
baseURL := "ws://" + ln.Addr().String()
listen(ln, nil, nil)
defer func() {
config.Global.ListenAddress = ""
ln.Close()
}()

buildAndLoadAPI(func(spec *APISpec) {
spec.APIID = "1"
spec.UseKeylessAccess = true
spec.Auth = apidef.Auth{
AuthHeaderName: "authorization",
}
spec.VersionDefinition.Location = "header"
spec.VersionDefinition.Key = "version"
spec.Proxy.ListenPath = "/v1"
spec.Proxy.TargetURL = targetUrl
})

// connect 1st time, send and read message, close connection
conn1, _, err := websocket.DefaultDialer.Dial(baseURL+"/v1", nil)
if err != nil {
t.Fatalf("cannot make websocket connection: %v", err)
}
err = conn1.WriteMessage(websocket.BinaryMessage, []byte("test message 1"))
if err != nil {
t.Fatalf("cannot write message: %v", err)
}
_, p, err := conn1.ReadMessage()
if err != nil {
t.Fatalf("cannot read message: %v", err)
}
t.Logf("success: received response: %q\n", p)
if string(p) != "reply to message: test message 1" {
t.Error("Unexpected reply:", string(p))
}
conn1.Close()

// connect 2nd time, send and read message, but don't close yet
conn2, _, err := websocket.DefaultDialer.Dial(baseURL+"/v1", nil)
if err != nil {
t.Fatalf("cannot make websocket connection: %v", err)
}
err = conn2.WriteMessage(websocket.BinaryMessage, []byte("test message 2"))
if err != nil {
t.Fatalf("cannot write message: %v", err)
}
_, p, err = conn2.ReadMessage()
if err != nil {
t.Fatalf("cannot read message: %v", err)
}
t.Logf("success: received response: %q\n", p)
if string(p) != "reply to message: test message 2" {
t.Error("Unexpected reply:", string(p))
}

// connect 3d time having one connection already open before, send and read message
conn3, _, err := websocket.DefaultDialer.Dial(baseURL+"/v1", nil)
if err != nil {
t.Fatalf("cannot make websocket connection: %v", err)
}
err = conn3.WriteMessage(websocket.BinaryMessage, []byte("test message 3"))
if err != nil {
t.Fatalf("cannot write message: %v", err)
}
_, p, err = conn3.ReadMessage()
if err != nil {
t.Fatalf("cannot read message: %v", err)
}
t.Logf("success: received response: %q\n", p)
if string(p) != "reply to message: test message 3" {
t.Error("Unexpected reply:", string(p))
}

// check that we still can interact via 2nd connection we did before
err = conn2.WriteMessage(websocket.BinaryMessage, []byte("new test message 2"))
if err != nil {
t.Fatalf("cannot write message: %v", err)
}
_, p, err = conn2.ReadMessage()
if err != nil {
t.Fatalf("cannot read message: %v", err)
}
t.Logf("success: received response: %q\n", p)
if string(p) != "reply to message: new test message 2" {
t.Error("Unexpected reply:", string(p))
}

// check that we still can interact via 3d connection we did before
err = conn3.WriteMessage(websocket.BinaryMessage, []byte("new test message 3"))
if err != nil {
t.Fatalf("cannot write message: %v", err)
}
_, p, err = conn3.ReadMessage()
if err != nil {
t.Fatalf("cannot read message: %v", err)
}
t.Logf("success: received response: %q\n", p)
if string(p) != "reply to message: new test message 3" {
t.Error("Unexpected reply:", string(p))
}

// clean up connections
conn2.Close()
conn3.Close()
}
4 changes: 4 additions & 0 deletions reverse_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,10 @@ func (p *ReverseProxy) WrappedServeHTTP(rw http.ResponseWriter, req *http.Reques
if p.TykAPISpec.HTTPTransport == nil {
_, timeout := p.CheckHardTimeoutEnforced(p.TykAPISpec, req)
p.TykAPISpec.HTTPTransport = httpTransport(timeout, rw, req, p)
} else if IsWebsocket(req) { // check if it is an upgrade request to NEW WS-connection
// overwrite transport's ResponseWriter from previous upgrade request
// as it was already hijacked and now is being used for other connection
p.TykAPISpec.HTTPTransport.(*WSDialer).RW = rw
}

ctx := req.Context()
Expand Down

0 comments on commit a3aedd8

Please sign in to comment.