diff --git a/cns/cnsclient/cnsclient_test.go b/cns/cnsclient/cnsclient_test.go index aeae417033..850438e922 100644 --- a/cns/cnsclient/cnsclient_test.go +++ b/cns/cnsclient/cnsclient_test.go @@ -53,6 +53,7 @@ func addTestStateToRestServer(t *testing.T, secondaryIps []string) { for _, secIpAddress := range secondaryIps { secIpConfig := cns.SecondaryIPConfig{ IPAddress: secIpAddress, + NCVersion: -1, } ipId := uuid.New() secondaryIPConfigs[ipId.String()] = secIpConfig @@ -63,6 +64,9 @@ func addTestStateToRestServer(t *testing.T, secondaryIps []string) { NetworkContainerid: "testNcId1", IPConfiguration: ipConfig, SecondaryIPConfigs: secondaryIPConfigs, + // Set it as -1 to be same as default host version. + // It will allow secondary IPs status to be set as available. + Version: "-1", } returnCode := svc.CreateOrUpdateNetworkContainerInternal(req, fakes.NewFakeScalar(releasePercent, requestPercent, batchSize), fakes.NewFakeNodeNetworkConfigSpec(initPoolSize)) @@ -122,7 +126,7 @@ func TestMain(m *testing.M) { logger.InitLogger(logName, 0, 0, tmpLogDir+"/") config := common.ServiceConfig{} - httpRestService, err := restserver.NewHTTPRestService(&config, fakes.NewFakeImdsClient()) + httpRestService, err := restserver.NewHTTPRestService(&config, fakes.NewFakeImdsClient(), fakes.NewFakeNMAgentClient()) svc = httpRestService.(*restserver.HTTPRestService) svc.Name = "cns-test-server" svc.IPAMPoolMonitor = fakes.NewIPAMPoolMonitorFake() diff --git a/cns/configuration/configuration.go b/cns/configuration/configuration.go index 105fcc036a..40111ae1f2 100644 --- a/cns/configuration/configuration.go +++ b/cns/configuration/configuration.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "os" "path/filepath" + "time" "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/logger" @@ -17,14 +18,17 @@ const ( ) type CNSConfig struct { - TelemetrySettings TelemetrySettings - ManagedSettings ManagedSettings - ChannelMode string - UseHTTPS bool - TLSSubjectName string - TLSCertificatePath string - TLSPort string - WireserverIP string + TelemetrySettings TelemetrySettings + ManagedSettings ManagedSettings + ChannelMode string + UseHTTPS bool + TLSSubjectName string + TLSCertificatePath string + TLSPort string + TLSEndpoint string + WireserverIP string + SyncHostNCVersionIntervalMs time.Duration + SyncHostNCTimeoutMs time.Duration } type TelemetrySettings struct { @@ -121,11 +125,13 @@ func setManagedSettingDefaults(managedSettings *ManagedSettings) { } } -// Set Default values of CNS config if not specified +// SetCNSConfigDefaults set default values of CNS config if not specified func SetCNSConfigDefaults(config *CNSConfig) { setTelemetrySettingDefaults(&config.TelemetrySettings) setManagedSettingDefaults(&config.ManagedSettings) if config.ChannelMode == "" { config.ChannelMode = cns.Direct } + config.SyncHostNCVersionIntervalMs = 1000 + config.SyncHostNCTimeoutMs = 500 } diff --git a/cns/fakes/cnsfake.go b/cns/fakes/cnsfake.go index 6cb5e8644f..3177b1d679 100644 --- a/cns/fakes/cnsfake.go +++ b/cns/fakes/cnsfake.go @@ -1,9 +1,11 @@ package fakes import ( + "context" "encoding/json" "errors" "sync" + "time" "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/common" @@ -230,6 +232,11 @@ func (fake *HTTPServiceFake) SyncNodeStatus(string, string, string, json.RawMess return 0, "" } +// SyncHostNCVersion will update HostVersion in containerstatus. +func (fake *HTTPServiceFake) SyncHostNCVersion(context.Context, string, time.Duration) { + return +} + func (fake *HTTPServiceFake) GetPendingProgramIPConfigs() []cns.IPConfigurationStatus { ipconfigs := []cns.IPConfigurationStatus{} for _, ipconfig := range fake.IPStateManager.PendingProgramIPConfigState { diff --git a/cns/fakes/nmagentclientfake.go b/cns/fakes/nmagentclientfake.go new file mode 100644 index 0000000000..5cf601f7b1 --- /dev/null +++ b/cns/fakes/nmagentclientfake.go @@ -0,0 +1,22 @@ +// Copyright 2020 Microsoft. All rights reserved. +// MIT License + +package fakes + +// NMAgentClientTest can be used to query to VM Host info. +type NMAgentClientTest struct { +} + +// NewFakeNMAgentClient return a mock implemetation of NMAgentClient +func NewFakeNMAgentClient() *NMAgentClientTest { + return &NMAgentClientTest{} +} + +// GetNcVersionListWithOutToken is mock implementation to return nc version list. +func (nmagentclient *NMAgentClientTest) GetNcVersionListWithOutToken(ncNeedUpdateList []string) map[string]int { + ncVersionList := make(map[string]int) + for _, ncID := range ncNeedUpdateList { + ncVersionList[ncID] = 0 + } + return ncVersionList +} diff --git a/cns/nmagentclient/nmagentclient.go b/cns/nmagentclient/nmagentclient.go index e254a09547..26135a3650 100644 --- a/cns/nmagentclient/nmagentclient.go +++ b/cns/nmagentclient/nmagentclient.go @@ -5,7 +5,10 @@ import ( "encoding/json" "encoding/xml" "fmt" + "io/ioutil" "net/http" + "strconv" + "time" "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/common" @@ -13,12 +16,14 @@ import ( const ( //GetNmAgentSupportedApiURLFmt Api endpoint to get supported Apis of NMAgent - GetNmAgentSupportedApiURLFmt = "http://%s/machine/plugins/?comp=nmagent&type=GetSupportedApis" - GetNetworkContainerVersionURLFmt = "http://%s/machine/plugins/?comp=nmagent&type=NetworkManagement/interfaces/%s/networkContainers/%s/version/authenticationToken/%s/api-version/1" + GetNmAgentSupportedApiURLFmt = "http://%s/machine/plugins/?comp=nmagent&type=GetSupportedApis" + GetNetworkContainerVersionURLFmt = "http://%s/machine/plugins/?comp=nmagent&type=NetworkManagement/interfaces/%s/networkContainers/%s/version/authenticationToken/%s/api-version/1" + GetNcVersionListWithOutTokenURLFmt = "http://%s/machine/plugins/?comp=nmagent&type=NetworkManagement/interfaces/api-version/%s" ) //WireServerIP - wire server ip var WireserverIP = "168.63.129.16" +var getNcVersionListWithOutTokenURLVersion = "2" // NMANetworkContainerResponse - NMAgent response. type NMANetworkContainerResponse struct { @@ -31,6 +36,36 @@ type NMAgentSupportedApisResponseXML struct { SupportedApis []string `xml:"type"` } +type ContainerInfo struct { + NetworkContainerID string `json:"networkContainerId"` + Version string `json:"version"` +} + +type NMANetworkContainerListResponse struct { + ResponseCode string `json:"httpStatusCode"` + Containers []ContainerInfo `json:"networkContainers"` +} + +// NMAgentClient is client to handle queries to nmagent +type NMAgentClient struct { + connectionURL string +} + +// NMAgentClientInterface has interface that nmagent client will handle +type NMAgentClientInterface interface { + GetNcVersionListWithOutToken(ncNeedUpdateList []string) map[string]int +} + +// NewNMAgentClient create a new nmagent client. +func NewNMAgentClient(url string) (*NMAgentClient, error) { + if url == "" { + url = fmt.Sprintf(GetNcVersionListWithOutTokenURLFmt, WireserverIP, getNcVersionListWithOutTokenURLVersion) + } + return &NMAgentClient{ + connectionURL: url, + }, nil +} + // JoinNetwork joins the given network func JoinNetwork( networkID string, @@ -149,3 +184,42 @@ func GetNmAgentSupportedApis( logger.Printf("[NMAgentClient][Response] GetNmAgentSupportedApis. Response: %+v.", response) return xmlDoc.SupportedApis, nil } + +// GetNcVersionListWithOutToken query nmagent for programmed container version. +func (nmagentclient *NMAgentClient) GetNcVersionListWithOutToken(ncNeedUpdateList []string) map[string]int { + ncVersionList := make(map[string]int) + now := time.Now() + response, err := http.Get(nmagentclient.connectionURL) + latency := time.Since(now) + logger.Printf("[NMAgentClient][Response] GetNcVersionListWithOutToken response: %+v, latency is %d", response, latency.Milliseconds()) + + if response.StatusCode != http.StatusOK { + logger.Printf("[NMAgentClient][Response] GetNcVersionListWithOutToken failed with %d, err is %v", response.StatusCode, err) + return nil + } + + var nmaNcListResponse NMANetworkContainerListResponse + rBytes, _ := ioutil.ReadAll(response.Body) + logger.Printf("Response body is %v", rBytes) + json.Unmarshal(rBytes, &nmaNcListResponse) + if nmaNcListResponse.ResponseCode != strconv.Itoa(http.StatusOK) { + logger.Printf("[NMAgentClient][Response] GetNcVersionListWithOutToken unmarshal failed with %s", rBytes) + return nil + } + + var receivedNcVersionListInMap = make(map[string]string) + for _, containers := range nmaNcListResponse.Containers { + receivedNcVersionListInMap[containers.NetworkContainerID] = containers.Version + } + for _, ncID := range ncNeedUpdateList { + if version, ok := receivedNcVersionListInMap[ncID]; ok { + if versionInInt, err := strconv.Atoi(version); err != nil { + logger.Printf("[NMAgentClient][Response] GetNcVersionListWithOutToken translate version %s to int failed with %s", version, err) + } else { + ncVersionList[ncID] = versionInInt + logger.Printf("Containers id is %s, programmed NC version is %d", ncID, versionInInt) + } + } + } + return ncVersionList +} diff --git a/cns/requestcontroller/kubecontroller/crdtranslator.go b/cns/requestcontroller/kubecontroller/crdtranslator.go index 4a57a68030..95f184e257 100644 --- a/cns/requestcontroller/kubecontroller/crdtranslator.go +++ b/cns/requestcontroller/kubecontroller/crdtranslator.go @@ -6,6 +6,7 @@ import ( "strconv" "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/log" nnc "github.com/Azure/azure-container-networking/nodenetworkconfig/api/v1alpha" ) @@ -66,7 +67,10 @@ func CRDStatusToNCRequest(crdStatus nnc.NodeNetworkConfigStatus) (cns.CreateNetw NCVersion: ncVersion, } ncRequest.SecondaryIPConfigs[ipAssignment.Name] = secondaryIPConfig + log.Debugf("Seconday IP Configs got set, name is %s, config is %v", ipAssignment.Name, secondaryIPConfig) } + log.Printf("Set NC request info with NetworkContainerid %s, NetworkContainerType %s, NC Version %s", + ncRequest.NetworkContainerid, ncRequest.NetworkContainerType, ncRequest.Version) } //Only returning the first network container for now, later we will return a list diff --git a/cns/restserver/api_test.go b/cns/restserver/api_test.go index 07551c71bc..e266a9b457 100644 --- a/cns/restserver/api_test.go +++ b/cns/restserver/api_test.go @@ -912,7 +912,7 @@ func startService() { var err error // Create the service. config := common.ServiceConfig{} - service, err = NewHTTPRestService(&config, fakes.NewFakeImdsClient()) + service, err = NewHTTPRestService(&config, fakes.NewFakeImdsClient(), fakes.NewFakeNMAgentClient()) if err != nil { fmt.Printf("Failed to create CNS object %v\n", err) os.Exit(1) diff --git a/cns/restserver/const.go b/cns/restserver/const.go index 8cb329cf05..a1c7253f6c 100644 --- a/cns/restserver/const.go +++ b/cns/restserver/const.go @@ -38,6 +38,7 @@ const ( NetworkContainerVfpProgramComplete = 35 NetworkContainerVfpProgramCheckSkipped = 36 NmAgentSupportedApisError = 37 + UnsupportedNCVersion = 38 UnexpectedError = 99 ) diff --git a/cns/restserver/internalapi.go b/cns/restserver/internalapi.go index 9d2a4ffb3d..f545dc329b 100644 --- a/cns/restserver/internalapi.go +++ b/cns/restserver/internalapi.go @@ -5,11 +5,14 @@ package restserver import ( "bytes" + "context" "encoding/json" "fmt" "net/http" "net/http/httptest" "reflect" + "strconv" + "time" "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/logger" @@ -143,6 +146,64 @@ func (service *HTTPRestService) SyncNodeStatus(dncEP, infraVnet, nodeID string, return } +// SyncHostNCVersion will check NC version from NMAgent and save it as host NC version in container status. +// If NMAgent NC version got updated, CNS will refresh the pending programming IP status. +func (service *HTTPRestService) SyncHostNCVersion(ctx context.Context, channelMode string, syncHostNCTimeoutMilliSec time.Duration) { + var hostVersionNeedUpdateNcList []string + service.RLock() + for _, containerstatus := range service.state.ContainerStatus { + // Will open a separate PR to convert all the NC version related variable to int. Change from string to int is a pain. + hostVersion, err := strconv.Atoi(containerstatus.HostVersion) + if err != nil { + log.Errorf("Received err when change containerstatus.HostVersion %s to int, err msg %v", containerstatus.HostVersion, err) + continue + } + dncNcVersion, err := strconv.Atoi(containerstatus.CreateNetworkContainerRequest.Version) + if err != nil { + log.Errorf("Received err when change nc version %s in containerstatus to int, err msg %v", containerstatus.CreateNetworkContainerRequest.Version, err) + continue + } + // host NC version is the NC version from NMAgent, if it's smaller than NC version from DNC, then append it to indicate it needs update. + if hostVersion < dncNcVersion { + hostVersionNeedUpdateNcList = append(hostVersionNeedUpdateNcList, containerstatus.ID) + } else if hostVersion > dncNcVersion { + log.Errorf("NC version from NMAgent is larger than DNC, NC version from NMAgent is %d, NC version from DNC is %d", hostVersion, dncNcVersion) + } + } + service.RUnlock() + if len(hostVersionNeedUpdateNcList) > 0 { + ncVersionChannel := make(chan map[string]int) + ctxWithTimeout, _ := context.WithTimeout(ctx, syncHostNCTimeoutMilliSec*time.Millisecond) + go func() { + ncVersionChannel <- service.nmagentClient.GetNcVersionListWithOutToken(hostVersionNeedUpdateNcList) + close(ncVersionChannel) + }() + select { + case newHostNCVersionList := <-ncVersionChannel: + if newHostNCVersionList == nil { + logger.Errorf("Can't get vfp programmed NC version list from url without token") + } else { + service.Lock() + for ncID, newHostNCVersion := range newHostNCVersionList { + // Check whether it exist in service state and get the related nc info + if ncInfo, exist := service.state.ContainerStatus[ncID]; !exist { + logger.Errorf("Can't find NC with ID %s in service state, stop updating this host NC version", ncID) + } else { + if channelMode == cns.CRD { + service.MarkIpsAsAvailableUntransacted(ncInfo.ID, newHostNCVersion) + } + ncInfo.HostVersion = strconv.Itoa(newHostNCVersion) + service.state.ContainerStatus[ncID] = ncInfo + } + } + service.Unlock() + } + case <-ctxWithTimeout.Done(): + logger.Errorf("Timeout when getting vfp programmed NC version list from url without token") + } + } +} + // This API will be called by CNS RequestController on CRD update. func (service *HTTPRestService) ReconcileNCState(ncRequest *cns.CreateNetworkContainerRequest, podInfoByIp map[string]cns.KubernetesPodInfo, scalar nnc.Scaler, spec nnc.NodeNetworkConfigSpec) int { // check if ncRequest is null, then return as there is no CRD state yet diff --git a/cns/restserver/internalapi_test.go b/cns/restserver/internalapi_test.go index 843f0a98a7..1b43fb4826 100644 --- a/cns/restserver/internalapi_test.go +++ b/cns/restserver/internalapi_test.go @@ -4,11 +4,13 @@ package restserver import ( + "context" "encoding/json" "fmt" "reflect" "strconv" "testing" + "time" "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/fakes" @@ -44,8 +46,8 @@ func TestCreateOrUpdateNetworkContainerInternal(t *testing.T) { setEnv(t) setOrchestratorTypeInternal(cns.KubernetesCRD) - // NC version set as 0 which is the default initial value. - validateCreateOrUpdateNCInternal(t, 2, "0") + // NC version set as -1 which is the same as default host version value. + validateCreateOrUpdateNCInternal(t, 2, "-1") } func TestCreateOrUpdateNCWithLargerVersionComparedToNMAgent(t *testing.T) { @@ -54,7 +56,7 @@ func TestCreateOrUpdateNCWithLargerVersionComparedToNMAgent(t *testing.T) { setEnv(t) setOrchestratorTypeInternal(cns.KubernetesCRD) // NC version set as 1 which is larger than NC version get from mock nmagent. - validateCreateOrUpdateNCInternal(t, 2, "1") + validateCreateNCInternal(t, 2, "1") } func TestCreateAndUpdateNCWithSecondaryIPNCVersion(t *testing.T) { @@ -125,6 +127,81 @@ func TestCreateAndUpdateNCWithSecondaryIPNCVersion(t *testing.T) { } } +func TestSyncHostNCVersion(t *testing.T) { + // cns.KubernetesCRD has one more logic compared to other orchestrator type, so test both of them + orchestratorTypes := []string{cns.Kubernetes, cns.KubernetesCRD} + for _, orchestratorType := range orchestratorTypes { + testSyncHostNCVersion(t, orchestratorType) + } +} + +func testSyncHostNCVersion(t *testing.T, orchestratorType string) { + req := createNCReqeustForSyncHostNCVersion(t) + containerStatus := svc.state.ContainerStatus[req.NetworkContainerid] + if containerStatus.HostVersion != "-1" { + t.Errorf("Unexpected containerStatus.HostVersion %s, expeted host version should be -1 in string", containerStatus.HostVersion) + } + if containerStatus.CreateNetworkContainerRequest.Version != "0" { + t.Errorf("Unexpected nc version in containerStatus as %s, expeted VM version should be 0 in string", containerStatus.CreateNetworkContainerRequest.Version) + } + // When sync host NC version, it will use the orchestratorType pass in. + svc.SyncHostNCVersion(context.Background(), orchestratorType, 500*time.Millisecond) + containerStatus = svc.state.ContainerStatus[req.NetworkContainerid] + if containerStatus.HostVersion != "0" { + t.Errorf("Unexpected containerStatus.HostVersion %s, expeted host version should be 0 in string", containerStatus.HostVersion) + } + if containerStatus.CreateNetworkContainerRequest.Version != "0" { + t.Errorf("Unexpected nc version in containerStatus as %s, expeted VM version should be 0 in string", containerStatus.CreateNetworkContainerRequest.Version) + } +} + +func TestPendingIPsGotUpdatedWhenSyncHostNCVersion(t *testing.T) { + req := createNCReqeustForSyncHostNCVersion(t) + containerStatus := svc.state.ContainerStatus[req.NetworkContainerid] + + receivedSecondaryIPConfigs := containerStatus.CreateNetworkContainerRequest.SecondaryIPConfigs + if len(receivedSecondaryIPConfigs) != 1 { + t.Errorf("Unexpected receivedSecondaryIPConfigs length %d, expeted length is 1", len(receivedSecondaryIPConfigs)) + } + for i, _ := range receivedSecondaryIPConfigs { + podIPConfigState := svc.PodIPConfigState[i] + if podIPConfigState.State != cns.PendingProgramming { + t.Errorf("Unexpected State %s, expeted State is %s, received %s, IP address is %s", podIPConfigState.State, cns.PendingProgramming, podIPConfigState.State, podIPConfigState.IPAddress) + } + } + svc.SyncHostNCVersion(context.Background(), cns.CRD, 500*time.Millisecond) + containerStatus = svc.state.ContainerStatus[req.NetworkContainerid] + + receivedSecondaryIPConfigs = containerStatus.CreateNetworkContainerRequest.SecondaryIPConfigs + if len(receivedSecondaryIPConfigs) != 1 { + t.Errorf("Unexpected receivedSecondaryIPConfigs length %d, expeted length is 1", len(receivedSecondaryIPConfigs)) + } + for i, _ := range receivedSecondaryIPConfigs { + podIPConfigState := svc.PodIPConfigState[i] + if podIPConfigState.State != cns.Available { + t.Errorf("Unexpected State %s, expeted State is %s, received %s, IP address is %s", podIPConfigState.State, cns.Available, podIPConfigState.State, podIPConfigState.IPAddress) + } + } +} + +func createNCReqeustForSyncHostNCVersion(t *testing.T) cns.CreateNetworkContainerRequest { + restartService() + setEnv(t) + setOrchestratorTypeInternal(cns.KubernetesCRD) + + // NC version set as 0 which is the default initial value. + ncVersion := 0 + secondaryIPConfigs := make(map[string]cns.SecondaryIPConfig) + ncID := "testNc1" + + // Build secondaryIPConfig, it will have one item as {IPAddress:"10.0.0.16", NCVersion: 0} + ipAddress := "10.0.0.16" + secIPConfig := newSecondaryIPConfig(ipAddress, ncVersion) + ipID := uuid.New() + secondaryIPConfigs[ipID.String()] = secIPConfig + req := createNCReqInternal(t, secondaryIPConfigs, ncID, strconv.Itoa(ncVersion)) + return req +} func TestReconcileNCWithEmptyState(t *testing.T) { restartService() setEnv(t) @@ -150,12 +227,12 @@ func TestReconcileNCWithExistingState(t *testing.T) { var startingIndex = 6 for i := 0; i < 4; i++ { ipaddress := "10.0.0." + strconv.Itoa(startingIndex) - secIpConfig := newSecondaryIPConfig(ipaddress, 0) + secIpConfig := newSecondaryIPConfig(ipaddress, -1) ipId := uuid.New() secondaryIPConfigs[ipId.String()] = secIpConfig startingIndex++ } - req := generateNetworkContainerRequest(secondaryIPConfigs, "reconcileNc1", "0") + req := generateNetworkContainerRequest(secondaryIPConfigs, "reconcileNc1", "-1") expectedAllocatedPods := make(map[string]cns.KubernetesPodInfo) expectedAllocatedPods["10.0.0.6"] = cns.KubernetesPodInfo{ @@ -187,12 +264,12 @@ func TestReconcileNCWithSystemPods(t *testing.T) { var startingIndex = 6 for i := 0; i < 4; i++ { ipaddress := "10.0.0." + strconv.Itoa(startingIndex) - secIpConfig := newSecondaryIPConfig(ipaddress, 0) + secIpConfig := newSecondaryIPConfig(ipaddress, -1) ipId := uuid.New() secondaryIPConfigs[ipId.String()] = secIpConfig startingIndex++ } - req := generateNetworkContainerRequest(secondaryIPConfigs, uuid.New().String(), "0") + req := generateNetworkContainerRequest(secondaryIPConfigs, uuid.New().String(), "-1") expectedAllocatedPods := make(map[string]cns.KubernetesPodInfo) expectedAllocatedPods["10.0.0.6"] = cns.KubernetesPodInfo{ @@ -221,14 +298,30 @@ func setOrchestratorTypeInternal(orchestratorType string) { svc.state.OrchestratorType = orchestratorType } -func validateCreateOrUpdateNCInternal(t *testing.T, secondaryIpCount int, ncVersion string) { +func validateCreateNCInternal(t *testing.T, secondaryIpCount int, ncVersion string) { secondaryIPConfigs := make(map[string]cns.SecondaryIPConfig) ncId := "testNc1" + ncVersionInInt, _ := strconv.Atoi(ncVersion) + var startingIndex = 6 + for i := 0; i < secondaryIpCount; i++ { + ipaddress := "10.0.0." + strconv.Itoa(startingIndex) + secIpConfig := newSecondaryIPConfig(ipaddress, ncVersionInInt) + ipId := uuid.New() + secondaryIPConfigs[ipId.String()] = secIpConfig + startingIndex++ + } + createAndValidateNCRequest(t, secondaryIPConfigs, ncId, ncVersion) +} + +func validateCreateOrUpdateNCInternal(t *testing.T, secondaryIpCount int, ncVersion string) { + secondaryIPConfigs := make(map[string]cns.SecondaryIPConfig) + ncId := "testNc1" + ncVersionInInt, _ := strconv.Atoi(ncVersion) var startingIndex = 6 for i := 0; i < secondaryIpCount; i++ { ipaddress := "10.0.0." + strconv.Itoa(startingIndex) - secIpConfig := newSecondaryIPConfig(ipaddress, 0) + secIpConfig := newSecondaryIPConfig(ipaddress, ncVersionInInt) ipId := uuid.New() secondaryIPConfigs[ipId.String()] = secIpConfig startingIndex++ @@ -240,7 +333,7 @@ func validateCreateOrUpdateNCInternal(t *testing.T, secondaryIpCount int, ncVers fmt.Println("Validate Scaleup") for i := 0; i < secondaryIpCount; i++ { ipaddress := "10.0.0." + strconv.Itoa(startingIndex) - secIpConfig := newSecondaryIPConfig(ipaddress, 1) + secIpConfig := newSecondaryIPConfig(ipaddress, ncVersionInInt) ipId := uuid.New() secondaryIPConfigs[ipId.String()] = secIpConfig startingIndex++ @@ -304,12 +397,12 @@ func validateNetworkRequest(t *testing.T, req cns.CreateNetworkContainerRequest) var expectedIPStatus string // 0 is the default NMAgent version return from fake GetNetworkContainerInfoFromHost - if containerStatus.VMVersion > "0" { + if containerStatus.CreateNetworkContainerRequest.Version > "0" { expectedIPStatus = cns.PendingProgramming } else { expectedIPStatus = cns.Available } - t.Logf("VMVersion is %s, HostVersion is %s", containerStatus.VMVersion, containerStatus.HostVersion) + t.Logf("NC version in container status is %s, HostVersion is %s", containerStatus.CreateNetworkContainerRequest.Version, containerStatus.HostVersion) var alreadyValidated = make(map[string]string) for ipid, ipStatus := range svc.PodIPConfigState { if ipaddress, found := alreadyValidated[ipid]; !found { diff --git a/cns/restserver/ipam.go b/cns/restserver/ipam.go index cdcf20f5b9..843971a207 100644 --- a/cns/restserver/ipam.go +++ b/cns/restserver/ipam.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "net/http" + "strconv" "github.com/Azure/azure-container-networking/cns" "github.com/Azure/azure-container-networking/cns/logger" @@ -99,7 +100,7 @@ func (service *HTTPRestService) MarkIPsAsPending(numberToMark int) (map[string]c defer service.Unlock() for uuid, _ := range service.PodIPConfigState { mutableIPConfig := service.PodIPConfigState[uuid] - if mutableIPConfig.State == cns.Available { + if mutableIPConfig.State == cns.Available || mutableIPConfig.State == cns.PendingProgramming { mutableIPConfig.State = cns.PendingRelease service.PodIPConfigState[uuid] = mutableIPConfig pendingReleaseIPs[uuid] = mutableIPConfig @@ -113,6 +114,33 @@ func (service *HTTPRestService) MarkIPsAsPending(numberToMark int) (map[string]c return nil, fmt.Errorf("Failed to mark %d IP's as pending, only marked %d IP's", numberToMark, len(pendingReleaseIPs)) } +// MarkIpsAsAvailableUntransacted will update pending programming IPs to available if NMAgent side's programmed nc version keep up with nc version. +// Note: this func is an untransacted API as the caller will take a Service lock +func (service *HTTPRestService) MarkIpsAsAvailableUntransacted(ncID string, newHostNCVersion int) { + // Check whether it exist in service state and get the related nc info + if ncInfo, exist := service.state.ContainerStatus[ncID]; !exist { + logger.Errorf("Can't find NC with ID %s in service state, stop updating its pending programming IP status", ncID) + } else { + previousHostNCVersion := ncInfo.HostVersion + // We only need to handle the situation when dnc nc version is larger than programmed nc version + if previousHostNCVersion < ncInfo.CreateNetworkContainerRequest.Version && previousHostNCVersion < strconv.Itoa(newHostNCVersion) { + for uuid, secondaryIPConfigs := range ncInfo.CreateNetworkContainerRequest.SecondaryIPConfigs { + if ipConfigStatus, exist := service.PodIPConfigState[uuid]; !exist { + logger.Errorf("IP %s with uuid as %s exist in service state Secondary IP list but can't find in PodIPConfigState", ipConfigStatus.IPAddress, uuid) + } else if ipConfigStatus.State == cns.PendingProgramming && secondaryIPConfigs.NCVersion <= newHostNCVersion { + ipConfigStatus.State = cns.Available + service.PodIPConfigState[uuid] = ipConfigStatus + // Following 2 sentence assign new host version to secondary ip config. + secondaryIPConfigs.NCVersion = newHostNCVersion + ncInfo.CreateNetworkContainerRequest.SecondaryIPConfigs[uuid] = secondaryIPConfigs + logger.Printf("Change ip %s with uuid %s from pending programming to %s, current secondary ip configs is %v", ipConfigStatus.IPAddress, uuid, cns.Available, + ncInfo.CreateNetworkContainerRequest.SecondaryIPConfigs[uuid]) + } + } + } + } +} + func (service *HTTPRestService) GetPodIPConfigState() map[string]cns.IPConfigurationStatus { service.RLock() defer service.RUnlock() diff --git a/cns/restserver/ipam_test.go b/cns/restserver/ipam_test.go index c04c8fea2e..9b4cd89304 100644 --- a/cns/restserver/ipam_test.go +++ b/cns/restserver/ipam_test.go @@ -41,7 +41,7 @@ var ( func getTestService() *HTTPRestService { var config common.ServiceConfig - httpsvc, _ := NewHTTPRestService(&config, fakes.NewFakeImdsClient()) + httpsvc, _ := NewHTTPRestService(&config, fakes.NewFakeImdsClient(), fakes.NewFakeNMAgentClient()) svc = httpsvc.(*HTTPRestService) svc.IPAMPoolMonitor = fakes.NewIPAMPoolMonitorFake() setOrchestratorTypeInternal(cns.KubernetesCRD) @@ -139,13 +139,14 @@ func UpdatePodIpConfigState(t *testing.T, svc *HTTPRestService, ipconfigs map[st for _, ipconfig := range ipconfigs { secIpConfig := cns.SecondaryIPConfig{ IPAddress: ipconfig.IPAddress, + NCVersion: -1, } ipId := ipconfig.ID secondaryIPConfigs[ipId] = secIpConfig } - createAndValidateNCRequest(t, secondaryIPConfigs, testNCID, "0") + createAndValidateNCRequest(t, secondaryIPConfigs, testNCID, "-1") // update ipconfigs to expected state for ipId, ipconfig := range ipconfigs { diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index 91010fc71a..ca325db8b2 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -14,6 +14,7 @@ import ( "github.com/Azure/azure-container-networking/cns/ipamclient" "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/networkcontainers" + "github.com/Azure/azure-container-networking/cns/nmagentclient" "github.com/Azure/azure-container-networking/cns/routes" acn "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/store" @@ -38,6 +39,7 @@ type HTTPRestService struct { dockerClient *dockerclient.DockerClient imdsClient imdsclient.ImdsClientInterface ipamClient *ipamclient.IpamClient + nmagentClient nmagentclient.NMAgentClientInterface networkContainer *networkcontainers.NetworkContainers PodIPIDByOrchestratorContext map[string]string // OrchestratorContext is key and value is Pod IP uuid. PodIPConfigState map[string]cns.IPConfigurationStatus // seondaryipid(uuid) is key @@ -84,7 +86,7 @@ type networkInfo struct { } // NewHTTPRestService creates a new HTTP Service object. -func NewHTTPRestService(config *common.ServiceConfig, imdsClientInterface imdsclient.ImdsClientInterface) (cns.HTTPService, error) { +func NewHTTPRestService(config *common.ServiceConfig, imdsClientInterface imdsclient.ImdsClientInterface, nmagentClient nmagentclient.NMAgentClientInterface) (cns.HTTPService, error) { service, err := cns.NewService(config.Name, config.Version, config.ChannelMode, config.Store) if err != nil { return nil, err @@ -118,6 +120,7 @@ func NewHTTPRestService(config *common.ServiceConfig, imdsClientInterface imdscl dockerClient: dc, imdsClient: imdsClient, ipamClient: ic, + nmagentClient: nmagentClient, networkContainer: nc, PodIPIDByOrchestratorContext: podIPIDByOrchestratorContext, PodIPConfigState: podIPConfigState, diff --git a/cns/restserver/util.go b/cns/restserver/util.go index 1ca49b8de9..5fcfda7dc1 100644 --- a/cns/restserver/util.go +++ b/cns/restserver/util.go @@ -120,8 +120,10 @@ func (service *HTTPRestService) saveNetworkContainerGoalState(req cns.CreateNetw hostVersion = existingNCStatus.HostVersion existingSecondaryIPConfigs = existingNCStatus.CreateNetworkContainerRequest.SecondaryIPConfigs vfpUpdateComplete = existingNCStatus.VfpUpdateComplete - } else { + } + if hostVersion == "" { // Host version is the NC version from NMAgent, set it -1 to indicate no result from NMAgent yet. + // TODO, query NMAgent and with aggresive time out and assign latest host version. hostVersion = "-1" } @@ -240,17 +242,14 @@ func (service *HTTPRestService) updateIpConfigsStateUntransacted(req cns.CreateN } } - newNCVersion, _ := strconv.Atoi(req.Version) - nmagentNCVersion, _ := strconv.Atoi(hostVersion) - - // TODO, remove this override when background thread which update nmagent version is ready. - nmagentNCVersion = service.imdsClient.GetNetworkContainerInfoFromHostWithoutToken() - - if nmagentNCVersion >= newNCVersion { - service.addIPConfigStateUntransacted(cns.Available, req.NetworkContainerid, newIPConfigs, existingSecondaryIPConfigs) - } else { - service.addIPConfigStateUntransacted(cns.PendingProgramming, req.NetworkContainerid, newIPConfigs, existingSecondaryIPConfigs) + // Add new IPs + // TODO, will udpate NC version related variable to int, change it from string to int is a pains + var hostNCVersionInInt int + var err error + if hostNCVersionInInt, err = strconv.Atoi(hostVersion); err != nil { + return UnsupportedNCVersion, fmt.Sprintf("Invalid hostVersion is %s, err:%s", hostVersion, err) } + service.addIPConfigStateUntransacted(req.NetworkContainerid, hostNCVersionInInt, req.SecondaryIPConfigs, existingSecondaryIPConfigs) return 0, "" } @@ -258,7 +257,8 @@ func (service *HTTPRestService) updateIpConfigsStateUntransacted(req cns.CreateN // addIPConfigStateUntransacted adds the IPConfigs to the PodIpConfigState map with Available state // If the IP is already added then it will be an idempotent call. Also note, caller will // acquire/release the service lock. -func (service *HTTPRestService) addIPConfigStateUntransacted(newIPCNSStatus, ncId string, ipconfigs, existingSecondaryIPConfigs map[string]cns.SecondaryIPConfig) { +func (service *HTTPRestService) addIPConfigStateUntransacted(ncId string, hostVersion int, ipconfigs, existingSecondaryIPConfigs map[string]cns.SecondaryIPConfig) { + newIPCNSStatus := cns.Available // add ipconfigs to state for ipId, ipconfig := range ipconfigs { // New secondary IP configs has new NC version however, CNS don't want to override existing IPs'with new NC version @@ -266,11 +266,17 @@ func (service *HTTPRestService) addIPConfigStateUntransacted(newIPCNSStatus, ncI if existingIPConfig, existsInPreviousIPConfig := existingSecondaryIPConfigs[ipId]; existsInPreviousIPConfig { ipconfig.NCVersion = existingIPConfig.NCVersion ipconfigs[ipId] = ipconfig + hostVersion = existingIPConfig.NCVersion } - logger.Printf("[Azure-Cns] Set IP %s version to %d", ipconfig.IPAddress, ipconfig.NCVersion) + logger.Printf("[Azure-Cns] Set IP %s version to %d, programmed host nc version is %d", ipconfig.IPAddress, ipconfig.NCVersion, hostVersion) if _, exists := service.PodIPConfigState[ipId]; exists { continue } + // Using the updated NC version attached with IP to compare with latest nmagent version and determine IP statues. + // When reconcile, service.PodIPConfigState doens't exist, rebuild it with the help of NC version attached with IP. + if hostVersion < ipconfig.NCVersion { + newIPCNSStatus = cns.PendingProgramming + } // add the new State ipconfigStatus := cns.IPConfigurationStatus{ NCID: ncId, diff --git a/cns/service/main.go b/cns/service/main.go index f93978ebfa..0cd8696ffa 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -453,8 +453,13 @@ func main() { return } + nmaclient, err := nmagentclient.NewNMAgentClient("") + if err != nil { + logger.Errorf("Failed to start nmagent client due to error %v", err) + return + } // Create CNS object. - httpRestService, err := restserver.NewHTTPRestService(&config, new(imdsclient.ImdsClient)) + httpRestService, err := restserver.NewHTTPRestService(&config, new(imdsclient.ImdsClient), nmaclient) if err != nil { logger.Errorf("Failed to create CNS object, err:%v.\n", err) return @@ -562,6 +567,16 @@ func main() { return } + logger.Printf("Starting SyncHostNCVersion") + rootCxt := context.Background() + go func() { + // Periodically poll vfp programmed NC version from NMAgent + for { + <-time.NewTicker(cnsconfig.SyncHostNCVersionIntervalMs * time.Millisecond).C + httpRestServiceImplementation.SyncHostNCVersion(rootCxt, config.ChannelMode, cnsconfig.SyncHostNCTimeoutMs) + } + }() + // initialize the ipam pool monitor httpRestServiceImplementation.IPAMPoolMonitor = ipampoolmonitor.NewCNSIPAMPoolMonitor(httpRestServiceImplementation, requestController)