Skip to content

Commit

Permalink
MB-48030 Retry shutdown with local kvaddrs incase of node rename
Browse files Browse the repository at this point in the history
BP of MB-48300 to 7.0.2

In case where a node gets added with localhost address first
and then ns_server changes the IP address to a public/private
IP, then the feed.nodeFeeds book-keeping becomes inconsistent.

E.g., when a node is added with 127.0.0.1 as IP address and
streams are opened, the key to nodeFeeds would be 127.0.0.1
When ns_server changes the IP address, then feed.bucket.getMasterNode
would return the changed IP address. This new IP address will not
be a part of nodeFeeds book-keeping and this call will fail.

Currently, shutdownVBuckets code path does not handle errors. So,
the memcached.ErrorInvalidFeed error gets silently ignored and
the vbucket will never be shutdown. This will lead to indexer endup
in a repair loop. To fix this, we check if the master node's IP address
is same as the local IP address. If yes, we go ahead and shutdown the
stream by using the local kvaddress

Change-Id: I1f02cd863f707ad04bbec4d47262f28878bc70ff
(cherry picked from commit 79356cf)
  • Loading branch information
varunv-cb committed Sep 7, 2021
1 parent 9626d27 commit 7f79312
Showing 1 changed file with 112 additions and 5 deletions.
117 changes: 112 additions & 5 deletions secondary/dcp/upr.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package couchbase
import (
"errors"
"fmt"
"net"
"sync"
"time"

"github.com/couchbase/indexing/secondary/dcp/transport"
memcached "github.com/couchbase/indexing/secondary/dcp/transport/client"
"github.com/couchbase/indexing/secondary/logging"
"github.com/couchbase/indexing/secondary/security"
)

// ErrorInvalidVbucket
Expand Down Expand Up @@ -509,18 +511,123 @@ func (feed *DcpFeed) dcpCloseStream(vb, opaqueMSB uint16) error {
logging.Errorf(fmsg, prefix, opaqueMSB, vb)
return ErrorInvalidVbucket
}
singleFeed, ok := removefromfeed(feed.nodeFeeds[master], vb)
if !ok {
fmsg := "%v ##%x notFound DcpFeed host: %q vb:%d"
logging.Errorf(fmsg, prefix, opaqueMSB, master, vb)
return memcached.ErrorInvalidFeed
singleFeed, err := feed.getSingleFeed(master, vb, prefix, opaqueMSB)
if err != nil {
return err
}
if err := singleFeed.dcpFeed.CloseStream(vb, opaqueMSB); err != nil {
return err
}
return nil
}

func (feed *DcpFeed) getSingleFeed(master string, vb uint16, prefix string, opaqueMSB uint16) (*FeedInfo, error) {
singleFeed, ok := removefromfeed(feed.nodeFeeds[master], vb)

if !ok {
// In case where a node gets added with localhost address first
// and then ns_server changes the IP address to a public/private
// IP, then the feed.nodeFeeds book-keeping becomes inconsistent.
//
// E.g., when a node is added with 127.0.0.1 as IP address and
// streams are opened, the key to nodeFeeds would be 127.0.0.1
// When ns_server changes the IP address, then feed.bucket.getMasterNode
// would return the changed IP address. This new IP address will not
// be a part of nodeFeeds book-keeping and this call will fail.
//
// Currently, shutdownVBuckets code path does not handle errors. So,
// the memcached.ErrorInvalidFeed error gets silently ignored and
// the vbucket will never be shutdown. This will lead to indexer endup
// in a repair loop. To fix this, we check if the master node's IP address
// is same as the local IP address. If yes, we go ahead and shutdown the
// stream by using the local kvaddress
if islocalIP, err := isLocalIP(master); err != nil {
logging.Errorf("%v ##%x err: %v observed while retrieving local IP "+
"for master: %v", prefix, opaqueMSB, master)
return nil, memcached.ErrorInvalidFeed
} else if islocalIP && len(feed.kvaddrs) == 1 {
fmsg := "%v ##%x notFound DcpFeed host: %q vb:%d, trying with kvaddrs: %v"
logging.Warnf(fmsg, prefix, opaqueMSB, master, vb, feed.kvaddrs[0])
// Trying with local address. kvaddrs[0] is the local kv address
singleFeed, ok = removefromfeed(feed.nodeFeeds[feed.kvaddrs[0]], vb)
if !ok {
fmsg := "%v ##%x notFound DcpFeed host: %q vb:%d with kvaddrs: %v"
logging.Errorf(fmsg, prefix, opaqueMSB, master, vb, feed.kvaddrs[0])
return nil, memcached.ErrorInvalidFeed
}
} else {
fmsg := "%v ##%x notFound DcpFeed host: %q vb:%d, kvaddrs: %v"
logging.Errorf(fmsg, prefix, opaqueMSB, master, vb, feed.kvaddrs)
return nil, memcached.ErrorInvalidFeed
}
}
return singleFeed, nil
}

func isLocalIP(hostport string) (bool, error) {

host, _, err := net.SplitHostPort(hostport)
if err != nil {
return false, err
}

localHost, err := getLocalIP()
if err != nil {
return false, err
}

inputHost := net.ParseIP(host)

if localHost.Equal(inputHost) {
return true, nil
}
return false, nil
}

// getLocalIP return the first external-IP4 configured for the first
// interface connected to this node.
// (This method is an exact implementation of GetLocalIP in common/util.go)
// (Making a copy of it here to avoid package dependencies)
func getLocalIP() (net.IP, error) {
interfaces, err := net.Interfaces()
if err != nil {
return nil, err
}
for _, iface := range interfaces {
if (iface.Flags & net.FlagUp) == 0 {
continue // interface down
}
if (iface.Flags & net.FlagLoopback) != 0 {
continue // loopback interface
}
addrs, err := iface.Addrs()
if err != nil {
return nil, err
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip != nil && !ip.IsLoopback() {
if security.IsIpv6() {
if ip = ip.To16(); ip != nil {
return ip, nil
}
} else {
if ip = ip.To4(); ip != nil {
return ip, nil
}
}
}
}
}
return nil, errors.New("cannot find local IP address")
}

func (feed *DcpFeed) dcpGetSeqnos() (map[uint16]uint64, error) {
count := len(feed.nodeFeeds)
ch := make(chan []interface{}, count)
Expand Down

0 comments on commit 7f79312

Please sign in to comment.