Skip to content

Commit

Permalink
MB-47603 : Use https port when encryption is enabled.
Browse files Browse the repository at this point in the history
Change-Id: I3bc365c20d16fd4ea3767231a2a494ba69dad68a
  • Loading branch information
ksaikrishnateja committed Aug 17, 2021
1 parent 6eb1d79 commit 15f8e6d
Show file tree
Hide file tree
Showing 18 changed files with 236 additions and 178 deletions.
55 changes: 39 additions & 16 deletions secondary/common/cluster_info.go
Expand Up @@ -637,6 +637,14 @@ func (c *ClusterInfoCache) buildEncryptPortMapping() {
if ok && ok1 {
mapping[fmt.Sprint(node.Services[CBQ_SERVICE])] = fmt.Sprint(node.Services[CBQ_SSL_SERVICE])
}

// TODO: Check this when user can specify encrypted port from query
// As ns_server does not send encrypted from all APIs we will need this in map.
// _, ok = node.Services[MGMT_SERVICE]
// _, ok1 = node.Services[MGMT_SSL_SERVICE]
// if ok && ok1 {
// mapping[fmt.Sprint(node.Services[MGMT_SERVICE])] = fmt.Sprint(node.Services[MGMT_SSL_SERVICE])
// }
}

c.encryptPortMapping = mapping
Expand Down Expand Up @@ -890,7 +898,7 @@ func (c *ClusterInfoCache) GetNodeStatus(nid NodeId) (string, error) {
return c.nodes[nid].Status, nil
}

func (c *ClusterInfoCache) GetServiceAddress(nid NodeId, srvc string) (addr string, err error) {
func (c *ClusterInfoCache) GetServiceAddress(nid NodeId, srvc string, useEncryptedPortMap bool) (addr string, err error) {
var port int
var ok bool

Expand All @@ -910,18 +918,23 @@ func (c *ClusterInfoCache) GetServiceAddress(nid NodeId, srvc string) (addr stri

// For current node, hostname might be empty
// Insert hostname used to connect to the cluster
cUrl, err := url.Parse(c.url)
if err != nil {
return "", errors.New("Unable to parse cluster url - " + err.Error())
}
h, p, _ := net.SplitHostPort(cUrl.Host)
if node.Hostname == "" {
cUrl, err := url.Parse(c.url)
if err != nil {
return "", errors.New("Unable to parse cluster url - " + err.Error())
}
h, _, _ := net.SplitHostPort(cUrl.Host)
node.Hostname = h
}

p = security.EncryptPort(node.Hostname, p)
var portStr string
if useEncryptedPortMap {
portStr = security.EncryptPort(node.Hostname, fmt.Sprint(port))
} else {
portStr = fmt.Sprint(port)
}

addr = net.JoinHostPort(node.Hostname, fmt.Sprint(port))
addr = net.JoinHostPort(node.Hostname, portStr)
return
}

Expand Down Expand Up @@ -966,7 +979,7 @@ func (c *ClusterInfoCache) sameNode(n1 couchbase.Node, n2 couchbase.Node) bool {
return n1.Hostname == n2.Hostname
}

func (c *ClusterInfoCache) GetLocalServiceAddress(srvc string) (string, error) {
func (c *ClusterInfoCache) GetLocalServiceAddress(srvc string, useEncryptedPortMap bool) (srvcAddr string, err error) {

if c.useStaticPorts {

Expand All @@ -979,16 +992,26 @@ func (c *ClusterInfoCache) GetLocalServiceAddress(srvc string) (string, error) {
if e != nil {
return "", e
}
return net.JoinHostPort(h, p), nil

srvcAddr = net.JoinHostPort(h, p)
if useEncryptedPortMap {
srvcAddr, _, _, err = security.EncryptPortFromAddr(srvcAddr)
if err != nil {
return "", err
}
}
} else {
node := c.GetCurrentNode()
return c.GetServiceAddress(node, srvc)
srvcAddr, err = c.GetServiceAddress(node, srvc, useEncryptedPortMap)
if err != nil {
return "", err
}
}

return srvcAddr, nil
}

func (c *ClusterInfoCache) GetLocalServicePort(srvc string) (string, error) {
addr, err := c.GetLocalServiceAddress(srvc)
func (c *ClusterInfoCache) GetLocalServicePort(srvc string, useEncryptedPortMap bool) (string, error) {
addr, err := c.GetLocalServiceAddress(srvc, useEncryptedPortMap)
if err != nil {
return addr, err
}
Expand All @@ -1001,9 +1024,9 @@ func (c *ClusterInfoCache) GetLocalServicePort(srvc string) (string, error) {
return net.JoinHostPort("", p), nil
}

func (c *ClusterInfoCache) GetLocalServiceHost(srvc string) (string, error) {
func (c *ClusterInfoCache) GetLocalServiceHost(srvc string, useEncryptedPortMap bool) (string, error) {

addr, err := c.GetLocalServiceAddress(srvc)
addr, err := c.GetLocalServiceAddress(srvc, useEncryptedPortMap)
if err != nil {
return addr, err
}
Expand Down
8 changes: 4 additions & 4 deletions secondary/indexer/ddl_service_manager.go
Expand Up @@ -1895,7 +1895,7 @@ func newMetadataProvider(clusterAddr string, nodes map[service.NodeID]bool, sett
nids := cinfo.GetNodesByServiceType(common.INDEX_HTTP_SERVICE)
for _, nid := range nids {

addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE)
addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true)
if err == nil {

resp, err := getWithAuth(addr + "/getLocalIndexMetadata?useETag=false")
Expand All @@ -1912,7 +1912,7 @@ func newMetadataProvider(clusterAddr string, nodes map[service.NodeID]bool, sett
if nodes[service.NodeID(localMeta.NodeUUID)] {
httpAddrMap[localMeta.NodeUUID] = addr

adminAddr, err := cinfo.GetServiceAddress(nid, common.INDEX_ADMIN_SERVICE)
adminAddr, err := cinfo.GetServiceAddress(nid, common.INDEX_ADMIN_SERVICE, true)
if err != nil {
return nil, nil, err
}
Expand All @@ -1938,7 +1938,7 @@ func newMetadataProvider(clusterAddr string, nodes map[service.NodeID]bool, sett
nids := cinfo.GetNodesByServiceType(common.INDEX_HTTP_SERVICE)

for _, nid := range nids {
adminAddr, err := cinfo.GetServiceAddress(nid, common.INDEX_ADMIN_SERVICE)
adminAddr, err := cinfo.GetServiceAddress(nid, common.INDEX_ADMIN_SERVICE, true)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -2266,7 +2266,7 @@ func postSchedTransferMap(transferMap map[string][]*mc.ScheduleCreateToken, clus
return fmt.Errorf("node with uuiid %v not found in cluster info cache", nodeUUID)
}

addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE)
addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true)
if err != nil {
return fmt.Errorf("error in GetServiceAddress %v", err)
}
Expand Down
19 changes: 14 additions & 5 deletions secondary/indexer/indexer.go
Expand Up @@ -497,14 +497,14 @@ func (idx *indexer) initSecurityContext(encryptLocalHost bool) error {

security.RegisterCallback("indexer", fn)

if err := refreshSecurityContextOnTopology(clusterAddr); err != nil {
if err := idx.refreshSecurityContextOnTopology(clusterAddr); err != nil {
return err
}

return nil
}

func refreshSecurityContextOnTopology(clusterAddr string) error {
func (idx *indexer) refreshSecurityContextOnTopology(clusterAddr string) error {

fn := func(r int, e error) error {
var cinfo *common.ClusterInfoCache
Expand All @@ -525,7 +525,15 @@ func refreshSecurityContextOnTopology(clusterAddr string) error {
return err
}

security.SetEncryptPortMapping(cinfo.EncryptPortMapping())
// When adding the node during the init time. nodesvs will not have httpPort in it.
// Add Port mapping from command line if the mapping does not have port.
mapping := cinfo.EncryptPortMapping()
httpPort := idx.config["httpPort"].String()
if _, ok := mapping[httpPort]; !ok {
httpsPort := idx.config["httpsPort"].String()
mapping[httpPort] = httpsPort
}
security.SetEncryptPortMapping(mapping)

return nil
}
Expand All @@ -546,8 +554,8 @@ func (idx *indexer) handleSecurityChange(msg Message) {
if refreshEncrypt {
logging.Infof("handleSecurityChange: refresh security context")
clusterAddr := idx.config["clusterAddr"].String()
if err := refreshSecurityContextOnTopology(clusterAddr); err != nil {
exitFn(fmt.Sprintf("Fail to refresh security contexxt on security change. Error %v", err))
if err := idx.refreshSecurityContextOnTopology(clusterAddr); err != nil {
exitFn(fmt.Sprintf("Fail to refresh security context on security change. Error %v", err))
}
}

Expand Down Expand Up @@ -5018,6 +5026,7 @@ func (idx *indexer) initServiceAddressMap() {
ServiceAddrMap[common.INDEX_ADMIN_SERVICE] = idx.config["adminPort"].String()
ServiceAddrMap[common.INDEX_SCAN_SERVICE] = idx.config["scanPort"].String()
ServiceAddrMap[common.INDEX_HTTP_SERVICE] = idx.config["httpPort"].String()
ServiceAddrMap[common.INDEX_HTTPS_SERVICE] = idx.config["httpsPort"].String()

common.SetServicePorts(ServiceAddrMap)
}
Expand Down
6 changes: 3 additions & 3 deletions secondary/indexer/kv_sender.go
Expand Up @@ -1363,7 +1363,7 @@ func (k *kvSender) getAllVbucketsInCluster(bucket string) ([]uint32, []string, e
return nil, nil, err
} else {
vbs = append(vbs, vbnos...)
addr, err := cinfo.GetServiceAddress(nid, "projector")
addr, err := cinfo.GetServiceAddress(nid, "projector", true)
if err != nil {
return nil, nil, err
}
Expand All @@ -1384,7 +1384,7 @@ func (k *kvSender) getAllProjectorAddrs() ([]string, error) {

var addrList []string
for _, nid := range nodes {
addr, err := cinfo.GetServiceAddress(nid, "projector")
addr, err := cinfo.GetServiceAddress(nid, "projector", true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1430,7 +1430,7 @@ func (k *kvSender) getProjAddrsForVbuckets(bucket string, vbnos []Vbucket) ([]st
}

if found {
addr, err := cinfo.GetServiceAddress(n, "projector")
addr, err := cinfo.GetServiceAddress(n, "projector", true)
if err != nil {
return nil, err
}
Expand Down
9 changes: 5 additions & 4 deletions secondary/indexer/rebalance_service_manager.go
Expand Up @@ -1522,10 +1522,10 @@ func (m *ServiceMgr) registerGlobalRebalanceToken(change service.TopologyChange)

for _, nid := range nids {

addr, err := m.cinfo.GetServiceAddress(nid, c.INDEX_HTTP_SERVICE)
addr, err := m.cinfo.GetServiceAddress(nid, c.INDEX_HTTP_SERVICE, true)
if err == nil {

localaddr, err := m.cinfo.GetLocalServiceAddress(c.INDEX_HTTP_SERVICE)
localaddr, err := m.cinfo.GetLocalServiceAddress(c.INDEX_HTTP_SERVICE, true)
if err != nil {
l.Errorf("ServiceMgr::registerGlobalRebalanceToken Error Fetching Local Service Address %v", err)
return errors.New(fmt.Sprintf("Fail to retrieve http endpoint for local node %v", err)), true
Expand Down Expand Up @@ -2837,14 +2837,15 @@ func (m *ServiceMgr) getNodeIdFromDest(dest string) (string, error) {

for _, nid := range nids {

maddr, err := m.cinfo.GetServiceAddress(nid, "mgmt")
// TODO: Check this when user can specify encrypted port from query.
maddr, err := m.cinfo.GetServiceAddress(nid, "mgmt", false)
if err != nil {
return "", err
}

if maddr == dest {

haddr, err := m.cinfo.GetServiceAddress(nid, c.INDEX_HTTP_SERVICE)
haddr, err := m.cinfo.GetServiceAddress(nid, c.INDEX_HTTP_SERVICE, true)
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion secondary/indexer/rebalancer.go
Expand Up @@ -1801,7 +1801,7 @@ func checkAllIndexersWarmedup(clusterURL string) (bool, []string) {

for _, nid := range nids {

addr, err := cinfo.GetServiceAddress(nid, c.INDEX_HTTP_SERVICE)
addr, err := cinfo.GetServiceAddress(nid, c.INDEX_HTTP_SERVICE, true)
if err == nil {

resp, err := getWithAuth(addr + url)
Expand Down
6 changes: 3 additions & 3 deletions secondary/manager/lifecycle.go
Expand Up @@ -3872,17 +3872,17 @@ func (m *LifecycleMgr) getServiceMap() (*client.ServiceMap, error) {
}
srvMap.IndexerId = string(id)

srvMap.ScanAddr, err = cinfo.GetLocalServiceAddress(common.INDEX_SCAN_SERVICE)
srvMap.ScanAddr, err = cinfo.GetLocalServiceAddress(common.INDEX_SCAN_SERVICE, true)
if err != nil {
return nil, err
}

srvMap.HttpAddr, err = cinfo.GetLocalServiceAddress(common.INDEX_HTTP_SERVICE)
srvMap.HttpAddr, err = cinfo.GetLocalServiceAddress(common.INDEX_HTTP_SERVICE, true)
if err != nil {
return nil, err
}

srvMap.AdminAddr, err = cinfo.GetLocalServiceAddress(common.INDEX_ADMIN_SERVICE)
srvMap.AdminAddr, err = cinfo.GetLocalServiceAddress(common.INDEX_ADMIN_SERVICE, true)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion secondary/manager/manager.go
Expand Up @@ -183,7 +183,7 @@ func NewIndexManagerInternal(config common.Config, storageMode common.StorageMod
cinfo.RLock()
defer cinfo.RUnlock()

adminPort, err := cinfo.GetLocalServicePort(common.INDEX_ADMIN_SERVICE)
adminPort, err := cinfo.GetLocalServicePort(common.INDEX_ADMIN_SERVICE, true)
if err != nil {
mgr.Close()
return nil, err
Expand Down
27 changes: 15 additions & 12 deletions secondary/manager/request_handler.go
Expand Up @@ -713,27 +713,29 @@ func (m *requestHandlerContext) getIndexStatus(creds cbauth.Creds, t *target, ge
for _, nid := range nids {

// mgmtAddr is this node's "cluster" address (host:uiPort), NOT a key for caches
mgmtAddr, err := cinfo.GetServiceAddress(nid, "mgmt")
// TODO: Check this when user can specify encrypted port from query
mgmtAddr, err := cinfo.GetServiceAddress(nid, "mgmt", false)
if err != nil {
logging.Errorf("RequestHandler::getIndexStatus: Error from GetServiceAddress (mgmt) for node id %v. Error = %v", nid, err)
continue
}

addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE)
addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true)
if err != nil {
logging.Debugf("RequestHandler::getIndexStatus: Error from GetServiceAddress (indexHttp) for node id %v. Error = %v", nid, err)
logging.Debugf("RequestHandler::getIndexStatus: Error from GetServiceAddress(indexHttp, true) for node id %v. Error = %v", nid, err)
failedNodes = append(failedNodes, mgmtAddr)
continue
}

u, err := security.GetURL(addr)
// Use same un-encrypted port number as key, even if the encryption level changes in between
addrForKey, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, false)
if err != nil {
logging.Debugf("RequestHandler::getIndexStatus: Fail to parse URL %v", addr)
logging.Debugf("RequestHandler::getIndexStatus: Error from GetServiceAddress(indexHttp, false) for node id %v. Error = %v", nid, err)
failedNodes = append(failedNodes, mgmtAddr)
continue
}

hostname := u.Host
hostname := addrForKey
hostKey := host2key(hostname) // key to caches
keepKeys = append(keepKeys, hostKey)
stale := false
Expand Down Expand Up @@ -1295,7 +1297,7 @@ func (m *requestHandlerContext) getIndexMetadata(creds cbauth.Creds, t *target)

for i, nid := range nids {

addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE)
addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true)
if err == nil {

url := "/getLocalIndexMetadata"
Expand Down Expand Up @@ -2492,7 +2494,7 @@ func (m *requestHandlerContext) getLocalIndexMetadataForNode(addr string, host s
var latest *LocalIndexMetadata
nids := cinfo.GetNodesByServiceType(common.INDEX_HTTP_SERVICE)
for _, nid := range nids {
addr, err1 := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE)
addr, err1 := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true)
if err1 == nil {
cached, err1 := m.getCachedLocalIndexMetadataFromREST(addr, host)
if cached != nil && err1 == nil {
Expand Down Expand Up @@ -2727,7 +2729,7 @@ func (m *requestHandlerContext) getStatsForNode(addr string, host string, cinfo
var latest *common.Statistics
nids := cinfo.GetNodesByServiceType(common.INDEX_HTTP_SERVICE)
for _, nid := range nids {
addr, err1 := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE)
addr, err1 := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true)
if err1 == nil {
cached, err1 := m.getCachedStatsFromREST(addr, host)
if cached != nil && err1 == nil {
Expand Down Expand Up @@ -3276,7 +3278,7 @@ func (m *requestHandlerContext) bucketBackupHandler(bucket, include, exclude str
cinfo.RLock()
defer cinfo.RUnlock()

addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE)
addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true)
if err == nil {
url := "/getLocalIndexMetadata?bucket=" + u.QueryEscape(bucket)
if len(include) != 0 {
Expand Down Expand Up @@ -3336,7 +3338,7 @@ func (m *requestHandlerContext) bucketBackupHandler(bucket, include, exclude str
localMeta := new(LocalIndexMetadata)
status := convertResponse(resp, localMeta)
if status == RESP_ERROR {
addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE)
addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true)
if err != nil {
return nil, errors.New(fmt.Sprintf("Fail to retrieve local metadata from node id %v.", nid))
} else {
Expand Down Expand Up @@ -3660,7 +3662,8 @@ func (s *schedTokenMonitor) getNodeAddr(token *mc.ScheduleCreateToken) (string,
fetched = true
}

return s.cinfo.GetServiceAddress(nid, "mgmt")
// TODO: Check impact of having an un encrypted port here
return s.cinfo.GetServiceAddress(nid, "mgmt", false)
}

func (s *schedTokenMonitor) makeIndexStatus(token *mc.ScheduleCreateToken) *IndexStatus {
Expand Down

0 comments on commit 15f8e6d

Please sign in to comment.