Skip to content

Commit

Permalink
Replace gorilla with melody for websocket API (#1205)
Browse files Browse the repository at this point in the history
  • Loading branch information
NikkyAI committed Aug 26, 2020
1 parent 88d371c commit 27c0254
Show file tree
Hide file tree
Showing 16 changed files with 978 additions and 34 deletions.
76 changes: 42 additions & 34 deletions bridge/api/api.go
Expand Up @@ -6,9 +6,10 @@ import (
"sync"
"time"

"gopkg.in/olahol/melody.v1"

"github.com/42wim/matterbridge/bridge"
"github.com/42wim/matterbridge/bridge/config"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
ring "github.com/zfjagann/golang-ring"
Expand All @@ -18,6 +19,7 @@ type API struct {
Messages ring.Ring
sync.RWMutex
*bridge.Config
mrouter *melody.Melody
}

type Message struct {
Expand All @@ -33,6 +35,32 @@ func New(cfg *bridge.Config) bridge.Bridger {
e := echo.New()
e.HideBanner = true
e.HidePort = true

b.mrouter = melody.New()
b.mrouter.HandleMessage(func(s *melody.Session, msg []byte) {
message := config.Message{}
err := json.Unmarshal(msg, &message)
if err != nil {
b.Log.Errorf("failed to decode message from byte[] '%s'", string(msg))
return
}
b.handleWebsocketMessage(message)
})
b.mrouter.HandleConnect(func(session *melody.Session) {
greet := b.getGreeting()
data, err := json.Marshal(greet)
if err != nil {
b.Log.Errorf("failed to encode message '%v'", greet)
return
}
err = session.Write(data)
if err != nil {
b.Log.Errorf("failed to write message '%s'", string(data))
return
}
// TODO: send message history buffer from `b.Messages` here
})

b.Messages = ring.Ring{}
if b.GetInt("Buffer") != 0 {
b.Messages.SetCapacity(b.GetInt("Buffer"))
Expand Down Expand Up @@ -67,13 +95,13 @@ func New(cfg *bridge.Config) bridge.Bridger {
func (b *API) Connect() error {
return nil
}

func (b *API) Disconnect() error {
return nil

}

func (b *API) JoinChannel(channel config.ChannelInfo) error {
return nil

}

func (b *API) Send(msg config.Message) (string, error) {
Expand All @@ -83,7 +111,14 @@ func (b *API) Send(msg config.Message) (string, error) {
if msg.Event == config.EventMsgDelete {
return "", nil
}
b.Messages.Enqueue(&msg)
b.Log.Debugf("enqueueing message from %s on ring buffer", msg.Username)
b.Messages.Enqueue(msg)

data, err := json.Marshal(msg)
if err != nil {
b.Log.Errorf("failed to encode message '%s'", msg)
}
_ = b.mrouter.Broadcast(data)
return "", nil
}

Expand Down Expand Up @@ -131,6 +166,7 @@ func (b *API) handleStream(c echo.Context) error {
}
c.Response().Flush()
for {
// TODO: this causes issues, messages should be broadcasted to all connected clients
msg := b.Messages.Dequeue()
if msg != nil {
if err := json.NewEncoder(c.Response()).Encode(msg); err != nil {
Expand All @@ -153,40 +189,12 @@ func (b *API) handleWebsocketMessage(message config.Message) {
b.Remote <- message
}

func (b *API) writePump(conn *websocket.Conn) {
for {
msg := b.Messages.Dequeue()
if msg != nil {
err := conn.WriteJSON(msg)
if err != nil {
break
}
}
}
}

func (b *API) readPump(conn *websocket.Conn) {
for {
message := config.Message{}
err := conn.ReadJSON(&message)
if err != nil {
break
}
b.handleWebsocketMessage(message)
}
}

func (b *API) handleWebsocket(c echo.Context) error {
conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), nil, 1024, 1024)
err := b.mrouter.HandleRequest(c.Response(), c.Request())
if err != nil {
b.Log.Errorf("error in websocket handling '%v'", err)
return err
}

greet := b.getGreeting()
_ = conn.WriteJSON(greet)

go b.writePump(conn)
go b.readPump(conn)

return nil
}
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -50,6 +50,7 @@ require (
golang.org/x/image v0.0.0-20200618115811-c13761719519
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
gomod.garykim.dev/nc-talk v0.0.2
gopkg.in/olahol/melody.v1 v1.0.0-20170518105555-d52139073376
)

go 1.13
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -928,6 +928,8 @@ gopkg.in/ini.v1 v1.55.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/mail.v2 v2.3.1/go.mod h1:htwXN1Qh09vZJ1NVKxQqHPBaCBbzKhp5GzuJEA4VJWw=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/olahol/melody.v1 v1.0.0-20170518105555-d52139073376 h1:sY2a+y0j4iDrajJcorb+a0hJIQ6uakU5gybjfLWHlXo=
gopkg.in/olahol/melody.v1 v1.0.0-20170518105555-d52139073376/go.mod h1:BHKOc1m5wm8WwQkMqYBoo4vNxhmF7xg8+xhG8L+Cy3M=
gopkg.in/olivere/elastic.v6 v6.2.30/go.mod h1:2cTT8Z+/LcArSWpCgvZqBgt3VOqXiy7v00w12Lz8bd4=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
Expand Down
2 changes: 2 additions & 0 deletions matterbridge.toml.sample
Expand Up @@ -1540,6 +1540,8 @@ Buffer=1000

#Bearer token used for authentication
#curl -H "Authorization: Bearer token" http://localhost:4242/api/messages
# https://github.com/vi/websocat
# websocat -H="Authorization: Bearer token" ws://127.0.0.1:4242/api/websocket
#OPTIONAL (no authorization if token is empty)
Token="mytoken"

Expand Down
5 changes: 5 additions & 0 deletions vendor/gopkg.in/olahol/melody.v1/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions vendor/gopkg.in/olahol/melody.v1/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions vendor/gopkg.in/olahol/melody.v1/CHANGELOG.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions vendor/gopkg.in/olahol/melody.v1/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 27c0254

Please sign in to comment.