diff --git a/azure-ipam/ipconfig/ipconfig.go b/azure-ipam/ipconfig/ipconfig.go index 6906f8cd72..a560d44743 100644 --- a/azure-ipam/ipconfig/ipconfig.go +++ b/azure-ipam/ipconfig/ipconfig.go @@ -32,6 +32,7 @@ func CreateIPConfigReq(args *cniSkel.CmdArgs) (cns.IPConfigRequest, error) { PodInterfaceID: args.ContainerID, InfraContainerID: args.ContainerID, OrchestratorContext: orchestratorContext, + Ifname: args.IfName, } return req, nil diff --git a/cns/NetworkContainerContract.go b/cns/NetworkContainerContract.go index 644bc44bc3..d253eb0b19 100644 --- a/cns/NetworkContainerContract.go +++ b/cns/NetworkContainerContract.go @@ -390,7 +390,7 @@ type IPConfigRequest struct { PodInterfaceID string InfraContainerID string OrchestratorContext json.RawMessage - Ifname string + Ifname string // Used by delegated IPAM } func (i IPConfigRequest) String() string { diff --git a/cns/azure-cns.yaml b/cns/azure-cns.yaml index 9bb98b713e..541c2a6d84 100644 --- a/cns/azure-cns.yaml +++ b/cns/azure-cns.yaml @@ -104,6 +104,8 @@ spec: mountPath: /var/log - name: cns-state mountPath: /var/lib/azure-network + - name: azure-endpoints + mountPath: /var/run/azure-cns/ - name: cns-config mountPath: /etc/azure-cns - name: cni-bin @@ -130,6 +132,10 @@ spec: fieldPath: spec.nodeName hostNetwork: true volumes: + - name: azure-endpoints + hostPath: + path: /var/run/azure-cns/ + type: DirectoryOrCreate - name: log hostPath: path: /var/log @@ -179,5 +185,8 @@ data: "NodeSyncIntervalInSeconds": 30 }, "ChannelMode": "CRD", - "InitializeFromCNI": true + "InitializeFromCNI": true, + "ManageEndpointState": false, + "ProgramSNATIPTables" : false } +# Toggle ManageEndpointState and ProgramSNATIPTables to true for delegated IPAM use case. diff --git a/cns/client/client_test.go b/cns/client/client_test.go index ae5deb929e..e1e03f7d55 100644 --- a/cns/client/client_test.go +++ b/cns/client/client_test.go @@ -169,7 +169,7 @@ func TestMain(m *testing.M) { logger.InitLogger(logName, 0, 0, tmpLogDir+"/") config := common.ServiceConfig{} - httpRestService, err := restserver.NewHTTPRestService(&config, &fakes.WireserverClientFake{}, &fakes.NMAgentClientFake{}) + httpRestService, err := restserver.NewHTTPRestService(&config, &fakes.WireserverClientFake{}, &fakes.NMAgentClientFake{}, nil) svc = httpRestService.(*restserver.HTTPRestService) svc.Name = "cns-test-server" fakeNNC := v1alpha.NodeNetworkConfig{ diff --git a/cns/cnireconciler/podinfoprovider.go b/cns/cnireconciler/podinfoprovider.go index 959ca64259..ee9e71a29d 100644 --- a/cns/cnireconciler/podinfoprovider.go +++ b/cns/cnireconciler/podinfoprovider.go @@ -6,6 +6,8 @@ import ( "github.com/Azure/azure-container-networking/cni/api" "github.com/Azure/azure-container-networking/cni/client" "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/restserver" + "github.com/Azure/azure-container-networking/store" "github.com/pkg/errors" "k8s.io/utils/exec" ) @@ -16,6 +18,27 @@ func NewCNIPodInfoProvider() (cns.PodInfoByIPProvider, error) { return newCNIPodInfoProvider(exec.New()) } +func NewCNSPodInfoProvider(endpointStore store.KeyValueStore) (cns.PodInfoByIPProvider, error) { + return newCNSPodInfoProvider(endpointStore) +} + +func newCNSPodInfoProvider(endpointStore store.KeyValueStore) (cns.PodInfoByIPProvider, error) { + var state map[string]*restserver.EndpointInfo + err := endpointStore.Read(restserver.EndpointStoreKey, &state) + if err != nil { + if errors.Is(err, store.ErrKeyNotFound) { + // Nothing to restore. + return cns.PodInfoByIPProviderFunc(func() (map[string]cns.PodInfo, error) { + return endpointStateToPodInfoByIP(state) + }), err + } + return nil, fmt.Errorf("failed to read endpoints state from store : %w", err) + } + return cns.PodInfoByIPProviderFunc(func() (map[string]cns.PodInfo, error) { + return endpointStateToPodInfoByIP(state) + }), nil +} + func newCNIPodInfoProvider(exec exec.Interface) (cns.PodInfoByIPProvider, error) { cli := client.New(exec) state, err := cli.GetEndpointState() @@ -44,3 +67,34 @@ func cniStateToPodInfoByIP(state *api.AzureCNIState) (map[string]cns.PodInfo, er } return podInfoByIP, nil } + +func endpointStateToPodInfoByIP(state map[string]*restserver.EndpointInfo) (map[string]cns.PodInfo, error) { + podInfoByIP := map[string]cns.PodInfo{} + for containerID, endpointInfo := range state { // for each endpoint + for ifname, ipinfo := range endpointInfo.IfnameToIPMap { // for each IP info object of the endpoint's interfaces + for _, ipv4conf := range ipinfo.IPv4 { // for each IPv4 config of the endpoint's interfaces + if _, ok := podInfoByIP[ipv4conf.IP.String()]; ok { + return nil, errors.Wrap(cns.ErrDuplicateIP, ipv4conf.IP.String()) + } + podInfoByIP[ipv4conf.IP.String()] = cns.NewPodInfo( + containerID, + ifname, + endpointInfo.PodName, + endpointInfo.PodNamespace, + ) + } + for _, ipv6conf := range ipinfo.IPv6 { // for each IPv6 config of the endpoint's interfaces + if _, ok := podInfoByIP[ipv6conf.IP.String()]; ok { + return nil, errors.Wrap(cns.ErrDuplicateIP, ipv6conf.IP.String()) + } + podInfoByIP[ipv6conf.IP.String()] = cns.NewPodInfo( + containerID, + ifname, + endpointInfo.PodName, + endpointInfo.PodNamespace, + ) + } + } + } + return podInfoByIP, nil +} diff --git a/cns/cnireconciler/podinfoprovider_test.go b/cns/cnireconciler/podinfoprovider_test.go index 8c426a3630..d81ff3e0b1 100644 --- a/cns/cnireconciler/podinfoprovider_test.go +++ b/cns/cnireconciler/podinfoprovider_test.go @@ -1,9 +1,12 @@ package cnireconciler import ( + "net" "testing" "github.com/Azure/azure-container-networking/cns" + "github.com/Azure/azure-container-networking/cns/restserver" + "github.com/Azure/azure-container-networking/store" testutils "github.com/Azure/azure-container-networking/test/utils" "github.com/stretchr/testify/assert" "k8s.io/utils/exec" @@ -59,3 +62,49 @@ func TestNewCNIPodInfoProvider(t *testing.T) { }) } } + +func TestNewCNSPodInfoProvider(t *testing.T) { + goodStore := store.NewMockStore("") + goodEndpointState := make(map[string]*restserver.EndpointInfo) + endpointInfo := &restserver.EndpointInfo{PodName: "goldpinger-deploy-bbbf9fd7c-z8v4l", PodNamespace: "default", IfnameToIPMap: make(map[string]*restserver.IPInfo)} + endpointInfo.IfnameToIPMap["eth0"] = &restserver.IPInfo{IPv4: []net.IPNet{{IP: net.IPv4(10, 241, 0, 65), Mask: net.IPv4Mask(255, 255, 255, 0)}}} + + goodEndpointState["0a4917617e15d24dc495e407d8eb5c88e4406e58fa209e4eb75a2c2fb7045eea"] = endpointInfo + err := goodStore.Write(restserver.EndpointStoreKey, goodEndpointState) + if err != nil { + t.Fatalf("Error writing to store: %v", err) + } + tests := []struct { + name string + store store.KeyValueStore + want map[string]cns.PodInfo + wantErr bool + }{ + { + name: "good", + store: goodStore, + want: map[string]cns.PodInfo{"10.241.0.65": cns.NewPodInfo("0a4917617e15d24dc495e407d8eb5c88e4406e58fa209e4eb75a2c2fb7045eea", "eth0", "goldpinger-deploy-bbbf9fd7c-z8v4l", "default")}, + wantErr: false, + }, + { + name: "empty store", + store: store.NewMockStore(""), + want: map[string]cns.PodInfo{}, + wantErr: true, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + got, err := newCNSPodInfoProvider(tt.store) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + podInfoByIP, _ := got.PodInfoByIP() + assert.Equal(t, tt.want, podInfoByIP) + }) + } +} diff --git a/cns/configuration/configuration.go b/cns/configuration/configuration.go index ddfcd2c5ba..4ce7cd1402 100644 --- a/cns/configuration/configuration.go +++ b/cns/configuration/configuration.go @@ -37,6 +37,7 @@ type CNSConfig struct { KeyVaultSettings KeyVaultSettings MSISettings MSISettings ProgramSNATIPTables bool + ManageEndpointState bool } type TelemetrySettings struct { diff --git a/cns/logger/cnslogger.go b/cns/logger/cnslogger.go index 02250d29b5..a19a2bee75 100644 --- a/cns/logger/cnslogger.go +++ b/cns/logger/cnslogger.go @@ -85,6 +85,17 @@ func (c *CNSLogger) Debugf(format string, args ...any) { c.sendTraceInternal(msg) } +func (c *CNSLogger) Warnf(format string, args ...any) { + c.logger.Warnf(format, args...) + + if c.th == nil || c.DisableTraceLogging { + return + } + + msg := fmt.Sprintf(format, args...) + c.sendTraceInternal(msg) +} + func (c *CNSLogger) Errorf(format string, args ...any) { c.logger.Errorf(format, args...) diff --git a/cns/logger/log.go b/cns/logger/log.go index 8dfcadc5c1..aab0befc23 100644 --- a/cns/logger/log.go +++ b/cns/logger/log.go @@ -37,6 +37,10 @@ func Debugf(format string, args ...any) { Log.Debugf(format, args...) } +func Warnf(format string, args ...any) { + Log.Warnf(format, args...) +} + func LogEvent(event aitelemetry.Event) { Log.LogEvent(event) } diff --git a/cns/restserver/api_test.go b/cns/restserver/api_test.go index f6c7f2d39b..a9ecae7e2c 100644 --- a/cns/restserver/api_test.go +++ b/cns/restserver/api_test.go @@ -1078,7 +1078,7 @@ func startService() error { } nmagentClient := &fakes.NMAgentClientFake{} - service, err = NewHTTPRestService(&config, &fakes.WireserverClientFake{}, nmagentClient) + service, err = NewHTTPRestService(&config, &fakes.WireserverClientFake{}, nmagentClient, nil) if err != nil { return err } diff --git a/cns/restserver/const.go b/cns/restserver/const.go index 9d94a74fa8..d51814a1d7 100644 --- a/cns/restserver/const.go +++ b/cns/restserver/const.go @@ -2,9 +2,10 @@ package restserver const ( // Key against which CNS state is persisted. - storeKey = "ContainerNetworkService" - attach = "Attach" - detach = "Detach" + storeKey = "ContainerNetworkService" + EndpointStoreKey = "Endpoints" + attach = "Attach" + detach = "Detach" // Rest service state identifier for named lock stateJoinedNetworks = "JoinedNetworks" dncApiVersion = "?api-version=2018-03-01" diff --git a/cns/restserver/ipam.go b/cns/restserver/ipam.go index 1feead1487..15d4e2ad94 100644 --- a/cns/restserver/ipam.go +++ b/cns/restserver/ipam.go @@ -5,6 +5,7 @@ package restserver import ( "fmt" + "net" "net/http" "strconv" @@ -12,6 +13,7 @@ import ( "github.com/Azure/azure-container-networking/cns/filter" "github.com/Azure/azure-container-networking/cns/logger" "github.com/Azure/azure-container-networking/cns/types" + "github.com/Azure/azure-container-networking/common" "github.com/pkg/errors" ) @@ -65,6 +67,25 @@ func (service *HTTPRestService) requestIPConfigHandler(w http.ResponseWriter, r ipAssignmentLatency.Observe(since.Seconds()) } }() + + // Check if http rest service managed endpoint state is set + if service.Options[common.OptManageEndpointState] == true { + err = service.updateEndpointState(ipconfigRequest, podInfo, podIPInfo) + if err != nil { + reserveResp := &cns.IPConfigResponse{ + Response: cns.Response{ + ReturnCode: types.UnexpectedError, + Message: fmt.Sprintf("Update endpoint state failed: %v ", err), + }, + PodIpInfo: podIPInfo, + } + w.Header().Set(cnsReturnCode, reserveResp.Response.ReturnCode.String()) + err = service.Listener.Encode(w, &reserveResp) + logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, reserveResp.Response.ReturnCode, err) + return + } + } + reserveResp := &cns.IPConfigResponse{ Response: cns.Response{ ReturnCode: types.Success, @@ -76,6 +97,73 @@ func (service *HTTPRestService) requestIPConfigHandler(w http.ResponseWriter, r logger.ResponseEx(service.Name+operationName, ipconfigRequest, reserveResp, reserveResp.Response.ReturnCode, err) } +var ( + errStoreEmpty = errors.New("empty endpoint state store") + errParsePodIPFailed = errors.New("failed to parse pod's ip") +) + +func (service *HTTPRestService) updateEndpointState(ipconfigRequest cns.IPConfigRequest, podInfo cns.PodInfo, podIPInfo cns.PodIpInfo) error { + if service.EndpointStateStore == nil { + return errStoreEmpty + } + service.Lock() + defer service.Unlock() + logger.Printf("[updateEndpointState] Updating endpoint state for infra container %s", ipconfigRequest.InfraContainerID) + if endpointInfo, ok := service.EndpointState[ipconfigRequest.InfraContainerID]; ok { + logger.Warnf("[updateEndpointState] Found existing endpoint state for infra container %s", ipconfigRequest.InfraContainerID) + ip := net.ParseIP(podIPInfo.PodIPConfig.IPAddress) + if ip == nil { + logger.Errorf("failed to parse pod ip address %s", podIPInfo.PodIPConfig.IPAddress) + return errParsePodIPFailed + } + if ip.To4() == nil { // is an ipv6 address + ipconfig := net.IPNet{IP: ip, Mask: net.CIDRMask(int(podIPInfo.PodIPConfig.PrefixLength), 128)} // nolint + for _, ipconf := range endpointInfo.IfnameToIPMap[ipconfigRequest.Ifname].IPv6 { + if ipconf.IP.Equal(ipconfig.IP) { + logger.Printf("[updateEndpointState] Found existing ipv6 ipconfig for infra container %s", ipconfigRequest.InfraContainerID) + return nil + } + } + endpointInfo.IfnameToIPMap[ipconfigRequest.Ifname].IPv6 = append(endpointInfo.IfnameToIPMap[ipconfigRequest.Ifname].IPv6, ipconfig) + } else { + ipconfig := net.IPNet{IP: ip, Mask: net.CIDRMask(int(podIPInfo.PodIPConfig.PrefixLength), 32)} // nolint + for _, ipconf := range endpointInfo.IfnameToIPMap[ipconfigRequest.Ifname].IPv4 { + if ipconf.IP.Equal(ipconfig.IP) { + logger.Printf("[updateEndpointState] Found existing ipv4 ipconfig for infra container %s", ipconfigRequest.InfraContainerID) + return nil + } + } + endpointInfo.IfnameToIPMap[ipconfigRequest.Ifname].IPv4 = append(endpointInfo.IfnameToIPMap[ipconfigRequest.Ifname].IPv4, ipconfig) + } + + service.EndpointState[ipconfigRequest.InfraContainerID] = endpointInfo + + } else { + endpointInfo := &EndpointInfo{PodName: podInfo.Name(), PodNamespace: podInfo.Namespace(), IfnameToIPMap: make(map[string]*IPInfo)} + ip := net.ParseIP(podIPInfo.PodIPConfig.IPAddress) + if ip == nil { + logger.Errorf("failed to parse pod ip address %s", podIPInfo.PodIPConfig.IPAddress) + return errParsePodIPFailed + } + ipInfo := &IPInfo{} + if ip.To4() == nil { // is an ipv6 address + ipconfig := net.IPNet{IP: ip, Mask: net.CIDRMask(int(podIPInfo.PodIPConfig.PrefixLength), 128)} // nolint + ipInfo.IPv6 = append(ipInfo.IPv6, ipconfig) + } else { + ipconfig := net.IPNet{IP: ip, Mask: net.CIDRMask(int(podIPInfo.PodIPConfig.PrefixLength), 32)} // nolint + ipInfo.IPv4 = append(ipInfo.IPv4, ipconfig) + } + endpointInfo.IfnameToIPMap[ipconfigRequest.Ifname] = ipInfo + service.EndpointState[ipconfigRequest.InfraContainerID] = endpointInfo + } + + err := service.EndpointStateStore.Write(EndpointStoreKey, service.EndpointState) + if err != nil { + return fmt.Errorf("failed to write endpoint state to store: %w", err) + } + return nil +} + func (service *HTTPRestService) releaseIPConfigHandler(w http.ResponseWriter, r *http.Request) { var req cns.IPConfigRequest err := service.Listener.Decode(w, r, &req) @@ -94,6 +182,21 @@ func (service *HTTPRestService) releaseIPConfigHandler(w http.ResponseWriter, r podInfo, returnCode, message := service.validateIPConfigRequest(req) + // Check if http rest service managed endpoint state is set + if service.Options[common.OptManageEndpointState] == true { + if err = service.removeEndpointState(podInfo); err != nil { + resp := cns.Response{ + ReturnCode: types.UnexpectedError, + Message: err.Error(), + } + logger.Errorf("releaseIPConfigHandler remove endpoint state failed because %v, release IP config info %s", resp.Message, req) + w.Header().Set(cnsReturnCode, resp.ReturnCode.String()) + err = service.Listener.Encode(w, &resp) + logger.ResponseEx(service.Name, req, resp, resp.ReturnCode, err) + return + } + } + if err = service.releaseIPConfig(podInfo); err != nil { returnCode = types.UnexpectedError message = err.Error() @@ -108,6 +211,25 @@ func (service *HTTPRestService) releaseIPConfigHandler(w http.ResponseWriter, r logger.ResponseEx(service.Name, req, resp, resp.ReturnCode, err) } +func (service *HTTPRestService) removeEndpointState(podInfo cns.PodInfo) error { + if service.EndpointStateStore == nil { + return errStoreEmpty + } + service.Lock() + defer service.Unlock() + logger.Printf("[removeEndpointState] Removing endpoint state for infra container %s", podInfo.InfraContainerID()) + if _, ok := service.EndpointState[podInfo.InfraContainerID()]; ok { + delete(service.EndpointState, podInfo.InfraContainerID()) + err := service.EndpointStateStore.Write(EndpointStoreKey, service.EndpointState) + if err != nil { + return fmt.Errorf("failed to write endpoint state to store: %w", err) + } + } else { // will not fail if no endpoint state for infra container id is found + logger.Printf("[removeEndpointState] No endpoint state found for infra container %s", podInfo.InfraContainerID()) + } + return nil +} + // MarkIPAsPendingRelease will set the IPs which are in PendingProgramming or Available to PendingRelease state // It will try to update [totalIpsToRelease] number of ips. func (service *HTTPRestService) MarkIPAsPendingRelease(totalIpsToRelease int) (map[string]cns.IPConfigurationStatus, error) { diff --git a/cns/restserver/ipam_test.go b/cns/restserver/ipam_test.go index dc0a5f8e9c..7fbef159ee 100644 --- a/cns/restserver/ipam_test.go +++ b/cns/restserver/ipam_test.go @@ -4,6 +4,8 @@ package restserver import ( + "fmt" + "net" "strconv" "testing" @@ -12,6 +14,7 @@ import ( "github.com/Azure/azure-container-networking/cns/fakes" "github.com/Azure/azure-container-networking/cns/types" "github.com/Azure/azure-container-networking/crd/nodenetworkconfig/api/v1alpha" + "github.com/Azure/azure-container-networking/store" "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) @@ -37,7 +40,7 @@ var ( func getTestService() *HTTPRestService { var config common.ServiceConfig - httpsvc, _ := NewHTTPRestService(&config, &fakes.WireserverClientFake{}, &fakes.NMAgentClientFake{}) + httpsvc, _ := NewHTTPRestService(&config, &fakes.WireserverClientFake{}, &fakes.NMAgentClientFake{}, store.NewMockStore("")) svc = httpsvc.(*HTTPRestService) svc.IPAMPoolMonitor = &fakes.MonitorFake{} setOrchestratorTypeInternal(cns.KubernetesCRD) @@ -125,6 +128,70 @@ func UpdatePodIpConfigState(t *testing.T, svc *HTTPRestService, ipconfigs map[st return nil } +func TestEndpointStateReadAndWrite(t *testing.T) { + svc := getTestService() + testState := NewPodState(testIP1, 24, testPod1GUID, testNCID, types.Available, 0) + ipconfigs := map[string]cns.IPConfigurationStatus{ + testState.ID: testState, + } + err := UpdatePodIpConfigState(t, svc, ipconfigs) + if err != nil { + t.Fatalf("Expected to not fail update service with config: %+v", err) + } + req := cns.IPConfigRequest{ + PodInterfaceID: testPod1Info.InterfaceID(), + InfraContainerID: testPod1Info.InfraContainerID(), + } + b, _ := testPod1Info.OrchestratorContext() + req.OrchestratorContext = b + req.Ifname = "eth0" + podIPInfo, err := requestIPConfigHelper(svc, req) + if err != nil { + t.Fatalf("Expected to not fail getting pod ip info: %+v", err) + } + ip, ipnet, err := net.ParseCIDR(podIPInfo.PodIPConfig.IPAddress + "/" + fmt.Sprint(podIPInfo.PodIPConfig.PrefixLength)) + if err != nil { + t.Fatalf("failed to parse pod ip address: %+v", err) + } + ipconfig := net.IPNet{IP: ip, Mask: ipnet.Mask} + ipInfo := &IPInfo{} + if ip.To4() == nil { // is an ipv6 address + ipInfo.IPv6 = append(ipInfo.IPv6, ipconfig) + } else { + ipInfo.IPv4 = append(ipInfo.IPv4, ipconfig) + } + + // add + desiredState := map[string]*EndpointInfo{req.InfraContainerID: {PodName: testPod1Info.Name(), PodNamespace: testPod1Info.Namespace(), IfnameToIPMap: map[string]*IPInfo{req.Ifname: ipInfo}}} + err = svc.updateEndpointState(req, testPod1Info, podIPInfo) + if err != nil { + t.Fatalf("Expected to not fail updating endpoint state: %+v", err) + } + assert.Equal(t, desiredState, svc.EndpointState) + + // consecutive add of same endpoint should not change state or cause error + err = svc.updateEndpointState(req, testPod1Info, podIPInfo) + if err != nil { + t.Fatalf("Expected to not fail updating existing endpoint state: %+v", err) + } + assert.Equal(t, desiredState, svc.EndpointState) + + // delete + desiredState = map[string]*EndpointInfo{} + err = svc.removeEndpointState(testPod1Info) + if err != nil { + t.Fatalf("Expected to not fail removing endpoint state: %+v", err) + } + assert.Equal(t, desiredState, svc.EndpointState) + + // delete non-existent endpoint should not change state or cause error + err = svc.removeEndpointState(testPod1Info) + if err != nil { + t.Fatalf("Expected to not fail removing non existing key: %+v", err) + } + assert.Equal(t, desiredState, svc.EndpointState) +} + // Want first IP func TestIPAMGetAvailableIPConfig(t *testing.T) { svc := getTestService() diff --git a/cns/restserver/restserver.go b/cns/restserver/restserver.go index d2e3ae8096..5106583e66 100644 --- a/cns/restserver/restserver.go +++ b/cns/restserver/restserver.go @@ -2,6 +2,7 @@ package restserver import ( "context" + "net" "sync" "time" @@ -58,7 +59,20 @@ type HTTPRestService struct { state *httpRestServiceState podsPendingIPAssignment *bounded.TimedSet sync.RWMutex - dncPartitionKey string + dncPartitionKey string + EndpointState map[string]*EndpointInfo // key : container id + EndpointStateStore store.KeyValueStore +} + +type EndpointInfo struct { + PodName string + PodNamespace string + IfnameToIPMap map[string]*IPInfo // key : interface name, value : IPInfo +} + +type IPInfo struct { + IPv4 []net.IPNet + IPv6 []net.IPNet } type GetHTTPServiceDataResponse struct { @@ -109,7 +123,7 @@ type networkInfo struct { } // NewHTTPRestService creates a new HTTP Service object. -func NewHTTPRestService(config *common.ServiceConfig, wscli interfaceGetter, nmagentClient nmagentClient) (cns.HTTPService, error) { +func NewHTTPRestService(config *common.ServiceConfig, wscli interfaceGetter, nmagentClient nmagentClient, endpointStateStore store.KeyValueStore) (cns.HTTPService, error) { service, err := cns.NewService(config.Name, config.Version, config.ChannelMode, config.Store) if err != nil { return nil, err @@ -158,6 +172,8 @@ func NewHTTPRestService(config *common.ServiceConfig, wscli interfaceGetter, nma routingTable: routingTable, state: serviceState, podsPendingIPAssignment: bounded.NewTimedSet(250), // nolint:gomnd // maxpods + EndpointStateStore: endpointStateStore, + EndpointState: make(map[string]*EndpointInfo), }, nil } diff --git a/cns/restserver/util.go b/cns/restserver/util.go index 8696125019..05acd5701e 100644 --- a/cns/restserver/util.go +++ b/cns/restserver/util.go @@ -95,6 +95,21 @@ func (service *HTTPRestService) restoreState() { } logger.Printf("[Azure CNS] Restored state, %+v\n", service.state) + + if service.Options[acn.OptManageEndpointState] == true { + err := service.EndpointStateStore.Read(EndpointStoreKey, &service.EndpointState) + if err != nil { + if errors.Is(err, store.ErrKeyNotFound) { + // Nothing to restore. + logger.Printf("[Azure CNS] No endpoint state to restore.\n") + } else { + logger.Errorf("[Azure CNS] Failed to restore endpoint state, err:%v. Removing endpoints.json", err) + } + return + } + logger.Printf("[Azure CNS] Restored endpoint state, %+v\n", service.EndpointState) + + } } func (service *HTTPRestService) saveNetworkContainerGoalState( diff --git a/cns/service/main.go b/cns/service/main.go index 2f5fb76a96..a13343ec8b 100644 --- a/cns/service/main.go +++ b/cns/service/main.go @@ -67,6 +67,8 @@ const ( // Service name. name = "azure-cns" pluginName = "azure-vnet" + endpointStoreName = "azure-endpoints" + endpointStoreLocation = "/var/run/azure-cns/" defaultCNINetworkConfigFileName = "10-azure.conflist" dncApiVersion = "?api-version=2018-03-01" poolIPAMRefreshRateInMilliseconds = 1000 @@ -438,8 +440,9 @@ func main() { // Initialize CNS. var ( - err error - config common.ServiceConfig + err error + config common.ServiceConfig + endpointStateStore store.KeyValueStore ) config.Version = version @@ -541,9 +544,34 @@ func main() { logger.Errorf("Failed to start nmagent client due to error %v", err) return } + + // Initialize endpoint state store if cns is managing endpoint state. + if cnsconfig.ManageEndpointState { + log.Printf("[Azure CNS] Configured to manage endpoints state") + endpointStoreLock, err := processlock.NewFileLock(platform.CNILockPath + endpointStoreName + store.LockExtension) // nolint + if err != nil { + log.Printf("Error initializing endpoint state file lock:%v", err) + return + } + defer endpointStoreLock.Unlock() // nolint + + err = platform.CreateDirectory(endpointStoreLocation) + if err != nil { + logger.Errorf("Failed to create File Store directory %s, due to Error:%v", storeFileLocation, err.Error()) + return + } + // Create the key value store. + storeFileName := endpointStoreLocation + endpointStoreName + ".json" + endpointStateStore, err = store.NewJsonFileStore(storeFileName, endpointStoreLock) + if err != nil { + logger.Errorf("Failed to create endpoint state store file: %s, due to error %v\n", storeFileName, err) + return + } + } + // Create CNS object. - httpRestService, err := restserver.NewHTTPRestService(&config, &wireserver.Client{HTTPClient: &http.Client{}}, nmaclient) + httpRestService, err := restserver.NewHTTPRestService(&config, &wireserver.Client{HTTPClient: &http.Client{}}, nmaclient, endpointStateStore) if err != nil { logger.Errorf("Failed to create CNS object, err:%v.\n", err) return @@ -557,6 +585,7 @@ func main() { httpRestService.SetOption(acn.OptHttpConnectionTimeout, httpConnectionTimeout) httpRestService.SetOption(acn.OptHttpResponseHeaderTimeout, httpResponseHeaderTimeout) httpRestService.SetOption(acn.OptProgramSNATIPTables, cnsconfig.ProgramSNATIPTables) + httpRestService.SetOption(acn.OptManageEndpointState, cnsconfig.ManageEndpointState) // Create default ext network if commandline option is set if len(strings.TrimSpace(createDefaultExtNetworkType)) > 0 { @@ -973,13 +1002,24 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn } var podInfoByIPProvider cns.PodInfoByIPProvider - if cnsconfig.InitializeFromCNI { + switch { + case cnsconfig.ManageEndpointState: + logger.Printf("Initializing from self managed endpoint store") + podInfoByIPProvider, err = cnireconciler.NewCNSPodInfoProvider(httpRestServiceImplementation.EndpointStateStore) // get reference to endpoint state store from rest server + if err != nil { + if errors.Is(err, store.ErrKeyNotFound) { + logger.Printf("[Azure CNS] No endpoint state found, skipping initializing CNS state") + } else { + return errors.Wrap(err, "failed to create CNS PodInfoProvider") + } + } + case cnsconfig.InitializeFromCNI: logger.Printf("Initializing from CNI") podInfoByIPProvider, err = cnireconciler.NewCNIPodInfoProvider() if err != nil { return errors.Wrap(err, "failed to create CNI PodInfoProvider") } - } else { + default: logger.Printf("Initializing from Kubernetes") podInfoByIPProvider = cns.PodInfoByIPProviderFunc(func() (map[string]cns.PodInfo, error) { pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{ //nolint:govet // ignore err shadow @@ -995,7 +1035,6 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn return podInfo, nil }) } - // create scoped kube clients. nnccli, err := nodenetworkconfig.NewClient(kubeConfig) if err != nil { diff --git a/cns/types/codes.go b/cns/types/codes.go index 6d75e13e1b..b61e1b3ba5 100644 --- a/cns/types/codes.go +++ b/cns/types/codes.go @@ -39,6 +39,7 @@ const ( NmAgentSupportedApisError ResponseCode = 37 UnsupportedNCVersion ResponseCode = 38 FailedToRunIPTableCmd ResponseCode = 39 + NilEndpointStateStore ResponseCode = 40 UnexpectedError ResponseCode = 99 ) diff --git a/common/config.go b/common/config.go index 996491828a..d3f17855f4 100644 --- a/common/config.go +++ b/common/config.go @@ -102,6 +102,9 @@ const ( OptHttpResponseHeaderTimeout = "http-response-header-timeout" OptHttpResponseHeaderTimeoutAlias = "httprespheadertimeout" + // Enable CNS to manage endpoint state + OptManageEndpointState = "manage-endpoint-state" + // Store file location OptStoreFileLocation = "store-file-path" OptStoreFileLocationAlias = "storefilepath" diff --git a/log/logger.go b/log/logger.go index 065cd6e064..a1299c7af7 100644 --- a/log/logger.go +++ b/log/logger.go @@ -257,3 +257,12 @@ func (logger *Logger) Debugf(format string, args ...interface{}) { func (logger *Logger) Errorf(format string, args ...interface{}) { logger.Logf(format, args...) } + +// Warnf logs a formatted string at warninglevel +func (logger *Logger) Warnf(format string, args ...interface{}) { + if logger.level < LevelWarning { + return + } + + logger.Logf(format, args...) +} diff --git a/store/mockstore.go b/store/mockstore.go index 4077510d6b..7dda8b1938 100644 --- a/store/mockstore.go +++ b/store/mockstore.go @@ -1,30 +1,48 @@ package store import ( + "encoding/json" + "fmt" "time" ) type mockStore struct { lockFilePath string + data map[string]*json.RawMessage } // NewMockStore creates a new jsonFileStore object, accessed as a KeyValueStore. func NewMockStore(lockFilePath string) KeyValueStore { return &mockStore{ lockFilePath: lockFilePath, + data: make(map[string]*json.RawMessage), } } func (ms *mockStore) Exists() bool { - return false + return ms.data == nil } // Read restores the value for the given key from persistent store. func (ms *mockStore) Read(key string, value interface{}) error { + if _, ok := ms.data[key]; !ok { + return ErrStoreEmpty + } + err := json.Unmarshal(*ms.data[key], value) + if err != nil { + return fmt.Errorf("%w", err) + } return nil } func (ms *mockStore) Write(key string, value interface{}) error { + var raw json.RawMessage + raw, err := json.Marshal(value) + if err != nil { + return fmt.Errorf("%w", err) + } + + ms.data[key] = &raw return nil }