Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

policy: roundrobbin should try all up hosts #1351

Merged
merged 1 commit into from Sep 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
101 changes: 56 additions & 45 deletions policies.go
Expand Up @@ -6,6 +6,8 @@ package gocql

import (
"context"
crand "crypto/rand"
"encoding/binary"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -334,8 +336,6 @@ func RoundRobinHostPolicy() HostSelectionPolicy {

type roundRobinHostPolicy struct {
hosts cowHostList
pos uint32
mu sync.RWMutex
}

func (r *roundRobinHostPolicy) IsLocal(*HostInfo) bool { return true }
Expand All @@ -344,25 +344,16 @@ func (r *roundRobinHostPolicy) SetPartitioner(partitioner string) {}
func (r *roundRobinHostPolicy) Init(*Session) {}

func (r *roundRobinHostPolicy) Pick(qry ExecutableQuery) NextHost {
// i is used to limit the number of attempts to find a host
// to the number of hosts known to this policy
var i int
return func() SelectedHost {
hosts := r.hosts.get()
if len(hosts) == 0 {
return nil
}
src := r.hosts.get()
hosts := make([]*HostInfo, len(src))
copy(hosts, src)

// always increment pos to evenly distribute traffic in case of
// failures
pos := atomic.AddUint32(&r.pos, 1) - 1
if i >= len(hosts) {
return nil
}
host := hosts[(pos)%uint32(len(hosts))]
i++
return (*selectedHost)(host)
}
rand := rand.New(randSource())
rand.Shuffle(len(hosts), func(i, j int) {
hosts[i], hosts[j] = hosts[j], hosts[i]
})

return roundRobbin(hosts)
}

func (r *roundRobinHostPolicy) AddHost(host *HostInfo) {
Expand Down Expand Up @@ -585,8 +576,8 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {

token := meta.tokenRing.partitioner.Hash(routingKey)
ht := meta.replicas[qry.Keyspace()].replicasFor(token)
var replicas []*HostInfo

var replicas []*HostInfo
if ht == nil {
host, _ := meta.tokenRing.GetHostForToken(token)
replicas = []*HostInfo{host}
Expand Down Expand Up @@ -792,8 +783,6 @@ func (host selectedHostPoolHost) Mark(err error) {

type dcAwareRR struct {
local string
pos uint32
mu sync.RWMutex
localHosts cowHostList
remoteHosts cowHostList
}
Expand All @@ -814,15 +803,15 @@ func (d *dcAwareRR) IsLocal(host *HostInfo) bool {
}

func (d *dcAwareRR) AddHost(host *HostInfo) {
if host.DataCenter() == d.local {
if d.IsLocal(host) {
d.localHosts.add(host)
} else {
d.remoteHosts.add(host)
}
}

func (d *dcAwareRR) RemoveHost(host *HostInfo) {
if host.DataCenter() == d.local {
if d.IsLocal(host) {
d.localHosts.remove(host.ConnectAddress())
} else {
d.remoteHosts.remove(host.ConnectAddress())
Expand All @@ -832,33 +821,55 @@ func (d *dcAwareRR) RemoveHost(host *HostInfo) {
func (d *dcAwareRR) HostUp(host *HostInfo) { d.AddHost(host) }
func (d *dcAwareRR) HostDown(host *HostInfo) { d.RemoveHost(host) }

func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost {
var randSeed int64

func init() {
p := make([]byte, 8)
if _, err := crand.Read(p); err != nil {
panic(err)
}
randSeed = int64(binary.BigEndian.Uint64(p))
}

func randSource() rand.Source {
return rand.NewSource(atomic.AddInt64(&randSeed, 1))
}

func roundRobbin(hosts []*HostInfo) NextHost {
var i int
return func() SelectedHost {
var hosts []*HostInfo
localHosts := d.localHosts.get()
remoteHosts := d.remoteHosts.get()
if len(localHosts) != 0 {
hosts = localHosts
} else {
hosts = remoteHosts
}
if len(hosts) == 0 {
return nil
}
for i < len(hosts) {
h := hosts[i]
i++

// always increment pos to evenly distribute traffic in case of
// failures
pos := atomic.AddUint32(&d.pos, 1) - 1
if i >= len(localHosts)+len(remoteHosts) {
return nil
if h.IsUp() {
return (*selectedHost)(h)
}
}
host := hosts[(pos)%uint32(len(hosts))]
i++
return (*selectedHost)(host)

return nil
}
}

func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost {
local := d.localHosts.get()
remote := d.remoteHosts.get()
hosts := make([]*HostInfo, len(local)+len(remote))
n := copy(hosts, local)
copy(hosts[n:], remote)

// TODO: use random chose-2 but that will require plumbing information
// about connection/host load to here
r := rand.New(randSource())
for _, l := range [][]*HostInfo{local, remote} {
r.Shuffle(len(l), func(i, j int) {
l[i], l[j] = l[j], l[i]
})
}

return roundRobbin(hosts)
}

// ConvictionPolicy interface is used by gocql to determine if a host should be
// marked as DOWN based on the error and host info
type ConvictionPolicy interface {
Expand Down