Skip to content

Commit

Permalink
feat: add ErrConnRefused to deal with dead cluster members
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Jul 24, 2022
1 parent 17e59f6 commit 8eaeb58
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 11 deletions.
4 changes: 3 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const DefaultScanCount = 10

// Member denotes a member of the Olric cluster.
type Member struct {
// Member name in the cluster
// Member name in the cluster. It's also host:port of the node.
Name string

// ID of the Member in the cluster. Hash of Name and Birthdate of the member
Expand Down Expand Up @@ -289,6 +289,8 @@ type Client interface {
// Members returns a thread-safe list of cluster members.
Members(ctx context.Context) ([]Member, error)

RefreshMetadata(ctx context.Context) error

// Close stops background routines and frees allocated resources.
Close(ctx context.Context) error
}
52 changes: 50 additions & 2 deletions cluster_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ package olric
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net"
"os"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/buraksezer/olric/config"
Expand Down Expand Up @@ -69,6 +72,10 @@ func processProtocolError(err error) error {
if err == redis.Nil {
return ErrKeyNotFound
}
if errors.Is(err, syscall.ECONNREFUSED) {
opErr := err.(*net.OpError)
return fmt.Errorf("%s %s %s: %w", opErr.Op, opErr.Net, opErr.Addr, ErrConnRefused)
}
return convertDMapError(protocol.ConvertError(err))
}

Expand Down Expand Up @@ -610,6 +617,36 @@ func (cl *ClusterClient) Members(ctx context.Context) ([]Member, error) {
return members, nil
}

func (cl *ClusterClient) RefreshMetadata(ctx context.Context) error {
var members []Member
var err error
for {
members, err = cl.Members(ctx)
if errors.Is(err, ErrConnRefused) {
err = nil
continue
}
if err != nil {
return err
}
break
}
addresses := make(map[string]struct{})
for _, member := range members {
addresses[member.Name] = struct{}{}
}

for addr := range cl.client.Addresses() {
if _, ok := addresses[addr]; !ok {
// Gone
if err := cl.client.Close(addr); err != nil {
return err
}
}
}
return cl.fetchRoutingTable()
}

// Close stops background routines and frees allocated resources.
func (cl *ClusterClient) Close(ctx context.Context) error {
select {
Expand Down Expand Up @@ -711,7 +748,7 @@ func (cl *ClusterClient) fetchRoutingTablePeriodically() {
case <-ticker.C:
err := cl.fetchRoutingTable()
if err != nil {
cl.logger.Printf("[ERROR] Failed to fetch the latest routing table: %s", err)
cl.logger.Printf("[ERROR] Failed to fetch the latest version of the routing table: %s", err)
}
}
}
Expand Down Expand Up @@ -760,13 +797,24 @@ func NewClusterClient(addresses []string, options ...ClusterClientOption) (*Clus
cl.client.Get(address)
}

// Discover all cluster members
members, err := cl.Members(ctx)
if err != nil {
return nil, fmt.Errorf("error while discovering the cluster members: %w", err)
}
for _, member := range members {
cl.client.Get(member.Name)
}

// Hash function is required to target primary owners instead of random cluster members.
partitions.SetHashFunc(cc.hasher)

// Initial fetch.
// Initial fetch. ClusterClient targets the primary owners for a smooth and quick operation.
if err := cl.fetchRoutingTable(); err != nil {
return nil, err
}

// Refresh the routing table in every 15 seconds.
cl.wg.Add(1)
go cl.fetchRoutingTablePeriodically()

Expand Down
2 changes: 1 addition & 1 deletion cluster_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (i *ClusterIterator) fetchRoutingTablePeriodically() {
return
case <-time.After(time.Second):
if err := i.fetchRoutingTable(); err != nil {
i.logger.Printf("[ERROR] Failed to fetch the latest routing table: %s", err)
i.logger.Printf("[ERROR] Failed to fetch the latest version of the routing table: %s", err)
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions embedded_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func (dm *EmbeddedDMap) Pipeline() (*DMapPipeline, error) {
return cdm.Pipeline()
}

func (e *EmbeddedClient) RefreshMetadata(_ context.Context) error {
// EmbeddedClient already has the latest metadata.
return nil
}

// Scan returns an iterator to loop over the keys.
//
// Available scan options:
Expand Down
12 changes: 12 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package olric

import (
"context"
"errors"
"fmt"
"io"
"testing"
Expand Down Expand Up @@ -71,6 +72,12 @@ func TestIntegration_NodesJoinOrLeftDuringQuery(t *testing.T) {

for i := 0; i < 100000; i++ {
_, err = dm.Get(context.Background(), fmt.Sprintf("mykey-%d", i))
if errors.Is(err, ErrConnRefused) {
// Rewind
i--
require.NoError(t, c.RefreshMetadata(context.Background()))
continue
}
require.NoError(t, err)
if i == 5999 {
err = c.client.Close(db2.name)
Expand Down Expand Up @@ -411,6 +418,11 @@ func TestIntegration_Kill_Nodes_During_Operation(t *testing.T) {

for i := 0; i < 100000; i++ {
_, err = dm.Get(context.Background(), fmt.Sprintf("mykey-%d", i))
if errors.Is(err, ErrConnRefused) {
i--
fmt.Println(c.RefreshMetadata(context.Background()))
continue
}
require.NoError(t, err)
}
}
Expand Down
29 changes: 22 additions & 7 deletions internal/server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ func NewClient(c *config.Client) *Client {
}
}

func (c *Client) Addresses() map[string]struct{} {
c.mu.RLock()
defer c.mu.RUnlock()

addresses := make(map[string]struct{})
for address, _ := range c.clients {
addresses[address] = struct{}{}
}
return addresses
}

func (c *Client) Get(addr string) *redis.Client {
c.mu.RLock()
rc, ok := c.clients[addr]
Expand All @@ -69,22 +80,26 @@ func (c *Client) Get(addr string) *redis.Client {
return rc
}

func (c *Client) Pick() (*redis.Client, error) {
func (c *Client) pickNodeRoundRobin() (string, error) {
c.mu.RLock()
defer c.mu.RUnlock()

addr, err := c.roundRobin.Get()
if err == roundrobin.ErrEmptyInstance {
return nil, fmt.Errorf("no available client found")
return "", fmt.Errorf("no available client found")
}
if err != nil {
return nil, err
return "", err
}
rc, ok := c.clients[addr]
if !ok {
return nil, fmt.Errorf("client could not be found: %s", addr)
return addr, nil
}

func (c *Client) Pick() (*redis.Client, error) {
addr, err := c.pickNodeRoundRobin()
if err != nil {
return nil, err
}
return rc, nil
return c.Get(addr), nil
}

func (c *Client) Close(addr string) error {
Expand Down
2 changes: 2 additions & 0 deletions olric.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ var (

// ErrEntryTooLarge returned if required space for an entry is bigger than table size.
ErrEntryTooLarge = errors.New("entry too large for the configured table size")

ErrConnRefused = errors.New("connection refused")
)

// Olric implements a distributed cache and in-memory key/value data store.
Expand Down

0 comments on commit 8eaeb58

Please sign in to comment.