Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ CNSFILES = \
$(wildcard cns/requestcontroller/*.go) \
$(wildcard cns/requestcontroller/kubecontroller/*.go) \
$(wildcard cns/fakes/*.go) \
$(wildcard cns/dncclient/*.go) \
$(COREFILES) \
$(CNMFILES)

Expand Down
2 changes: 1 addition & 1 deletion cns/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type HTTPService interface {
common.ServiceAPI
SendNCSnapShotPeriodically(int, chan bool)
SetNodeOrchestrator(*SetOrchestratorTypeRequest)
SyncNodeStatus(string, string, string, json.RawMessage) (int, string)
SyncNodeNcStatus(string, string, string, json.RawMessage) (int, string)
GetPendingProgramIPConfigs() []IPConfigurationStatus
GetAvailableIPConfigs() []IPConfigurationStatus
GetAllocatedIPConfigs() []IPConfigurationStatus
Expand Down
105 changes: 105 additions & 0 deletions cns/dncclient/dncclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package dncclient

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"runtime"
"time"

"github.com/Azure/azure-container-networking/cns"
"github.com/Azure/azure-container-networking/cns/logger"
acn "github.com/Azure/azure-container-networking/common"
)

const (
// RegisterNodeURLFmt: /networks/{infraNetworkID}/node/{nodeID}
registerNodeURLFmt = "%s/networks/%s/node/%s%s"
// SyncNodeNetworkContainersURLFmt: /networks/{infraNetworkID}/node/{nodeID}/networkcontainers
syncNodeNetworkContainersURLFmt = "%s/networks/%s/node/%s/networkcontainers%s"

dncAPIVersion = "?api-version=2018-03-01"
registerNodeRetryInterval = 5 * time.Second
)

// NodeRegistrationRequest - Struct to hold node registration request.
type NodeRegistrationRequest struct {
NumCores int `json:"NumCores"`
}

// RegisterNode registers the node with managed DNC
func RegisterNode(httpRestService cns.HTTPService, dncEP, infraVnet, nodeID string) {
logger.Printf("[dncclient] Registering node: %s with Infrastructure Network: %s dncEP: %s", nodeID, infraVnet, dncEP)

var (
registerNodeURL = fmt.Sprintf(registerNodeURLFmt, dncEP, infraVnet, nodeID, dncAPIVersion)
body bytes.Buffer
httpc = acn.GetHttpClient()
)

// Create a body with number of CPU cores
nodeRegistrationRequest := NodeRegistrationRequest{
NumCores: runtime.NumCPU(),
}
json.NewEncoder(&body).Encode(nodeRegistrationRequest)

for {
orchestratorDetails, err := registerNode(httpc, registerNodeURL, &body)
if err != nil {
logger.Errorf("[dncclient] Failed to register node: %s with error: %+v", nodeID, err)
// todo: make this interval configurable
time.Sleep(registerNodeRetryInterval)
continue
}

httpRestService.SetNodeOrchestrator(&orchestratorDetails)
break
}

logger.Printf("[dncclient] Successfully registered node: %s", nodeID)
}

func registerNode(httpCl *http.Client, url string, body io.Reader) (cns.SetOrchestratorTypeRequest, error) {
var orchestratorDetails cns.SetOrchestratorTypeRequest
response, err := httpCl.Post(url, acn.JsonContent, body)
if err != nil {
return orchestratorDetails, err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that this is code that was moved, but perhaps it's a good chance to clean it up since it is a bit hard to follow. My main recommendation would be to extract the body of the loop to a function such as:

func registerNode(httpCl *http.Client, url string, body io.Reader) (cns.SetOrchestratorTypeRequest, error) {
	response, err := httpCl.Post(url, acn.JsonContent, body)
	if err != nil {...}
	defer response.Body.Close()

	if response.StatusCode != http.StatusOK {...}

	var req cns.SetOrchestratorTypeRequest
	_ = json.NewDecoder(response.Body).Decode(&req)
	return req, nil
}

which then makes the loop much simpler:

for {
	req, err := registerNode(httpc, registerNodeURL, &body)
	if err != nil {
		logger.Errorf(err.Error())
		time.Sleep(acn.FiveSeconds)
		continue
	}
	httpRestService.SetNodeOrchestrator(&req)
	break
}

To go a step further, I would consider making a proper DNCClient struct, which embeds reusable dependencies such as an http client. The main benefit of this is that the calling code can then use an interface and stub out the actual implementation when writing tests. That can be a later refactor though. See cnsclient in the dnc repo as an example.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have updated the logic. I will create the DNCClient struct as a part of next PR where I'll handle the msi in the dncclient.

Copy link
Contributor

@vakalapa vakalapa Nov 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored this section in #680 , along with Ramiro's suggestion, i added ticker logic too


defer response.Body.Close()

if response.StatusCode != http.StatusOK {
return orchestratorDetails,
fmt.Errorf("[dncclient] Failed to register node with http status code: %d", response.StatusCode)
}

_ = json.NewDecoder(response.Body).Decode(&orchestratorDetails)
return orchestratorDetails, nil
}

// SyncNodeNcStatus retrieves the NCs scheduled on this node by DNC
func SyncNodeNcStatus(dncEP, infraVnet, nodeID string) (cns.NodeInfoResponse, error) {
var (
syncNodeNcStatusURL = fmt.Sprintf(syncNodeNetworkContainersURLFmt, dncEP, infraVnet, nodeID, dncAPIVersion)
nodeInfoResponse cns.NodeInfoResponse
httpc = acn.GetHttpClient()
)

logger.Printf("[dncclient] SyncNodeNcStatus: Node: %s, InfraVnet: %s", nodeID, infraVnet)

response, err := httpc.Get(syncNodeNcStatusURL)
if err != nil {
return nodeInfoResponse, err
}

if response.StatusCode == http.StatusOK {
err = json.NewDecoder(response.Body).Decode(&nodeInfoResponse)
} else {
err = fmt.Errorf("%d", response.StatusCode)
}
response.Body.Close()

return nodeInfoResponse, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer to return early. it's easier then to read the happy path of the code without much indentation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
2 changes: 1 addition & 1 deletion cns/fakes/cnsfake.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (fake *HTTPServiceFake) SetNodeOrchestrator(*cns.SetOrchestratorTypeRequest

}

func (fake *HTTPServiceFake) SyncNodeStatus(string, string, string, json.RawMessage) (int, string) {
func (fake *HTTPServiceFake) SyncNodeNcStatus(string, string, string, json.RawMessage) (int, string) {
return 0, ""
}

Expand Down
35 changes: 8 additions & 27 deletions cns/restserver/internalapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"reflect"

"github.com/Azure/azure-container-networking/cns"
"github.com/Azure/azure-container-networking/cns/dncclient"
"github.com/Azure/azure-container-networking/cns/logger"
"github.com/Azure/azure-container-networking/cns/nmagentclient"
"github.com/Azure/azure-container-networking/common"
Expand Down Expand Up @@ -39,30 +40,10 @@ func (service *HTTPRestService) SetNodeOrchestrator(r *cns.SetOrchestratorTypeRe
service.setOrchestratorType(httptest.NewRecorder(), req)
}

// SyncNodeStatus :- Retrieve the latest node state from DNC & returns the first occurence of returnCode and error with respect to contextFromCNI
func (service *HTTPRestService) SyncNodeStatus(dncEP, infraVnet, nodeID string, contextFromCNI json.RawMessage) (returnCode int, errStr string) {
logger.Printf("[Azure CNS] SyncNodeStatus")
var (
response *http.Response
err error
nodeInfoResponse cns.NodeInfoResponse
req *http.Request
body []byte
httpc = common.GetHttpClient()
)

// try to retrieve NodeInfoResponse from mDNC
response, err = httpc.Get(fmt.Sprintf(common.SyncNodeNetworkContainersURLFmt, dncEP, infraVnet, nodeID, dncApiVersion))
if err == nil {
if response.StatusCode == http.StatusOK {
err = json.NewDecoder(response.Body).Decode(&nodeInfoResponse)
} else {
err = fmt.Errorf("%d", response.StatusCode)
}

response.Body.Close()
}

// SyncNodeNcStatus :- Retrieve the latest NCs scheduled on this node by DNC & returns the first occurence of returnCode and error with respect to contextFromCNI
func (service *HTTPRestService) SyncNodeNcStatus(dncEP, infraVnet, nodeID string, contextFromCNI json.RawMessage) (returnCode int, errStr string) {
logger.Printf("[Azure CNS] SyncNodeNcStatus")
nodeInfoResponse, err := dncclient.SyncNodeNcStatus(dncEP, infraVnet, nodeID)
if err != nil {
returnCode = UnexpectedError
errStr = fmt.Sprintf("[Azure-CNS] Failed to sync node with error: %+v", err)
Expand Down Expand Up @@ -104,8 +85,8 @@ func (service *HTTPRestService) SyncNodeStatus(dncEP, infraVnet, nodeID string,
ncVersionURLs.Store(nc.NetworkContainerid, versionURL)
waitingForUpdate, _, _ := service.isNCWaitingForUpdate(nc.Version, nc.NetworkContainerid)

body, _ = json.Marshal(nc)
req, _ = http.NewRequest(http.MethodPost, "", bytes.NewBuffer(body))
body, _ := json.Marshal(nc)
req, _ := http.NewRequest(http.MethodPost, "", bytes.NewBuffer(body))
req.Header.Set(common.ContentType, common.JsonContent)
service.createOrUpdateNetworkContainer(w, req)
if w.Result().StatusCode == http.StatusOK {
Expand All @@ -129,7 +110,7 @@ func (service *HTTPRestService) SyncNodeStatus(dncEP, infraVnet, nodeID string,
var body bytes.Buffer
json.NewEncoder(&body).Encode(&cns.DeleteNetworkContainerRequest{NetworkContainerid: nc})

req, err = http.NewRequest(http.MethodPost, "", &body)
req, err := http.NewRequest(http.MethodPost, "", &body)
if err == nil {
req.Header.Set(common.JsonContent, common.JsonContent)
service.deleteNetworkContainer(httptest.NewRecorder(), req)
Expand Down
4 changes: 2 additions & 2 deletions cns/restserver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (service *HTTPRestService) getNetworkContainerResponse(req cns.GetNetworkCo
)

service.Unlock()
getNetworkContainerResponse.Response.ReturnCode, getNetworkContainerResponse.Response.Message = service.SyncNodeStatus(dncEP, infraVnet, nodeID, req.OrchestratorContext)
getNetworkContainerResponse.Response.ReturnCode, getNetworkContainerResponse.Response.Message = service.SyncNodeNcStatus(dncEP, infraVnet, nodeID, req.OrchestratorContext)
service.Lock()
if getNetworkContainerResponse.Response.ReturnCode == NotFound {
return getNetworkContainerResponse
Expand Down Expand Up @@ -508,7 +508,7 @@ func (service *HTTPRestService) attachOrDetachHelper(req cns.ConfigureContainerN
nodeID = service.GetOption(acn.OptNodeID).(string)
)

returnCode, msg := service.SyncNodeStatus(dncEP, infraVnet, nodeID, json.RawMessage{})
returnCode, msg := service.SyncNodeNcStatus(dncEP, infraVnet, nodeID, json.RawMessage{})
if returnCode != Success {
return cns.Response{
ReturnCode: returnCode,
Expand Down
52 changes: 5 additions & 47 deletions cns/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,17 @@
package main

import (
"bytes"
"context"
"encoding/json"
"fmt"
localtls "github.com/Azure/azure-container-networking/server/tls"
"net/http"
"os"
"os/signal"
"runtime"
"strconv"
"strings"
"syscall"
"time"

localtls "github.com/Azure/azure-container-networking/server/tls"

"github.com/Azure/azure-container-networking/cns/ipampoolmonitor"

"github.com/Azure/azure-container-networking/aitelemetry"
Expand All @@ -27,6 +24,7 @@ import (
"github.com/Azure/azure-container-networking/cns/cnsclient"
"github.com/Azure/azure-container-networking/cns/common"
"github.com/Azure/azure-container-networking/cns/configuration"
"github.com/Azure/azure-container-networking/cns/dncclient"
"github.com/Azure/azure-container-networking/cns/hnsclient"
"github.com/Azure/azure-container-networking/cns/imdsclient"
"github.com/Azure/azure-container-networking/cns/logger"
Expand All @@ -45,7 +43,6 @@ const (
pluginName = "azure-vnet"
defaultCNINetworkConfigFileName = "10-azure.conflist"
configFileName = "config.json"
dncApiVersion = "?api-version=2018-03-01"
poolIPAMRefreshRateInMilliseconds = 1000
)

Expand Down Expand Up @@ -246,45 +243,6 @@ func printVersion() {
fmt.Printf("Version %v\n", version)
}

// Try to register node with DNC when CNS is started in managed DNC mode
func registerNode(httpRestService cns.HTTPService, dncEP, infraVnet, nodeID string) {
logger.Printf("[Azure CNS] Registering node %s with Infrastructure Network: %s PrivateEndpoint: %s", nodeID, infraVnet, dncEP)

var (
numCPU = runtime.NumCPU()
url = fmt.Sprintf(acn.RegisterNodeURLFmt, dncEP, infraVnet, nodeID, numCPU, dncApiVersion)
response *http.Response
err = fmt.Errorf("")
body bytes.Buffer
httpc = acn.GetHttpClient()
)

for sleep := true; err != nil; sleep = true {
response, err = httpc.Post(url, "application/json", &body)
if err == nil {
if response.StatusCode == http.StatusCreated {
var req cns.SetOrchestratorTypeRequest
json.NewDecoder(response.Body).Decode(&req)
httpRestService.SetNodeOrchestrator(&req)
sleep = false
} else {
err = fmt.Errorf("[Azure CNS] Failed to register node with http status code %s", strconv.Itoa(response.StatusCode))
logger.Errorf(err.Error())
}

response.Body.Close()
} else {
logger.Errorf("[Azure CNS] Failed to register node with err: %+v", err)
}

if sleep {
time.Sleep(acn.FiveSeconds)
}
}

logger.Printf("[Azure CNS] Node Registered")
}

// Main is the entry point for CNS.
func main() {
// Initialize and parse command line arguments.
Expand Down Expand Up @@ -460,12 +418,12 @@ func main() {
httpRestService.SetOption(acn.OptInfrastructureNetworkID, infravnet)
httpRestService.SetOption(acn.OptNodeID, nodeID)

registerNode(httpRestService, privateEndpoint, infravnet, nodeID)
dncclient.RegisterNode(httpRestService, privateEndpoint, infravnet, nodeID)
go func(ep, vnet, node string) {
// Periodically poll DNC for node updates
for {
<-time.NewTicker(time.Duration(cnsconfig.ManagedSettings.NodeSyncIntervalInSeconds) * time.Second).C
httpRestService.SyncNodeStatus(ep, vnet, node, json.RawMessage{})
httpRestService.SyncNodeNcStatus(ep, vnet, node, json.RawMessage{})
}
}(privateEndpoint, infravnet, nodeID)
} else if config.ChannelMode == cns.CRD {
Expand Down
15 changes: 6 additions & 9 deletions common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,12 @@ import (
)

const (
metadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-08-01&format=json"
azCloudUrl = "http://169.254.169.254/metadata/instance/compute/azEnvironment?api-version=2018-10-01&format=text"
httpConnectionTimeout = 7
headerTimeout = 7
RegisterNodeURLFmt = "%s/%s/node/%s/cores/%d%s"
SyncNodeNetworkContainersURLFmt = "%s/%s/node/%s%s"
FiveSeconds = 5 * time.Second
JsonContent = "application/json; charset=UTF-8"
ContentType = "Content-Type"
metadataURL = "http://169.254.169.254/metadata/instance?api-version=2017-08-01&format=json"
azCloudUrl = "http://169.254.169.254/metadata/instance/compute/azEnvironment?api-version=2018-10-01&format=text"
httpConnectionTimeout = 7
headerTimeout = 7
JsonContent = "application/json; charset=UTF-8"
ContentType = "Content-Type"
)

// XmlDocument - Azure host agent XML document format.
Expand Down