diff --git a/Makefile b/Makefile index d04fee9855..6dffae9929 100644 --- a/Makefile +++ b/Makefile @@ -372,7 +372,7 @@ test-all: # run all tests .PHONY: test-integration test-integration: - go test -coverpkg=./... -v -race -covermode atomic -coverprofile=coverage.out -tags=integration ./test/integration... + go test -timeout 1h -coverpkg=./... -v -race -covermode atomic -coverprofile=coverage.out -tags=integration ./test/integration... .PHONY: test-cyclonus test-cyclonus: diff --git a/cns/client/client_test.go b/cns/client/client_test.go index 61c6436dab..b26b595ff8 100644 --- a/cns/client/client_test.go +++ b/cns/client/client_test.go @@ -168,7 +168,7 @@ func TestMain(m *testing.M) { logger.InitLogger(logName, 0, 0, tmpLogDir+"/") config := common.ServiceConfig{} - httpRestService, err := restserver.NewHTTPRestService(&config, &fakes.WireserverClientFake{}, fakes.NewFakeNMAgentClient()) + httpRestService, err := restserver.NewHTTPRestService(&config, &fakes.WireserverClientFake{}, &fakes.NMAgentClientFake{}) svc = httpRestService.(*restserver.HTTPRestService) svc.Name = "cns-test-server" fakeNNC := v1alpha.NodeNetworkConfig{ diff --git a/cns/configuration/configuration.go b/cns/configuration/configuration.go index 2c4100b3dd..8614deea64 100644 --- a/cns/configuration/configuration.go +++ b/cns/configuration/configuration.go @@ -151,9 +151,9 @@ func SetCNSConfigDefaults(config *CNSConfig) { config.MetricsBindAddress = ":9090" } if config.SyncHostNCVersionIntervalMs == 0 { - config.SyncHostNCVersionIntervalMs = 1000 + config.SyncHostNCVersionIntervalMs = 1000 * time.Millisecond //nolint:gomnd // default times } if config.SyncHostNCTimeoutMs == 0 { - config.SyncHostNCTimeoutMs = 500 + config.SyncHostNCTimeoutMs = 500 * time.Millisecond //nolint:gomnd // default times } } diff --git a/cns/configuration/configuration_test.go b/cns/configuration/configuration_test.go index bb984fd429..6ec0705191 100644 --- a/cns/configuration/configuration_test.go +++ b/cns/configuration/configuration_test.go @@ -4,6 +4,7 @@ import ( "os" "path/filepath" "testing" + "time" "github.com/Azure/azure-container-networking/common" "github.com/stretchr/testify/assert" @@ -187,8 +188,8 @@ func TestSetCNSConfigDefaults(t *testing.T) { NodeSyncIntervalInSeconds: 30, }, MetricsBindAddress: ":9090", - SyncHostNCTimeoutMs: 500, - SyncHostNCVersionIntervalMs: 1000, + SyncHostNCTimeoutMs: 500 * time.Millisecond, + SyncHostNCVersionIntervalMs: 1000 * time.Millisecond, TelemetrySettings: TelemetrySettings{ TelemetryBatchSizeBytes: 32768, TelemetryBatchIntervalInSecs: 30, diff --git a/cns/fakes/imdsclientfake.go b/cns/fakes/imdsclientfake.go index 21fa0200f2..475d905aa4 100644 --- a/cns/fakes/imdsclientfake.go +++ b/cns/fakes/imdsclientfake.go @@ -12,7 +12,7 @@ import ( "github.com/Azure/azure-container-networking/cns/wireserver" ) -var ( +const ( // HostPrimaryIP 10.0.0.4 HostPrimaryIP = "10.0.0.4" // HostSubnet 10.0.0.0/24 diff --git a/cns/fakes/nmagentclientfake.go b/cns/fakes/nmagentclientfake.go index 751159f44b..5a94160432 100644 --- a/cns/fakes/nmagentclientfake.go +++ b/cns/fakes/nmagentclientfake.go @@ -6,19 +6,18 @@ package fakes -// NMAgentClientTest can be used to query to VM Host info. -type NMAgentClientTest struct{} +import ( + "context" -// NewFakeNMAgentClient return a mock implemetation of NMAgentClient -func NewFakeNMAgentClient() *NMAgentClientTest { - return &NMAgentClientTest{} + "github.com/Azure/azure-container-networking/cns/nmagent" +) + +// NMAgentClientFake can be used to query to VM Host info. +type NMAgentClientFake struct { + GetNCVersionListFunc func(context.Context) (*nmagent.NetworkContainerListResponse, error) } // GetNcVersionListWithOutToken is mock implementation to return nc version list. -func (nmagentclient *NMAgentClientTest) GetNcVersionListWithOutToken(ncNeedUpdateList []string) map[string]int { - ncVersionList := make(map[string]int) - for _, ncID := range ncNeedUpdateList { - ncVersionList[ncID] = 0 - } - return ncVersionList +func (c *NMAgentClientFake) GetNCVersionList(ctx context.Context) (*nmagent.NetworkContainerListResponse, error) { + return c.GetNCVersionListFunc(ctx) } diff --git a/cns/nmagentclient/nmagentclient.go b/cns/nmagent/client.go similarity index 59% rename from cns/nmagentclient/nmagentclient.go rename to cns/nmagent/client.go index 3c95d7c1eb..e372dc8948 100644 --- a/cns/nmagentclient/nmagentclient.go +++ b/cns/nmagent/client.go @@ -1,17 +1,18 @@ -package nmagentclient +package nmagent import ( "bytes" + "context" "encoding/json" "encoding/xml" "fmt" - "io/ioutil" + "io" "net/http" - "strconv" "time" "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/common" + "github.com/pkg/errors" ) const ( @@ -27,14 +28,14 @@ var ( getNcVersionListWithOutTokenURLVersion = "2" ) -// NMANetworkContainerResponse - NMAgent response. -type NMANetworkContainerResponse struct { +// NetworkContainerResponse - NMAgent response. +type NetworkContainerResponse struct { ResponseCode string `json:"httpStatusCode"` NetworkContainerID string `json:"networkContainerId"` Version string `json:"version"` } -type NMAgentSupportedApisResponseXML struct { +type SupportedAPIsResponseXML struct { SupportedApis []string `xml:"type"` } @@ -43,35 +44,28 @@ type ContainerInfo struct { Version string `json:"version"` } -type NMANetworkContainerListResponse struct { +type NetworkContainerListResponse struct { ResponseCode string `json:"httpStatusCode"` Containers []ContainerInfo `json:"networkContainers"` } -// NMAgentClient is client to handle queries to nmagent -type NMAgentClient struct { +// Client is client to handle queries to nmagent +type Client struct { connectionURL string } -// NMAgentClientInterface has interface that nmagent client will handle -type NMAgentClientInterface interface { - GetNcVersionListWithOutToken(ncNeedUpdateList []string) map[string]int -} - -// NewNMAgentClient create a new nmagent client. -func NewNMAgentClient(url string) (*NMAgentClient, error) { +// NewClient create a new nmagent client. +func NewClient(url string) (*Client, error) { if url == "" { url = fmt.Sprintf(GetNcVersionListWithOutTokenURLFmt, WireserverIP, getNcVersionListWithOutTokenURLVersion) } - return &NMAgentClient{ + return &Client{ connectionURL: url, }, nil } // JoinNetwork joins the given network -func JoinNetwork( - networkID string, - joinNetworkURL string) (*http.Response, error) { +func JoinNetwork(networkID, joinNetworkURL string) (*http.Response, error) { logger.Printf("[NMAgentClient] JoinNetwork: %s", networkID) // Empty body is required as wireserver cannot handle a post without the body. @@ -90,10 +84,7 @@ func JoinNetwork( } // PublishNetworkContainer publishes given network container -func PublishNetworkContainer( - networkContainerID string, - createNetworkContainerURL string, - requestBodyData []byte) (*http.Response, error) { +func PublishNetworkContainer(networkContainerID, createNetworkContainerURL string, requestBodyData []byte) (*http.Response, error) { logger.Printf("[NMAgentClient] PublishNetworkContainer NC: %s", networkContainerID) requestBody := bytes.NewBuffer(requestBodyData) @@ -106,9 +97,7 @@ func PublishNetworkContainer( } // UnpublishNetworkContainer unpublishes given network container -func UnpublishNetworkContainer( - networkContainerID string, - deleteNetworkContainerURL string) (*http.Response, error) { +func UnpublishNetworkContainer(networkContainerID, deleteNetworkContainerURL string) (*http.Response, error) { logger.Printf("[NMAgentClient] UnpublishNetworkContainer NC: %s", networkContainerID) // Empty body is required as wireserver cannot handle a post without the body. @@ -123,9 +112,7 @@ func UnpublishNetworkContainer( } // GetNetworkContainerVersion :- Retrieves NC version from NMAgent -func GetNetworkContainerVersion( - networkContainerID, - getNetworkContainerVersionURL string) (*http.Response, error) { +func GetNetworkContainerVersion(networkContainerID, getNetworkContainerVersionURL string) (*http.Response, error) { logger.Printf("[NMAgentClient] GetNetworkContainerVersion NC: %s", networkContainerID) response, err := common.GetHttpClient().Get(getNetworkContainerVersionURL) @@ -136,9 +123,7 @@ func GetNetworkContainerVersion( } // GetNmAgentSupportedApis :- Retrieves Supported Apis from NMAgent -func GetNmAgentSupportedApis( - httpc *http.Client, - getNmAgentSupportedApisURL string) ([]string, error) { +func GetNmAgentSupportedApis(httpc *http.Client, getNmAgentSupportedApisURL string) ([]string, error) { var returnErr error if getNmAgentSupportedApisURL == "" { @@ -146,7 +131,12 @@ func GetNmAgentSupportedApis( GetNmAgentSupportedApiURLFmt, WireserverIP) } - response, err := httpc.Get(getNmAgentSupportedApisURL) + req, err := http.NewRequestWithContext(context.TODO(), http.MethodGet, getNmAgentSupportedApisURL, nil) + if err != nil { + return nil, errors.Wrap(err, "failed to build request") + } + + resp, err := httpc.Do(req) if err != nil { returnErr = fmt.Errorf( "Failed to retrieve Supported Apis from NMAgent with error %v", @@ -154,25 +144,27 @@ func GetNmAgentSupportedApis( logger.Errorf("[Azure-CNS] %s", returnErr) return nil, returnErr } - if response == nil { + if resp == nil { returnErr = fmt.Errorf( "Response from getNmAgentSupportedApis call is ") logger.Errorf("[Azure-CNS] %s", returnErr) return nil, returnErr } - defer response.Body.Close() - - if response.StatusCode != http.StatusOK { + defer resp.Body.Close() + b, err := io.ReadAll(resp.Body) + if err != nil { + return nil, errors.Wrap(err, "failed to read response body") + } + if resp.StatusCode != http.StatusOK { returnErr = fmt.Errorf( "Failed to retrieve Supported Apis from NMAgent with StatusCode: %d", - response.StatusCode) + resp.StatusCode) logger.Errorf("[Azure-CNS] %s", returnErr) return nil, returnErr } - var xmlDoc NMAgentSupportedApisResponseXML - decoder := xml.NewDecoder(response.Body) - err = decoder.Decode(&xmlDoc) + var xmlDoc SupportedAPIsResponseXML + err = xml.NewDecoder(bytes.NewReader(b)).Decode(&xmlDoc) if err != nil { returnErr = fmt.Errorf( "Failed to decode XML response of Supported Apis from NMAgent with error %v", @@ -181,44 +173,35 @@ func GetNmAgentSupportedApis( return nil, returnErr } - logger.Printf("[NMAgentClient][Response] GetNmAgentSupportedApis. Response: %+v.", response) + logger.Printf("[NMAgentClient][Response] GetNmAgentSupportedApis. Response: %+v.", resp) return xmlDoc.SupportedApis, nil } -// GetNcVersionListWithOutToken query nmagent for programmed container version. -func (nmagentclient *NMAgentClient) GetNcVersionListWithOutToken(ncNeedUpdateList []string) map[string]int { - ncVersionList := make(map[string]int) +// GetNCVersionList query nmagent for programmed container versions. +func (c *Client) GetNCVersionList(ctx context.Context) (*NetworkContainerListResponse, error) { now := time.Now() - response, err := http.Get(nmagentclient.connectionURL) - latency := time.Since(now) - logger.Printf("[NMAgentClient][Response] GetNcVersionListWithOutToken response: %+v, latency is %d", response, latency.Milliseconds()) - - if response.StatusCode != http.StatusOK { - logger.Printf("[NMAgentClient][Response] GetNcVersionListWithOutToken failed with %d, err is %v", response.StatusCode, err) - return nil + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.connectionURL, nil) + if err != nil { + return nil, errors.Wrap(err, "failed to build nmagent request") } - - var nmaNcListResponse NMANetworkContainerListResponse - rBytes, _ := ioutil.ReadAll(response.Body) - if err := json.Unmarshal(rBytes, &nmaNcListResponse); err != nil { - logger.Printf("[NMAgentClient][Response] GetNcVersionListWithOutToken unmarshal failed with %s", err) - return nil + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, errors.Wrap(err, "failed to make nmagent request") } - logger.Printf("NMAgent NC List Response is %s", nmaNcListResponse) + defer resp.Body.Close() + b, err := io.ReadAll(resp.Body) + if err != nil { + return nil, errors.Wrap(err, "failed to read response body") + } + logger.Printf("[NMAgentClient][Response] GetNcVersionListWithOutToken response: %s, latency is %d", string(b), time.Since(now).Milliseconds()) - receivedNcVersionListInMap := make(map[string]string) - for _, containers := range nmaNcListResponse.Containers { - receivedNcVersionListInMap[containers.NetworkContainerID] = containers.Version + if resp.StatusCode != http.StatusOK { + return nil, errors.Wrap(err, "failed to GetNCVersionList") } - for _, ncID := range ncNeedUpdateList { - if version, ok := receivedNcVersionListInMap[ncID]; ok { - if versionInInt, err := strconv.Atoi(version); err != nil { - logger.Printf("[NMAgentClient][Response] GetNcVersionListWithOutToken translate version %s to int failed with %s", version, err) - } else { - ncVersionList[ncID] = versionInInt - logger.Printf("Containers id is %s, programmed NC version is %d", ncID, versionInInt) - } - } + + var response NetworkContainerListResponse + if err := json.Unmarshal(b, &response); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal response") } - return ncVersionList + return &response, nil } diff --git a/cns/restserver/api.go b/cns/restserver/api.go index 4408ecefe9..c5f820c95c 100644 --- a/cns/restserver/api.go +++ b/cns/restserver/api.go @@ -6,7 +6,7 @@ package restserver import ( "context" "fmt" - "io/ioutil" + "io" "net" "net/http" "regexp" @@ -16,7 +16,7 @@ import ( "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/hnsclient" "github.com/Azure/azure-container-networking/cns/logger" - "github.com/Azure/azure-container-networking/cns/nmagentclient" + "github.com/Azure/azure-container-networking/cns/nmagent" "github.com/Azure/azure-container-networking/cns/types" "github.com/Azure/azure-container-networking/cns/wireserver" "github.com/Azure/azure-container-networking/common" @@ -1138,7 +1138,7 @@ func (service *HTTPRestService) publishNetworkContainer(w http.ResponseWriter, r if isNetworkJoined { // Publish Network Container - publishResponse, publishError = nmagentclient.PublishNetworkContainer( + publishResponse, publishError = nmagent.PublishNetworkContainer( req.NetworkContainerID, req.CreateNetworkContainerURL, req.CreateNetworkContainerRequestBody) @@ -1147,6 +1147,7 @@ func (service *HTTPRestService) publishNetworkContainer(w http.ResponseWriter, r returnCode = types.NetworkContainerPublishFailed logger.Errorf("[Azure-CNS] %s", returnMessage) } + defer publishResponse.Body.Close() } // Store ncGetVersionURL needed for calling NMAgent to check if vfp programming is completed for the NC @@ -1156,10 +1157,10 @@ func (service *HTTPRestService) publishNetworkContainer(w http.ResponseWriter, r // we attempt to extract the wireserver IP to use from the request, otherwise default to the well-known IP. hostIP := extractHostFromJoinNetworkURL(req.JoinNetworkURL) if hostIP == "" { - hostIP = nmagentclient.WireserverIP + hostIP = nmagent.WireserverIP } - ncGetVersionURL := fmt.Sprintf(nmagentclient.GetNetworkContainerVersionURLFmt, + ncGetVersionURL := fmt.Sprintf(nmagent.GetNetworkContainerVersionURLFmt, hostIP, primaryInterfaceIdentifier, req.NetworkContainerID, @@ -1179,14 +1180,12 @@ func (service *HTTPRestService) publishNetworkContainer(w http.ResponseWriter, r publishStatusCode = publishResponse.StatusCode var errParse error - publishResponseBody, errParse = ioutil.ReadAll(publishResponse.Body) + publishResponseBody, errParse = io.ReadAll(publishResponse.Body) if errParse != nil { returnMessage = fmt.Sprintf("Failed to parse the publish body. Error: %v", errParse) returnCode = types.UnexpectedError logger.Errorf("[Azure-CNS] %s", returnMessage) } - - publishResponse.Body.Close() } response := cns.PublishNetworkContainerResponse{ @@ -1252,7 +1251,7 @@ func (service *HTTPRestService) unpublishNetworkContainer(w http.ResponseWriter, if isNetworkJoined { // Unpublish Network Container - unpublishResponse, unpublishError = nmagentclient.UnpublishNetworkContainer( + unpublishResponse, unpublishError = nmagent.UnpublishNetworkContainer( req.NetworkContainerID, req.DeleteNetworkContainerURL) if unpublishError != nil || unpublishResponse.StatusCode != http.StatusOK { @@ -1263,7 +1262,7 @@ func (service *HTTPRestService) unpublishNetworkContainer(w http.ResponseWriter, if unpublishResponse != nil { var errParse error - unpublishResponseBody, errParse = ioutil.ReadAll(unpublishResponse.Body) + unpublishResponseBody, errParse = io.ReadAll(unpublishResponse.Body) if errParse != nil { returnMessage = fmt.Sprintf("Failed to parse the unpublish body. Error: %v", errParse) returnCode = types.UnexpectedError @@ -1420,7 +1419,7 @@ func (service *HTTPRestService) nmAgentSupportedApisHandler(w http.ResponseWrite switch r.Method { case http.MethodPost: - supportedApis, retErr = nmagentclient.GetNmAgentSupportedApis(common.GetHttpClient(), + supportedApis, retErr = nmagent.GetNmAgentSupportedApis(common.GetHttpClient(), req.GetNmAgentSupportedApisURL) if retErr != nil { returnCode = types.NmAgentSupportedApisError diff --git a/cns/restserver/api_test.go b/cns/restserver/api_test.go index c4cbf7bad8..d311cc25e9 100644 --- a/cns/restserver/api_test.go +++ b/cns/restserver/api_test.go @@ -5,6 +5,7 @@ package restserver import ( "bytes" + "context" "encoding/json" "encoding/xml" "fmt" @@ -12,6 +13,7 @@ import ( "net/http/httptest" "net/url" "os" + "strconv" "strings" "testing" @@ -21,7 +23,7 @@ import ( "github.com/Azure/azure-container-networking/cns/common" "github.com/Azure/azure-container-networking/cns/fakes" "github.com/Azure/azure-container-networking/cns/logger" - "github.com/Azure/azure-container-networking/cns/nmagentclient" + "github.com/Azure/azure-container-networking/cns/nmagent" "github.com/Azure/azure-container-networking/cns/types" acncommon "github.com/Azure/azure-container-networking/common" ) @@ -149,7 +151,7 @@ func TestMain(m *testing.M) { nmAgentServer.AddHandler("/getInterface", getInterfaceInfo) nmAgentServer.AddHandler("/", nmagentHandler) - nmagentclient.WireserverIP = nmagentEndpoint + nmagent.WireserverIP = nmagentEndpoint err = nmAgentServer.Start(make(chan error, 1)) if err != nil { @@ -914,7 +916,8 @@ func startService() error { return err } - service, err = NewHTTPRestService(&config, &fakes.WireserverClientFake{}, fakes.NewFakeNMAgentClient()) + nmagentClient := &fakes.NMAgentClientFake{} + service, err = NewHTTPRestService(&config, &fakes.WireserverClientFake{}, nmagentClient) if err != nil { return err } @@ -926,6 +929,34 @@ func startService() error { } svc.IPAMPoolMonitor = &fakes.MonitorFake{} + nmagentClient.GetNCVersionListFunc = func(context.Context) (*nmagent.NetworkContainerListResponse, error) { + var hostVersionNeedsUpdateContainers []string + for idx := range svc.state.ContainerStatus { + hostVersion, err := strconv.Atoi(svc.state.ContainerStatus[idx].HostVersion) //nolint:govet // intentional shadowing + if err != nil { + logger.Errorf("Received err when change containerstatus.HostVersion %s to int, err msg %v", svc.state.ContainerStatus[idx].HostVersion, err) + continue + } + dncNcVersion, err := strconv.Atoi(svc.state.ContainerStatus[idx].CreateNetworkContainerRequest.Version) + if err != nil { + logger.Errorf("Received err when change nc version %s in containerstatus to int, err msg %v", svc.state.ContainerStatus[idx].CreateNetworkContainerRequest.Version, err) + continue + } + // host NC version is the NC version from NMAgent, if it's smaller than NC version from DNC, then append it to indicate it needs update. + if hostVersion < dncNcVersion { + hostVersionNeedsUpdateContainers = append(hostVersionNeedsUpdateContainers, svc.state.ContainerStatus[idx].ID) + } else if hostVersion > dncNcVersion { + logger.Errorf("NC version from NMAgent is larger than DNC, NC version from NMAgent is %d, NC version from DNC is %d", hostVersion, dncNcVersion) + } + } + resp := &nmagent.NetworkContainerListResponse{ + Containers: []nmagent.ContainerInfo{}, + } + for _, cs := range hostVersionNeedsUpdateContainers { + resp.Containers = append(resp.Containers, nmagent.ContainerInfo{Version: "0", NetworkContainerID: cs}) + } + return resp, nil + } if service != nil { // Create empty azure-cns.json. CNS should start successfully by deleting this file diff --git a/cns/restserver/internalapi.go b/cns/restserver/internalapi.go index faa04cb562..e8a0cfdd42 100644 --- a/cns/restserver/internalapi.go +++ b/cns/restserver/internalapi.go @@ -12,11 +12,10 @@ import ( "net/http/httptest" "reflect" "strconv" - "time" "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/logger" - "github.com/Azure/azure-container-networking/cns/nmagentclient" + "github.com/Azure/azure-container-networking/cns/nmagent" "github.com/Azure/azure-container-networking/cns/types" "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/crd/nodenetworkconfig/api/v1alpha" @@ -98,8 +97,8 @@ func (service *HTTPRestService) SyncNodeStatus( // check if the version is valid and save it to service state for ncid, nc := range ncsToBeAdded { var ( - versionURL = fmt.Sprintf(nmagentclient.GetNetworkContainerVersionURLFmt, - nmagentclient.WireserverIP, + versionURL = fmt.Sprintf(nmagent.GetNetworkContainerVersionURLFmt, + nmagent.WireserverIP, nc.PrimaryInterfaceIdentifier, nc.NetworkContainerid, nc.AuthorizationToken) @@ -150,62 +149,66 @@ func (service *HTTPRestService) SyncNodeStatus( // SyncHostNCVersion will check NC version from NMAgent and save it as host NC version in container status. // If NMAgent NC version got updated, CNS will refresh the pending programming IP status. -func (service *HTTPRestService) SyncHostNCVersion(ctx context.Context, channelMode string, syncHostNCTimeoutMilliSec time.Duration) { - var hostVersionNeedUpdateNcList []string - service.RLock() - for _, containerstatus := range service.state.ContainerStatus { +func (service *HTTPRestService) SyncHostNCVersion(ctx context.Context, channelMode string) { + service.Lock() + defer service.Unlock() + var hostVersionNeedsUpdateContainers []string + for idx := range service.state.ContainerStatus { // Will open a separate PR to convert all the NC version related variable to int. Change from string to int is a pain. - hostVersion, err := strconv.Atoi(containerstatus.HostVersion) + hostVersion, err := strconv.Atoi(service.state.ContainerStatus[idx].HostVersion) if err != nil { - logger.Errorf("Received err when change containerstatus.HostVersion %s to int, err msg %v", containerstatus.HostVersion, err) + logger.Errorf("Received err when change containerstatus.HostVersion %s to int, err msg %v", service.state.ContainerStatus[idx].HostVersion, err) continue } - dncNcVersion, err := strconv.Atoi(containerstatus.CreateNetworkContainerRequest.Version) + dncNcVersion, err := strconv.Atoi(service.state.ContainerStatus[idx].CreateNetworkContainerRequest.Version) if err != nil { - logger.Errorf("Received err when change nc version %s in containerstatus to int, err msg %v", containerstatus.CreateNetworkContainerRequest.Version, err) + logger.Errorf("Received err when change nc version %s in containerstatus to int, err msg %v", service.state.ContainerStatus[idx].CreateNetworkContainerRequest.Version, err) continue } // host NC version is the NC version from NMAgent, if it's smaller than NC version from DNC, then append it to indicate it needs update. if hostVersion < dncNcVersion { - hostVersionNeedUpdateNcList = append(hostVersionNeedUpdateNcList, containerstatus.ID) + hostVersionNeedsUpdateContainers = append(hostVersionNeedsUpdateContainers, service.state.ContainerStatus[idx].ID) } else if hostVersion > dncNcVersion { logger.Errorf("NC version from NMAgent is larger than DNC, NC version from NMAgent is %d, NC version from DNC is %d", hostVersion, dncNcVersion) } } - service.RUnlock() - if len(hostVersionNeedUpdateNcList) > 0 { - logger.Printf("Updating version of the following NC IDs: %v", hostVersionNeedUpdateNcList) - ncVersionChannel := make(chan map[string]int) - ctxWithTimeout, _ := context.WithTimeout(ctx, syncHostNCTimeoutMilliSec*time.Millisecond) - go func() { - ncVersionChannel <- service.nmagentClient.GetNcVersionListWithOutToken(hostVersionNeedUpdateNcList) - close(ncVersionChannel) - }() - select { - case newHostNCVersionList := <-ncVersionChannel: - if newHostNCVersionList == nil { - logger.Errorf("Can't get vfp programmed NC version list from url without token") - } else { - service.Lock() - for ncID, newHostNCVersion := range newHostNCVersionList { - // Check whether it exist in service state and get the related nc info - if ncInfo, exist := service.state.ContainerStatus[ncID]; !exist { - logger.Errorf("Can't find NC with ID %s in service state, stop updating this host NC version", ncID) - } else { - if channelMode == cns.CRD { - service.MarkIpsAsAvailableUntransacted(ncInfo.ID, newHostNCVersion) - } - oldHostNCVersion := ncInfo.HostVersion - ncInfo.HostVersion = strconv.Itoa(newHostNCVersion) - service.state.ContainerStatus[ncID] = ncInfo - logger.Printf("Updated NC %s host version from %s to %s", ncID, oldHostNCVersion, ncInfo.HostVersion) - } - } - service.Unlock() - } - case <-ctxWithTimeout.Done(): - logger.Errorf("Timeout when getting vfp programmed NC version list from url without token") + if len(hostVersionNeedsUpdateContainers) == 0 { + return + } + ncList, err := service.nmagentClient.GetNCVersionList(ctx) + if err != nil { + logger.Errorf("%v", err) + return + } + + newHostNCVersionList := map[string]string{} + for _, nc := range ncList.Containers { + newHostNCVersionList[nc.NetworkContainerID] = nc.Version + } + for _, ncID := range hostVersionNeedsUpdateContainers { + versionStr, ok := newHostNCVersionList[ncID] + if !ok { + continue + } + version, err := strconv.Atoi(versionStr) + if err != nil { + logger.Errorf("failed to parse %s to int", versionStr) + continue + } + + // Check whether it exist in service state and get the related nc info + ncInfo, exist := service.state.ContainerStatus[ncID] + if !exist { + logger.Errorf("Can't find NC with ID %s in service state, stop updating this host NC version", ncID) + continue + } + if channelMode == cns.CRD { + service.MarkIpsAsAvailableUntransacted(ncInfo.ID, version) } + oldHostNCVersion := ncInfo.HostVersion + ncInfo.HostVersion = versionStr + service.state.ContainerStatus[ncID] = ncInfo + logger.Printf("Updated NC %s host version from %s to %s", ncID, oldHostNCVersion, ncInfo.HostVersion) } } diff --git a/cns/restserver/internalapi_test.go b/cns/restserver/internalapi_test.go index a9be924bcf..f00f8b55a7 100644 --- a/cns/restserver/internalapi_test.go +++ b/cns/restserver/internalapi_test.go @@ -10,7 +10,6 @@ import ( "reflect" "strconv" "testing" - "time" "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/types" @@ -135,7 +134,7 @@ func testSyncHostNCVersion(t *testing.T, orchestratorType string) { t.Errorf("Unexpected nc version in containerStatus as %s, expeted VM version should be 0 in string", containerStatus.CreateNetworkContainerRequest.Version) } // When sync host NC version, it will use the orchestratorType pass in. - svc.SyncHostNCVersion(context.Background(), orchestratorType, 500*time.Millisecond) + svc.SyncHostNCVersion(context.Background(), orchestratorType) containerStatus = svc.state.ContainerStatus[req.NetworkContainerid] if containerStatus.HostVersion != "0" { t.Errorf("Unexpected containerStatus.HostVersion %s, expeted host version should be 0 in string", containerStatus.HostVersion) @@ -159,7 +158,7 @@ func TestPendingIPsGotUpdatedWhenSyncHostNCVersion(t *testing.T) { t.Errorf("Unexpected State %s, expeted State is %s, received %s, IP address is %s", podIPConfigState.State, cns.PendingProgramming, podIPConfigState.State, podIPConfigState.IPAddress) } } - svc.SyncHostNCVersion(context.Background(), cns.CRD, 500*time.Millisecond) + svc.SyncHostNCVersion(context.Background(), cns.CRD) containerStatus = svc.state.ContainerStatus[req.NetworkContainerid] receivedSecondaryIPConfigs = containerStatus.CreateNetworkContainerRequest.SecondaryIPConfigs diff --git a/cns/restserver/ipam_test.go b/cns/restserver/ipam_test.go index 17af33883f..db1c4a9d80 100644 --- a/cns/restserver/ipam_test.go +++ b/cns/restserver/ipam_test.go @@ -35,7 +35,7 @@ var ( func getTestService() *HTTPRestService { var config common.ServiceConfig - httpsvc, _ := NewHTTPRestService(&config, &fakes.WireserverClientFake{}, fakes.NewFakeNMAgentClient()) + httpsvc, _ := NewHTTPRestService(&config, &fakes.WireserverClientFake{}, &fakes.NMAgentClientFake{}) svc = httpsvc.(*HTTPRestService) svc.IPAMPoolMonitor = &fakes.MonitorFake{} setOrchestratorTypeInternal(cns.KubernetesCRD) diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index d72e5d6938..34cd95ee5e 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -1,6 +1,3 @@ -// Copyright 2017 Microsoft. All rights reserved. -// MIT License - package restserver import ( @@ -14,7 +11,7 @@ import ( "github.com/Azure/azure-container-networking/cns/ipamclient" "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/networkcontainers" - "github.com/Azure/azure-container-networking/cns/nmagentclient" + "github.com/Azure/azure-container-networking/cns/nmagent" "github.com/Azure/azure-container-networking/cns/routes" "github.com/Azure/azure-container-networking/cns/types" "github.com/Azure/azure-container-networking/cns/types/bounded" @@ -41,13 +38,17 @@ type interfaceGetter interface { GetInterfaces(ctx context.Context) (*wireserver.GetInterfacesResult, error) } +type nmagentClient interface { + GetNCVersionList(ctx context.Context) (*nmagent.NetworkContainerListResponse, error) +} + // HTTPRestService represents http listener for CNS - Container Networking Service. type HTTPRestService struct { *cns.Service dockerClient *dockerclient.Client wscli interfaceGetter ipamClient *ipamclient.IpamClient - nmagentClient nmagentclient.NMAgentClientInterface + nmagentClient nmagentClient networkContainer *networkcontainers.NetworkContainers PodIPIDByPodInterfaceKey map[string]string // PodInterfaceId is key and value is Pod IP (SecondaryIP) uuid. PodIPConfigState map[string]cns.IPConfigurationStatus // Secondary IP ID(uuid) is key @@ -108,7 +109,7 @@ type networkInfo struct { } // NewHTTPRestService creates a new HTTP Service object. -func NewHTTPRestService(config *common.ServiceConfig, wscli interfaceGetter, nmagentClient nmagentclient.NMAgentClientInterface) (cns.HTTPService, error) { +func NewHTTPRestService(config *common.ServiceConfig, wscli interfaceGetter, nmagentClient nmagentClient) (cns.HTTPService, error) { service, err := cns.NewService(config.Name, config.Version, config.ChannelMode, config.Store) if err != nil { return nil, err diff --git a/cns/restserver/util.go b/cns/restserver/util.go index 167c103e5a..c06e37ac2c 100644 --- a/cns/restserver/util.go +++ b/cns/restserver/util.go @@ -1,13 +1,10 @@ -// Copyright 2017 Microsoft. All rights reserved. -// MIT License - package restserver import ( "context" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "strconv" "time" @@ -17,7 +14,7 @@ import ( "github.com/Azure/azure-container-networking/cns/dockerclient" "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/networkcontainers" - "github.com/Azure/azure-container-networking/cns/nmagentclient" + "github.com/Azure/azure-container-networking/cns/nmagent" "github.com/Azure/azure-container-networking/cns/types" "github.com/Azure/azure-container-networking/cns/wireserver" acn "github.com/Azure/azure-container-networking/common" @@ -623,7 +620,7 @@ func (service *HTTPRestService) joinNetwork( networkID string, joinNetworkURL string) (*http.Response, error, error) { var err error - joinResponse, joinErr := nmagentclient.JoinNetwork( + joinResponse, joinErr := nmagent.JoinNetwork( networkID, joinNetworkURL) @@ -716,10 +713,11 @@ func (service *HTTPRestService) getPrimaryHostInterface(ctx context.Context) (*w if err != nil { return nil, errors.Wrap(err, "failed to get interfaces from IMDS") } - service.state.primaryInterface, err = wireserver.GetPrimaryInterfaceFromResult(res) + primary, err := wireserver.GetPrimaryInterfaceFromResult(res) if err != nil { return nil, errors.Wrap(err, "failed to get primary interface from IMDS response") } + service.state.primaryInterface = primary } return service.state.primaryInterface, nil } @@ -778,7 +776,7 @@ func (service *HTTPRestService) isNCWaitingForUpdate( return } - resp, err := nmagentclient.GetNetworkContainerVersion(ncid, getNCVersionURL.(string)) + resp, err := nmagent.GetNetworkContainerVersion(ncid, getNCVersionURL.(string)) if err != nil { logger.Printf("[Azure CNS] Failed to get NC version status from NMAgent with error: %+v. "+ "Skipping GetNCVersionStatus check from NMAgent", err) @@ -794,8 +792,8 @@ func (service *HTTPRestService) isNCWaitingForUpdate( return } - var versionResponse nmagentclient.NMANetworkContainerResponse - rBytes, _ := ioutil.ReadAll(resp.Body) + var versionResponse nmagent.NetworkContainerResponse + rBytes, _ := io.ReadAll(resp.Body) json.Unmarshal(rBytes, &versionResponse) if versionResponse.ResponseCode != "200" { returnCode = types.NetworkContainerVfpProgramPending diff --git a/cns/service/main.go b/cns/service/main.go index 249266f9ff..35e9ab778d 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -31,7 +31,7 @@ import ( "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/multitenantcontroller" "github.com/Azure/azure-container-networking/cns/multitenantcontroller/multitenantoperator" - "github.com/Azure/azure-container-networking/cns/nmagentclient" + "github.com/Azure/azure-container-networking/cns/nmagent" "github.com/Azure/azure-container-networking/cns/restserver" kubecontroller "github.com/Azure/azure-container-networking/cns/singletenantcontroller" cnstypes "github.com/Azure/azure-container-networking/cns/types" @@ -292,7 +292,7 @@ func registerNode(httpc *http.Client, httpRestService cns.HTTPService, dncEP, in ) nodeRegisterRequest.NumCPU = numCPU - supportedApis, retErr := nmagentclient.GetNmAgentSupportedApis(httpc, "") + supportedApis, retErr := nmagent.GetNmAgentSupportedApis(httpc, "") if retErr != nil { logger.Errorf("[Azure CNS] Failed to retrieve SupportedApis from NMagent of node %s with Infrastructure Network: %s PrivateEndpoint: %s", @@ -431,7 +431,7 @@ func main() { logger.Printf("[Azure CNS] Read config :%+v", cnsconfig) if cnsconfig.WireserverIP != "" { - nmagentclient.WireserverIP = cnsconfig.WireserverIP + nmagent.WireserverIP = cnsconfig.WireserverIP } if cnsconfig.ChannelMode == cns.Managed { @@ -480,7 +480,7 @@ func main() { return } - nmaclient, err := nmagentclient.NewNMAgentClient("") + nmaclient, err := nmagent.NewClient("") if err != nil { logger.Errorf("Failed to start nmagent client due to error %v", err) return @@ -766,11 +766,13 @@ func InitializeMultiTenantController(ctx context.Context, httpRestService cns.HT logger.Printf("Starting SyncHostNCVersion") go func() { // Periodically poll vfp programmed NC version from NMAgent - tickerChannel := time.Tick(cnsconfig.SyncHostNCVersionIntervalMs * time.Millisecond) + tickerChannel := time.Tick(cnsconfig.SyncHostNCVersionIntervalMs) for { select { case <-tickerChannel: - httpRestServiceImpl.SyncHostNCVersion(ctx, cnsconfig.ChannelMode, cnsconfig.SyncHostNCTimeoutMs) + timedCtx, cancel := context.WithTimeout(ctx, cnsconfig.SyncHostNCTimeoutMs) + httpRestServiceImpl.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) + cancel() case <-ctx.Done(): return } @@ -924,7 +926,9 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn for { select { case <-tickerChannel: - httpRestServiceImplementation.SyncHostNCVersion(ctx, cnsconfig.ChannelMode, cnsconfig.SyncHostNCTimeoutMs) + timedCtx, cancel := context.WithTimeout(ctx, cnsconfig.SyncHostNCTimeoutMs) + httpRestServiceImplementation.SyncHostNCVersion(timedCtx, cnsconfig.ChannelMode) + cancel() case <-ctx.Done(): return } diff --git a/cns/wireserver/net.go b/cns/wireserver/net.go index 7e96543091..ed0791d0af 100644 --- a/cns/wireserver/net.go +++ b/cns/wireserver/net.go @@ -14,15 +14,11 @@ var ( ) func GetPrimaryInterfaceFromResult(res *GetInterfacesResult) (*InterfaceInfo, error) { - interfaceInfo := &InterfaceInfo{} - found := false - // For each interface. for _, i := range res.Interface { // skip if not primary if !i.IsPrimary { continue } - interfaceInfo.IsPrimary = true // skip if no subnets if len(i.IPSubnet) == 0 { @@ -31,27 +27,26 @@ func GetPrimaryInterfaceFromResult(res *GetInterfacesResult) (*InterfaceInfo, er // get the first subnet s := i.IPSubnet[0] - interfaceInfo.Subnet = s.Prefix gw, err := calculateGatewayIP(s.Prefix) if err != nil { return nil, err } - interfaceInfo.Gateway = gw.String() + + primaryIP := "" for _, ip := range s.IPAddress { if ip.IsPrimary { - interfaceInfo.PrimaryIP = ip.Address + primaryIP = ip.Address } } - found = true - break - } - - if !found { - return nil, ErrNoPrimaryInterface + return &InterfaceInfo{ + Subnet: s.Prefix, + IsPrimary: true, + Gateway: gw.String(), + PrimaryIP: primaryIP, + }, nil } - - return interfaceInfo, nil + return nil, ErrNoPrimaryInterface } // calculateGatewayIP parses the passed CIDR string and returns the first IP in the range.