Skip to content

Commit

Permalink
server/comms: fix synchronization issues and drop sleeps in tests
Browse files Browse the repository at this point in the history
* eliminate sleeps from comms tests

Channel patterns are used throughout. There are still 2 time.Sleeps,
one for response handler timeout and one for checking that ping-pongs are
working.
  • Loading branch information
buck54321 committed Jun 2, 2020
1 parent 0bad740 commit b90705e
Show file tree
Hide file tree
Showing 17 changed files with 446 additions and 438 deletions.
12 changes: 6 additions & 6 deletions client/asset/btc/btc.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,24 +343,24 @@ func (btc *ExchangeWallet) Info() *asset.WalletInfo {

// Connect connects the wallet to the RPC server. Satisfies the dex.Connector
// interface.
func (btc *ExchangeWallet) Connect(ctx context.Context) (error, *sync.WaitGroup) {
func (btc *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error) {
// Check the version. Do it here, so we can also diagnose a bad connection.
netVer, codeVer, err := btc.getVersion()
if err != nil {
return fmt.Errorf("error getting version: %v", err), nil
return nil, fmt.Errorf("error getting version: %v", err)
}
if netVer < btc.minNetworkVersion {
return fmt.Errorf("reported node version %d is less than minimum %d", netVer, minNetworkVersion), nil
return nil, fmt.Errorf("reported node version %d is less than minimum %d", netVer, minNetworkVersion)
}
if codeVer < minProtocolVersion {
return fmt.Errorf("node software out of date. version %d is less than minimum %d", codeVer, minProtocolVersion), nil
return nil, fmt.Errorf("node software out of date. version %d is less than minimum %d", codeVer, minProtocolVersion)
}
// If this is the first time connecting, clear the locked coins. This should
// have been done at shutdown, but shutdown may not have been clean.
if atomic.SwapUint32(&btc.hasConnected, 1) == 0 {
err := btc.wallet.LockUnspent(true, nil)
if err != nil {
return err, nil
return nil, err
}
}
var wg sync.WaitGroup
Expand All @@ -373,7 +373,7 @@ func (btc *ExchangeWallet) Connect(ctx context.Context) (error, *sync.WaitGroup)
btc.log.Errorf("failed to unlock %s outputs on shutdown: %v", btc.symbol, err)
}
}()
return nil, &wg
return &wg, nil
}

// Balance should return the total available funds in the wallet. Balance takes
Expand Down
12 changes: 6 additions & 6 deletions client/asset/dcr/dcr.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,26 +327,26 @@ func (dcr *ExchangeWallet) Info() *asset.WalletInfo {

// Connect connects the wallet to the RPC server. Satisfies the dex.Connector
// interface.
func (dcr *ExchangeWallet) Connect(ctx context.Context) (error, *sync.WaitGroup) {
func (dcr *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error) {
err := dcr.client.Connect(ctx, false)
if err != nil {
return fmt.Errorf("Decred Wallet connect error: %v", err), nil
return nil, fmt.Errorf("Decred Wallet connect error: %v", err)
}
// Check the min api versions.
versions, err := dcr.client.Version()
if err != nil {
return fmt.Errorf("DCR ExchangeWallet version fetch error: %v", err), nil
return nil, fmt.Errorf("DCR ExchangeWallet version fetch error: %v", err)
}
err = checkVersionInfo(versions)
if err != nil {
return fmt.Errorf("DCR ExchangeWallet version check failed: %v", err), nil
return nil, fmt.Errorf("DCR ExchangeWallet version check failed: %v", err)
}
// If this is the first time connecting, clear the locked coins. This should
// have been done at shutdown, but shutdown may not have been clean.
if atomic.SwapUint32(&dcr.hasConnected, 1) == 0 {
err := dcr.node.LockUnspent(true, nil)
if err != nil {
return err, nil
return nil, err
}
}
var wg sync.WaitGroup
Expand All @@ -356,7 +356,7 @@ func (dcr *ExchangeWallet) Connect(ctx context.Context) (error, *sync.WaitGroup)
dcr.monitorBlocks(ctx)
dcr.shutdown()
}()
return nil, &wg
return &wg, nil
}

// Balance should return the total available funds in the wallet.
Expand Down
19 changes: 16 additions & 3 deletions client/cmd/dexc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"decred.org/dcrdex/client/core"
"decred.org/dcrdex/client/rpcserver"
"decred.org/dcrdex/client/webserver"
"decred.org/dcrdex/dex"
"github.com/decred/slog"
)

Expand Down Expand Up @@ -94,7 +95,13 @@ func main() {
log.Errorf("Error starting rpc server: %v", err)
os.Exit(1)
}
rpcSrv.Run(appCtx)
cm := dex.NewConnectionMaster(rpcSrv)
err = cm.Connect(appCtx)
if err != nil {
log.Errorf("Error starting rpc server: %v", err)
return
}
cm.Wait()
}()
}

Expand All @@ -104,10 +111,16 @@ func main() {
defer wg.Done()
webSrv, err := webserver.New(clientCore, cfg.WebAddr, logMaker.Logger("WEB"), cfg.ReloadHTML)
if err != nil {
log.Errorf("Error starting web server: %v", err)
log.Errorf("Error creating web server: %v", err)
os.Exit(1)
}
webSrv.Run(appCtx)
cm := dex.NewConnectionMaster(webSrv)
err = cm.Connect(appCtx)
if err != nil {
log.Errorf("Error starting web server: %v", err)
return
}
cm.Wait()
}()
}

Expand Down
22 changes: 18 additions & 4 deletions client/cmd/dexc/ui/widgets.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"decred.org/dcrdex/client/core"
"decred.org/dcrdex/client/rpcserver"
"decred.org/dcrdex/client/webserver"
"decred.org/dcrdex/dex"
"github.com/decred/slog"
"github.com/gdamore/tcell"
"github.com/rivo/tview"
Expand Down Expand Up @@ -155,25 +156,38 @@ func createWidgets() {
marketView = newMarketView()
webView = newServerView("Web", cfg.WebAddr, func(ctx context.Context, addr string, logger slog.Logger) {
setWebLabelOn(true)
defer setWebLabelOn(false)
webSrv, err := webserver.New(clientCore, cfg.WebAddr, logger, cfg.ReloadHTML)
if err != nil {
log.Errorf("Error creating web server: %v", err)
return
}
cm := dex.NewConnectionMaster(webSrv)
err = cm.Connect(ctx)
if err != nil {
log.Errorf("Error starting web server: %v", err)
return
}
webSrv.Run(ctx)
setWebLabelOn(false)
cm.Wait()
})
rpcView = newServerView("RPC", cfg.RPCAddr, func(ctx context.Context, _ string, logger slog.Logger) {
setRPCLabelOn(true)
defer setRPCLabelOn(false)
rpcserver.SetLogger(logger)
rpcCfg := &rpcserver.Config{clientCore, cfg.RPCAddr, cfg.RPCUser, cfg.RPCPass, cfg.RPCCert, cfg.RPCKey}
rpcSrv, err := rpcserver.New(rpcCfg)
if err != nil {
log.Errorf("Error starting rpc server: %v", err)
return
}
rpcSrv.Run(ctx)
setRPCLabelOn(false)
cm := dex.NewConnectionMaster(rpcSrv)
err = cm.Connect(ctx)
if err != nil {
log.Errorf("Error starting rpc server: %v", err)
return
}
cm.Wait()

})
noteJournal = newJournal("Notifications", handleNotificationLog)
noteLog = mustLogger("NOTE", func(msg []byte) {
Expand Down
6 changes: 3 additions & 3 deletions client/comms/wsconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type WsConn interface {
NextID() uint64
Send(msg *msgjson.Message) error
Request(msg *msgjson.Message, f func(*msgjson.Message)) error
Connect(ctx context.Context) (error, *sync.WaitGroup)
Connect(ctx context.Context) (*sync.WaitGroup, error)
MessageSource() <-chan *msgjson.Message
}

Expand Down Expand Up @@ -375,7 +375,7 @@ func (conn *wsConn) NextID() uint64 {
// connection will be returned. If the connection is successful, an
// auto-reconnect goroutine will be started. To shutdown auto-reconnect, use
// Stop() or cancel the context.
func (conn *wsConn) Connect(ctx context.Context) (error, *sync.WaitGroup) {
func (conn *wsConn) Connect(ctx context.Context) (*sync.WaitGroup, error) {
var ctxInternal context.Context
ctxInternal, conn.cancel = context.WithCancel(ctx)

Expand All @@ -400,7 +400,7 @@ func (conn *wsConn) Connect(ctx context.Context) (error, *sync.WaitGroup) {
close(conn.readCh) // signal to receivers that the wsConn is dead
}()

return conn.connect(ctxInternal), &conn.wg
return &conn.wg, conn.connect(ctxInternal)
}

// Stop can be used to close the connection and all of the goroutines started by
Expand Down
8 changes: 4 additions & 4 deletions client/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ func (conn *TWebsocket) Request(msg *msgjson.Message, f msgFunc) error {
return conn.reqErr
}
func (conn *TWebsocket) MessageSource() <-chan *msgjson.Message { return conn.msgs }
func (conn *TWebsocket) Connect(context.Context) (error, *sync.WaitGroup) {
return conn.connectErr, &sync.WaitGroup{}
func (conn *TWebsocket) Connect(context.Context) (*sync.WaitGroup, error) {
return &sync.WaitGroup{}, conn.connectErr
}

type TDB struct {
Expand Down Expand Up @@ -446,8 +446,8 @@ func (w *TXCWallet) Info() *asset.WalletInfo {
return &asset.WalletInfo{}
}

func (w *TXCWallet) Connect(ctx context.Context) (error, *sync.WaitGroup) {
return w.connectErr, &sync.WaitGroup{}
func (w *TXCWallet) Connect(ctx context.Context) (*sync.WaitGroup, error) {
return &sync.WaitGroup{}, w.connectErr
}

func (w *TXCWallet) Run(ctx context.Context) { <-ctx.Done() }
Expand Down
34 changes: 18 additions & 16 deletions client/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,16 +269,15 @@ func New(cfg *Config) (*RPCServer, error) {
return s, nil
}

// Run starts the web server. Satisfies the dex.Runner interface.
func (s *RPCServer) Run(ctx context.Context) {
// Connect starts the RPC server. Satisfies the dex.Connector interface.
func (s *RPCServer) Connect(ctx context.Context) (*sync.WaitGroup, error) {
// ctx passed to newMarketSyncer when making new market syncers.
s.ctx = ctx

// Create listener.
listener, err := tls.Listen("tcp", s.addr, s.tlsConfig)
if err != nil {
log.Errorf("can't listen on %s. rpc server quitting: %v", s.addr, err)
return
return nil, fmt.Errorf("can't listen on %s. rpc server quitting: %v", s.addr, err)
}

// Close the listener on context cancellation.
Expand All @@ -292,19 +291,22 @@ func (s *RPCServer) Run(ctx context.Context) {
log.Errorf("HTTP server Shutdown: %v", err)
}
}()
log.Infof("RPC server listening on %s", s.addr)
if err := s.srv.Serve(listener); err != http.ErrServerClosed {
log.Warnf("unexpected (http.Server).Serve error: %v", err)
}
s.mtx.Lock()
for _, cl := range s.clients {
cl.Disconnect()
}
s.mtx.Unlock()

// Wait for market syncers to finish and Shutdown.
s.wg.Wait()
log.Infof("RPC server off")
s.wg.Add(1)
go func() {
defer s.wg.Done()
if err := s.srv.Serve(listener); err != http.ErrServerClosed {
log.Warnf("unexpected (http.Server).Serve error: %v", err)
}
s.mtx.Lock()
for _, cl := range s.clients {
cl.Disconnect()
}
s.mtx.Unlock()
log.Infof("RPC server off")
}()
log.Infof("RPC server listening on %s", s.addr)
return &s.wg, nil
}

// handleRequest sends the request to the correct handler function if able.
Expand Down
14 changes: 10 additions & 4 deletions client/rpcserver/rpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,14 @@ func newTServer(t *testing.T, start bool, user, pass string) (*RPCServer,
t.Fatalf("error creating server: %v", err)
}
if start {
waiter := dex.NewStartStopWaiter(s)
waiter.Start(ctx)
cm := dex.NewConnectionMaster(s)
err := cm.Connect(ctx)
if err != nil {
t.Fatalf("Error starting WebServer: %v", err)
}
shutdown = func() {
killCtx()
waiter.WaitForShutdown()
cm.Disconnect()
}
} else {
shutdown = killCtx
Expand Down Expand Up @@ -284,7 +287,10 @@ func TestLoadMarket(t *testing.T) {
link := newLink()
s, tCore, shutdown := newTServer(t, false, "", "")
defer shutdown()
link.cl.Start()
_, err := link.cl.Connect(tCtx)
if err != nil {
t.Fatalf("WSLink Start: %v", err)
}
defer link.cl.Disconnect()
params := &marketLoad{
Host: "abc",
Expand Down
9 changes: 7 additions & 2 deletions client/rpcserver/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,13 @@ func (s *RPCServer) websocketHandler(conn ws.Connection, ip string) {
delete(s.clients, cl.cid)
s.mtx.Unlock()
}()
cl.Start()
cl.WaitForShutdown()
cm := dex.NewConnectionMaster(cl)
err := cm.Connect(s.ctx)
if err != nil {
log.Errorf("websocketHandler client Connect: %v")
return
}
cm.Wait()
log.Tracef("Disconnected websocket client %s", ip)
}

Expand Down
9 changes: 6 additions & 3 deletions client/webserver/live_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,6 @@ func TestServer(t *testing.T) {
time.AfterFunc(time.Minute*59, func() { shutdown() })
logger := slog.NewBackend(os.Stdout).Logger("TEST")
logger.SetLevel(slog.LevelTrace)
time.AfterFunc(time.Minute*60, func() { shutdown() })
tCore := newTCore()

if register {
Expand All @@ -739,7 +738,11 @@ func TestServer(t *testing.T) {
if err != nil {
t.Fatalf("error creating server: %v", err)
}
go s.Run(tCtx)
cm := dex.NewConnectionMaster(s)
err = cm.Connect(tCtx)
if err != nil {
t.Fatalf("Connect error: %v", err)
}
go tCore.runEpochs()
<-tCtx.Done()
cm.Wait()
}
Loading

0 comments on commit b90705e

Please sign in to comment.