Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context to syncer.Connect #28

Merged
merged 5 commits into from Mar 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 44 additions & 38 deletions syncer/syncer.go
Expand Up @@ -202,6 +202,9 @@ type Syncer struct {
config config
log *zap.Logger // redundant, but convenient

shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc

mu sync.Mutex
peers map[string]*Peer
strikes map[string]int
Expand Down Expand Up @@ -411,7 +414,16 @@ func (s *Syncer) acceptLoop() error {
}
}

func (s *Syncer) peerLoop(closeChan <-chan struct{}) error {
func (s *Syncer) isStopped() bool {
select {
case <-s.shutdownCtx.Done():
return true
default:
return false
}
}

func (s *Syncer) peerLoop() error {
log := s.log.Named("peerLoop")
numOutbound := func() (n int) {
s.mu.Lock()
Expand Down Expand Up @@ -475,15 +487,10 @@ func (s *Syncer) peerLoop(closeChan <-chan struct{}) error {
select {
case <-ticker.C:
return true
case <-closeChan:
case <-s.shutdownCtx.Done():
return false
}
}
closing := func() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.l == nil
}
for fst := true; fst || sleep(); fst = false {
if numOutbound() >= s.config.MaxOutboundPeers {
continue
Expand All @@ -494,21 +501,24 @@ func (s *Syncer) peerLoop(closeChan <-chan struct{}) error {
continue
}
for _, p := range candidates {
if numOutbound() >= s.config.MaxOutboundPeers || closing() {
if numOutbound() >= s.config.MaxOutboundPeers || s.isStopped() {
break
}

// NOTE: we don't bother logging failure here, since it's common and
// not particularly interesting or actionable
if _, err := s.Connect(p); err == nil {
ctx, cancel := context.WithTimeout(s.shutdownCtx, s.config.ConnectTimeout)
if _, err := s.Connect(ctx, p); err == nil {
s.log.Debug("connected to peer", zap.String("peer", p))
}
cancel()
lastTried[p] = time.Now()
}
}
return nil
}

func (s *Syncer) syncLoop(closeChan <-chan struct{}) error {
func (s *Syncer) syncLoop() error {
peersForSync := func() (peers []*Peer) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -529,7 +539,7 @@ func (s *Syncer) syncLoop(closeChan <-chan struct{}) error {
select {
case <-ticker.C:
return true
case <-closeChan:
case <-s.shutdownCtx.Done():
return false
}
}
Expand Down Expand Up @@ -600,14 +610,13 @@ func (s *Syncer) syncLoop(closeChan <-chan struct{}) error {
// terminated. To gracefully shutdown a Syncer, close its net.Listener.
func (s *Syncer) Run() error {
errChan := make(chan error)
closeChan := make(chan struct{})
go func() { errChan <- s.acceptLoop() }()
go func() { errChan <- s.peerLoop(closeChan) }()
go func() { errChan <- s.syncLoop(closeChan) }()
go func() { errChan <- s.peerLoop() }()
go func() { errChan <- s.syncLoop() }()
err := <-errChan

// when one goroutine exits, shutdown and wait for the others
close(closeChan)
s.shutdownCtxCancel()
s.l.Close()
s.mu.Lock()
s.l = nil
Expand Down Expand Up @@ -635,27 +644,21 @@ func (s *Syncer) Run() error {
}

// Connect forms an outbound connection to a peer.
func (s *Syncer) Connect(addr string) (*Peer, error) {
func (s *Syncer) Connect(ctx context.Context, addr string) (*Peer, error) {
if err := s.allowConnect(addr, false); err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), s.config.ConnectTimeout)
defer cancel()
// slightly gross polling hack so that we shutdown quickly

// ensure we cancel out immediately if the syncer is stopped
ctx, cancel := context.WithCancel(ctx)
go func() {
ChrisSchinnerl marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-ctx.Done():
return
case <-time.After(100 * time.Millisecond):
s.mu.Lock()
if s.l == nil {
cancel()
}
s.mu.Unlock()
}
select {
case <-ctx.Done():
case <-s.shutdownCtx.Done():
cancel()
}
}()

conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", addr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -742,14 +745,17 @@ func New(l net.Listener, cm ChainManager, pm PeerStore, header gateway.Header, o
for _, opt := range opts {
opt(&config)
}
ctx, cancel := context.WithCancel(context.Background())
return &Syncer{
l: l,
cm: cm,
pm: pm,
header: header,
config: config,
log: config.Logger,
peers: make(map[string]*Peer),
strikes: make(map[string]int),
l: l,
cm: cm,
pm: pm,
header: header,
config: config,
log: config.Logger,
shutdownCtx: ctx,
shutdownCtxCancel: cancel,
peers: make(map[string]*Peer),
strikes: make(map[string]int),
}
}