Skip to content

Commit

Permalink
Fix cluster pipeline tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Dec 20, 2016
1 parent a3eed90 commit cd7431c
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 41 deletions.
25 changes: 17 additions & 8 deletions cluster.go
Expand Up @@ -95,18 +95,22 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
}

if clOpt.RouteByLatency {
const probes = 10
for i := 0; i < probes; i++ {
t1 := time.Now()
node.Client.Ping()
node.Latency += time.Since(t1)
}
node.Latency = node.Latency / probes
node.updateLatency()
}

return &node
}

func (n *clusterNode) updateLatency() {
const probes = 10
for i := 0; i < probes; i++ {
start := time.Now()
n.Client.Ping()
n.Latency += time.Since(start)
}
n.Latency = n.Latency / probes
}

func (n *clusterNode) Loading() bool {
return !n.loading.IsZero() && time.Since(n.loading) < time.Minute
}
Expand Down Expand Up @@ -290,14 +294,19 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
}

func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
const threshold = time.Millisecond

nodes := c.slotNodes(slot)
if len(nodes) == 0 {
return c.nodes.Random()
}

var node *clusterNode
for _, n := range nodes {
if node == nil || n.Latency < node.Latency {
if n.Loading() {
continue
}
if node == nil || node.Latency-n.Latency > threshold {
node = n
}
}
Expand Down
78 changes: 45 additions & 33 deletions cluster_test.go
Expand Up @@ -6,15 +6,14 @@ import (
"strconv"
"strings"
"sync"

"testing"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"gopkg.in/redis.v5"
"gopkg.in/redis.v5/internal/hashtag"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

type clusterScenario struct {
Expand All @@ -24,10 +23,6 @@ type clusterScenario struct {
clients map[string]*redis.Client
}

func (s *clusterScenario) primary() *redis.Client {
return s.clients[s.ports[0]]
}

func (s *clusterScenario) masters() []*redis.Client {
result := make([]*redis.Client, 3)
for pos, port := range s.ports[:3] {
Expand Down Expand Up @@ -157,6 +152,9 @@ func slotEqual(s1, s2 redis.ClusterSlot) bool {
if s1.End != s2.End {
return false
}
if len(s1.Nodes) != len(s2.Nodes) {
return false
}
for i, n1 := range s1.Nodes {
if n1.Addr != s2.Nodes[i].Addr {
return false
Expand All @@ -182,9 +180,10 @@ func stopCluster(scenario *clusterScenario) error {
//------------------------------------------------------------------------------

var _ = Describe("ClusterClient", func() {
var opt *redis.ClusterOptions
var client *redis.ClusterClient

describeClusterClient := func() {
assertClusterClient := func() {
It("should CLUSTER SLOTS", func() {
res, err := client.ClusterSlots().Result()
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -377,11 +376,13 @@ var _ = Describe("ClusterClient", func() {
var pipe *redis.Pipeline

assertPipeline := func() {
It("follows redirects", func() {
slot := hashtag.Slot("A")
Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
keys := []string{"A", "B", "C", "D", "E", "F", "G"}

keys := []string{"A", "B", "C", "D", "E", "F", "G"}
It("follows redirects", func() {
for _, key := range keys {
slot := hashtag.Slot(key)
client.SwapSlotNodes(slot)
}

for i, key := range keys {
pipe.Set(key, key+"_value", 0)
Expand All @@ -391,32 +392,42 @@ var _ = Describe("ClusterClient", func() {
Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(14))

if opt.RouteByLatency {
return
}

for _, key := range keys {
slot := hashtag.Slot(key)
client.SwapSlotNodes(slot)
}

for _, key := range keys {
pipe.Get(key)
pipe.TTL(key)
}
cmds, err = pipe.Exec()
Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(14))
Expect(cmds[0].(*redis.StringCmd).Val()).To(Equal("A_value"))
Expect(cmds[1].(*redis.DurationCmd).Val()).To(BeNumerically("~", 1*time.Hour, time.Second))
Expect(cmds[6].(*redis.StringCmd).Val()).To(Equal("D_value"))
Expect(cmds[7].(*redis.DurationCmd).Val()).To(BeNumerically("~", 4*time.Hour, time.Second))
Expect(cmds[12].(*redis.StringCmd).Val()).To(Equal("G_value"))
Expect(cmds[13].(*redis.DurationCmd).Val()).To(BeNumerically("~", 7*time.Hour, time.Second))

for i, key := range keys {
get := cmds[i*2].(*redis.StringCmd)
Expect(get.Val()).To(Equal(key + "_value"))

ttl := cmds[(i*2)+1].(*redis.DurationCmd)
Expect(ttl.Val()).To(BeNumerically("~", time.Duration(i+1)*time.Hour, time.Second))
}
})

It("works with missing keys", func() {
Expect(client.Set("A", "A_value", 0).Err()).NotTo(HaveOccurred())
Expect(client.Set("C", "C_value", 0).Err()).NotTo(HaveOccurred())

var a, b, c *redis.StringCmd
cmds, err := client.Pipelined(func(pipe *redis.Pipeline) error {
a = pipe.Get("A")
b = pipe.Get("B")
c = pipe.Get("C")
return nil
})
pipe.Set("A", "A_value", 0)
pipe.Set("C", "C_value", 0)
_, err := pipe.Exec()
Expect(err).NotTo(HaveOccurred())

a := pipe.Get("A")
b := pipe.Get("B")
c := pipe.Get("C")
cmds, err := pipe.Exec()
Expect(err).To(Equal(redis.Nil))
Expect(cmds).To(HaveLen(3))

Expand Down Expand Up @@ -476,7 +487,8 @@ var _ = Describe("ClusterClient", func() {

Describe("default ClusterClient", func() {
BeforeEach(func() {
client = cluster.clusterClient(redisClusterOptions())
opt = redisClusterOptions()
client = cluster.clusterClient(opt)

_ = client.ForEachMaster(func(master *redis.Client) error {
return master.FlushDb().Err()
Expand All @@ -487,12 +499,12 @@ var _ = Describe("ClusterClient", func() {
Expect(client.Close()).NotTo(HaveOccurred())
})

describeClusterClient()
assertClusterClient()
})

Describe("ClusterClient with RouteByLatency", func() {
BeforeEach(func() {
opt := redisClusterOptions()
opt = redisClusterOptions()
opt.RouteByLatency = true
client = cluster.clusterClient(opt)

Expand All @@ -506,7 +518,7 @@ var _ = Describe("ClusterClient", func() {
Expect(client.Close()).NotTo(HaveOccurred())
})

describeClusterClient()
assertClusterClient()
})
})

Expand Down

0 comments on commit cd7431c

Please sign in to comment.