From 7d77619723152e84b305b82c5e00f0a4c134b50d Mon Sep 17 00:00:00 2001 From: Jonathan Chappelow Date: Mon, 11 May 2020 12:24:01 -0500 Subject: [PATCH] server/comms: add (*Server).Broadcast and Link.IP --- server/auth/auth.go | 2 +- server/auth/auth_test.go | 7 ++++++- server/comms/comms_test.go | 32 +++++++++++++++++++++++++++++++- server/comms/link.go | 10 +++++++++- server/comms/server.go | 22 +++++++++++++++++++++- server/market/routers_test.go | 3 +++ 6 files changed, 71 insertions(+), 5 deletions(-) diff --git a/server/auth/auth.go b/server/auth/auth.go index d433eb63a7..d2c8fa0ba9 100644 --- a/server/auth/auth.go +++ b/server/auth/auth.go @@ -500,7 +500,7 @@ func (auth *AuthManager) handleConnect(conn comms.Link, msg *msgjson.Message) *m } auth.addClient(client) - log.Tracef("user %x connected", acctInfo.ID[:]) + log.Debugf("User %s connected from %s.", acctInfo.ID, conn.IP()) return nil } diff --git a/server/auth/auth_test.go b/server/auth/auth_test.go index 4e0aadaaf6..2a05de6a5d 100644 --- a/server/auth/auth_test.go +++ b/server/auth/auth_test.go @@ -92,6 +92,7 @@ type tReq struct { // tRPCClient satisfies the comms.Link interface. type TRPCClient struct { id uint64 + ip string sendErr error requestErr error banished bool @@ -100,6 +101,7 @@ type TRPCClient struct { } func (c *TRPCClient) ID() uint64 { return c.id } +func (c *TRPCClient) IP() string { return c.ip } func (c *TRPCClient) Send(msg *msgjson.Message) error { c.sends = append(c.sends, msg) return c.sendErr @@ -133,7 +135,10 @@ var tClientID uint64 func tNewRPCClient() *TRPCClient { tClientID++ - return &TRPCClient{id: tClientID} + return &TRPCClient{ + id: tClientID, + ip: "123.123.123.123", + } } var tAcctID uint64 diff --git a/server/comms/comms_test.go b/server/comms/comms_test.go index 0d3b58c284..6d7374e6dc 100644 --- a/server/comms/comms_test.go +++ b/server/comms/comms_test.go @@ -41,11 +41,13 @@ type wsConnStub struct { read int close int lastMsg []byte + recv chan []byte } func newWsStub() *wsConnStub { return &wsConnStub{ - msg: make(chan []byte), + msg: make(chan []byte), + // recv is nil unless a test wants to receive quit: make(chan struct{}), } } @@ -97,6 +99,10 @@ func (conn *wsConnStub) WriteMessage(msgType int, msg []byte) error { return nil } conn.lastMsg = msg + // Send the message if their is a receiver for the current test. + if conn.recv != nil { + conn.recv <- msg + } conn.write++ writeErrMtx.Lock() defer writeErrMtx.Unlock() @@ -147,6 +153,11 @@ func makeResp(id uint64, msg string) *msgjson.Message { return resp } +func makeNtfn(route, msg string) *msgjson.Message { + ntfn, _ := msgjson.NewNotification(route, json.RawMessage(msg)) + return ntfn +} + func sendToConn(t *testing.T, conn *wsConnStub, method, msg string) { encMsg, err := json.Marshal(makeReq(method, msg)) if err != nil { @@ -500,6 +511,25 @@ func TestClientResponses(t *testing.T) { wg.Wait() }() + // Test Broadcast + conn.recv = make(chan []byte) // for WriteMessage in this test + server.Broadcast(makeNtfn("someNote", `"blah"`)) // async conn.recv <- msg send + msgBytes := <-conn.recv + conn.recv = nil // all done with this chan + msg, err := msgjson.DecodeMessage(msgBytes) + if err != nil { + t.Fatalf("error decoding last message: %v", err) + } + var note string + err = json.Unmarshal(msg.Payload, ¬e) + if err != nil { + return + } + conn.lastMsg = nil + if note != "blah" { + t.Errorf("wrong note: %s", note) + } + // Send a request from the server to the client, setting a flag when the // client responds. responded := make(chan struct{}, 1) diff --git a/server/comms/link.go b/server/comms/link.go index 8faffc26a2..5acf80b455 100644 --- a/server/comms/link.go +++ b/server/comms/link.go @@ -19,8 +19,10 @@ const outBufferSize = 128 // reference implementation of a Link-satisfying type is the wsLink, which // passes messages over a websocket connection. type Link interface { - // ID will return a unique ID by which this connection can be identified. + // ID returns a unique ID by which this connection can be identified. ID() uint64 + // IP returns the IP address of the peer. + IP() string // Send sends the msgjson.Message to the client. Send(msg *msgjson.Message) error // Request sends the Request-type msgjson.Message to the client and registers @@ -69,10 +71,16 @@ func (c *wsLink) Banish() { c.Disconnect() } +// ID returns a unique ID by which this connection can be identified. func (c *wsLink) ID() uint64 { return c.id } +// IP returns the IP address of the peer. +func (c *wsLink) IP() string { + return c.WSLink.IP() +} + func handleMessage(c *wsLink, msg *msgjson.Message) *msgjson.Error { switch msg.Type { case msgjson.Request: diff --git a/server/comms/server.go b/server/comms/server.go index 1a486879c2..b5c87927ea 100644 --- a/server/comms/server.go +++ b/server/comms/server.go @@ -21,6 +21,7 @@ import ( "decred.org/dcrdex/dex/msgjson" "decred.org/dcrdex/dex/ws" "github.com/decred/dcrd/certgen" + "github.com/decred/slog" "github.com/go-chi/chi" "github.com/go-chi/chi/middleware" ) @@ -291,7 +292,7 @@ func (s *Server) banish(ip string) { // run as a goroutine. func (s *Server) websocketHandler(wg *sync.WaitGroup, conn ws.Connection, ip string) { defer wg.Done() - log.Tracef("New websocket client %s", ip) + log.Debugf("New websocket client %s", ip) // Create a new websocket client to handle the new websocket connection // and wait for it to shutdown. Once it has shutdown (and hence @@ -310,6 +311,25 @@ func (s *Server) websocketHandler(wg *sync.WaitGroup, conn ws.Connection, ip str log.Tracef("Disconnected websocket client %s", ip) } +// Broadcast sends a message to all connected clients. The message should be a +// notification. See msgjson.NewNotification. +func (s *Server) Broadcast(msg *msgjson.Message) { + s.clientMtx.RLock() + defer s.clientMtx.RUnlock() + + log.Infof("Broadcasting %s for route %s to %d clients...", msg.Type, msg.Route, len(s.clients)) + if log.Level() <= slog.LevelTrace { // don't marshal unless needed + log.Tracef("Broadcast: %q", msg.String()) + } + + for id, cl := range s.clients { + if err := cl.Send(msg); err != nil { + log.Debugf("Send to client %d at %s failed: %v", id, cl.IP(), err) + cl.Disconnect() // triggers return of websocketHandler, and removeClient + } + } +} + // disconnectClients calls disconnect on each wsLink, but does not remove it // from the Server's client map. func (s *Server) disconnectClients() { diff --git a/server/market/routers_test.go b/server/market/routers_test.go index 0598f03779..5275e7476d 100644 --- a/server/market/routers_test.go +++ b/server/market/routers_test.go @@ -1101,6 +1101,7 @@ func (s *TBookSource) OrderFeed() <-chan *bookUpdateSignal { type TLink struct { mtx sync.Mutex id uint64 + ip string sends []*msgjson.Message sendErr error banished bool @@ -1112,11 +1113,13 @@ func tNewLink() *TLink { linkCounter++ return &TLink{ id: linkCounter, + ip: "[1:800:dex:rules::]", sends: make([]*msgjson.Message, 0), } } func (conn *TLink) ID() uint64 { return conn.id } +func (conn *TLink) IP() string { return conn.ip } func (conn *TLink) Send(msg *msgjson.Message) error { conn.mtx.Lock() defer conn.mtx.Unlock()