Skip to content

Commit

Permalink
server/comms: add (*Server).Broadcast and Link.IP
Browse files Browse the repository at this point in the history
  • Loading branch information
chappjc committed May 14, 2020
1 parent 305c5d7 commit 7d77619
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 5 deletions.
2 changes: 1 addition & 1 deletion server/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (auth *AuthManager) handleConnect(conn comms.Link, msg *msgjson.Message) *m
}

auth.addClient(client)
log.Tracef("user %x connected", acctInfo.ID[:])
log.Debugf("User %s connected from %s.", acctInfo.ID, conn.IP())
return nil
}

Expand Down
7 changes: 6 additions & 1 deletion server/auth/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type tReq struct {
// tRPCClient satisfies the comms.Link interface.
type TRPCClient struct {
id uint64
ip string
sendErr error
requestErr error
banished bool
Expand All @@ -100,6 +101,7 @@ type TRPCClient struct {
}

func (c *TRPCClient) ID() uint64 { return c.id }
func (c *TRPCClient) IP() string { return c.ip }
func (c *TRPCClient) Send(msg *msgjson.Message) error {
c.sends = append(c.sends, msg)
return c.sendErr
Expand Down Expand Up @@ -133,7 +135,10 @@ var tClientID uint64

func tNewRPCClient() *TRPCClient {
tClientID++
return &TRPCClient{id: tClientID}
return &TRPCClient{
id: tClientID,
ip: "123.123.123.123",
}
}

var tAcctID uint64
Expand Down
32 changes: 31 additions & 1 deletion server/comms/comms_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ type wsConnStub struct {
read int
close int
lastMsg []byte
recv chan []byte
}

func newWsStub() *wsConnStub {
return &wsConnStub{
msg: make(chan []byte),
msg: make(chan []byte),
// recv is nil unless a test wants to receive
quit: make(chan struct{}),
}
}
Expand Down Expand Up @@ -97,6 +99,10 @@ func (conn *wsConnStub) WriteMessage(msgType int, msg []byte) error {
return nil
}
conn.lastMsg = msg
// Send the message if their is a receiver for the current test.
if conn.recv != nil {
conn.recv <- msg
}
conn.write++
writeErrMtx.Lock()
defer writeErrMtx.Unlock()
Expand Down Expand Up @@ -147,6 +153,11 @@ func makeResp(id uint64, msg string) *msgjson.Message {
return resp
}

func makeNtfn(route, msg string) *msgjson.Message {
ntfn, _ := msgjson.NewNotification(route, json.RawMessage(msg))
return ntfn
}

func sendToConn(t *testing.T, conn *wsConnStub, method, msg string) {
encMsg, err := json.Marshal(makeReq(method, msg))
if err != nil {
Expand Down Expand Up @@ -500,6 +511,25 @@ func TestClientResponses(t *testing.T) {
wg.Wait()
}()

// Test Broadcast
conn.recv = make(chan []byte) // for WriteMessage in this test
server.Broadcast(makeNtfn("someNote", `"blah"`)) // async conn.recv <- msg send
msgBytes := <-conn.recv
conn.recv = nil // all done with this chan
msg, err := msgjson.DecodeMessage(msgBytes)
if err != nil {
t.Fatalf("error decoding last message: %v", err)
}
var note string
err = json.Unmarshal(msg.Payload, &note)
if err != nil {
return
}
conn.lastMsg = nil
if note != "blah" {
t.Errorf("wrong note: %s", note)
}

// Send a request from the server to the client, setting a flag when the
// client responds.
responded := make(chan struct{}, 1)
Expand Down
10 changes: 9 additions & 1 deletion server/comms/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ const outBufferSize = 128
// reference implementation of a Link-satisfying type is the wsLink, which
// passes messages over a websocket connection.
type Link interface {
// ID will return a unique ID by which this connection can be identified.
// ID returns a unique ID by which this connection can be identified.
ID() uint64
// IP returns the IP address of the peer.
IP() string
// Send sends the msgjson.Message to the client.
Send(msg *msgjson.Message) error
// Request sends the Request-type msgjson.Message to the client and registers
Expand Down Expand Up @@ -69,10 +71,16 @@ func (c *wsLink) Banish() {
c.Disconnect()
}

// ID returns a unique ID by which this connection can be identified.
func (c *wsLink) ID() uint64 {
return c.id
}

// IP returns the IP address of the peer.
func (c *wsLink) IP() string {
return c.WSLink.IP()
}

func handleMessage(c *wsLink, msg *msgjson.Message) *msgjson.Error {
switch msg.Type {
case msgjson.Request:
Expand Down
22 changes: 21 additions & 1 deletion server/comms/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"decred.org/dcrdex/dex/msgjson"
"decred.org/dcrdex/dex/ws"
"github.com/decred/dcrd/certgen"
"github.com/decred/slog"
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
)
Expand Down Expand Up @@ -291,7 +292,7 @@ func (s *Server) banish(ip string) {
// run as a goroutine.
func (s *Server) websocketHandler(wg *sync.WaitGroup, conn ws.Connection, ip string) {
defer wg.Done()
log.Tracef("New websocket client %s", ip)
log.Debugf("New websocket client %s", ip)

// Create a new websocket client to handle the new websocket connection
// and wait for it to shutdown. Once it has shutdown (and hence
Expand All @@ -310,6 +311,25 @@ func (s *Server) websocketHandler(wg *sync.WaitGroup, conn ws.Connection, ip str
log.Tracef("Disconnected websocket client %s", ip)
}

// Broadcast sends a message to all connected clients. The message should be a
// notification. See msgjson.NewNotification.
func (s *Server) Broadcast(msg *msgjson.Message) {
s.clientMtx.RLock()
defer s.clientMtx.RUnlock()

log.Infof("Broadcasting %s for route %s to %d clients...", msg.Type, msg.Route, len(s.clients))
if log.Level() <= slog.LevelTrace { // don't marshal unless needed
log.Tracef("Broadcast: %q", msg.String())
}

for id, cl := range s.clients {
if err := cl.Send(msg); err != nil {
log.Debugf("Send to client %d at %s failed: %v", id, cl.IP(), err)
cl.Disconnect() // triggers return of websocketHandler, and removeClient
}
}
}

// disconnectClients calls disconnect on each wsLink, but does not remove it
// from the Server's client map.
func (s *Server) disconnectClients() {
Expand Down
3 changes: 3 additions & 0 deletions server/market/routers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,7 @@ func (s *TBookSource) OrderFeed() <-chan *bookUpdateSignal {
type TLink struct {
mtx sync.Mutex
id uint64
ip string
sends []*msgjson.Message
sendErr error
banished bool
Expand All @@ -1112,11 +1113,13 @@ func tNewLink() *TLink {
linkCounter++
return &TLink{
id: linkCounter,
ip: "[1:800:dex:rules::]",
sends: make([]*msgjson.Message, 0),
}
}

func (conn *TLink) ID() uint64 { return conn.id }
func (conn *TLink) IP() string { return conn.ip }
func (conn *TLink) Send(msg *msgjson.Message) error {
conn.mtx.Lock()
defer conn.mtx.Unlock()
Expand Down

0 comments on commit 7d77619

Please sign in to comment.