diff --git a/Makefile b/Makefile index c3f5de7041..6745ec1659 100644 --- a/Makefile +++ b/Makefile @@ -52,6 +52,7 @@ CNSFILES = \ $(wildcard cns/requestcontroller/*.go) \ $(wildcard cns/requestcontroller/kubecontroller/*.go) \ $(wildcard cns/fakes/*.go) \ + $(wildcard cns/dncclient/*.go) \ $(COREFILES) \ $(CNMFILES) diff --git a/cns/api.go b/cns/api.go index 113fcc7b3d..cff1daf020 100644 --- a/cns/api.go +++ b/cns/api.go @@ -36,7 +36,7 @@ type HTTPService interface { common.ServiceAPI SendNCSnapShotPeriodically(int, chan bool) SetNodeOrchestrator(*SetOrchestratorTypeRequest) - SyncNodeStatus(string, string, string, json.RawMessage) (int, string) + SyncNodeNcStatus(string, string, string, json.RawMessage) (int, string) GetPendingProgramIPConfigs() []IPConfigurationStatus GetAvailableIPConfigs() []IPConfigurationStatus GetAllocatedIPConfigs() []IPConfigurationStatus diff --git a/cns/dncclient/dncclient.go b/cns/dncclient/dncclient.go new file mode 100644 index 0000000000..54e31922fb --- /dev/null +++ b/cns/dncclient/dncclient.go @@ -0,0 +1,105 @@ +package dncclient + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "runtime" + "time" + + "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/logger" + acn "github.com/Azure/azure-container-networking/common" +) + +const ( + // RegisterNodeURLFmt: /networks/{infraNetworkID}/node/{nodeID} + registerNodeURLFmt = "%s/networks/%s/node/%s%s" + // SyncNodeNetworkContainersURLFmt: /networks/{infraNetworkID}/node/{nodeID}/networkcontainers + syncNodeNetworkContainersURLFmt = "%s/networks/%s/node/%s/networkcontainers%s" + + dncAPIVersion = "?api-version=2018-03-01" + registerNodeRetryInterval = 5 * time.Second +) + +// NodeRegistrationRequest - Struct to hold node registration request. +type NodeRegistrationRequest struct { + NumCores int `json:"NumCores"` +} + +// RegisterNode registers the node with managed DNC +func RegisterNode(httpRestService cns.HTTPService, dncEP, infraVnet, nodeID string) { + logger.Printf("[dncclient] Registering node: %s with Infrastructure Network: %s dncEP: %s", nodeID, infraVnet, dncEP) + + var ( + registerNodeURL = fmt.Sprintf(registerNodeURLFmt, dncEP, infraVnet, nodeID, dncAPIVersion) + body bytes.Buffer + httpc = acn.GetHttpClient() + ) + + // Create a body with number of CPU cores + nodeRegistrationRequest := NodeRegistrationRequest{ + NumCores: runtime.NumCPU(), + } + json.NewEncoder(&body).Encode(nodeRegistrationRequest) + + for { + orchestratorDetails, err := registerNode(httpc, registerNodeURL, &body) + if err != nil { + logger.Errorf("[dncclient] Failed to register node: %s with error: %+v", nodeID, err) + // todo: make this interval configurable + time.Sleep(registerNodeRetryInterval) + continue + } + + httpRestService.SetNodeOrchestrator(&orchestratorDetails) + break + } + + logger.Printf("[dncclient] Successfully registered node: %s", nodeID) +} + +func registerNode(httpCl *http.Client, url string, body io.Reader) (cns.SetOrchestratorTypeRequest, error) { + var orchestratorDetails cns.SetOrchestratorTypeRequest + response, err := httpCl.Post(url, acn.JsonContent, body) + if err != nil { + return orchestratorDetails, err + } + + defer response.Body.Close() + + if response.StatusCode != http.StatusOK { + return orchestratorDetails, + fmt.Errorf("[dncclient] Failed to register node with http status code: %d", response.StatusCode) + } + + _ = json.NewDecoder(response.Body).Decode(&orchestratorDetails) + return orchestratorDetails, nil +} + +// SyncNodeNcStatus retrieves the NCs scheduled on this node by DNC +func SyncNodeNcStatus(dncEP, infraVnet, nodeID string) (cns.NodeInfoResponse, error) { + var ( + syncNodeNcStatusURL = fmt.Sprintf(syncNodeNetworkContainersURLFmt, dncEP, infraVnet, nodeID, dncAPIVersion) + nodeInfoResponse cns.NodeInfoResponse + httpc = acn.GetHttpClient() + ) + + logger.Printf("[dncclient] SyncNodeNcStatus: Node: %s, InfraVnet: %s", nodeID, infraVnet) + + response, err := httpc.Get(syncNodeNcStatusURL) + if err != nil { + return nodeInfoResponse, err + } + + if response.StatusCode == http.StatusOK { + err = json.NewDecoder(response.Body).Decode(&nodeInfoResponse) + } else { + err = fmt.Errorf("%d", response.StatusCode) + } + response.Body.Close() + + return nodeInfoResponse, err +} diff --git a/cns/fakes/cnsfake.go b/cns/fakes/cnsfake.go index d87a47748f..5ea9c9c7a9 100644 --- a/cns/fakes/cnsfake.go +++ b/cns/fakes/cnsfake.go @@ -221,7 +221,7 @@ func (fake *HTTPServiceFake) SetNodeOrchestrator(*cns.SetOrchestratorTypeRequest } -func (fake *HTTPServiceFake) SyncNodeStatus(string, string, string, json.RawMessage) (int, string) { +func (fake *HTTPServiceFake) SyncNodeNcStatus(string, string, string, json.RawMessage) (int, string) { return 0, "" } diff --git a/cns/restserver/internalapi.go b/cns/restserver/internalapi.go index 9d2a4ffb3d..470c203a90 100644 --- a/cns/restserver/internalapi.go +++ b/cns/restserver/internalapi.go @@ -12,6 +12,7 @@ import ( "reflect" "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/dncclient" "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/nmagentclient" "github.com/Azure/azure-container-networking/common" @@ -39,30 +40,10 @@ func (service *HTTPRestService) SetNodeOrchestrator(r *cns.SetOrchestratorTypeRe service.setOrchestratorType(httptest.NewRecorder(), req) } -// SyncNodeStatus :- Retrieve the latest node state from DNC & returns the first occurence of returnCode and error with respect to contextFromCNI -func (service *HTTPRestService) SyncNodeStatus(dncEP, infraVnet, nodeID string, contextFromCNI json.RawMessage) (returnCode int, errStr string) { - logger.Printf("[Azure CNS] SyncNodeStatus") - var ( - response *http.Response - err error - nodeInfoResponse cns.NodeInfoResponse - req *http.Request - body []byte - httpc = common.GetHttpClient() - ) - - // try to retrieve NodeInfoResponse from mDNC - response, err = httpc.Get(fmt.Sprintf(common.SyncNodeNetworkContainersURLFmt, dncEP, infraVnet, nodeID, dncApiVersion)) - if err == nil { - if response.StatusCode == http.StatusOK { - err = json.NewDecoder(response.Body).Decode(&nodeInfoResponse) - } else { - err = fmt.Errorf("%d", response.StatusCode) - } - - response.Body.Close() - } - +// SyncNodeNcStatus :- Retrieve the latest NCs scheduled on this node by DNC & returns the first occurence of returnCode and error with respect to contextFromCNI +func (service *HTTPRestService) SyncNodeNcStatus(dncEP, infraVnet, nodeID string, contextFromCNI json.RawMessage) (returnCode int, errStr string) { + logger.Printf("[Azure CNS] SyncNodeNcStatus") + nodeInfoResponse, err := dncclient.SyncNodeNcStatus(dncEP, infraVnet, nodeID) if err != nil { returnCode = UnexpectedError errStr = fmt.Sprintf("[Azure-CNS] Failed to sync node with error: %+v", err) @@ -104,8 +85,8 @@ func (service *HTTPRestService) SyncNodeStatus(dncEP, infraVnet, nodeID string, ncVersionURLs.Store(nc.NetworkContainerid, versionURL) waitingForUpdate, _, _ := service.isNCWaitingForUpdate(nc.Version, nc.NetworkContainerid) - body, _ = json.Marshal(nc) - req, _ = http.NewRequest(http.MethodPost, "", bytes.NewBuffer(body)) + body, _ := json.Marshal(nc) + req, _ := http.NewRequest(http.MethodPost, "", bytes.NewBuffer(body)) req.Header.Set(common.ContentType, common.JsonContent) service.createOrUpdateNetworkContainer(w, req) if w.Result().StatusCode == http.StatusOK { @@ -129,7 +110,7 @@ func (service *HTTPRestService) SyncNodeStatus(dncEP, infraVnet, nodeID string, var body bytes.Buffer json.NewEncoder(&body).Encode(&cns.DeleteNetworkContainerRequest{NetworkContainerid: nc}) - req, err = http.NewRequest(http.MethodPost, "", &body) + req, err := http.NewRequest(http.MethodPost, "", &body) if err == nil { req.Header.Set(common.JsonContent, common.JsonContent) service.deleteNetworkContainer(httptest.NewRecorder(), req) diff --git a/cns/restserver/util.go b/cns/restserver/util.go index 2af74d14c1..aecfe67684 100644 --- a/cns/restserver/util.go +++ b/cns/restserver/util.go @@ -384,7 +384,7 @@ func (service *HTTPRestService) getNetworkContainerResponse(req cns.GetNetworkCo ) service.Unlock() - getNetworkContainerResponse.Response.ReturnCode, getNetworkContainerResponse.Response.Message = service.SyncNodeStatus(dncEP, infraVnet, nodeID, req.OrchestratorContext) + getNetworkContainerResponse.Response.ReturnCode, getNetworkContainerResponse.Response.Message = service.SyncNodeNcStatus(dncEP, infraVnet, nodeID, req.OrchestratorContext) service.Lock() if getNetworkContainerResponse.Response.ReturnCode == NotFound { return getNetworkContainerResponse @@ -508,7 +508,7 @@ func (service *HTTPRestService) attachOrDetachHelper(req cns.ConfigureContainerN nodeID = service.GetOption(acn.OptNodeID).(string) ) - returnCode, msg := service.SyncNodeStatus(dncEP, infraVnet, nodeID, json.RawMessage{}) + returnCode, msg := service.SyncNodeNcStatus(dncEP, infraVnet, nodeID, json.RawMessage{}) if returnCode != Success { return cns.Response{ ReturnCode: returnCode, diff --git a/cns/service/main.go b/cns/service/main.go index a07570c4a2..d9c8f0d9cb 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -4,20 +4,17 @@ package main import ( - "bytes" "context" "encoding/json" "fmt" - localtls "github.com/Azure/azure-container-networking/server/tls" - "net/http" "os" "os/signal" - "runtime" - "strconv" "strings" "syscall" "time" + localtls "github.com/Azure/azure-container-networking/server/tls" + "github.com/Azure/azure-container-networking/cns/ipampoolmonitor" "github.com/Azure/azure-container-networking/aitelemetry" @@ -27,6 +24,7 @@ import ( "github.com/Azure/azure-container-networking/cns/cnsclient" "github.com/Azure/azure-container-networking/cns/common" "github.com/Azure/azure-container-networking/cns/configuration" + "github.com/Azure/azure-container-networking/cns/dncclient" "github.com/Azure/azure-container-networking/cns/hnsclient" "github.com/Azure/azure-container-networking/cns/imdsclient" "github.com/Azure/azure-container-networking/cns/logger" @@ -45,7 +43,6 @@ const ( pluginName = "azure-vnet" defaultCNINetworkConfigFileName = "10-azure.conflist" configFileName = "config.json" - dncApiVersion = "?api-version=2018-03-01" poolIPAMRefreshRateInMilliseconds = 1000 ) @@ -246,45 +243,6 @@ func printVersion() { fmt.Printf("Version %v\n", version) } -// Try to register node with DNC when CNS is started in managed DNC mode -func registerNode(httpRestService cns.HTTPService, dncEP, infraVnet, nodeID string) { - logger.Printf("[Azure CNS] Registering node %s with Infrastructure Network: %s PrivateEndpoint: %s", nodeID, infraVnet, dncEP) - - var ( - numCPU = runtime.NumCPU() - url = fmt.Sprintf(acn.RegisterNodeURLFmt, dncEP, infraVnet, nodeID, numCPU, dncApiVersion) - response *http.Response - err = fmt.Errorf("") - body bytes.Buffer - httpc = acn.GetHttpClient() - ) - - for sleep := true; err != nil; sleep = true { - response, err = httpc.Post(url, "application/json", &body) - if err == nil { - if response.StatusCode == http.StatusCreated { - var req cns.SetOrchestratorTypeRequest - json.NewDecoder(response.Body).Decode(&req) - httpRestService.SetNodeOrchestrator(&req) - sleep = false - } else { - err = fmt.Errorf("[Azure CNS] Failed to register node with http status code %s", strconv.Itoa(response.StatusCode)) - logger.Errorf(err.Error()) - } - - response.Body.Close() - } else { - logger.Errorf("[Azure CNS] Failed to register node with err: %+v", err) - } - - if sleep { - time.Sleep(acn.FiveSeconds) - } - } - - logger.Printf("[Azure CNS] Node Registered") -} - // Main is the entry point for CNS. func main() { // Initialize and parse command line arguments. @@ -460,12 +418,12 @@ func main() { httpRestService.SetOption(acn.OptInfrastructureNetworkID, infravnet) httpRestService.SetOption(acn.OptNodeID, nodeID) - registerNode(httpRestService, privateEndpoint, infravnet, nodeID) + dncclient.RegisterNode(httpRestService, privateEndpoint, infravnet, nodeID) go func(ep, vnet, node string) { // Periodically poll DNC for node updates for { <-time.NewTicker(time.Duration(cnsconfig.ManagedSettings.NodeSyncIntervalInSeconds) * time.Second).C - httpRestService.SyncNodeStatus(ep, vnet, node, json.RawMessage{}) + httpRestService.SyncNodeNcStatus(ep, vnet, node, json.RawMessage{}) } }(privateEndpoint, infravnet, nodeID) } else if config.ChannelMode == cns.CRD { diff --git a/common/utils.go b/common/utils.go index a1e77df6d4..aaf226f507 100644 --- a/common/utils.go +++ b/common/utils.go @@ -22,15 +22,12 @@ import ( ) const ( - metadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-08-01&format=json" - azCloudUrl = "http://169.254.169.254/metadata/instance/compute/azEnvironment?api-version=2018-10-01&format=text" - httpConnectionTimeout = 7 - headerTimeout = 7 - RegisterNodeURLFmt = "%s/%s/node/%s/cores/%d%s" - SyncNodeNetworkContainersURLFmt = "%s/%s/node/%s%s" - FiveSeconds = 5 * time.Second - JsonContent = "application/json; charset=UTF-8" - ContentType = "Content-Type" + metadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-08-01&format=json" + azCloudUrl = "http://169.254.169.254/metadata/instance/compute/azEnvironment?api-version=2018-10-01&format=text" + httpConnectionTimeout = 7 + headerTimeout = 7 + JsonContent = "application/json; charset=UTF-8" + ContentType = "Content-Type" ) // XmlDocument - Azure host agent XML document format.