Skip to content

Commit

Permalink
Merge pull request #292 from cnnrznn/cz_290
Browse files Browse the repository at this point in the history
Attempt to connect to leader via 10 servers simultaneously
  • Loading branch information
cole-miller authored Mar 21, 2024
2 parents dfa1d3a + 018812d commit 1940345
Showing 1 changed file with 87 additions and 44 deletions.
131 changes: 87 additions & 44 deletions internal/protocol/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
"io"
"net"
"sort"
"sync"
"time"

"github.com/Rican7/retry"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/canonical/go-dqlite/logging"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
)

// DialFunc is a function that can be used to establish a network connection.
Expand Down Expand Up @@ -126,60 +128,101 @@ func (c *Connector) connectAttemptAll(ctx context.Context, log logging.Func) (*P
return servers[i].Role < servers[j].Role
})

ctx, cancel := context.WithCancel(ctx)
defer cancel()

sem := semaphore.NewWeighted(10)

protocolChan := make(chan *Protocol)

wg := &sync.WaitGroup{}
wg.Add(len(servers))

go func() {
wg.Wait()
close(protocolChan)
}()

// Make an attempt for each address until we find the leader.
for _, server := range servers {
log := func(l logging.Level, format string, a ...interface{}) {
format = fmt.Sprintf("server %s: ", server.Address) + format
log(l, format, a...)
}
go func(server NodeInfo, pc chan<- *Protocol) {
defer wg.Done()

ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()
if err := sem.Acquire(ctx, 1); err != nil {
return
}
defer sem.Release(1)

protocol, leader, err := c.connectAttemptOne(ctx, server.Address, log)
if err != nil {
// This server is unavailable, try with the next target.
log(logging.Warn, err.Error())
continue
}
if protocol != nil {
// We found the leader
if ctx.Err() != nil {
return
}

log := func(l logging.Level, format string, a ...interface{}) {
format = fmt.Sprintf("server %s: ", server.Address) + format
log(l, format, a...)
}

ctx, cancel := context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()

protocol, leader, err := c.connectAttemptOne(ctx, server.Address, log)
if err != nil {
// This server is unavailable, try with the next target.
log(logging.Warn, err.Error())
return
}
if protocol != nil {
// We found the leader
log(logging.Debug, "connected")
pc <- protocol
return
}
if leader == "" {
// This server does not know who the current leader is,
// try with the next target.
log(logging.Warn, "no known leader")
return
}

// If we get here, it means this server reported that another
// server is the leader, let's close the connection to this
// server and try with the suggested one.
log(logging.Debug, "connect to reported leader %s", leader)

ctx, cancel = context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()

protocol, _, err = c.connectAttemptOne(ctx, leader, log)
if err != nil {
// The leader reported by the previous server is
// unavailable, try with the next target.
log(logging.Warn, "reported leader unavailable err=%v", err)
return
}
if protocol == nil {
// The leader reported by the target server does not consider itself
// the leader, try with the next target.
log(logging.Warn, "reported leader server is not the leader")
return
}
log(logging.Debug, "connected")
return protocol, nil
}
if leader == "" {
// This server does not know who the current leader is,
// try with the next target.
log(logging.Warn, "no known leader")
continue
}
pc <- protocol
}(server, protocolChan)
}

// If we get here, it means this server reported that another
// server is the leader, let's close the connection to this
// server and try with the suggested one.
log(logging.Debug, "connect to reported leader %s", leader)
// Read from protocol chan, cancel context
protocol, ok := <-protocolChan
if !ok {
return nil, ErrNoAvailableLeader
}

ctx, cancel = context.WithTimeout(ctx, c.config.AttemptTimeout)
defer cancel()
cancel()

protocol, _, err = c.connectAttemptOne(ctx, leader, log)
if err != nil {
// The leader reported by the previous server is
// unavailable, try with the next target.
log(logging.Warn, "reported leader unavailable err=%v", err)
continue
}
if protocol == nil {
// The leader reported by the target server does not consider itself
// the leader, try with the next target.
log(logging.Warn, "reported leader server is not the leader")
continue
}
log(logging.Debug, "connected")
return protocol, nil
for extra := range protocolChan {
extra.Close()
}

return nil, ErrNoAvailableLeader
return protocol, nil
}

// Perform the initial handshake using the given protocol version.
Expand Down

0 comments on commit 1940345

Please sign in to comment.