Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added topics support to waddell, increased test coverage #2

Merged
merged 36 commits into from
Dec 17, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5407587
Moved reconnect logic into waddell
oxtoacart Nov 26, 2014
7da4919
More refinement of reconnect
oxtoacart Nov 26, 2014
c3362a8
Testing some error conditions on redial
oxtoacart Nov 26, 2014
8ac5a22
Added channel id support
oxtoacart Nov 26, 2014
45c0f87
Started to add topics
oxtoacart Nov 26, 2014
c53ede7
More topic refactoring
oxtoacart Nov 27, 2014
b4f4e64
More topic refactoring
oxtoacart Nov 27, 2014
95b3376
More topic refactoring
oxtoacart Nov 27, 2014
c6e1eb9
Topics API complete, still need more tests though
oxtoacart Nov 27, 2014
3f23aa6
Improved test coverage
oxtoacart Nov 27, 2014
9edb326
Made ID updates available
oxtoacart Nov 27, 2014
da62170
Added support for multipart message bodies
oxtoacart Nov 27, 2014
a99a58b
Added support for multipart message bodies
oxtoacart Nov 27, 2014
19eb4d5
Added type for DialFunc
oxtoacart Nov 27, 2014
c2ab040
Renamed idsChannel
oxtoacart Nov 27, 2014
fb75ec6
Added ConnMgr to unit test
oxtoacart Nov 27, 2014
76b428a
Added ClientMgr
oxtoacart Nov 27, 2014
ec9b463
Removed IdCallback
oxtoacart Nov 27, 2014
d538c98
Added ServerCert parameter to ClientMgr
oxtoacart Nov 27, 2014
5030652
Added IdCallback
oxtoacart Nov 27, 2014
30b8b86
Cleaned up connect and close error handling
oxtoacart Nov 27, 2014
4aba434
Renamed IdCallback
oxtoacart Nov 27, 2014
e199c16
Added ServerCert parameter to client
oxtoacart Nov 27, 2014
2e7c90f
Make sure that failing Client can be closed
oxtoacart Nov 27, 2014
37b3d60
Client updates
oxtoacart Nov 27, 2014
41dc1c2
Added warning about Close() method
oxtoacart Nov 28, 2014
d743998
Renamed NewMessageOut to Message
oxtoacart Nov 29, 2014
50bf9e2
Renamed log
oxtoacart Nov 29, 2014
d455c99
Updated to new constant names from framed
oxtoacart Dec 1, 2014
0c3f2c3
Code review updates
oxtoacart Dec 1, 2014
22e2963
Code review updates
oxtoacart Dec 3, 2014
732c7fa
More code review updates
oxtoacart Dec 3, 2014
d4cc22e
Code review changes to simplify concurrency stuff
oxtoacart Dec 3, 2014
2c4ea5a
Go vet updates
oxtoacart Dec 6, 2014
7370724
Added note about draining all topics
oxtoacart Dec 16, 2014
6dd823d
Fixed test for linux
oxtoacart Dec 17, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package waddell

import (
"crypto/tls"
"fmt"
"net"
"sync"
"sync/atomic"
"time"

"github.com/getlantern/keyman"
)

var (
maxReconnectDelay = 5 * time.Second
reconnectDelayInterval = 100 * time.Millisecond

closedError = fmt.Errorf("Client closed")
)

type ClientConfig struct {
// Dial is a function that dials the waddell server
Dial DialFunc

// ServerCert: PEM-encoded certificate by which to authenticate the waddell
// server. If provided, connection to waddell is encrypted with TLS. If not,
// connection will be made plain-text.
ServerCert string

// ReconnectAttempts specifies how many consecutive times to try
// reconnecting in the event of a connection failure.
//
// Note - when auto reconnecting is enabled, the client will never resend
// messages, it will simply reopen the connection.
ReconnectAttempts int

// OnId allows optionally registering a callback to be notified whenever a
// PeerId is assigned to this client (i.e. on each successful connection to
// the waddell server).
OnId func(id PeerId)
}

// Client is a client of a waddell server
type Client struct {
*ClientConfig

connInfoChs chan chan *connInfo
connErrCh chan error
topicsOut map[TopicId]*topic
topicsOutMutex sync.Mutex
topicsIn map[TopicId]chan *MessageIn
topicsInMutex sync.Mutex
currentId PeerId
currentIdMutex sync.RWMutex
closed int32
}

// DialFunc is a function for dialing a waddell server.
type DialFunc func() (net.Conn, error)

// NewClient creates a waddell client, including establishing an initial
// connection to the waddell server, returning the client and the initial
// PeerId.
//
// IMPORTANT - clients receive messages on topics. Users of Client are
// responsible for draining all topics on which the Client may receive a
// message, otherwise other topics will block.
//
// Note - if the client automatically reconnects, its peer ID will change. You
// can obtain the new id through providing an OnId callback to the client.
//
// Note - whether or not auto reconnecting is enabled, this method doesn't
// return until a connection has been established or we've failed trying.
func NewClient(cfg *ClientConfig) (*Client, error) {
c := &Client{
ClientConfig: cfg,
}
var err error
if c.ServerCert != "" {
c.Dial, err = secured(c.Dial, c.ServerCert)
if err != nil {
return nil, err
}
}

c.connInfoChs = make(chan chan *connInfo)
c.connErrCh = make(chan error)
c.topicsOut = make(map[TopicId]*topic)
c.topicsIn = make(map[TopicId]chan *MessageIn)
go c.stayConnected()
go c.processInbound()
info := c.getConnInfo()
return c, info.err
}

// CurrentId returns the current id (from most recent connection to waddell).
// To be notified about changes to the id, use the OnId handler.
func (c *Client) CurrentId() PeerId {
c.currentIdMutex.RLock()
defer c.currentIdMutex.RUnlock()
return c.currentId
}

func (c *Client) setCurrentId(id PeerId) {
c.currentIdMutex.Lock()
c.currentId = id
c.currentIdMutex.Unlock()
}

// SendKeepAlive sends a keep alive message to the server to keep the underlying
// connection open.
func (c *Client) SendKeepAlive() error {
if c.isClosed() {
return closedError
}

info := c.getConnInfo()
if info.err != nil {
return info.err
}
_, err := info.writer.Write(keepAlive)
if err != nil {
c.connError(err)
}
return err
}

// Close closes this client, its topics and associated resources.
//
// WARNING - Close() closes the out topic channels. Attempts to write to these
// channels after they're closed will result in a panic. So, don't call Close()
// until you're actually 100% finished using this client.
func (c *Client) Close() error {
if c == nil {
return nil
}

justClosed := atomic.CompareAndSwapInt32(&c.closed, 0, 1)
if !justClosed {
return nil
}

var err error
log.Trace("Closing client")
c.topicsInMutex.Lock()
defer c.topicsInMutex.Unlock()
c.topicsOutMutex.Lock()
defer c.topicsOutMutex.Unlock()
for _, t := range c.topicsOut {
close(t.out)
}
for _, ch := range c.topicsIn {
close(ch)
}
info := c.getConnInfo()
if info.conn != nil {
err = info.conn.Close()
log.Trace("Closed client connection")
}
close(c.connInfoChs)
return err
}

// secured wraps the given dial function with TLS support, authenticating the
// waddell server using the supplied cert (assumed to be PEM encoded).
func secured(dial DialFunc, cert string) (DialFunc, error) {
c, err := keyman.LoadCertificateFromPEMBytes([]byte(cert))
if err != nil {
return nil, err
}
tlsConfig := &tls.Config{
RootCAs: c.PoolContainingCert(),
ServerName: c.X509().Subject.CommonName,
}
return func() (net.Conn, error) {
conn, err := dial()
if err != nil {
return nil, err
}
return tls.Client(conn, tlsConfig), nil
}, nil
}

func (c *Client) isClosed() bool {
return c.closed == 1
}
98 changes: 0 additions & 98 deletions clientconn.go

This file was deleted.

80 changes: 80 additions & 0 deletions clientmgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package waddell

import (
"net"
"sync"
)

// ClientMgr provides a mechanism for managing connections to multiple waddell
// servers.
type ClientMgr struct {
// Dial is a function that dials the waddell server at the given addr.
Dial func(addr string) (net.Conn, error)

// ServerCert: PEM-encoded certificate by which to authenticate the waddell
// server. If provided, connection to waddell is encrypted with TLS. If not,
// connection will be made plain-text.
ServerCert string

// ReconnectAttempts specifies how many consecutive times to try
// reconnecting in the event of a connection failure. See
// Client.ReconnectAttempts for more information.
ReconnectAttempts int

// OnId allows optionally registering a callback to be notified whenever a
// PeerId is assigned to the client connected to the indicated addr (i.e. on
// each successful connection to the waddell server at addr).
OnId func(addr string, id PeerId)

clients map[string]*Client
clientsMutex sync.Mutex
}

// ClientTo obtains the one (and only) client to the given addr, creating a new
// one if necessary. This method is safe to call from multiple goroutines.
func (m *ClientMgr) ClientTo(addr string) (*Client, error) {
m.clientsMutex.Lock()
defer m.clientsMutex.Unlock()
if m.clients == nil {
m.clients = make(map[string]*Client)
}
client := m.clients[addr]
var err error
if client == nil {
cfg := &ClientConfig{
Dial: func() (net.Conn, error) {
return m.Dial(addr)
},
ServerCert: m.ServerCert,
ReconnectAttempts: m.ReconnectAttempts,
}
if m.OnId != nil {
cfg.OnId = func(id PeerId) {
m.OnId(addr, id)
}
}
client, err = NewClient(cfg)
if err != nil {
return nil, err
}
m.clients[addr] = client
}
return client, nil
}

// Close closes this ClientMgr and all managed clients.
func (m *ClientMgr) Close() []error {
errors := make([]error, 0)
m.clientsMutex.Lock()
defer m.clientsMutex.Unlock()
for _, client := range m.clients {
err := client.Close()
if err != nil {
errors = append(errors, err)
}
}
if len(errors) == 0 {
return nil
}
return errors
}
Loading