Skip to content

Commit

Permalink
fix gobwas/ws example
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Jun 6, 2020
1 parent 6332634 commit 42cec58
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 26 deletions.
41 changes: 23 additions & 18 deletions _examples/custom_ws_gobwas/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ func main() {
cfg.LogLevel = centrifuge.LogLevelDebug
cfg.LogHandler = handleLog
cfg.ClientInsecure = true
cfg.TokenHMACSecretKey = "secret"

cfg.Namespaces = []centrifuge.ChannelNamespace{
centrifuge.ChannelNamespace{
{
Name: "chat",
ChannelOptions: centrifuge.ChannelOptions{
Publish: true,
Expand Down Expand Up @@ -97,7 +98,7 @@ func main() {
})

transport := client.Transport()
log.Printf("user %s connected via %s with encoding: %s", client.UserID(), transport.Name(), transport.Encoding())
log.Printf("user %s connected via %s with format: %s", client.UserID(), transport.Name(), transport.Protocol())

// Connect handler should not block, so start separate goroutine to
// periodically send messages to client.
Expand Down Expand Up @@ -145,7 +146,7 @@ func main() {

protoType := centrifuge.ProtocolTypeJSON

upgrader := ws.Upgrader{
up := ws.Upgrader{
OnRequest: func(uri []byte) error {
if strings.Contains(string(uri), "format=protobuf") {
protoType = centrifuge.ProtocolTypeProtobuf
Expand All @@ -155,10 +156,10 @@ func main() {
}

// Zero-copy upgrade to WebSocket connection.
hs, err := upgrader.Upgrade(safeConn)
hs, err := up.Upgrade(safeConn)
if err != nil {
log.Printf("%s: upgrade error: %v", nameConn(conn), err)
conn.Close()
_ = conn.Close()
return
}

Expand All @@ -168,23 +169,23 @@ func main() {
client, err := centrifuge.NewClient(context.Background(), node, transport)
if err != nil {
log.Printf("%s: client create error: %v", nameConn(conn), err)
conn.Close()
_ = conn.Close()
return
}

// Create netpoll event descriptor for conn.
// We want to handle only read events of it.
desc := netpoll.Must(netpoll.HandleRead(conn))
desc := netpoll.Must(netpoll.HandleReadOnce(conn))

// Subscribe to events about conn.
poller.Start(desc, func(ev netpoll.Event) {
_ = poller.Start(desc, func(ev netpoll.Event) {
if ev&(netpoll.EventReadHup|netpoll.EventHup) != 0 {
// When ReadHup or Hup received, this mean that client has
// closed at least write end of the connection or connections
// itself. So we want to stop receive events about such conn
// and remove it from the chat registry.
poller.Stop(desc)
client.Close(nil)
_ = poller.Stop(desc)
_ = client.Close(nil)
return
}
// Here we can read some new message from connection.
Expand All @@ -193,16 +194,20 @@ func main() {
// We do not want to spawn a new goroutine to read single message.
// But we want to reuse previously spawned goroutine.
pool.Schedule(func() {
if data, err := transport.read(); err != nil {
if data, isControl, err := transport.read(); err != nil {
// When receive failed, we can only disconnect broken
// connection and stop to receive events about it.
poller.Stop(desc)
client.Close(nil)
_ = poller.Stop(desc)
_ = client.Close(nil)
} else {
ok := client.Handle(data)
if !ok {
return
if !isControl {
ok := client.Handle(data)
if !ok {
_ = poller.Stop(desc)
return
}
}
_ = poller.Resume(desc)
}
})
})
Expand All @@ -227,7 +232,7 @@ func main() {
accept := make(chan error, 1)

// Subscribe to events about listener.
poller.Start(acceptDesc, func(e netpoll.Event) {
_ = poller.Start(acceptDesc, func(e netpoll.Event) {
// We do not want to accept incoming connection when goroutine pool is
// busy. So if there are no free goroutines during 1ms we want to
// cooldown the server and do not receive connection for some short
Expand Down Expand Up @@ -261,7 +266,7 @@ func main() {
time.Sleep(delay)
}

poller.Resume(acceptDesc)
_ = poller.Resume(acceptDesc)
})

go func() {
Expand Down
4 changes: 3 additions & 1 deletion _examples/custom_ws_gobwas/readme.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Example demonstrates simple chat with custom WebSocket transport based on https://github.com/gobwas/ws library. You can expect a much less memory usage when handling lots of connections compared to default Gorilla Websocket based transport (about 3x reduction).
Example demonstrates a simple chat with custom WebSocket transport based on https://github.com/gobwas/ws library. You can expect a much less memory usage when handling lots of connections compared to default Gorilla Websocket based transport (about 3x reduction).

To start example run the following command from example directory:

Expand All @@ -7,3 +7,5 @@ go run main.go
```

Then go to http://localhost:8000 to see it in action.

Note that since this example uses `epoll` on Linux and `kqueue` on BSD-based systems you should be very careful with relying on this code for your production application. **Think wise and carefully test**.
14 changes: 7 additions & 7 deletions _examples/custom_ws_gobwas/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,24 @@ func (t *customWebsocketTransport) Encoding() centrifuge.EncodingType {
return centrifuge.EncodingTypeJSON
}

func (t *customWebsocketTransport) read() ([]byte, error) {
func (t *customWebsocketTransport) read() ([]byte, bool, error) {
t.mu.Lock()
defer t.mu.Unlock()

h, r, err := wsutil.NextReader(t.conn, ws.StateServerSide)
if err != nil {
return nil, err
return nil, false, err
}
if h.OpCode.IsControl() {
return nil, wsutil.ControlFrameHandler(t.conn, ws.StateServerSide)(h, r)
return nil, true, wsutil.ControlFrameHandler(t.conn, ws.StateServerSide)(h, r)
}

data, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
return nil, false, err
}

return data, nil
return data, false, nil
}

func (t *customWebsocketTransport) Write(data []byte) error {
Expand Down Expand Up @@ -92,10 +92,10 @@ func (t *customWebsocketTransport) Close(disconnect *centrifuge.Disconnect) erro

if disconnect != nil {
data := ws.NewCloseFrameBody(ws.StatusCode(disconnect.Code), disconnect.CloseText())
wsutil.WriteServerMessage(t.conn, ws.OpClose, data)
_ = wsutil.WriteServerMessage(t.conn, ws.OpClose, data)
return t.conn.Close()
}
data := ws.NewCloseFrameBody(ws.StatusNormalClosure, "")
wsutil.WriteServerMessage(t.conn, ws.OpClose, data)
_ = wsutil.WriteServerMessage(t.conn, ws.OpClose, data)
return t.conn.Close()
}

0 comments on commit 42cec58

Please sign in to comment.