From 15f8e6d3e89a0ead4e6e3c9a4b6472aee429c1f8 Mon Sep 17 00:00:00 2001 From: Sai Krishna Teja Kommaraju Date: Wed, 28 Jul 2021 16:27:16 +0530 Subject: [PATCH] MB-47603 : Use https port when encryption is enabled. Change-Id: I3bc365c20d16fd4ea3767231a2a494ba69dad68a --- secondary/common/cluster_info.go | 55 +++++--- secondary/indexer/ddl_service_manager.go | 8 +- secondary/indexer/indexer.go | 19 ++- secondary/indexer/kv_sender.go | 6 +- .../indexer/rebalance_service_manager.go | 9 +- secondary/indexer/rebalancer.go | 2 +- secondary/manager/lifecycle.go | 6 +- secondary/manager/manager.go | 2 +- secondary/manager/request_handler.go | 27 ++-- secondary/manager/test/restore/testclient.go | 119 +++++++++--------- secondary/planner/proxy.go | 7 +- secondary/projector/feed.go | 4 +- secondary/queryport/client/cbq_client.go | 33 ++--- secondary/queryport/client/meta_client.go | 2 +- .../secondaryindex/secondaryindexstats.go | 2 +- secondary/tools/datapath/datapath.go | 35 +++--- secondary/tools/multibuckets/multibuckets.go | 40 +++--- secondary/tools/recovery/recovery.go | 38 +++--- 18 files changed, 236 insertions(+), 178 deletions(-) diff --git a/secondary/common/cluster_info.go b/secondary/common/cluster_info.go index f6d2eb56c..e70b18ba3 100644 --- a/secondary/common/cluster_info.go +++ b/secondary/common/cluster_info.go @@ -637,6 +637,14 @@ func (c *ClusterInfoCache) buildEncryptPortMapping() { if ok && ok1 { mapping[fmt.Sprint(node.Services[CBQ_SERVICE])] = fmt.Sprint(node.Services[CBQ_SSL_SERVICE]) } + + // TODO: Check this when user can specify encrypted port from query + // As ns_server does not send encrypted from all APIs we will need this in map. + // _, ok = node.Services[MGMT_SERVICE] + // _, ok1 = node.Services[MGMT_SSL_SERVICE] + // if ok && ok1 { + // mapping[fmt.Sprint(node.Services[MGMT_SERVICE])] = fmt.Sprint(node.Services[MGMT_SSL_SERVICE]) + // } } c.encryptPortMapping = mapping @@ -890,7 +898,7 @@ func (c *ClusterInfoCache) GetNodeStatus(nid NodeId) (string, error) { return c.nodes[nid].Status, nil } -func (c *ClusterInfoCache) GetServiceAddress(nid NodeId, srvc string) (addr string, err error) { +func (c *ClusterInfoCache) GetServiceAddress(nid NodeId, srvc string, useEncryptedPortMap bool) (addr string, err error) { var port int var ok bool @@ -910,18 +918,23 @@ func (c *ClusterInfoCache) GetServiceAddress(nid NodeId, srvc string) (addr stri // For current node, hostname might be empty // Insert hostname used to connect to the cluster - cUrl, err := url.Parse(c.url) - if err != nil { - return "", errors.New("Unable to parse cluster url - " + err.Error()) - } - h, p, _ := net.SplitHostPort(cUrl.Host) if node.Hostname == "" { + cUrl, err := url.Parse(c.url) + if err != nil { + return "", errors.New("Unable to parse cluster url - " + err.Error()) + } + h, _, _ := net.SplitHostPort(cUrl.Host) node.Hostname = h } - p = security.EncryptPort(node.Hostname, p) + var portStr string + if useEncryptedPortMap { + portStr = security.EncryptPort(node.Hostname, fmt.Sprint(port)) + } else { + portStr = fmt.Sprint(port) + } - addr = net.JoinHostPort(node.Hostname, fmt.Sprint(port)) + addr = net.JoinHostPort(node.Hostname, portStr) return } @@ -966,7 +979,7 @@ func (c *ClusterInfoCache) sameNode(n1 couchbase.Node, n2 couchbase.Node) bool { return n1.Hostname == n2.Hostname } -func (c *ClusterInfoCache) GetLocalServiceAddress(srvc string) (string, error) { +func (c *ClusterInfoCache) GetLocalServiceAddress(srvc string, useEncryptedPortMap bool) (srvcAddr string, err error) { if c.useStaticPorts { @@ -979,16 +992,26 @@ func (c *ClusterInfoCache) GetLocalServiceAddress(srvc string) (string, error) { if e != nil { return "", e } - return net.JoinHostPort(h, p), nil - + srvcAddr = net.JoinHostPort(h, p) + if useEncryptedPortMap { + srvcAddr, _, _, err = security.EncryptPortFromAddr(srvcAddr) + if err != nil { + return "", err + } + } } else { node := c.GetCurrentNode() - return c.GetServiceAddress(node, srvc) + srvcAddr, err = c.GetServiceAddress(node, srvc, useEncryptedPortMap) + if err != nil { + return "", err + } } + + return srvcAddr, nil } -func (c *ClusterInfoCache) GetLocalServicePort(srvc string) (string, error) { - addr, err := c.GetLocalServiceAddress(srvc) +func (c *ClusterInfoCache) GetLocalServicePort(srvc string, useEncryptedPortMap bool) (string, error) { + addr, err := c.GetLocalServiceAddress(srvc, useEncryptedPortMap) if err != nil { return addr, err } @@ -1001,9 +1024,9 @@ func (c *ClusterInfoCache) GetLocalServicePort(srvc string) (string, error) { return net.JoinHostPort("", p), nil } -func (c *ClusterInfoCache) GetLocalServiceHost(srvc string) (string, error) { +func (c *ClusterInfoCache) GetLocalServiceHost(srvc string, useEncryptedPortMap bool) (string, error) { - addr, err := c.GetLocalServiceAddress(srvc) + addr, err := c.GetLocalServiceAddress(srvc, useEncryptedPortMap) if err != nil { return addr, err } diff --git a/secondary/indexer/ddl_service_manager.go b/secondary/indexer/ddl_service_manager.go index df6bc8c30..fde94692b 100644 --- a/secondary/indexer/ddl_service_manager.go +++ b/secondary/indexer/ddl_service_manager.go @@ -1895,7 +1895,7 @@ func newMetadataProvider(clusterAddr string, nodes map[service.NodeID]bool, sett nids := cinfo.GetNodesByServiceType(common.INDEX_HTTP_SERVICE) for _, nid := range nids { - addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE) + addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true) if err == nil { resp, err := getWithAuth(addr + "/getLocalIndexMetadata?useETag=false") @@ -1912,7 +1912,7 @@ func newMetadataProvider(clusterAddr string, nodes map[service.NodeID]bool, sett if nodes[service.NodeID(localMeta.NodeUUID)] { httpAddrMap[localMeta.NodeUUID] = addr - adminAddr, err := cinfo.GetServiceAddress(nid, common.INDEX_ADMIN_SERVICE) + adminAddr, err := cinfo.GetServiceAddress(nid, common.INDEX_ADMIN_SERVICE, true) if err != nil { return nil, nil, err } @@ -1938,7 +1938,7 @@ func newMetadataProvider(clusterAddr string, nodes map[service.NodeID]bool, sett nids := cinfo.GetNodesByServiceType(common.INDEX_HTTP_SERVICE) for _, nid := range nids { - adminAddr, err := cinfo.GetServiceAddress(nid, common.INDEX_ADMIN_SERVICE) + adminAddr, err := cinfo.GetServiceAddress(nid, common.INDEX_ADMIN_SERVICE, true) if err != nil { return nil, nil, err } @@ -2266,7 +2266,7 @@ func postSchedTransferMap(transferMap map[string][]*mc.ScheduleCreateToken, clus return fmt.Errorf("node with uuiid %v not found in cluster info cache", nodeUUID) } - addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE) + addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true) if err != nil { return fmt.Errorf("error in GetServiceAddress %v", err) } diff --git a/secondary/indexer/indexer.go b/secondary/indexer/indexer.go index 7ae16c47c..3d21f8155 100644 --- a/secondary/indexer/indexer.go +++ b/secondary/indexer/indexer.go @@ -497,14 +497,14 @@ func (idx *indexer) initSecurityContext(encryptLocalHost bool) error { security.RegisterCallback("indexer", fn) - if err := refreshSecurityContextOnTopology(clusterAddr); err != nil { + if err := idx.refreshSecurityContextOnTopology(clusterAddr); err != nil { return err } return nil } -func refreshSecurityContextOnTopology(clusterAddr string) error { +func (idx *indexer) refreshSecurityContextOnTopology(clusterAddr string) error { fn := func(r int, e error) error { var cinfo *common.ClusterInfoCache @@ -525,7 +525,15 @@ func refreshSecurityContextOnTopology(clusterAddr string) error { return err } - security.SetEncryptPortMapping(cinfo.EncryptPortMapping()) + // When adding the node during the init time. nodesvs will not have httpPort in it. + // Add Port mapping from command line if the mapping does not have port. + mapping := cinfo.EncryptPortMapping() + httpPort := idx.config["httpPort"].String() + if _, ok := mapping[httpPort]; !ok { + httpsPort := idx.config["httpsPort"].String() + mapping[httpPort] = httpsPort + } + security.SetEncryptPortMapping(mapping) return nil } @@ -546,8 +554,8 @@ func (idx *indexer) handleSecurityChange(msg Message) { if refreshEncrypt { logging.Infof("handleSecurityChange: refresh security context") clusterAddr := idx.config["clusterAddr"].String() - if err := refreshSecurityContextOnTopology(clusterAddr); err != nil { - exitFn(fmt.Sprintf("Fail to refresh security contexxt on security change. Error %v", err)) + if err := idx.refreshSecurityContextOnTopology(clusterAddr); err != nil { + exitFn(fmt.Sprintf("Fail to refresh security context on security change. Error %v", err)) } } @@ -5018,6 +5026,7 @@ func (idx *indexer) initServiceAddressMap() { ServiceAddrMap[common.INDEX_ADMIN_SERVICE] = idx.config["adminPort"].String() ServiceAddrMap[common.INDEX_SCAN_SERVICE] = idx.config["scanPort"].String() ServiceAddrMap[common.INDEX_HTTP_SERVICE] = idx.config["httpPort"].String() + ServiceAddrMap[common.INDEX_HTTPS_SERVICE] = idx.config["httpsPort"].String() common.SetServicePorts(ServiceAddrMap) } diff --git a/secondary/indexer/kv_sender.go b/secondary/indexer/kv_sender.go index 9b4913937..764dd57d2 100644 --- a/secondary/indexer/kv_sender.go +++ b/secondary/indexer/kv_sender.go @@ -1363,7 +1363,7 @@ func (k *kvSender) getAllVbucketsInCluster(bucket string) ([]uint32, []string, e return nil, nil, err } else { vbs = append(vbs, vbnos...) - addr, err := cinfo.GetServiceAddress(nid, "projector") + addr, err := cinfo.GetServiceAddress(nid, "projector", true) if err != nil { return nil, nil, err } @@ -1384,7 +1384,7 @@ func (k *kvSender) getAllProjectorAddrs() ([]string, error) { var addrList []string for _, nid := range nodes { - addr, err := cinfo.GetServiceAddress(nid, "projector") + addr, err := cinfo.GetServiceAddress(nid, "projector", true) if err != nil { return nil, err } @@ -1430,7 +1430,7 @@ func (k *kvSender) getProjAddrsForVbuckets(bucket string, vbnos []Vbucket) ([]st } if found { - addr, err := cinfo.GetServiceAddress(n, "projector") + addr, err := cinfo.GetServiceAddress(n, "projector", true) if err != nil { return nil, err } diff --git a/secondary/indexer/rebalance_service_manager.go b/secondary/indexer/rebalance_service_manager.go index c2383adad..368bf9c7a 100644 --- a/secondary/indexer/rebalance_service_manager.go +++ b/secondary/indexer/rebalance_service_manager.go @@ -1522,10 +1522,10 @@ func (m *ServiceMgr) registerGlobalRebalanceToken(change service.TopologyChange) for _, nid := range nids { - addr, err := m.cinfo.GetServiceAddress(nid, c.INDEX_HTTP_SERVICE) + addr, err := m.cinfo.GetServiceAddress(nid, c.INDEX_HTTP_SERVICE, true) if err == nil { - localaddr, err := m.cinfo.GetLocalServiceAddress(c.INDEX_HTTP_SERVICE) + localaddr, err := m.cinfo.GetLocalServiceAddress(c.INDEX_HTTP_SERVICE, true) if err != nil { l.Errorf("ServiceMgr::registerGlobalRebalanceToken Error Fetching Local Service Address %v", err) return errors.New(fmt.Sprintf("Fail to retrieve http endpoint for local node %v", err)), true @@ -2837,14 +2837,15 @@ func (m *ServiceMgr) getNodeIdFromDest(dest string) (string, error) { for _, nid := range nids { - maddr, err := m.cinfo.GetServiceAddress(nid, "mgmt") + // TODO: Check this when user can specify encrypted port from query. + maddr, err := m.cinfo.GetServiceAddress(nid, "mgmt", false) if err != nil { return "", err } if maddr == dest { - haddr, err := m.cinfo.GetServiceAddress(nid, c.INDEX_HTTP_SERVICE) + haddr, err := m.cinfo.GetServiceAddress(nid, c.INDEX_HTTP_SERVICE, true) if err != nil { return "", err } diff --git a/secondary/indexer/rebalancer.go b/secondary/indexer/rebalancer.go index 519d67698..7216fbb9f 100644 --- a/secondary/indexer/rebalancer.go +++ b/secondary/indexer/rebalancer.go @@ -1801,7 +1801,7 @@ func checkAllIndexersWarmedup(clusterURL string) (bool, []string) { for _, nid := range nids { - addr, err := cinfo.GetServiceAddress(nid, c.INDEX_HTTP_SERVICE) + addr, err := cinfo.GetServiceAddress(nid, c.INDEX_HTTP_SERVICE, true) if err == nil { resp, err := getWithAuth(addr + url) diff --git a/secondary/manager/lifecycle.go b/secondary/manager/lifecycle.go index 4a6540ade..3e676dd8f 100644 --- a/secondary/manager/lifecycle.go +++ b/secondary/manager/lifecycle.go @@ -3872,17 +3872,17 @@ func (m *LifecycleMgr) getServiceMap() (*client.ServiceMap, error) { } srvMap.IndexerId = string(id) - srvMap.ScanAddr, err = cinfo.GetLocalServiceAddress(common.INDEX_SCAN_SERVICE) + srvMap.ScanAddr, err = cinfo.GetLocalServiceAddress(common.INDEX_SCAN_SERVICE, true) if err != nil { return nil, err } - srvMap.HttpAddr, err = cinfo.GetLocalServiceAddress(common.INDEX_HTTP_SERVICE) + srvMap.HttpAddr, err = cinfo.GetLocalServiceAddress(common.INDEX_HTTP_SERVICE, true) if err != nil { return nil, err } - srvMap.AdminAddr, err = cinfo.GetLocalServiceAddress(common.INDEX_ADMIN_SERVICE) + srvMap.AdminAddr, err = cinfo.GetLocalServiceAddress(common.INDEX_ADMIN_SERVICE, true) if err != nil { return nil, err } diff --git a/secondary/manager/manager.go b/secondary/manager/manager.go index c6caf30c1..a4df171b8 100644 --- a/secondary/manager/manager.go +++ b/secondary/manager/manager.go @@ -183,7 +183,7 @@ func NewIndexManagerInternal(config common.Config, storageMode common.StorageMod cinfo.RLock() defer cinfo.RUnlock() - adminPort, err := cinfo.GetLocalServicePort(common.INDEX_ADMIN_SERVICE) + adminPort, err := cinfo.GetLocalServicePort(common.INDEX_ADMIN_SERVICE, true) if err != nil { mgr.Close() return nil, err diff --git a/secondary/manager/request_handler.go b/secondary/manager/request_handler.go index 944aa1e8b..92fb51d84 100644 --- a/secondary/manager/request_handler.go +++ b/secondary/manager/request_handler.go @@ -713,27 +713,29 @@ func (m *requestHandlerContext) getIndexStatus(creds cbauth.Creds, t *target, ge for _, nid := range nids { // mgmtAddr is this node's "cluster" address (host:uiPort), NOT a key for caches - mgmtAddr, err := cinfo.GetServiceAddress(nid, "mgmt") + // TODO: Check this when user can specify encrypted port from query + mgmtAddr, err := cinfo.GetServiceAddress(nid, "mgmt", false) if err != nil { logging.Errorf("RequestHandler::getIndexStatus: Error from GetServiceAddress (mgmt) for node id %v. Error = %v", nid, err) continue } - addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE) + addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true) if err != nil { - logging.Debugf("RequestHandler::getIndexStatus: Error from GetServiceAddress (indexHttp) for node id %v. Error = %v", nid, err) + logging.Debugf("RequestHandler::getIndexStatus: Error from GetServiceAddress(indexHttp, true) for node id %v. Error = %v", nid, err) failedNodes = append(failedNodes, mgmtAddr) continue } - u, err := security.GetURL(addr) + // Use same un-encrypted port number as key, even if the encryption level changes in between + addrForKey, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, false) if err != nil { - logging.Debugf("RequestHandler::getIndexStatus: Fail to parse URL %v", addr) + logging.Debugf("RequestHandler::getIndexStatus: Error from GetServiceAddress(indexHttp, false) for node id %v. Error = %v", nid, err) failedNodes = append(failedNodes, mgmtAddr) continue } - hostname := u.Host + hostname := addrForKey hostKey := host2key(hostname) // key to caches keepKeys = append(keepKeys, hostKey) stale := false @@ -1295,7 +1297,7 @@ func (m *requestHandlerContext) getIndexMetadata(creds cbauth.Creds, t *target) for i, nid := range nids { - addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE) + addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true) if err == nil { url := "/getLocalIndexMetadata" @@ -2492,7 +2494,7 @@ func (m *requestHandlerContext) getLocalIndexMetadataForNode(addr string, host s var latest *LocalIndexMetadata nids := cinfo.GetNodesByServiceType(common.INDEX_HTTP_SERVICE) for _, nid := range nids { - addr, err1 := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE) + addr, err1 := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true) if err1 == nil { cached, err1 := m.getCachedLocalIndexMetadataFromREST(addr, host) if cached != nil && err1 == nil { @@ -2727,7 +2729,7 @@ func (m *requestHandlerContext) getStatsForNode(addr string, host string, cinfo var latest *common.Statistics nids := cinfo.GetNodesByServiceType(common.INDEX_HTTP_SERVICE) for _, nid := range nids { - addr, err1 := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE) + addr, err1 := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true) if err1 == nil { cached, err1 := m.getCachedStatsFromREST(addr, host) if cached != nil && err1 == nil { @@ -3276,7 +3278,7 @@ func (m *requestHandlerContext) bucketBackupHandler(bucket, include, exclude str cinfo.RLock() defer cinfo.RUnlock() - addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE) + addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true) if err == nil { url := "/getLocalIndexMetadata?bucket=" + u.QueryEscape(bucket) if len(include) != 0 { @@ -3336,7 +3338,7 @@ func (m *requestHandlerContext) bucketBackupHandler(bucket, include, exclude str localMeta := new(LocalIndexMetadata) status := convertResponse(resp, localMeta) if status == RESP_ERROR { - addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE) + addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true) if err != nil { return nil, errors.New(fmt.Sprintf("Fail to retrieve local metadata from node id %v.", nid)) } else { @@ -3660,7 +3662,8 @@ func (s *schedTokenMonitor) getNodeAddr(token *mc.ScheduleCreateToken) (string, fetched = true } - return s.cinfo.GetServiceAddress(nid, "mgmt") + // TODO: Check impact of having an un encrypted port here + return s.cinfo.GetServiceAddress(nid, "mgmt", false) } func (s *schedTokenMonitor) makeIndexStatus(token *mc.ScheduleCreateToken) *IndexStatus { diff --git a/secondary/manager/test/restore/testclient.go b/secondary/manager/test/restore/testclient.go index f81747189..dd0e8df07 100644 --- a/secondary/manager/test/restore/testclient.go +++ b/secondary/manager/test/restore/testclient.go @@ -1,30 +1,31 @@ -package main +package main import ( + "bytes" + "encoding/json" "flag" - "net/http" "log" - "bytes" + "net/http" "strings" - "encoding/json" + "github.com/couchbase/indexing/secondary/common" ) type IndexRequest struct { - Version uint64 `json:"version,omitempty"` - Type string `json:"type,omitempty"` - Index common.IndexDefn `json:"index,omitempty"` + Version uint64 `json:"version,omitempty"` + Type string `json:"type,omitempty"` + Index common.IndexDefn `json:"index,omitempty"` } // // main function // func main() { - var curl string + var curl string var cmd string var meta string - var host string - var defnId uint64 + var host string + var defnId uint64 flag.StringVar(&curl, "curl", "", "cluster url") flag.StringVar(&cmd, "cmd", "", "command=getIndexStatus/getIndexMetadata/restoreIndexMetadata/dropIndex") @@ -36,12 +37,12 @@ func main() { cinfo, err := common.NewClusterInfoCache(curl, "default") if err != nil { log.Printf("%v", err) - return - } + return + } if err := cinfo.Fetch(); err != nil { log.Printf("%v", err) - return + return } nodes := cinfo.GetNodesByServiceType("indexHttp") @@ -49,86 +50,86 @@ func main() { log.Printf("There is no couchbase server running with indexer service") return } - - indexHttp, err := cinfo.GetServiceAddress(nodes[0], "indexHttp") + + indexHttp, err := cinfo.GetServiceAddress(nodes[0], "indexHttp", false) if err != nil { log.Printf("%v", err) - return + return } if cmd == "getIndexStatus" || cmd == "getIndexMetadata" { - + resp, err := http.Get("http://" + indexHttp + "/" + cmd) if err != nil { log.Printf("%v", err) return } - - buf := new(bytes.Buffer) - if _, err := buf.ReadFrom(resp.Body); err != nil { - log.Printf("%v", err) - return - } - + + buf := new(bytes.Buffer) + if _, err := buf.ReadFrom(resp.Body); err != nil { + log.Printf("%v", err) + return + } + log.Printf("%v", string(buf.Bytes())) - + } else if cmd == "restoreIndexMetadata" { - + bodybuf := bytes.NewBuffer([]byte(meta)) - resp, err := http.Post("http://" + indexHttp + "/" + cmd, "application/json", bodybuf) + resp, err := http.Post("http://"+indexHttp+"/"+cmd, "application/json", bodybuf) if err != nil { log.Printf("%v", err) return } - - buf := new(bytes.Buffer) - if _, err := buf.ReadFrom(resp.Body); err != nil { - log.Printf("%v", err) - return - } - + + buf := new(bytes.Buffer) + if _, err := buf.ReadFrom(resp.Body); err != nil { + log.Printf("%v", err) + return + } + log.Printf("%v", string(buf.Bytes())) - + } else if cmd == "dropIndex" { for _, id := range nodes { - indexHttp, err := cinfo.GetServiceAddress(id, "indexHttp") + indexHttp, err := cinfo.GetServiceAddress(id, "indexHttp", false) if err != nil { log.Printf("%v", err) - return + return } - + if strings.HasPrefix(indexHttp, host) { - - defn := common.IndexDefn{DefnId : common.IndexDefnId(defnId)} - request := &IndexRequest{Type : "drop", Index : defn} - - body, err := json.Marshal(&request) - if err != nil { - log.Printf("%v", err) - return - } - + + defn := common.IndexDefn{DefnId: common.IndexDefnId(defnId)} + request := &IndexRequest{Type: "drop", Index: defn} + + body, err := json.Marshal(&request) + if err != nil { + log.Printf("%v", err) + return + } + log.Printf("dialing http://" + indexHttp + "/" + cmd) - + bodybuf := bytes.NewBuffer(body) - resp, err := http.Post("http://" + indexHttp + "/" + cmd, "application/json", bodybuf) + resp, err := http.Post("http://"+indexHttp+"/"+cmd, "application/json", bodybuf) if err != nil { log.Printf("%v", err) return } - - buf := new(bytes.Buffer) - if _, err := buf.ReadFrom(resp.Body); err != nil { - log.Printf("%v", err) - return - } - + + buf := new(bytes.Buffer) + if _, err := buf.ReadFrom(resp.Body); err != nil { + log.Printf("%v", err) + return + } + log.Printf("%v", string(buf.Bytes())) return } } - - log.Printf("Cannot find matching host %d", host) + + log.Printf("Cannot find matching host %d", host) } } diff --git a/secondary/planner/proxy.go b/secondary/planner/proxy.go index ff45ab549..56028690d 100644 --- a/secondary/planner/proxy.go +++ b/secondary/planner/proxy.go @@ -225,7 +225,7 @@ func getIndexLayout(config common.Config, hosts []string) ([]*IndexerNode, error node.ServerGroup = cinfo.GetServerGroup(nid) // obtain the admin port for the indexer node - addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE) + addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true) if err != nil { logging.Errorf("Planner::getIndexLayout: Error from getting service address for node %v, err: = %v", node.NodeId, err) return nil, err @@ -891,7 +891,8 @@ func createIndexerNode(cinfo *common.ClusterInfoCache, nid common.NodeId) (*Inde // func getIndexerHost(cinfo *common.ClusterInfoCache, nid common.NodeId) (string, error) { - addr, err := cinfo.GetServiceAddress(nid, "mgmt") + // TODO: Check this when user can specify encrypted port from query + addr, err := cinfo.GetServiceAddress(nid, "mgmt", false) if err != nil { return "", err } @@ -1796,7 +1797,7 @@ func restHelperNoLock(rest func(string) (*http.Response, error), hosts []string, for _, nid := range nids { // obtain the admin port for the indexer node - addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE) + addr, err := cinfo.GetServiceAddress(nid, common.INDEX_HTTP_SERVICE, true) if err != nil { logging.Errorf("Planner::restHelperNoLock: Error from getting service address for node %v. Error = %v", nid, err) return nil, err diff --git a/secondary/projector/feed.go b/secondary/projector/feed.go index 011790bbd..ad03c048f 100644 --- a/secondary/projector/feed.go +++ b/secondary/projector/feed.go @@ -1809,9 +1809,9 @@ func (feed *Feed) getLocalKVAddrs( cinfo.RLock() defer cinfo.RUnlock() - kvaddr, err := cinfo.GetLocalServiceAddress("kv") + kvaddr, err := cinfo.GetLocalServiceAddress("kv", false) if err != nil { - fmsg := "%v ##%x cinfo.GetLocalServiceAddress(`kv`): %v\n" + fmsg := "%v ##%x cinfo.GetLocalServiceAddress(`kv`, false): %v\n" logging.Errorf(fmsg, prefix, opaque, err) // Force fetch cluster info cache incase it was not syncronized properly, diff --git a/secondary/queryport/client/cbq_client.go b/secondary/queryport/client/cbq_client.go index 5443277d4..16dd3795f 100644 --- a/secondary/queryport/client/cbq_client.go +++ b/secondary/queryport/client/cbq_client.go @@ -3,19 +3,22 @@ package client -import "net/http" -import "encoding/json" -import "bytes" -import "fmt" -import "io/ioutil" -import "errors" -import "strings" -import "sync" -import "math" - -import "github.com/couchbase/indexing/secondary/logging" -import "github.com/couchbase/indexing/secondary/common" -import mclient "github.com/couchbase/indexing/secondary/manager/client" +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "math" + "net/http" + "strings" + "sync" + + "github.com/couchbase/indexing/secondary/common" + "github.com/couchbase/indexing/secondary/logging" + + mclient "github.com/couchbase/indexing/secondary/manager/client" +) // indexError for a failed index-request. type indexError struct { @@ -67,11 +70,11 @@ func newCbqClient(cluster string) (*cbqClient, error) { err := fmt.Errorf("cinfo.GetNodesByServiceType() returns %d nodes", l) return nil, err } - adminport, err := cinfo.GetServiceAddress(nodes[0], "indexAdmin") + adminport, err := cinfo.GetServiceAddress(nodes[0], "indexAdmin", true) if err != nil { return nil, err } - queryport, err := cinfo.GetServiceAddress(nodes[0], "indexScan") + queryport, err := cinfo.GetServiceAddress(nodes[0], "indexScan", true) if err != nil { return nil, err } diff --git a/secondary/queryport/client/meta_client.go b/secondary/queryport/client/meta_client.go index c485433ec..ddeb6f96c 100644 --- a/secondary/queryport/client/meta_client.go +++ b/secondary/queryport/client/meta_client.go @@ -1966,7 +1966,7 @@ func getIndexerAdminports(cinfo *common.ClusterInfoCache) ([]string, int, int, i } logging.Verbosef("node %v status: %q", node, status) if status == "healthy" || status == "active" || status == "warmup" { - adminport, err := cinfo.GetServiceAddress(node, "indexAdmin") + adminport, err := cinfo.GetServiceAddress(node, "indexAdmin", true) if err != nil { return nil, 0, 0, 0, 0, err } diff --git a/secondary/tests/framework/secondaryindex/secondaryindexstats.go b/secondary/tests/framework/secondaryindex/secondaryindexstats.go index 6548c9321..78b5327dc 100644 --- a/secondary/tests/framework/secondaryindex/secondaryindexstats.go +++ b/secondary/tests/framework/secondaryindex/secondaryindexstats.go @@ -40,7 +40,7 @@ func GetIndexerNodesHttpAddresses(hostaddress string) ([]string, error) { node_ids := cinfo.GetNodesByServiceType(c.INDEX_HTTP_SERVICE) indexNodes := []string{} for _, node_id := range node_ids { - addr, _ := cinfo.GetServiceAddress(node_id, c.INDEX_HTTP_SERVICE) + addr, _ := cinfo.GetServiceAddress(node_id, c.INDEX_HTTP_SERVICE, true) indexNodes = append(indexNodes, addr) } diff --git a/secondary/tools/datapath/datapath.go b/secondary/tools/datapath/datapath.go index 60cf08411..e1979dbf1 100644 --- a/secondary/tools/datapath/datapath.go +++ b/secondary/tools/datapath/datapath.go @@ -1,19 +1,24 @@ package main -import "flag" -import "fmt" -import "log" -import "os" -import "strings" -import "time" - -import "github.com/couchbase/indexing/secondary/logging" -import c "github.com/couchbase/indexing/secondary/common" -import "github.com/couchbase/indexing/secondary/dataport" -import "github.com/couchbase/indexing/secondary/projector" -import "github.com/couchbase/cbauth" -import projc "github.com/couchbase/indexing/secondary/projector/client" -import protobuf "github.com/couchbase/indexing/secondary/protobuf/projector" +import ( + "flag" + "fmt" + "log" + "os" + "strings" + "time" + + "github.com/couchbase/indexing/secondary/logging" + + "github.com/couchbase/cbauth" + c "github.com/couchbase/indexing/secondary/common" + "github.com/couchbase/indexing/secondary/dataport" + "github.com/couchbase/indexing/secondary/projector" + + projc "github.com/couchbase/indexing/secondary/projector/client" + + protobuf "github.com/couchbase/indexing/secondary/protobuf/projector" +) var pooln = "default" @@ -167,7 +172,7 @@ func getProjectorAdminport(cluster, pooln string) string { log.Fatal(err) } nodeID := cinfo.GetCurrentNode() - adminport, err := cinfo.GetServiceAddress(nodeID, "projector") + adminport, err := cinfo.GetServiceAddress(nodeID, "projector", false) if err != nil { log.Fatal(err) } diff --git a/secondary/tools/multibuckets/multibuckets.go b/secondary/tools/multibuckets/multibuckets.go index e2ce57652..16c2187db 100644 --- a/secondary/tools/multibuckets/multibuckets.go +++ b/secondary/tools/multibuckets/multibuckets.go @@ -1,21 +1,27 @@ package main -import "flag" -import "fmt" -import "log" -import "os" -import "strings" -import "time" -import "sync" - -import "github.com/couchbase/cbauth" -import "github.com/couchbase/indexing/secondary/logging" -import c "github.com/couchbase/indexing/secondary/common" -import "github.com/couchbase/indexing/secondary/dataport" -import "github.com/couchbase/indexing/secondary/projector" -import projc "github.com/couchbase/indexing/secondary/projector/client" -import protobuf "github.com/couchbase/indexing/secondary/protobuf/projector" -import data "github.com/couchbase/indexing/secondary/protobuf/data" +import ( + "flag" + "fmt" + "log" + "os" + "strings" + "sync" + "time" + + "github.com/couchbase/cbauth" + "github.com/couchbase/indexing/secondary/logging" + + c "github.com/couchbase/indexing/secondary/common" + "github.com/couchbase/indexing/secondary/dataport" + "github.com/couchbase/indexing/secondary/projector" + + projc "github.com/couchbase/indexing/secondary/projector/client" + + protobuf "github.com/couchbase/indexing/secondary/protobuf/projector" + + data "github.com/couchbase/indexing/secondary/protobuf/data" +) var pooln = "default" @@ -221,7 +227,7 @@ func getProjectorAdminport(cluster, pooln string) string { log.Fatal(err) } nodeID := cinfo.GetCurrentNode() - adminport, err := cinfo.GetServiceAddress(nodeID, "projector") + adminport, err := cinfo.GetServiceAddress(nodeID, "projector", false) if err != nil { log.Fatal(err) } diff --git a/secondary/tools/recovery/recovery.go b/secondary/tools/recovery/recovery.go index 5d9ce4f1c..5961cbd47 100644 --- a/secondary/tools/recovery/recovery.go +++ b/secondary/tools/recovery/recovery.go @@ -1,20 +1,26 @@ package main -import "flag" -import "fmt" -import "log" -import "os" -import "strings" -import "time" - -import "github.com/couchbase/cbauth" -import "github.com/couchbase/indexing/secondary/logging" -import c "github.com/couchbase/indexing/secondary/common" -import "github.com/couchbase/indexing/secondary/dataport" -import "github.com/couchbase/indexing/secondary/projector" -import projc "github.com/couchbase/indexing/secondary/projector/client" -import protobuf "github.com/couchbase/indexing/secondary/protobuf/projector" -import data "github.com/couchbase/indexing/secondary/protobuf/data" +import ( + "flag" + "fmt" + "log" + "os" + "strings" + "time" + + "github.com/couchbase/cbauth" + "github.com/couchbase/indexing/secondary/logging" + + c "github.com/couchbase/indexing/secondary/common" + "github.com/couchbase/indexing/secondary/dataport" + "github.com/couchbase/indexing/secondary/projector" + + projc "github.com/couchbase/indexing/secondary/projector/client" + + protobuf "github.com/couchbase/indexing/secondary/protobuf/projector" + + data "github.com/couchbase/indexing/secondary/protobuf/data" +) var pooln = "default" @@ -225,7 +231,7 @@ func getProjectorAdminport(cluster, pooln string) string { log.Fatal("error cluster-info: %v", err) } nodeId := cinfo.GetCurrentNode() - adminport, err := cinfo.GetServiceAddress(nodeId, "projector") + adminport, err := cinfo.GetServiceAddress(nodeId, "projector", false) if err != nil { log.Fatal(err) }