From b5da52a2262815ecc0c67c4d9c93a73bdf53f8a1 Mon Sep 17 00:00:00 2001 From: John Sun Date: Sat, 17 Oct 2020 23:32:26 +0800 Subject: [PATCH] support websocket by integrate gin with gorilla --- examples/websocket/client.go | 105 +++++++++++++++++++++++++++++++++++ examples/websocket/server.go | 66 ++++++++++++++++++++++ go.mod | 1 + go.sum | 1 + pkg/server/http/server.go | 7 +++ pkg/server/http/websocket.go | 55 ++++++++++++++++++ 6 files changed, 235 insertions(+) create mode 100644 examples/websocket/client.go create mode 100644 examples/websocket/server.go create mode 100644 pkg/server/http/websocket.go diff --git a/examples/websocket/client.go b/examples/websocket/client.go new file mode 100644 index 0000000..11309b9 --- /dev/null +++ b/examples/websocket/client.go @@ -0,0 +1,105 @@ +/* + * + * Copyright 2020 waterdrop authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package main + +import ( + "flag" + "fmt" + "log" + "net/url" + "os" + "os/signal" + "time" + + "github.com/gorilla/websocket" +) + +var addr = flag.String("addr", "localhost:9000", "http service address") +var ping = []byte("ping") + +func main() { + flag.Parse() + log.SetFlags(0) + + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + u := url.URL{Scheme: "ws", Host: *addr, Path: "/ws"} + log.Printf("connecting to %s", u.String()) + + c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + log.Fatal("dial:", err) + } + defer c.Close() + + done := make(chan struct{}) + + go func() { + defer close(done) + for { + _, message, err := c.ReadMessage() + if err != nil { + log.Println("read:", err) + return + } + log.Printf("recv: %s", message) + } + }() + + ticker := time.NewTicker(10 * time.Second) + pingTicker := time.NewTicker(time.Second * 5) + defer ticker.Stop() + defer pingTicker.Stop() + + for { + select { + case <-done: + return + case t := <-ticker.C: + err := c.WriteMessage(websocket.TextMessage, []byte(t.String())) + if err != nil { + log.Println("write:", err) + return + } + case <-pingTicker.C: + err := c.WriteMessage(websocket.PingMessage, ping) + if err != nil { + log.Println("write:", err) + return + } + fmt.Println("write ping") + case <-interrupt: + log.Println("interrupt") + + // Cleanly close the connection by sending a close message and then + // waiting (with timeout) for the server to close the connection. + err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + log.Println("write close:", err) + return + } + select { + case <-done: + case <-time.After(time.Second): + } + return + } + } +} diff --git a/examples/websocket/server.go b/examples/websocket/server.go new file mode 100644 index 0000000..3ec0ef1 --- /dev/null +++ b/examples/websocket/server.go @@ -0,0 +1,66 @@ +/* + * + * Copyright 2020 waterdrop authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package main + +import ( + "fmt" + "github.com/UnderTreeTech/waterdrop/pkg/log" + "github.com/UnderTreeTech/waterdrop/pkg/server/http" + "github.com/gorilla/websocket" + "os" + "os/signal" + "syscall" + "time" +) + +var pong = []byte("pong") + +func main() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) + + defer log.New(nil).Sync() + + srv := http.NewServer(nil) + srv.Upgrade(http.NewWebSocket("/ws", func(ws *http.WebSocket) { + ws.SetPingHandler(func(message string) error { + ws.SetReadDeadline(time.Now().Add(time.Second * 10)) + return ws.WriteControl(websocket.PongMessage, pong, time.Now().Add(time.Second)) + }) + for { + ws.SetReadDeadline(time.Now().Add(time.Second * 10)) + msgType, message, err := ws.ReadMessage() + if err != nil { + fmt.Println("read msg fail", err.Error()) + break + } + fmt.Println("recv msg", string(message), msgType) + + err = ws.WriteMessage(msgType, message) + if err != nil { + fmt.Println("write msg fail", err.Error()) + break + } + } + })) + + srv.Start() + + <-c +} diff --git a/go.mod b/go.mod index a1d86b5..dbd2c49 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/gogo/protobuf v1.3.2-0.20200807193113-deb6fe8ca7c6 // indirect github.com/golang/protobuf v1.4.2 github.com/gomodule/redigo v1.8.2 + github.com/gorilla/websocket v1.4.2 github.com/json-iterator/go v1.1.10 // indirect github.com/kr/pretty v0.2.0 // indirect github.com/mitchellh/mapstructure v1.3.3 diff --git a/go.sum b/go.sum index 54497d4..f91db76 100644 --- a/go.sum +++ b/go.sum @@ -101,6 +101,7 @@ github.com/caddyserver/certmagic v0.10.6/go.mod h1:Y8jcUBctgk/IhpAzlHKfimZNyXCkf github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= diff --git a/pkg/server/http/server.go b/pkg/server/http/server.go index 8cca52c..5050706 100644 --- a/pkg/server/http/server.go +++ b/pkg/server/http/server.go @@ -105,3 +105,10 @@ func (s *Server) Start() net.Addr { func (s *Server) Stop(ctx context.Context) error { return s.Server.Shutdown(ctx) } + +// upgrade http to websocket +func (s *Server) Upgrade(ws *WebSocket) gin.IRoutes { + return s.GET(ws.Path, func(c *gin.Context) { + ws.Upgrade(c.Writer, c.Request) + }) +} diff --git a/pkg/server/http/websocket.go b/pkg/server/http/websocket.go new file mode 100644 index 0000000..89caaf2 --- /dev/null +++ b/pkg/server/http/websocket.go @@ -0,0 +1,55 @@ +/* + * + * Copyright 2020 waterdrop authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package http + +import ( + "github.com/UnderTreeTech/waterdrop/pkg/log" + "github.com/gorilla/websocket" + "net/http" +) + +type WebSocketHandler func(*WebSocket) + +type WebSocket struct { + Path string + Handler WebSocketHandler + + *websocket.Upgrader + *websocket.Conn +} + +func NewWebSocket(path string, handler WebSocketHandler) *WebSocket { + return &WebSocket{ + Path: path, + Upgrader: &websocket.Upgrader{}, + Handler: handler, + } +} + +func (ws *WebSocket) Upgrade(w http.ResponseWriter, r *http.Request) { + conn, err := ws.Upgrader.Upgrade(w, r, nil) + if err != nil { + log.Error(r.Context(), "upgrade fail", log.String("error", err.Error())) + return + } + defer conn.Close() + + ws.Conn = conn + ws.Handler(ws) +}