Skip to content

Commit

Permalink
Allow client logger to be set by callers
Browse files Browse the repository at this point in the history
  • Loading branch information
badslug committed Jul 13, 2023
1 parent a6745d3 commit a86121b
Showing 1 changed file with 37 additions and 29 deletions.
66 changes: 37 additions & 29 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

const (
DISCONNECTED = iota
DISCONNECTED = iota
DISCONNECTING
CONNECTED
DIALING
Expand Down Expand Up @@ -97,6 +97,9 @@ type Client struct {
// reconnectLock protects access to reconnection
reconnectLock *sync.Mutex

// l entry to use for all l messages
l *log.Entry

// statusListeners will be informed when the connection status of the client changes
statusListeners []StatusListener
// connectionListeners will be informed when a connection to the server is established
Expand All @@ -106,7 +109,12 @@ type Client struct {
KeyManager
}

// NewClient creates a default client (using an internal websocket) to the
// NewClient creates a client with NewClientWithLogger and the default apex logger.
func NewClient(url, origin string) *Client {
return NewClientWithLogger(url, origin, log.NewEntry(log.Log.(*log.Logger)))
}

// NewClientWithLogger creates a default client (using an internal websocket) to the
// provided URL using the origin for the connection. The client will
// automatically connect, upgrade to a websocket, and establish a DDP
// connection session before returning the client. The client will
Expand All @@ -116,7 +124,7 @@ type Client struct {
// TBD create an option to substitute heartbeat and reconnect behavior (aka http.Transport)
// TBD create an option to hijack the connection (aka http.Hijacker)
// TBD create profiling features (aka net/http/pprof)
func NewClient(url, origin string) *Client {
func NewClientWithLogger(url, origin string, logger *log.Entry) *Client {
c := &Client{
HeartbeatInterval: time.Minute, // Meteor impl default + 10 (we ping last)
HeartbeatTimeout: 15 * time.Second, // Meteor impl default
Expand All @@ -130,6 +138,8 @@ func NewClient(url, origin string) *Client {
connectionStatus: DISCONNECTED,
reconnectLock: &sync.Mutex{},

l: logger,

// Stats
writeSocketStats: NewWriterStats(nil),
writeStats: NewWriterStats(nil),
Expand Down Expand Up @@ -180,11 +190,11 @@ func (c *Client) Connect() error {
ws, err := websocket.Dial(c.url, "", c.origin)
if err != nil {
c.Close()
log.WithError(err).Debug("dial error")
c.l.WithError(err).Debug("dial error")
c.reconnectLater()
return err
}
log.Debug("dialed")
c.l.Debug("dialed")
// Start DDP connection
c.start(ws, NewConnect())
return nil
Expand Down Expand Up @@ -214,7 +224,7 @@ func (c *Client) Reconnect() {
ws, err := websocket.Dial(c.url, "", c.origin)
if err != nil {
c.Close()
log.WithError(err).Debug("Dial error")
c.l.WithError(err).Debug("Dial error")
c.reconnectLater()
return
}
Expand Down Expand Up @@ -258,7 +268,7 @@ func (c *Client) Subscribe(subName string, done chan *Call, args ...interface{})
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Fatal("ddp.rpc: done channel is unbuffered")
c.l.Fatal("ddp.rpc: done channel is unbuffered")
}
}
call.Done = done
Expand Down Expand Up @@ -303,7 +313,7 @@ func (c *Client) Go(serviceMethod string, done chan *Call, args ...interface{})
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Fatal("ddp.rpc: done channel is unbuffered")
c.l.Fatal("ddp.rpc: done channel is unbuffered")
}
}
call.Done = done
Expand Down Expand Up @@ -361,7 +371,7 @@ func (c *Client) Send(msg interface{}) error {

// Close implements the io.Closer interface.
func (c *Client) Close() {
if(c.connectionStatus != DISCONNECTED) {
if c.connectionStatus != DISCONNECTED {
c.status(DISCONNECTING)
}

Expand All @@ -378,11 +388,11 @@ func (c *Client) Close() {
}

//Close channels
if(c.inbox != nil) {
if c.inbox != nil {
close(c.inbox)
c.inbox = nil
}
if(c.inbox != nil) {
if c.inbox != nil {
close(c.errors)
c.errors = nil
}
Expand Down Expand Up @@ -442,13 +452,13 @@ func (c *Client) start(ws *websocket.Conn, connect *Connect) {
c.status(CONNECTING)

// We spin off an error processing goroutine
if(c.errors == nil) {
if c.errors == nil {
c.errors = make(chan error, 100)
go c.errorManager()
}

// We spin off an inbox processing goroutine
if(c.inbox == nil) {
if c.inbox == nil {
c.inbox = make(chan map[string]interface{}, 100)
go c.inboxManager()
go c.inboxWorker(c.readStats)
Expand All @@ -468,10 +478,10 @@ func (c *Client) start(ws *websocket.Conn, connect *Connect) {
func (c *Client) inboxManager() {
for msg := range c.inbox {
// Message!
//log.Println("Got message", msg)
//c.l.Println("Got message", msg)
msgType, ok := msg["msg"]
if ok {
log.WithField("msg", msgType).Debug("recv")
c.l.WithField("msg", msgType).Debug("recv")
switch msgType.(string) {
// Connection management
case "connected":
Expand All @@ -491,8 +501,7 @@ func (c *Client) inboxManager() {
go listener.Connected()
}
case "failed":
log.Fatalf("IM Failed to connect, we support version 1 but server supports %s", msg["version"])

c.l.Fatalf("IM Failed to connect, we support version 1 but server supports %s", msg["version"])
// Heartbeats
case "ping":
// We received a ping - need to respond with a pong
Expand Down Expand Up @@ -523,13 +532,12 @@ func (c *Client) inboxManager() {

// Live Data
case "nosub":
log.WithField("msg", msg).Debug("sub returned a nosub error")
c.l.WithField("msg", msg).Debug("sub returned a nosub error")
// Clear related subscriptions
sub, ok := msg["id"]
if ok {
id := sub.(string)
runningSub := c.subs[id]

if runningSub != nil {
runningSub.Error = errors.New("sub returned a nosub error")
runningSub.done()
Expand Down Expand Up @@ -579,7 +587,7 @@ func (c *Client) inboxManager() {

default:
// Ignore?
log.WithField("msg", msg).Debug("Server sent unexpected message")
c.l.WithField("msg", msg).Debug("Server sent unexpected message")
}
} else {
// Current Meteor server sends an undocumented DDP message
Expand All @@ -591,18 +599,18 @@ func (c *Client) inboxManager() {
case string:
c.serverID = ID
default:
log.WithField("id", serverID).Debug("Server cluster node")
c.l.WithField("id", serverID).Debug("Server cluster node")
}
} else {
log.WithField("msg", msg).Debug("Server sent message with no `msg` field")
c.l.WithField("msg", msg).Debug("Server sent message with no `msg` field")
}
}
}
}

func (c *Client) errorManager() {
for err := range c.errors {
log.WithError(err).Warn("Websocket error")
c.l.WithError(err).Warn("Websocket error")
}
}

Expand All @@ -629,30 +637,30 @@ func (c *Client) inboxWorker(ws io.Reader) {
err := dec.Decode(&event)

// stop worker if disconnected
if(c.connectionStatus == DISCONNECTED || c.connectionStatus == DISCONNECTING || c.connectionStatus == RECONNECTING) {
if c.connectionStatus == DISCONNECTED || c.connectionStatus == DISCONNECTING || c.connectionStatus == RECONNECTING {
return
}

if(err == io.EOF) {
if err == io.EOF {
break
} else if err != nil {
if(c.errors != nil) {
if c.errors != nil {
c.errors <- err
}
}
if c.pingTimer != nil {
c.pingTimer.Reset(c.HeartbeatInterval)
}
if event == nil {
log.Debug("Inbox worker found nil event. May be due to broken websocket. Reconnecting.")
c.l.Debug("Inbox worker found nil event. May be due to broken websocket. Reconnecting.")
break
} else {
if(c.inbox != nil) {
if c.inbox != nil {
c.inbox <- event.(map[string]interface{})
}
}
}

c.reconnectLater()
}

Expand Down

0 comments on commit a86121b

Please sign in to comment.