From f7f52d4ab1ec099dafea8e6895b1d5db4a477a51 Mon Sep 17 00:00:00 2001 From: Vamsi Kalapala Date: Fri, 10 Dec 2021 14:19:36 -0800 Subject: [PATCH 1/8] Adding some basic bits --- npm/pkg/controlplane/daemon/daemon.go | 29 +++++++++++++++++++++++++++ test/integration/npm/main.go | 6 ++++-- 2 files changed, 33 insertions(+), 2 deletions(-) create mode 100644 npm/pkg/controlplane/daemon/daemon.go diff --git a/npm/pkg/controlplane/daemon/daemon.go b/npm/pkg/controlplane/daemon/daemon.go new file mode 100644 index 0000000000..2a788cfbfb --- /dev/null +++ b/npm/pkg/controlplane/daemon/daemon.go @@ -0,0 +1,29 @@ +package daemon + +import ( + "context" + + "github.com/Azure/azure-container-networking/npm/pkg/dataplane" + "k8s.io/klog" +) + +type Daemon struct { + ctx context.Context + cancel context.CancelFunc + nodeID string + dp dataplane.GenericDataplane +} + +func NewDaemon( + ctx context.Context, + nodeID string, + dp dataplane.GenericDataplane) *Daemon { + + klog.Infof("Creating daemon for node %s", nodeID) + return &Daemon{ + ctx: ctx, + cancel: nil, + nodeID: nodeID, + dp: dp, + } +} diff --git a/test/integration/npm/main.go b/test/integration/npm/main.go index c0243bf3bb..cb97bf3d11 100644 --- a/test/integration/npm/main.go +++ b/test/integration/npm/main.go @@ -80,7 +80,7 @@ func main() { panicOnError(dp.AddToSets([]*ipsets.IPSetMetadata{ipsets.TestNSSet.Metadata}, podMetadataB)) podMetadataC := &dataplane.PodMetadata{ PodKey: "c", - PodIP: "10.240.0.28", + PodIP: "10.240.0.83", NodeName: nodeName, } panicOnError(dp.AddToSets([]*ipsets.IPSetMetadata{ipsets.TestKeyPodSet.Metadata, ipsets.TestNSSet.Metadata}, podMetadataC)) @@ -107,6 +107,8 @@ func main() { dp.DeleteIPSet(ipsets.TestKVPodSet.Metadata) panicOnError(dp.ApplyDataPlane()) + panicOnError(dp.AddToLists([]*ipsets.IPSetMetadata{ipsets.TestNestedLabelList.Metadata}, []*ipsets.IPSetMetadata{ipsets.TestKVPodSet.Metadata, ipsets.TestNSSet.Metadata})) + printAndWait(true) panicOnError(dp.RemoveFromSets([]*ipsets.IPSetMetadata{ipsets.TestNSSet.Metadata}, podMetadata)) @@ -125,7 +127,7 @@ func main() { podMetadataD = &dataplane.PodMetadata{ PodKey: "d", - PodIP: "10.240.0.27", + PodIP: "10.240.0.91", NodeName: nodeName, } panicOnError(dp.AddToSets([]*ipsets.IPSetMetadata{ipsets.TestKeyPodSet.Metadata, ipsets.TestNSSet.Metadata}, podMetadataD)) From 662d275ad6d3ce4efcf013e8b06fc9d62fdf2d9e Mon Sep 17 00:00:00 2001 From: Vamsi Kalapala Date: Fri, 17 Dec 2021 12:38:04 -0800 Subject: [PATCH 2/8] adding some bits for daemon --- npm/pkg/controlplane/daemon/daemon.go | 53 +++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 8 deletions(-) diff --git a/npm/pkg/controlplane/daemon/daemon.go b/npm/pkg/controlplane/daemon/daemon.go index 2a788cfbfb..c2b26a9a33 100644 --- a/npm/pkg/controlplane/daemon/daemon.go +++ b/npm/pkg/controlplane/daemon/daemon.go @@ -8,22 +8,59 @@ import ( ) type Daemon struct { - ctx context.Context - cancel context.CancelFunc - nodeID string - dp dataplane.GenericDataplane + ctx context.Context + cancel context.CancelFunc + nodeID string + podName string + controllerIP string + controllerPort int + dp dataplane.GenericDataplane } func NewDaemon( ctx context.Context, nodeID string, + podName string, dp dataplane.GenericDataplane) *Daemon { klog.Infof("Creating daemon for node %s", nodeID) return &Daemon{ - ctx: ctx, - cancel: nil, - nodeID: nodeID, - dp: dp, + ctx: ctx, + nodeID: nodeID, + podName: podName, + dp: dp, + } +} + +func (d *Daemon) Start() { + klog.Infof("Starting daemon for node %s", d.nodeID) + go d.run() +} + +func (d *Daemon) SetController(controllerIP string, controllerPort int) { + klog.Infof("Setting controller for node %s", d.nodeID) + d.controllerIP = controllerIP + d.controllerPort = controllerPort +} + +func (d *Daemon) Stop() { + klog.Infof("Stopping daemon for node %s", d.nodeID) + d.cancel() +} + +func (d *Daemon) run() { + klog.Infof("Starting daemon for node %s", d.nodeID) + for { + select { + case <-d.ctx.Done(): + klog.Infof("Daemon for node %s stopped", d.nodeID) + return + default: + if d.controllerIP != "" && d.controllerPort != 0 { + klog.Warningf("Invaling controller for node %s, IP %s port %d", d.nodeID, d.controllerIP, d.controllerPort) + return + } + klog.Infof("Starting dataplane for node %s", d.nodeID) + } } } From e69f249a1afc4056f4b39a975fe6084406cc608f Mon Sep 17 00:00:00 2001 From: Vamsi Kalapala Date: Fri, 7 Jan 2022 16:05:54 -0800 Subject: [PATCH 3/8] First pass on adding gsprocessor --- npm/pkg/controlplane/daemon/daemon.go | 66 ---- .../goalstateprocessor/goalstateprocessor.go | 286 ++++++++++++++++++ npm/pkg/controlplane/gobutils.go | 62 ++++ npm/pkg/controlplane/types.go | 28 ++ npm/pkg/dataplane/dataplane.go | 4 + npm/pkg/dataplane/ipsets/ipset.go | 14 +- .../mocks/genericdataplane_generated.go | 16 +- .../dataplane/policies/policymanager_test.go | 39 +++ npm/pkg/dataplane/types.go | 1 + npm/pkg/protos/transport.pb.go | 252 ++++----------- npm/pkg/protos/transport.proto | 25 +- npm/pkg/transport/controlplane.go | 21 +- npm/pkg/transport/dataplane.go | 20 +- 13 files changed, 535 insertions(+), 299 deletions(-) delete mode 100644 npm/pkg/controlplane/daemon/daemon.go create mode 100644 npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go create mode 100644 npm/pkg/controlplane/gobutils.go create mode 100644 npm/pkg/controlplane/types.go diff --git a/npm/pkg/controlplane/daemon/daemon.go b/npm/pkg/controlplane/daemon/daemon.go deleted file mode 100644 index c2b26a9a33..0000000000 --- a/npm/pkg/controlplane/daemon/daemon.go +++ /dev/null @@ -1,66 +0,0 @@ -package daemon - -import ( - "context" - - "github.com/Azure/azure-container-networking/npm/pkg/dataplane" - "k8s.io/klog" -) - -type Daemon struct { - ctx context.Context - cancel context.CancelFunc - nodeID string - podName string - controllerIP string - controllerPort int - dp dataplane.GenericDataplane -} - -func NewDaemon( - ctx context.Context, - nodeID string, - podName string, - dp dataplane.GenericDataplane) *Daemon { - - klog.Infof("Creating daemon for node %s", nodeID) - return &Daemon{ - ctx: ctx, - nodeID: nodeID, - podName: podName, - dp: dp, - } -} - -func (d *Daemon) Start() { - klog.Infof("Starting daemon for node %s", d.nodeID) - go d.run() -} - -func (d *Daemon) SetController(controllerIP string, controllerPort int) { - klog.Infof("Setting controller for node %s", d.nodeID) - d.controllerIP = controllerIP - d.controllerPort = controllerPort -} - -func (d *Daemon) Stop() { - klog.Infof("Stopping daemon for node %s", d.nodeID) - d.cancel() -} - -func (d *Daemon) run() { - klog.Infof("Starting daemon for node %s", d.nodeID) - for { - select { - case <-d.ctx.Done(): - klog.Infof("Daemon for node %s stopped", d.nodeID) - return - default: - if d.controllerIP != "" && d.controllerPort != 0 { - klog.Warningf("Invaling controller for node %s, IP %s port %d", d.nodeID, d.controllerIP, d.controllerPort) - return - } - klog.Infof("Starting dataplane for node %s", d.nodeID) - } - } -} diff --git a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go new file mode 100644 index 0000000000..67f2b09945 --- /dev/null +++ b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go @@ -0,0 +1,286 @@ +package goalstateprocessor + +import ( + "bytes" + "context" + "fmt" + + cp "github.com/Azure/azure-container-networking/npm/pkg/controlplane" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" + "github.com/Azure/azure-container-networking/npm/pkg/protos" + + "k8s.io/klog" +) + +type GoalStateProcessor struct { + ctx context.Context + cancel context.CancelFunc + nodeID string + podName string + controllerIP string + controllerPort int + dp dataplane.GenericDataplane + inputChannel chan *protos.Events + backoffChannel chan *protos.Events +} + +func NewGoalStateProcessor( + ctx context.Context, + nodeID string, + podName string, + inputChan chan *protos.Events, + dp dataplane.GenericDataplane) *GoalStateProcessor { + + klog.Infof("Creating GoalStateProcessor for node %s", nodeID) + return &GoalStateProcessor{ + ctx: ctx, + nodeID: nodeID, + podName: podName, + dp: dp, + inputChannel: inputChan, + backoffChannel: make(chan *protos.Events), + } +} + +func (gsp *GoalStateProcessor) Start() { + klog.Infof("Starting GoalStateProcessor for node %s", gsp.nodeID) + go gsp.run() +} + +func (gsp *GoalStateProcessor) SetController(controllerIP string, controllerPort int) { + klog.Infof("Setting controller for node %s", gsp.nodeID) + gsp.controllerIP = controllerIP + gsp.controllerPort = controllerPort +} + +func (gsp *GoalStateProcessor) Stop() { + klog.Infof("Stopping GoalStateProcessor for node %s", gsp.nodeID) + gsp.cancel() +} + +func (gsp *GoalStateProcessor) run() { + if gsp.controllerIP != "" && gsp.controllerPort != 0 { + klog.Warningf("Invaling controller for node %s, IP %s port %d", gsp.nodeID, gsp.controllerIP, gsp.controllerPort) + return + } + klog.Infof("Starting dataplane for node %s", gsp.nodeID) + for { + select { + case <-gsp.ctx.Done(): + klog.Infof("GoalStateProcessor for node %s stopped", gsp.nodeID) + return + case inputEvents := <-gsp.inputChannel: + klog.Infof("Received event %s", inputEvents) + gsp.process(inputEvents) + case backoffEvents := <-gsp.backoffChannel: + // For now keep it simple. Do not worry about backoff events + // but if we need to handle them, we can do it here. + klog.Infof("Received backoff event %s", backoffEvents) + gsp.process(backoffEvents) + } + } +} + +func (gsp *GoalStateProcessor) process(inputEvent *protos.Events) { + klog.Infof("Processing event") + + // Process these individual buckkets in order + // 1. Apply IPSET + // 2. Apply POLICY + // 3. Remove POLICY + // 4. Remove IPSET + payload := inputEvent.GetPayload() + + if !validatePayload(payload) { + klog.Warningf("Empty payload in event %s", inputEvent) + return + } + + err := gsp.processIPSetsApplyEvent(payload[cp.IpsetApply]) + if err != nil { + klog.Errorf("Error processing IPSET apply event %s", err) + } + + err = gsp.processPolicyApplyEvent(payload[cp.PolicyApply]) + if err != nil { + klog.Errorf("Error processing POLICY apply event %s", err) + } + + err = gsp.processPolicyRemoveEvent(payload[cp.PolicyRemove]) + if err != nil { + klog.Errorf("Error processing POLICY remove event %s", err) + } + + err = gsp.processIPSetsRemoveEvent(payload[cp.IpsetRemove]) + if err != nil { + klog.Errorf("Error processing IPSET remove event %s", err) + } +} + +func (gsp *GoalStateProcessor) processIPSetsApplyEvent(goalState *protos.GoalState) error { + for _, gs := range goalState.GetData() { + payload := bytes.NewBuffer(gs) + ipset, err := cp.DecodeControllerIPSet(payload) + if err != nil { + return err + } + + ipsetName := ipset.GetPrefixName() + klog.Infof("Processing %s IPSET apply event", ipsetName) + + cachedIPSet := gsp.dp.GetIPSet(ipsetName) + if cachedIPSet == nil { + klog.Infof("IPSet %s not found in cache, adding to cache", ipsetName) + } + + var applyErr error + switch ipset.GetSetKind() { + case ipsets.HashSet: + applyErr = gsp.applySets(ipset, cachedIPSet) + case ipsets.ListSet: + applyErr = gsp.applyLists(ipset, cachedIPSet) + case ipsets.UnknownKind: + applyErr = fmt.Errorf("Unknown IPSet kind %s", cachedIPSet.Kind) + } + if applyErr != nil { + return applyErr + } + } + return nil +} + +func (gsp *GoalStateProcessor) applySets(ipSet *cp.ControllerIPSets, cachedIPSet *ipsets.IPSet) error { + if len(ipSet.IPPodMetadata) == 0 { + gsp.dp.CreateIPSets([]*ipsets.IPSetMetadata{ipSet.GetMetadata()}) + return nil + } + + setMetadata := ipSet.GetMetadata() + for _, podMetadata := range ipSet.IPPodMetadata { + err := gsp.dp.AddToSets([]*ipsets.IPSetMetadata{setMetadata}, podMetadata) + if err != nil { + return err + } + } + + if cachedIPSet != nil { + toDeleteMembers := make(map[string]string) + for podIP, podKey := range cachedIPSet.IPPodKey { + if _, ok := ipSet.IPPodMetadata[podIP]; !ok { + toDeleteMembers[podIP] = podKey + } + } + + if len(toDeleteMembers) > 0 { + for podIP, podKey := range toDeleteMembers { + err := gsp.dp.RemoveFromSets([]*ipsets.IPSetMetadata{setMetadata}, dataplane.NewPodMetadata(podIP, podKey, "")) + if err != nil { + return err + } + } + } + } + return nil +} + +func (gsp *GoalStateProcessor) applyLists(ipSet *cp.ControllerIPSets, cachedIPSet *ipsets.IPSet) error { + if len(ipSet.MemberIPSets) == 0 { + gsp.dp.CreateIPSets([]*ipsets.IPSetMetadata{ipSet.GetMetadata()}) + return nil + } + + setMetadata := ipSet.GetMetadata() + membersToAdd := make([]*ipsets.IPSetMetadata, len(ipSet.MemberIPSets)) + idx := 0 + for _, memberIPSet := range ipSet.MemberIPSets { + membersToAdd[idx] = memberIPSet + idx++ + } + err := gsp.dp.AddToLists([]*ipsets.IPSetMetadata{setMetadata}, membersToAdd) + if err != nil { + return err + } + + if cachedIPSet != nil { + toDeleteMembers := make([]*ipsets.IPSetMetadata, 0) + for _, memberSet := range cachedIPSet.MemberIPSets { + if _, ok := ipSet.MemberIPSets[memberSet.Name]; !ok { + toDeleteMembers = append(toDeleteMembers, memberSet.GetSetMetadata()) + } + } + + if len(toDeleteMembers) > 0 { + err := gsp.dp.RemoveFromList(setMetadata, toDeleteMembers) + if err != nil { + return err + } + } + } + return nil +} + +func (gsp *GoalStateProcessor) processIPSetsRemoveEvent(goalState *protos.GoalState) error { + for _, gs := range goalState.GetData() { + payload := bytes.NewBuffer(gs) + ipsetName, err := cp.DecodeString(payload) + if err != nil { + return err + } + klog.Infof("Processing %s IPSET remove event", ipsetName) + + cachedIPSet := gsp.dp.GetIPSet(ipsetName) + if cachedIPSet == nil { + klog.Infof("IPSet %s not found in cache, adding to cache", ipsetName) + return nil + } + + gsp.dp.DeleteIPSet(ipsets.NewIPSetMetadata(cachedIPSet.Name, cachedIPSet.Type)) + } + return nil +} + +func (gsp *GoalStateProcessor) processPolicyApplyEvent(goalState *protos.GoalState) error { + for _, gs := range goalState.GetData() { + payload := bytes.NewBuffer(gs) + netpol, err := cp.DecodeNPMNetworkPolicy(payload) + if err != nil { + return err + } + klog.Infof("Processing %s Policy ADD event", netpol.Name) + + err = gsp.dp.UpdatePolicy(netpol) + if err != nil { + klog.Errorf("Error removing policy %s from dataplane %s", netpol.Name, err) + return err + } + } + return nil +} + +func (gsp *GoalStateProcessor) processPolicyRemoveEvent(goalState *protos.GoalState) error { + for _, gs := range goalState.GetData() { + payload := bytes.NewBuffer(gs) + netpolName, err := cp.DecodeString(payload) + if err != nil { + return err + } + klog.Infof("Processing %s Policy remove event", netpolName) + + err = gsp.dp.RemovePolicy(netpolName) + if err != nil { + klog.Errorf("Error removing policy %s from dataplane %s", netpolName, err) + return err + } + } + return nil +} + +func validatePayload(payload map[string]*protos.GoalState) bool { + for _, v := range payload { + if len(v.GetData()) != 0 { + return true + } + } + return false +} diff --git a/npm/pkg/controlplane/gobutils.go b/npm/pkg/controlplane/gobutils.go new file mode 100644 index 0000000000..975d22a3e5 --- /dev/null +++ b/npm/pkg/controlplane/gobutils.go @@ -0,0 +1,62 @@ +package controlplane + +import ( + "bytes" + "encoding/gob" + + "github.com/Azure/azure-container-networking/npm/pkg/dataplane/policies" +) + +func EncodeString(name string) (*bytes.Buffer, error) { + var payloadBuffer *bytes.Buffer + err := gob.NewEncoder(payloadBuffer).Encode(&name) + if err != nil { + return nil, err + } + return payloadBuffer, nil +} + +func DecodeString(payload *bytes.Buffer) (string, error) { + var name string + err := gob.NewDecoder(payload).Decode(&name) + if err != nil { + return "", err + } + return name, nil +} + +func EncodeControllerIPSet(ipset *ControllerIPSets) (*bytes.Buffer, error) { + var payloadBuffer *bytes.Buffer + err := gob.NewEncoder(payloadBuffer).Encode(&ipset) + if err != nil { + return nil, err + } + return payloadBuffer, nil +} + +func DecodeControllerIPSet(payload *bytes.Buffer) (*ControllerIPSets, error) { + var ipset ControllerIPSets + err := gob.NewDecoder(payload).Decode(&ipset) + if err != nil { + return nil, err + } + return &ipset, nil +} + +func EncodeNPMNetworkPolicy(netpol *policies.NPMNetworkPolicy) (*bytes.Buffer, error) { + var payloadBuffer *bytes.Buffer + err := gob.NewEncoder(payloadBuffer).Encode(&netpol) + if err != nil { + return nil, err + } + return payloadBuffer, nil +} + +func DecodeNPMNetworkPolicy(payload *bytes.Buffer) (*policies.NPMNetworkPolicy, error) { + var netpol policies.NPMNetworkPolicy + err := gob.NewDecoder(payload).Decode(&netpol) + if err != nil { + return nil, err + } + return &netpol, nil +} diff --git a/npm/pkg/controlplane/types.go b/npm/pkg/controlplane/types.go new file mode 100644 index 0000000000..e9bfc0a874 --- /dev/null +++ b/npm/pkg/controlplane/types.go @@ -0,0 +1,28 @@ +package controlplane + +import ( + dp "github.com/Azure/azure-container-networking/npm/pkg/dataplane" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" +) + +const ( + IpsetApply string = "IPSETAPPLY" + IpsetRemove string = "IPSETREMOVE" + PolicyApply string = "POLICYAPPLY" + PolicyRemove string = "POLICYREMOVE" +) + +// ControllerIPSets is used in decoupled mode for controller pod to calculate +// and push to daemon pod +type ControllerIPSets struct { + ipsets.IPSetMetadata + // IPPodMetadata is used for setMaps to store Ips and ports as keys + // and podMetadata as value + IPPodMetadata map[string]*dp.PodMetadata + // This is used for listMaps to store child IP Sets + MemberIPSets map[string]*ipsets.IPSetMetadata +} + +func (c *ControllerIPSets) GetMetadata() *ipsets.IPSetMetadata { + return &c.IPSetMetadata +} diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index b4b6800bbd..19ad894c07 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -82,6 +82,10 @@ func (dp *DataPlane) ResetDataPlane() error { return dp.bootupDataPlane() } +func (dp *DataPlane) GetIPSet(setName string) *ipsets.IPSet { + return dp.ipsetMgr.GetIPSet(setName) +} + // CreateIPSets takes in a set object and updates local cache with this set func (dp *DataPlane) CreateIPSets(setMetadata []*ipsets.IPSetMetadata) { dp.ipsetMgr.CreateIPSets(setMetadata) diff --git a/npm/pkg/dataplane/ipsets/ipset.go b/npm/pkg/dataplane/ipsets/ipset.go index 7d38f627a0..75ce109de4 100644 --- a/npm/pkg/dataplane/ipsets/ipset.go +++ b/npm/pkg/dataplane/ipsets/ipset.go @@ -181,8 +181,9 @@ const ( ) type IPSet struct { - Name string - HashedName string + Name string + unPrefixedName string + HashedName string // SetProperties embedding set properties SetProperties // IpPodKey is used for setMaps to store Ips and ports as keys @@ -207,8 +208,9 @@ type IPSet struct { func NewIPSet(setMetadata *IPSetMetadata) *IPSet { prefixedName := setMetadata.GetPrefixName() set := &IPSet{ - Name: prefixedName, - HashedName: util.GetHashedName(prefixedName), + Name: prefixedName, + unPrefixedName: setMetadata.Name, + HashedName: util.GetHashedName(prefixedName), SetProperties: SetProperties{ Type: setMetadata.Type, Kind: setMetadata.GetSetKind(), @@ -230,6 +232,10 @@ func NewIPSet(setMetadata *IPSetMetadata) *IPSet { return set } +func (set *IPSet) GetSetMetadata() *IPSetMetadata { + return NewIPSetMetadata(set.unPrefixedName, set.Type) +} + func (set *IPSet) String() string { return fmt.Sprintf("Name: %s HashedNamed: %s Type: %s Kind: %s", set.Name, set.HashedName, setTypeName[set.Type], string(set.Kind)) diff --git a/npm/pkg/dataplane/mocks/genericdataplane_generated.go b/npm/pkg/dataplane/mocks/genericdataplane_generated.go index 5dd6243e6e..a309d6d57a 100644 --- a/npm/pkg/dataplane/mocks/genericdataplane_generated.go +++ b/npm/pkg/dataplane/mocks/genericdataplane_generated.go @@ -2,7 +2,7 @@ // // Code generated by MockGen. DO NOT EDIT. -// Source: /home/nitishm/github/nitishm/azure-container-networking/npm/pkg/dataplane/types.go +// Source: /mnt/c/Users/vamsi/Desktop/Microsoft_ws/azure-container-networking/npm/pkg/dataplane/types.go // Package mocks is a generated GoMock package. package mocks @@ -119,6 +119,20 @@ func (mr *MockGenericDataplaneMockRecorder) DeleteIPSet(setMetadata interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteIPSet", reflect.TypeOf((*MockGenericDataplane)(nil).DeleteIPSet), setMetadata) } +// GetIPSet mocks base method. +func (m *MockGenericDataplane) GetIPSet(setName string) *ipsets.IPSet { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetIPSet", setName) + ret0, _ := ret[0].(*ipsets.IPSet) + return ret0 +} + +// GetIPSet indicates an expected call of GetIPSet. +func (mr *MockGenericDataplaneMockRecorder) GetIPSet(setName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetIPSet", reflect.TypeOf((*MockGenericDataplane)(nil).GetIPSet), setName) +} + // InitializeDataPlane mocks base method. func (m *MockGenericDataplane) InitializeDataPlane() error { m.ctrl.T.Helper() diff --git a/npm/pkg/dataplane/policies/policymanager_test.go b/npm/pkg/dataplane/policies/policymanager_test.go index 6950422de3..858f7c50ce 100644 --- a/npm/pkg/dataplane/policies/policymanager_test.go +++ b/npm/pkg/dataplane/policies/policymanager_test.go @@ -1,10 +1,13 @@ package policies import ( + "bytes" + "encoding/gob" "testing" "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -167,3 +170,39 @@ func TestNormalizeAndValidatePolicy(t *testing.T) { }) } } + +func BenchmarkCheckMarshall(b *testing.B) { + var netpol bytes.Buffer + enc := gob.NewEncoder(&netpol) + dec := gob.NewDecoder(&netpol) + + err := enc.Encode(testNetPol) + if err != nil { + b.Fatal(err) + } + + var dectestnetpol *NPMNetworkPolicy + err = dec.Decode(&dectestnetpol) + if err != nil { + b.Fatal(err) + } +} + +func TestCheckMarshall(t *testing.T) { + var netpol bytes.Buffer + enc := gob.NewEncoder(&netpol) + dec := gob.NewDecoder(&netpol) + + err := enc.Encode(testNetPol) + if err != nil { + t.Fatal(err) + } + + var dectestnetpol *NPMNetworkPolicy + err = dec.Decode(&dectestnetpol) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, testNetPol, dectestnetpol) +} diff --git a/npm/pkg/dataplane/types.go b/npm/pkg/dataplane/types.go index 88a48a1165..d24210c240 100644 --- a/npm/pkg/dataplane/types.go +++ b/npm/pkg/dataplane/types.go @@ -8,6 +8,7 @@ import ( type GenericDataplane interface { InitializeDataPlane() error ResetDataPlane() error + GetIPSet(setName string) *ipsets.IPSet CreateIPSets(setNames []*ipsets.IPSetMetadata) DeleteIPSet(setMetadata *ipsets.IPSetMetadata) AddToSets(setNames []*ipsets.IPSetMetadata, podMetadata *PodMetadata) error diff --git a/npm/pkg/protos/transport.pb.go b/npm/pkg/protos/transport.pb.go index ebc6980c31..345a189e82 100644 --- a/npm/pkg/protos/transport.pb.go +++ b/npm/pkg/protos/transport.pb.go @@ -9,7 +9,6 @@ package protos import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - structpb "google.golang.org/protobuf/types/known/structpb" reflect "reflect" sync "sync" ) @@ -64,102 +63,6 @@ func (DatapathPodMetadata_APIVersion) EnumDescriptor() ([]byte, []int) { return file_transport_proto_rawDescGZIP(), []int{0, 0} } -// The following events are supported: -type Events_EventType int32 - -const ( - // Send a CREATE/UPDATE event to a datapath Pod - Events_APPLY Events_EventType = 0 - // Send a DELETE event to a datapath Pod - Events_REMOVE Events_EventType = 1 -) - -// Enum value maps for Events_EventType. -var ( - Events_EventType_name = map[int32]string{ - 0: "APPLY", - 1: "REMOVE", - } - Events_EventType_value = map[string]int32{ - "APPLY": 0, - "REMOVE": 1, - } -) - -func (x Events_EventType) Enum() *Events_EventType { - p := new(Events_EventType) - *p = x - return p -} - -func (x Events_EventType) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (Events_EventType) Descriptor() protoreflect.EnumDescriptor { - return file_transport_proto_enumTypes[1].Descriptor() -} - -func (Events_EventType) Type() protoreflect.EnumType { - return &file_transport_proto_enumTypes[1] -} - -func (x Events_EventType) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use Events_EventType.Descriptor instead. -func (Events_EventType) EnumDescriptor() ([]byte, []int) { - return file_transport_proto_rawDescGZIP(), []int{1, 0} -} - -// Supported objects are IPSet and NetworkPolicy objects. -type Events_ObjectType int32 - -const ( - Events_IPSET Events_ObjectType = 0 - Events_POLICY Events_ObjectType = 1 -) - -// Enum value maps for Events_ObjectType. -var ( - Events_ObjectType_name = map[int32]string{ - 0: "IPSET", - 1: "POLICY", - } - Events_ObjectType_value = map[string]int32{ - "IPSET": 0, - "POLICY": 1, - } -) - -func (x Events_ObjectType) Enum() *Events_ObjectType { - p := new(Events_ObjectType) - *p = x - return p -} - -func (x Events_ObjectType) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (Events_ObjectType) Descriptor() protoreflect.EnumDescriptor { - return file_transport_proto_enumTypes[2].Descriptor() -} - -func (Events_ObjectType) Type() protoreflect.EnumType { - return &file_transport_proto_enumTypes[2] -} - -func (x Events_ObjectType) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use Events_ObjectType.Descriptor instead. -func (Events_ObjectType) EnumDescriptor() ([]byte, []int) { - return file_transport_proto_rawDescGZIP(), []int{1, 1} -} - // DatapathPodMetadata is the metadata for a datapath pod type DatapathPodMetadata struct { state protoimpl.MessageState @@ -232,10 +135,8 @@ type Events struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Type Events_EventType `protobuf:"varint,1,opt,name=type,proto3,enum=protos.Events_EventType" json:"type,omitempty"` - Object Events_ObjectType `protobuf:"varint,2,opt,name=object,proto3,enum=protos.Events_ObjectType" json:"object,omitempty"` // Payload can contain one or more Event objects. - Event []*Event `protobuf:"bytes,3,rep,name=event,proto3" json:"event,omitempty"` + Payload map[string]*GoalState `protobuf:"bytes,1,rep,name=payload,proto3" json:"payload,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (x *Events) Reset() { @@ -270,41 +171,27 @@ func (*Events) Descriptor() ([]byte, []int) { return file_transport_proto_rawDescGZIP(), []int{1} } -func (x *Events) GetType() Events_EventType { - if x != nil { - return x.Type - } - return Events_APPLY -} - -func (x *Events) GetObject() Events_ObjectType { - if x != nil { - return x.Object - } - return Events_IPSET -} - -func (x *Events) GetEvent() []*Event { +func (x *Events) GetPayload() map[string]*GoalState { if x != nil { - return x.Event + return x.Payload } return nil } // Event is a generic object that can be Created, // Updated, Deleted by the controlplane. -type Event struct { +type GoalState struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields // Data can contain one or more instances of IPSet or NetworkPolicy // objects. - Data []*structpb.Struct `protobuf:"bytes,3,rep,name=data,proto3" json:"data,omitempty"` + Data [][]byte `protobuf:"bytes,1,rep,name=data,proto3" json:"data,omitempty"` } -func (x *Event) Reset() { - *x = Event{} +func (x *GoalState) Reset() { + *x = GoalState{} if protoimpl.UnsafeEnabled { mi := &file_transport_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -312,13 +199,13 @@ func (x *Event) Reset() { } } -func (x *Event) String() string { +func (x *GoalState) String() string { return protoimpl.X.MessageStringOf(x) } -func (*Event) ProtoMessage() {} +func (*GoalState) ProtoMessage() {} -func (x *Event) ProtoReflect() protoreflect.Message { +func (x *GoalState) ProtoReflect() protoreflect.Message { mi := &file_transport_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -330,12 +217,12 @@ func (x *Event) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use Event.ProtoReflect.Descriptor instead. -func (*Event) Descriptor() ([]byte, []int) { +// Deprecated: Use GoalState.ProtoReflect.Descriptor instead. +func (*GoalState) Descriptor() ([]byte, []int) { return file_transport_proto_rawDescGZIP(), []int{2} } -func (x *Event) GetData() []*structpb.Struct { +func (x *GoalState) GetData() [][]byte { if x != nil { return x.Data } @@ -346,46 +233,39 @@ var File_transport_proto protoreflect.FileDescriptor var file_transport_proto_rawDesc = []byte{ 0x0a, 0x0f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x12, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, - 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xab, 0x01, 0x0a, 0x13, 0x44, 0x61, 0x74, 0x61, - 0x70, 0x61, 0x74, 0x68, 0x50, 0x6f, 0x64, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, - 0x19, 0x0a, 0x08, 0x70, 0x6f, 0x64, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x07, 0x70, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x6f, - 0x64, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, - 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x46, 0x0a, 0x0a, 0x61, 0x70, 0x69, 0x56, 0x65, - 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x26, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x70, 0x61, 0x74, 0x68, 0x50, 0x6f, 0x64, - 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x41, 0x50, 0x49, 0x56, 0x65, 0x72, 0x73, - 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x61, 0x70, 0x69, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, - 0x14, 0x0a, 0x0a, 0x41, 0x50, 0x49, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x06, 0x0a, - 0x02, 0x56, 0x31, 0x10, 0x00, 0x22, 0xd7, 0x01, 0x0a, 0x06, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, - 0x12, 0x2c, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x18, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x45, - 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x31, - 0x0a, 0x06, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x4f, - 0x62, 0x6a, 0x65, 0x63, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x06, 0x6f, 0x62, 0x6a, 0x65, 0x63, - 0x74, 0x12, 0x23, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x0d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, - 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x22, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, - 0x79, 0x70, 0x65, 0x12, 0x09, 0x0a, 0x05, 0x41, 0x50, 0x50, 0x4c, 0x59, 0x10, 0x00, 0x12, 0x0a, - 0x0a, 0x06, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x10, 0x01, 0x22, 0x23, 0x0a, 0x0a, 0x4f, 0x62, - 0x6a, 0x65, 0x63, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09, 0x0a, 0x05, 0x49, 0x50, 0x53, 0x45, - 0x54, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x50, 0x4f, 0x4c, 0x49, 0x43, 0x59, 0x10, 0x01, 0x22, - 0x34, 0x0a, 0x05, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x2b, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, - 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, - 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x4b, 0x0a, 0x0f, 0x44, 0x61, 0x74, 0x61, 0x70, 0x6c, 0x61, - 0x6e, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x38, 0x0a, 0x07, 0x43, 0x6f, 0x6e, 0x6e, - 0x65, 0x63, 0x74, 0x12, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x44, 0x61, 0x74, - 0x61, 0x70, 0x61, 0x74, 0x68, 0x50, 0x6f, 0x64, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, - 0x1a, 0x0e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, - 0x30, 0x01, 0x42, 0x43, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x41, 0x7a, 0x75, 0x72, 0x65, 0x2f, 0x61, 0x7a, 0x75, 0x72, 0x65, 0x2d, 0x63, 0x6f, 0x6e, - 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x2d, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x69, 0x6e, - 0x67, 0x2f, 0x6e, 0x70, 0x6d, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, - 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x12, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x22, 0xab, 0x01, 0x0a, 0x13, 0x44, 0x61, + 0x74, 0x61, 0x70, 0x61, 0x74, 0x68, 0x50, 0x6f, 0x64, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, + 0x61, 0x12, 0x19, 0x0a, 0x08, 0x70, 0x6f, 0x64, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1b, 0x0a, 0x09, + 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x46, 0x0a, 0x0a, 0x61, 0x70, 0x69, + 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x26, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x70, 0x61, 0x74, 0x68, 0x50, + 0x6f, 0x64, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x41, 0x50, 0x49, 0x56, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x61, 0x70, 0x69, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, + 0x6e, 0x22, 0x14, 0x0a, 0x0a, 0x41, 0x50, 0x49, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, + 0x06, 0x0a, 0x02, 0x56, 0x31, 0x10, 0x00, 0x22, 0x8e, 0x01, 0x0a, 0x06, 0x45, 0x76, 0x65, 0x6e, + 0x74, 0x73, 0x12, 0x35, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x45, 0x76, 0x65, + 0x6e, 0x74, 0x73, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x79, + 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x4d, 0x0a, 0x0c, 0x50, 0x61, 0x79, + 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x27, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x73, 0x2e, 0x47, 0x6f, 0x61, 0x6c, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x1f, 0x0a, 0x09, 0x47, 0x6f, 0x61, 0x6c, + 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x4b, 0x0a, 0x0f, 0x44, 0x61, 0x74, + 0x61, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x38, 0x0a, 0x07, + 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, + 0x2e, 0x44, 0x61, 0x74, 0x61, 0x70, 0x61, 0x74, 0x68, 0x50, 0x6f, 0x64, 0x4d, 0x65, 0x74, 0x61, + 0x64, 0x61, 0x74, 0x61, 0x1a, 0x0e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x45, 0x76, + 0x65, 0x6e, 0x74, 0x73, 0x30, 0x01, 0x42, 0x43, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x41, 0x7a, 0x75, 0x72, 0x65, 0x2f, 0x61, 0x7a, 0x75, 0x72, 0x65, + 0x2d, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x2d, 0x6e, 0x65, 0x74, 0x77, 0x6f, + 0x72, 0x6b, 0x69, 0x6e, 0x67, 0x2f, 0x6e, 0x70, 0x6d, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x73, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( @@ -400,30 +280,26 @@ func file_transport_proto_rawDescGZIP() []byte { return file_transport_proto_rawDescData } -var file_transport_proto_enumTypes = make([]protoimpl.EnumInfo, 3) -var file_transport_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_transport_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_transport_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_transport_proto_goTypes = []interface{}{ (DatapathPodMetadata_APIVersion)(0), // 0: protos.DatapathPodMetadata.APIVersion - (Events_EventType)(0), // 1: protos.Events.EventType - (Events_ObjectType)(0), // 2: protos.Events.ObjectType - (*DatapathPodMetadata)(nil), // 3: protos.DatapathPodMetadata - (*Events)(nil), // 4: protos.Events - (*Event)(nil), // 5: protos.Event - (*structpb.Struct)(nil), // 6: google.protobuf.Struct + (*DatapathPodMetadata)(nil), // 1: protos.DatapathPodMetadata + (*Events)(nil), // 2: protos.Events + (*GoalState)(nil), // 3: protos.GoalState + nil, // 4: protos.Events.PayloadEntry } var file_transport_proto_depIdxs = []int32{ 0, // 0: protos.DatapathPodMetadata.apiVersion:type_name -> protos.DatapathPodMetadata.APIVersion - 1, // 1: protos.Events.type:type_name -> protos.Events.EventType - 2, // 2: protos.Events.object:type_name -> protos.Events.ObjectType - 5, // 3: protos.Events.event:type_name -> protos.Event - 6, // 4: protos.Event.data:type_name -> google.protobuf.Struct - 3, // 5: protos.DataplaneEvents.Connect:input_type -> protos.DatapathPodMetadata - 4, // 6: protos.DataplaneEvents.Connect:output_type -> protos.Events - 6, // [6:7] is the sub-list for method output_type - 5, // [5:6] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 4, // 1: protos.Events.payload:type_name -> protos.Events.PayloadEntry + 3, // 2: protos.Events.PayloadEntry.value:type_name -> protos.GoalState + 1, // 3: protos.DataplaneEvents.Connect:input_type -> protos.DatapathPodMetadata + 2, // 4: protos.DataplaneEvents.Connect:output_type -> protos.Events + 4, // [4:5] is the sub-list for method output_type + 3, // [3:4] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_transport_proto_init() } @@ -457,7 +333,7 @@ func file_transport_proto_init() { } } file_transport_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Event); i { + switch v := v.(*GoalState); i { case 0: return &v.state case 1: @@ -474,8 +350,8 @@ func file_transport_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_transport_proto_rawDesc, - NumEnums: 3, - NumMessages: 3, + NumEnums: 1, + NumMessages: 4, NumExtensions: 0, NumServices: 1, }, diff --git a/npm/pkg/protos/transport.proto b/npm/pkg/protos/transport.proto index 68388b134f..6f1740a0ce 100644 --- a/npm/pkg/protos/transport.proto +++ b/npm/pkg/protos/transport.proto @@ -2,8 +2,6 @@ syntax = "proto3"; package protos; option go_package = "github.com/Azure/azure-container-networking/npm/pkg/protos;protos"; -import "google/protobuf/struct.proto"; - // DataplaneEvents represents the Service RPC exposed by the gRPC server. service DataplaneEvents{ rpc Connect(DatapathPodMetadata) returns (stream Events); @@ -22,30 +20,15 @@ message DatapathPodMetadata { // Events defines the operation (event type) and object type being // streamed to the datapath client. A events message may carry one or // more Event objects. -message Events{ - EventType type = 1; - // The following events are supported: - enum EventType { - // Send a CREATE/UPDATE event to a datapath Pod - APPLY = 0; - // Send a DELETE event to a datapath Pod - REMOVE = 1; - } - - ObjectType object = 2; - // Supported objects are IPSet and NetworkPolicy objects. - enum ObjectType { - IPSET = 0; - POLICY = 1; - } +message Events { // Payload can contain one or more Event objects. - repeated Event event = 3; + map payload = 1; } // Event is a generic object that can be Created, // Updated, Deleted by the controlplane. -message Event { +message GoalState { // Data can contain one or more instances of IPSet or NetworkPolicy // objects. - repeated google.protobuf.Struct data = 3; + repeated bytes data = 1; } diff --git a/npm/pkg/transport/controlplane.go b/npm/pkg/transport/controlplane.go index 4a5c3f5e67..9ba6b56d67 100644 --- a/npm/pkg/transport/controlplane.go +++ b/npm/pkg/transport/controlplane.go @@ -1,7 +1,9 @@ package transport import ( + "bytes" "context" + "encoding/gob" "fmt" "net" @@ -9,7 +11,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/reflection" "google.golang.org/grpc/stats" - "google.golang.org/protobuf/types/known/structpb" "k8s.io/klog/v2" ) @@ -97,15 +98,19 @@ func (m *Manager) start() error { } } case msg := <-m.inCh: + var payload bytes.Buffer + enc := gob.NewEncoder(&payload) + + err := enc.Encode(msg) + if err != nil { + fmt.Errorf("Failed to encode") + return err + } for _, client := range m.Registrations { if err := client.stream.SendMsg(&protos.Events{ - Type: *protos.Events_APPLY.Enum(), - Object: *protos.Events_IPSET.Enum(), - Event: []*protos.Event{ - { - Data: []*structpb.Struct{ - msg.(*structpb.Struct), - }, + Payload: map[string]*protos.GoalState{ + "IPSETAPPLY": { + Data: [][]byte{payload.Bytes()}, }, }, }); err != nil { diff --git a/npm/pkg/transport/dataplane.go b/npm/pkg/transport/dataplane.go index 82b32f52f3..c673b5e8d6 100644 --- a/npm/pkg/transport/dataplane.go +++ b/npm/pkg/transport/dataplane.go @@ -17,6 +17,7 @@ type DataplaneEventsClient struct { } func NewDataplaneEventsClient(ctx context.Context, pod, node, addr string) (*DataplaneEventsClient, error) { + // TODO Make this secure cc, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { return nil, fmt.Errorf("failed to dial %s: %w", addr, err) @@ -56,18 +57,15 @@ func (c *DataplaneEventsClient) run(ctx context.Context, connectClient protos.Da break } - // TODO: REMOVE ME - // This is for debugging purposes only - fmt.Printf( - "Received event type %s object type %s: \n", - event.GetType(), - event.GetObject(), - ) - - for _, e := range event.GetEvent() { + for _, e := range event.GetPayload() { + // TODO: REMOVE ME + // This is for debugging purposes only + fmt.Printf( + "Received event %s \n", + e.String(), + ) for _, d := range e.GetData() { - eventAsMap := d.AsMap() - fmt.Printf("%s: %s\n", eventAsMap["Type"], eventAsMap["Payload"]) + fmt.Printf("%b\n", d) } } } From d8e307310e6d3cf40fcdff5d2758352404e09eaf Mon Sep 17 00:00:00 2001 From: Vamsi Kalapala Date: Wed, 12 Jan 2022 09:37:47 -0800 Subject: [PATCH 4/8] Addressing comments --- .../goalstateprocessor/goalstateprocessor.go | 37 ++++++++++-------- .../goalstateprocessor_test.go | 23 +++++++++++ npm/pkg/controlplane/gobutils.go | 13 ++++--- npm/pkg/controlplane/types.go | 2 +- npm/pkg/dataplane/ipsets/ipset.go | 6 +-- .../dataplane/policies/policymanager_test.go | 39 ------------------- 6 files changed, 54 insertions(+), 66 deletions(-) create mode 100644 npm/pkg/controlplane/goalstateprocessor/goalstateprocessor_test.go diff --git a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go index 67f2b09945..52e1fa96e9 100644 --- a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go +++ b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go @@ -9,7 +9,7 @@ import ( "github.com/Azure/azure-container-networking/npm/pkg/dataplane" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" "github.com/Azure/azure-container-networking/npm/pkg/protos" - + npmerrors "github.com/Azure/azure-container-networking/npm/util/errors" "k8s.io/klog" ) @@ -61,7 +61,7 @@ func (gsp *GoalStateProcessor) Stop() { func (gsp *GoalStateProcessor) run() { if gsp.controllerIP != "" && gsp.controllerPort != 0 { - klog.Warningf("Invaling controller for node %s, IP %s port %d", gsp.nodeID, gsp.controllerIP, gsp.controllerPort) + klog.Warningf("Invalid controller for node %s, IP %s port %d", gsp.nodeID, gsp.controllerIP, gsp.controllerPort) return } klog.Infof("Starting dataplane for node %s", gsp.nodeID) @@ -84,12 +84,21 @@ func (gsp *GoalStateProcessor) run() { func (gsp *GoalStateProcessor) process(inputEvent *protos.Events) { klog.Infof("Processing event") + // apply dataplane after syncing + defer func() { + dperr := gsp.dp.ApplyDataPlane() + if dperr != nil { + klog.Errorf("Apply Dataplane failed with %v", dperr) + } + }() // Process these individual buckkets in order // 1. Apply IPSET // 2. Apply POLICY // 3. Remove POLICY // 4. Remove IPSET + + // TODO need to handle first connect stream of all GoalStates payload := inputEvent.GetPayload() if !validatePayload(payload) { @@ -123,7 +132,7 @@ func (gsp *GoalStateProcessor) processIPSetsApplyEvent(goalState *protos.GoalSta payload := bytes.NewBuffer(gs) ipset, err := cp.DecodeControllerIPSet(payload) if err != nil { - return err + return npmerrors.SimpleErrorWrapper("failed to decode IPSet apply event", err) } ipsetName := ipset.GetPrefixName() @@ -141,11 +150,12 @@ func (gsp *GoalStateProcessor) processIPSetsApplyEvent(goalState *protos.GoalSta case ipsets.ListSet: applyErr = gsp.applyLists(ipset, cachedIPSet) case ipsets.UnknownKind: - applyErr = fmt.Errorf("Unknown IPSet kind %s", cachedIPSet.Kind) - } - if applyErr != nil { - return applyErr + applyErr = npmerrors.SimpleErrorWrapper( + "failed to decode IPSet apply event", + fmt.Errorf("Unknown IPSet kind %s", cachedIPSet.Kind), + ) } + return applyErr } return nil } @@ -165,15 +175,8 @@ func (gsp *GoalStateProcessor) applySets(ipSet *cp.ControllerIPSets, cachedIPSet } if cachedIPSet != nil { - toDeleteMembers := make(map[string]string) for podIP, podKey := range cachedIPSet.IPPodKey { if _, ok := ipSet.IPPodMetadata[podIP]; !ok { - toDeleteMembers[podIP] = podKey - } - } - - if len(toDeleteMembers) > 0 { - for podIP, podKey := range toDeleteMembers { err := gsp.dp.RemoveFromSets([]*ipsets.IPSetMetadata{setMetadata}, dataplane.NewPodMetadata(podIP, podKey, "")) if err != nil { return err @@ -231,7 +234,7 @@ func (gsp *GoalStateProcessor) processIPSetsRemoveEvent(goalState *protos.GoalSt cachedIPSet := gsp.dp.GetIPSet(ipsetName) if cachedIPSet == nil { - klog.Infof("IPSet %s not found in cache, adding to cache", ipsetName) + klog.Infof("IPSet %s not found in cache, ignoring delete call.", ipsetName) return nil } @@ -251,7 +254,7 @@ func (gsp *GoalStateProcessor) processPolicyApplyEvent(goalState *protos.GoalSta err = gsp.dp.UpdatePolicy(netpol) if err != nil { - klog.Errorf("Error removing policy %s from dataplane %s", netpol.Name, err) + klog.Errorf("Error applying policy %s to dataplane with error: %s", netpol.Name, err.Error()) return err } } @@ -269,7 +272,7 @@ func (gsp *GoalStateProcessor) processPolicyRemoveEvent(goalState *protos.GoalSt err = gsp.dp.RemovePolicy(netpolName) if err != nil { - klog.Errorf("Error removing policy %s from dataplane %s", netpolName, err) + klog.Errorf("Error removing policy %s from dataplane with error: %s", netpolName, err.Error()) return err } } diff --git a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor_test.go b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor_test.go new file mode 100644 index 0000000000..a80ca08076 --- /dev/null +++ b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor_test.go @@ -0,0 +1,23 @@ +package goalstateprocessor + +import ( + "context" + "testing" + + dpmocks "github.com/Azure/azure-container-networking/npm/pkg/dataplane/mocks" + "github.com/Azure/azure-container-networking/npm/pkg/protos" + "github.com/golang/mock/gomock" +) + +func TestNewGoalStateProcessor(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + var inputChan chan *protos.Events + ctx, cancel := context.WithCancel(context.Background()) + gsp := NewGoalStateProcessor(ctx, "node1", "pod1", inputChan, dp) + gsp.run() + + cancel() +} diff --git a/npm/pkg/controlplane/gobutils.go b/npm/pkg/controlplane/gobutils.go index 975d22a3e5..2f18f87971 100644 --- a/npm/pkg/controlplane/gobutils.go +++ b/npm/pkg/controlplane/gobutils.go @@ -5,13 +5,14 @@ import ( "encoding/gob" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/policies" + npmerrors "github.com/Azure/azure-container-networking/npm/util/errors" ) func EncodeString(name string) (*bytes.Buffer, error) { var payloadBuffer *bytes.Buffer err := gob.NewEncoder(payloadBuffer).Encode(&name) if err != nil { - return nil, err + return nil, npmerrors.SimpleErrorWrapper("failed to encode", err) } return payloadBuffer, nil } @@ -20,7 +21,7 @@ func DecodeString(payload *bytes.Buffer) (string, error) { var name string err := gob.NewDecoder(payload).Decode(&name) if err != nil { - return "", err + return "", npmerrors.SimpleErrorWrapper("failed to decode", err) } return name, nil } @@ -29,7 +30,7 @@ func EncodeControllerIPSet(ipset *ControllerIPSets) (*bytes.Buffer, error) { var payloadBuffer *bytes.Buffer err := gob.NewEncoder(payloadBuffer).Encode(&ipset) if err != nil { - return nil, err + return nil, npmerrors.SimpleErrorWrapper("failed to encode", err) } return payloadBuffer, nil } @@ -38,7 +39,7 @@ func DecodeControllerIPSet(payload *bytes.Buffer) (*ControllerIPSets, error) { var ipset ControllerIPSets err := gob.NewDecoder(payload).Decode(&ipset) if err != nil { - return nil, err + return nil, npmerrors.SimpleErrorWrapper("failed to decode", err) } return &ipset, nil } @@ -47,7 +48,7 @@ func EncodeNPMNetworkPolicy(netpol *policies.NPMNetworkPolicy) (*bytes.Buffer, e var payloadBuffer *bytes.Buffer err := gob.NewEncoder(payloadBuffer).Encode(&netpol) if err != nil { - return nil, err + return nil, npmerrors.SimpleErrorWrapper("failed to encode", err) } return payloadBuffer, nil } @@ -56,7 +57,7 @@ func DecodeNPMNetworkPolicy(payload *bytes.Buffer) (*policies.NPMNetworkPolicy, var netpol policies.NPMNetworkPolicy err := gob.NewDecoder(payload).Decode(&netpol) if err != nil { - return nil, err + return nil, npmerrors.SimpleErrorWrapper("failed to decode", err) } return &netpol, nil } diff --git a/npm/pkg/controlplane/types.go b/npm/pkg/controlplane/types.go index e9bfc0a874..10c55d2ad9 100644 --- a/npm/pkg/controlplane/types.go +++ b/npm/pkg/controlplane/types.go @@ -12,7 +12,7 @@ const ( PolicyRemove string = "POLICYREMOVE" ) -// ControllerIPSets is used in decoupled mode for controller pod to calculate +// ControllerIPSets is used in fan-out design for controller pod to calculate // and push to daemon pod type ControllerIPSets struct { ipsets.IPSetMetadata diff --git a/npm/pkg/dataplane/ipsets/ipset.go b/npm/pkg/dataplane/ipsets/ipset.go index 75ce109de4..e2f0458d9b 100644 --- a/npm/pkg/dataplane/ipsets/ipset.go +++ b/npm/pkg/dataplane/ipsets/ipset.go @@ -182,7 +182,7 @@ const ( type IPSet struct { Name string - unPrefixedName string + unprefixedName string HashedName string // SetProperties embedding set properties SetProperties @@ -209,7 +209,7 @@ func NewIPSet(setMetadata *IPSetMetadata) *IPSet { prefixedName := setMetadata.GetPrefixName() set := &IPSet{ Name: prefixedName, - unPrefixedName: setMetadata.Name, + unprefixedName: setMetadata.Name, HashedName: util.GetHashedName(prefixedName), SetProperties: SetProperties{ Type: setMetadata.Type, @@ -233,7 +233,7 @@ func NewIPSet(setMetadata *IPSetMetadata) *IPSet { } func (set *IPSet) GetSetMetadata() *IPSetMetadata { - return NewIPSetMetadata(set.unPrefixedName, set.Type) + return NewIPSetMetadata(set.unprefixedName, set.Type) } func (set *IPSet) String() string { diff --git a/npm/pkg/dataplane/policies/policymanager_test.go b/npm/pkg/dataplane/policies/policymanager_test.go index 858f7c50ce..6950422de3 100644 --- a/npm/pkg/dataplane/policies/policymanager_test.go +++ b/npm/pkg/dataplane/policies/policymanager_test.go @@ -1,13 +1,10 @@ package policies import ( - "bytes" - "encoding/gob" "testing" "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -170,39 +167,3 @@ func TestNormalizeAndValidatePolicy(t *testing.T) { }) } } - -func BenchmarkCheckMarshall(b *testing.B) { - var netpol bytes.Buffer - enc := gob.NewEncoder(&netpol) - dec := gob.NewDecoder(&netpol) - - err := enc.Encode(testNetPol) - if err != nil { - b.Fatal(err) - } - - var dectestnetpol *NPMNetworkPolicy - err = dec.Decode(&dectestnetpol) - if err != nil { - b.Fatal(err) - } -} - -func TestCheckMarshall(t *testing.T) { - var netpol bytes.Buffer - enc := gob.NewEncoder(&netpol) - dec := gob.NewDecoder(&netpol) - - err := enc.Encode(testNetPol) - if err != nil { - t.Fatal(err) - } - - var dectestnetpol *NPMNetworkPolicy - err = dec.Decode(&dectestnetpol) - if err != nil { - t.Fatal(err) - } - - assert.Equal(t, testNetPol, dectestnetpol) -} From 95f541ee93df12aef493a8c1e54952be6d57141e Mon Sep 17 00:00:00 2001 From: Vamsi Kalapala Date: Wed, 12 Jan 2022 15:30:22 -0800 Subject: [PATCH 5/8] Adding uts --- .../goalstateprocessor/goalstateprocessor.go | 91 ++++---- .../goalstateprocessor_test.go | 197 +++++++++++++++++- npm/pkg/controlplane/gobutils.go | 19 +- npm/pkg/controlplane/types.go | 12 +- npm/pkg/dataplane/dpshim/dpshim.go | 69 ++++++ npm/pkg/transport/controlplane.go | 7 +- 6 files changed, 341 insertions(+), 54 deletions(-) create mode 100644 npm/pkg/dataplane/dpshim/dpshim.go diff --git a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go index 52e1fa96e9..cce91f2cda 100644 --- a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go +++ b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go @@ -65,20 +65,28 @@ func (gsp *GoalStateProcessor) run() { return } klog.Infof("Starting dataplane for node %s", gsp.nodeID) + for { - select { - case <-gsp.ctx.Done(): - klog.Infof("GoalStateProcessor for node %s stopped", gsp.nodeID) - return - case inputEvents := <-gsp.inputChannel: - klog.Infof("Received event %s", inputEvents) - gsp.process(inputEvents) - case backoffEvents := <-gsp.backoffChannel: - // For now keep it simple. Do not worry about backoff events - // but if we need to handle them, we can do it here. - klog.Infof("Received backoff event %s", backoffEvents) - gsp.process(backoffEvents) - } + gsp.processNext() + } +} + +func (gsp *GoalStateProcessor) processNext() { + select { + case <-gsp.ctx.Done(): + klog.Infof("GoalStateProcessor for node %s stopped", gsp.nodeID) + return + case inputEvents := <-gsp.inputChannel: + // TODO remove this large print later + klog.Infof("Received event %s", inputEvents) + gsp.process(inputEvents) + case backoffEvents := <-gsp.backoffChannel: + // For now keep it simple. Do not worry about backoff events + // but if we need to handle them, we can do it here. + // TODO remove this large print later + klog.Infof("Received backoff event %s", backoffEvents) + gsp.process(backoffEvents) + default: } } @@ -106,28 +114,37 @@ func (gsp *GoalStateProcessor) process(inputEvent *protos.Events) { return } - err := gsp.processIPSetsApplyEvent(payload[cp.IpsetApply]) - if err != nil { - klog.Errorf("Error processing IPSET apply event %s", err) + if _, ok := payload[cp.IpsetApply]; ok { + err := gsp.processIPSetsApplyEvent(payload[cp.IpsetApply]) + if err != nil { + klog.Errorf("Error processing IPSET apply event %s", err) + } } - err = gsp.processPolicyApplyEvent(payload[cp.PolicyApply]) - if err != nil { - klog.Errorf("Error processing POLICY apply event %s", err) + if _, ok := payload[cp.PolicyApply]; ok { + err := gsp.processPolicyApplyEvent(payload[cp.PolicyApply]) + if err != nil { + klog.Errorf("Error processing POLICY apply event %s", err) + } } - err = gsp.processPolicyRemoveEvent(payload[cp.PolicyRemove]) - if err != nil { - klog.Errorf("Error processing POLICY remove event %s", err) + if _, ok := payload[cp.PolicyRemove]; ok { + err := gsp.processPolicyRemoveEvent(payload[cp.PolicyRemove]) + if err != nil { + klog.Errorf("Error processing POLICY remove event %s", err) + } } - err = gsp.processIPSetsRemoveEvent(payload[cp.IpsetRemove]) - if err != nil { - klog.Errorf("Error processing IPSET remove event %s", err) + if _, ok := payload[cp.IpsetRemove]; ok { + err := gsp.processIPSetsRemoveEvent(payload[cp.IpsetRemove]) + if err != nil { + klog.Errorf("Error processing IPSET remove event %s", err) + } } } func (gsp *GoalStateProcessor) processIPSetsApplyEvent(goalState *protos.GoalState) error { + var applyErr error for _, gs := range goalState.GetData() { payload := bytes.NewBuffer(gs) ipset, err := cp.DecodeControllerIPSet(payload) @@ -143,21 +160,21 @@ func (gsp *GoalStateProcessor) processIPSetsApplyEvent(goalState *protos.GoalSta klog.Infof("IPSet %s not found in cache, adding to cache", ipsetName) } - var applyErr error switch ipset.GetSetKind() { case ipsets.HashSet: applyErr = gsp.applySets(ipset, cachedIPSet) case ipsets.ListSet: applyErr = gsp.applyLists(ipset, cachedIPSet) case ipsets.UnknownKind: - applyErr = npmerrors.SimpleErrorWrapper( - "failed to decode IPSet apply event", - fmt.Errorf("Unknown IPSet kind %s", cachedIPSet.Kind), + applyErr = npmerrors.SimpleError( + fmt.Sprintf("failed to decode IPSet apply event: Unknown IPSet kind %s", cachedIPSet.Kind), ) } - return applyErr + if applyErr != nil { + break + } } - return nil + return applyErr } func (gsp *GoalStateProcessor) applySets(ipSet *cp.ControllerIPSets, cachedIPSet *ipsets.IPSet) error { @@ -170,7 +187,7 @@ func (gsp *GoalStateProcessor) applySets(ipSet *cp.ControllerIPSets, cachedIPSet for _, podMetadata := range ipSet.IPPodMetadata { err := gsp.dp.AddToSets([]*ipsets.IPSetMetadata{setMetadata}, podMetadata) if err != nil { - return err + return npmerrors.SimpleErrorWrapper("IPSet apply event, failed at AddToSet.", err) } } @@ -179,7 +196,7 @@ func (gsp *GoalStateProcessor) applySets(ipSet *cp.ControllerIPSets, cachedIPSet if _, ok := ipSet.IPPodMetadata[podIP]; !ok { err := gsp.dp.RemoveFromSets([]*ipsets.IPSetMetadata{setMetadata}, dataplane.NewPodMetadata(podIP, podKey, "")) if err != nil { - return err + return npmerrors.SimpleErrorWrapper("IPSet apply event, failed at RemoveFromSets.", err) } } } @@ -202,7 +219,7 @@ func (gsp *GoalStateProcessor) applyLists(ipSet *cp.ControllerIPSets, cachedIPSe } err := gsp.dp.AddToLists([]*ipsets.IPSetMetadata{setMetadata}, membersToAdd) if err != nil { - return err + return npmerrors.SimpleErrorWrapper("IPSet apply event, failed at AddToLists.", err) } if cachedIPSet != nil { @@ -216,7 +233,7 @@ func (gsp *GoalStateProcessor) applyLists(ipSet *cp.ControllerIPSets, cachedIPSe if len(toDeleteMembers) > 0 { err := gsp.dp.RemoveFromList(setMetadata, toDeleteMembers) if err != nil { - return err + return npmerrors.SimpleErrorWrapper("IPSet apply event, failed at RemoveFromList.", err) } } } @@ -228,7 +245,7 @@ func (gsp *GoalStateProcessor) processIPSetsRemoveEvent(goalState *protos.GoalSt payload := bytes.NewBuffer(gs) ipsetName, err := cp.DecodeString(payload) if err != nil { - return err + return npmerrors.SimpleErrorWrapper("failed to decode IPSet remove event", err) } klog.Infof("Processing %s IPSET remove event", ipsetName) @@ -248,7 +265,7 @@ func (gsp *GoalStateProcessor) processPolicyApplyEvent(goalState *protos.GoalSta payload := bytes.NewBuffer(gs) netpol, err := cp.DecodeNPMNetworkPolicy(payload) if err != nil { - return err + return npmerrors.SimpleErrorWrapper("failed to decode Policy remove event", err) } klog.Infof("Processing %s Policy ADD event", netpol.Name) diff --git a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor_test.go b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor_test.go index a80ca08076..0728ea4053 100644 --- a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor_test.go +++ b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor_test.go @@ -3,10 +3,77 @@ package goalstateprocessor import ( "context" "testing" + "time" + "github.com/Azure/azure-container-networking/npm/pkg/controlplane" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" dpmocks "github.com/Azure/azure-container-networking/npm/pkg/dataplane/mocks" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane/policies" "github.com/Azure/azure-container-networking/npm/pkg/protos" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +const ( + sleepAfterChanSent = time.Millisecond * 100 +) + +var ( + testNSSet = ipsets.NewIPSetMetadata("test-ns-set", ipsets.Namespace) + testNSCPSet = controlplane.NewControllerIPSets(testNSSet) + testKeyPodSet = ipsets.NewIPSetMetadata("test-keyPod-set", ipsets.KeyLabelOfPod) + testKeyPodCPSet = controlplane.NewControllerIPSets(testKeyPodSet) + testNestedKeyPodSet = ipsets.NewIPSetMetadata("test-nestedkeyPod-set", ipsets.NestedLabelOfPod) + testNestedKeyPodCPSet = controlplane.NewControllerIPSets(testNestedKeyPodSet) + testNetPol = &policies.NPMNetworkPolicy{ + Name: "test-netpol", + NameSpace: "x", + PolicyKey: "x/test-netpol", + PodSelectorIPSets: []*ipsets.TranslatedIPSet{ + { + Metadata: testNSSet, + }, + { + Metadata: testKeyPodSet, + }, + }, + RuleIPSets: []*ipsets.TranslatedIPSet{ + { + Metadata: testNSSet, + }, + { + Metadata: testKeyPodSet, + }, + }, + ACLs: []*policies.ACLPolicy{ + { + PolicyID: "azure-acl-123", + Target: policies.Dropped, + Direction: policies.Ingress, + }, + { + PolicyID: "azure-acl-234", + Target: policies.Allowed, + Direction: policies.Ingress, + SrcList: []policies.SetInfo{ + { + IPSet: testNSSet, + Included: true, + MatchType: policies.SrcMatch, + }, + { + IPSet: testKeyPodSet, + Included: true, + MatchType: policies.SrcMatch, + }, + }, + }, + }, + PodEndpoints: map[string]string{ + "10.0.0.1": "1234", + }, + } ) func TestNewGoalStateProcessor(t *testing.T) { @@ -14,10 +81,134 @@ func TestNewGoalStateProcessor(t *testing.T) { defer ctrl.Finish() dp := dpmocks.NewMockGenericDataplane(ctrl) - var inputChan chan *protos.Events + // Verify that the policy was applied + dp.EXPECT().UpdatePolicy(gomock.Any()).Times(1) + dp.EXPECT().ApplyDataPlane().Times(1) + + inputChan := make(chan *protos.Events) + payload, err := controlplane.EncodeNPMNetworkPolicy(testNetPol) + assert.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() gsp := NewGoalStateProcessor(ctx, "node1", "pod1", inputChan, dp) - gsp.run() - cancel() + go func() { + inputChan <- &protos.Events{ + Payload: map[string]*protos.GoalState{ + controlplane.PolicyApply: { + Data: [][]byte{payload.Bytes()}, + }, + }, + } + }() + time.Sleep(sleepAfterChanSent) + + gsp.processNext() +} + +func TestIPSetsApply(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + // Verify that the policy was applied + dp.EXPECT().GetIPSet(gomock.Any()).Times(3) + dp.EXPECT().CreateIPSets(gomock.Any()).Times(3) + dp.EXPECT().ApplyDataPlane().Times(1) + + inputChan := make(chan *protos.Events) + + goalState := getGoalStateForControllerSets(t, + []*controlplane.ControllerIPSets{ + testNSCPSet, + testKeyPodCPSet, + testNestedKeyPodCPSet, + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + gsp := NewGoalStateProcessor(ctx, "node1", "pod1", inputChan, dp) + go func() { + inputChan <- &protos.Events{ + Payload: goalState, + } + }() + time.Sleep(sleepAfterChanSent) + + gsp.processNext() +} + +func TestIPSetsApplyUpdateMembers(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + // Verify that the policy was applied + dp.EXPECT().GetIPSet(gomock.Any()).Times(4) + dp.EXPECT().CreateIPSets(gomock.Any()).Times(1) + dp.EXPECT().AddToSets(gomock.Any(), gomock.Any()).Times(2) + dp.EXPECT().AddToLists(gomock.Any(), gomock.Any()).Times(1) + dp.EXPECT().ApplyDataPlane().Times(2) + + inputChan := make(chan *protos.Events) + + testNSCPSet.IPPodMetadata = map[string]*dataplane.PodMetadata{ + "10.0.0.1": dataplane.NewPodMetadata("test", "10.0.0.1", "1234"), + } + testNestedKeyPodCPSet.MemberIPSets = map[string]*ipsets.IPSetMetadata{ + testNSSet.GetPrefixName(): testNSSet, + } + goalState := getGoalStateForControllerSets(t, + []*controlplane.ControllerIPSets{ + testNSCPSet, + testKeyPodCPSet, + testNestedKeyPodCPSet, + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + gsp := NewGoalStateProcessor(ctx, "node1", "pod1", inputChan, dp) + go func() { + inputChan <- &protos.Events{ + Payload: goalState, + } + }() + time.Sleep(sleepAfterChanSent) + + gsp.processNext() + + // Update one of the ipsets and send another event + testNSCPSet.IPPodMetadata = map[string]*dataplane.PodMetadata{ + "10.0.0.2": dataplane.NewPodMetadata("test2", "10.0.0.2", "1234"), + } + goalState = getGoalStateForControllerSets(t, + []*controlplane.ControllerIPSets{ + testNSCPSet, + }, + ) + go func() { + inputChan <- &protos.Events{ + Payload: goalState, + } + }() + time.Sleep(sleepAfterChanSent) + + gsp.processNext() +} + +func getGoalStateForControllerSets(t *testing.T, sets []*controlplane.ControllerIPSets) map[string]*protos.GoalState { + goalState := map[string]*protos.GoalState{ + controlplane.IpsetApply: { + Data: [][]byte{}, + }, + } + for _, set := range sets { + payload, err := controlplane.EncodeControllerIPSet(set) + assert.NoError(t, err) + goalState[controlplane.IpsetApply].Data = append(goalState[controlplane.IpsetApply].Data, payload.Bytes()) + } + return goalState } diff --git a/npm/pkg/controlplane/gobutils.go b/npm/pkg/controlplane/gobutils.go index 2f18f87971..250a96db78 100644 --- a/npm/pkg/controlplane/gobutils.go +++ b/npm/pkg/controlplane/gobutils.go @@ -9,12 +9,12 @@ import ( ) func EncodeString(name string) (*bytes.Buffer, error) { - var payloadBuffer *bytes.Buffer - err := gob.NewEncoder(payloadBuffer).Encode(&name) + var payloadBuffer bytes.Buffer + err := gob.NewEncoder(&payloadBuffer).Encode(&name) if err != nil { return nil, npmerrors.SimpleErrorWrapper("failed to encode", err) } - return payloadBuffer, nil + return &payloadBuffer, nil } func DecodeString(payload *bytes.Buffer) (string, error) { @@ -27,12 +27,12 @@ func DecodeString(payload *bytes.Buffer) (string, error) { } func EncodeControllerIPSet(ipset *ControllerIPSets) (*bytes.Buffer, error) { - var payloadBuffer *bytes.Buffer - err := gob.NewEncoder(payloadBuffer).Encode(&ipset) + var payloadBuffer bytes.Buffer + err := gob.NewEncoder(&payloadBuffer).Encode(&ipset) if err != nil { return nil, npmerrors.SimpleErrorWrapper("failed to encode", err) } - return payloadBuffer, nil + return &payloadBuffer, nil } func DecodeControllerIPSet(payload *bytes.Buffer) (*ControllerIPSets, error) { @@ -45,12 +45,13 @@ func DecodeControllerIPSet(payload *bytes.Buffer) (*ControllerIPSets, error) { } func EncodeNPMNetworkPolicy(netpol *policies.NPMNetworkPolicy) (*bytes.Buffer, error) { - var payloadBuffer *bytes.Buffer - err := gob.NewEncoder(payloadBuffer).Encode(&netpol) + var payloadBuffer bytes.Buffer + enc := gob.NewEncoder(&payloadBuffer) + err := enc.Encode(netpol) if err != nil { return nil, npmerrors.SimpleErrorWrapper("failed to encode", err) } - return payloadBuffer, nil + return &payloadBuffer, nil } func DecodeNPMNetworkPolicy(payload *bytes.Buffer) (*policies.NPMNetworkPolicy, error) { diff --git a/npm/pkg/controlplane/types.go b/npm/pkg/controlplane/types.go index 10c55d2ad9..c51d9d1b77 100644 --- a/npm/pkg/controlplane/types.go +++ b/npm/pkg/controlplane/types.go @@ -15,7 +15,7 @@ const ( // ControllerIPSets is used in fan-out design for controller pod to calculate // and push to daemon pod type ControllerIPSets struct { - ipsets.IPSetMetadata + *ipsets.IPSetMetadata // IPPodMetadata is used for setMaps to store Ips and ports as keys // and podMetadata as value IPPodMetadata map[string]*dp.PodMetadata @@ -23,6 +23,14 @@ type ControllerIPSets struct { MemberIPSets map[string]*ipsets.IPSetMetadata } +func NewControllerIPSets(metadata *ipsets.IPSetMetadata) *ControllerIPSets { + return &ControllerIPSets{ + IPSetMetadata: metadata, + IPPodMetadata: make(map[string]*dp.PodMetadata), + MemberIPSets: make(map[string]*ipsets.IPSetMetadata), + } +} + func (c *ControllerIPSets) GetMetadata() *ipsets.IPSetMetadata { - return &c.IPSetMetadata + return c.IPSetMetadata } diff --git a/npm/pkg/dataplane/dpshim/dpshim.go b/npm/pkg/dataplane/dpshim/dpshim.go new file mode 100644 index 0000000000..a213f9fec7 --- /dev/null +++ b/npm/pkg/dataplane/dpshim/dpshim.go @@ -0,0 +1,69 @@ +package dpshim + +import ( + "github.com/Azure/azure-container-networking/npm/pkg/dataplane" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane/policies" + "github.com/Azure/azure-container-networking/npm/pkg/protos" +) + +// TODO setting this up to unblock another workitem +type DPShim struct { + outChannel chan *protos.Events +} + +func NewDPSim(outChannel chan *protos.Events) *DPShim { + return &DPShim{outChannel: outChannel} +} + +func (dp *DPShim) InitializeDataPlane() error { + return nil +} + +func (dp *DPShim) ResetDataPlane() error { + return nil +} + +func (dp *DPShim) GetIPSet(setName string) *ipsets.IPSet { + return nil +} + +func (dp *DPShim) CreateIPSets(setNames []*ipsets.IPSetMetadata) { + return +} + +func (dp *DPShim) DeleteIPSet(setMetadata *ipsets.IPSetMetadata) { + return +} + +func (dp *DPShim) AddToSets(setNames []*ipsets.IPSetMetadata, podMetadata *dataplane.PodMetadata) error { + return nil +} + +func (dp *DPShim) RemoveFromSets(setNames []*ipsets.IPSetMetadata, podMetadata *dataplane.PodMetadata) error { + return nil +} + +func (dp *DPShim) AddToLists(listName []*ipsets.IPSetMetadata, setNames []*ipsets.IPSetMetadata) error { + return nil +} + +func (dp *DPShim) RemoveFromList(listName *ipsets.IPSetMetadata, setNames []*ipsets.IPSetMetadata) error { + return nil +} + +func (dp *DPShim) ApplyDataPlane() error { + return nil +} + +func (dp *DPShim) AddPolicy(policies *policies.NPMNetworkPolicy) error { + return nil +} + +func (dp *DPShim) RemovePolicy(policyName string) error { + return nil +} + +func (dp *DPShim) UpdatePolicy(policies *policies.NPMNetworkPolicy) error { + return nil +} diff --git a/npm/pkg/transport/controlplane.go b/npm/pkg/transport/controlplane.go index 9ba6b56d67..b1752cb69d 100644 --- a/npm/pkg/transport/controlplane.go +++ b/npm/pkg/transport/controlplane.go @@ -7,7 +7,9 @@ import ( "fmt" "net" + cp "github.com/Azure/azure-container-networking/npm/pkg/controlplane" "github.com/Azure/azure-container-networking/npm/pkg/protos" + npmerrors "github.com/Azure/azure-container-networking/npm/util/errors" "google.golang.org/grpc" "google.golang.org/grpc/reflection" "google.golang.org/grpc/stats" @@ -103,13 +105,12 @@ func (m *Manager) start() error { err := enc.Encode(msg) if err != nil { - fmt.Errorf("Failed to encode") - return err + return npmerrors.SimpleErrorWrapper("failed to encode event", err) } for _, client := range m.Registrations { if err := client.stream.SendMsg(&protos.Events{ Payload: map[string]*protos.GoalState{ - "IPSETAPPLY": { + cp.IpsetApply: { Data: [][]byte{payload.Bytes()}, }, }, From 65b9061eb654426b7b39fa87006bf90d5b21df26 Mon Sep 17 00:00:00 2001 From: Vamsi Kalapala Date: Wed, 12 Jan 2022 16:49:28 -0800 Subject: [PATCH 6/8] resolving some lints --- .../goalstateprocessor/goalstateprocessor.go | 27 ++++++++++--------- npm/pkg/dataplane/dpshim/dpshim.go | 8 +++--- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go index cce91f2cda..b72d02e1b6 100644 --- a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go +++ b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go @@ -31,7 +31,6 @@ func NewGoalStateProcessor( podName string, inputChan chan *protos.Events, dp dataplane.GenericDataplane) *GoalStateProcessor { - klog.Infof("Creating GoalStateProcessor for node %s", nodeID) return &GoalStateProcessor{ ctx: ctx, @@ -144,7 +143,6 @@ func (gsp *GoalStateProcessor) process(inputEvent *protos.Events) { } func (gsp *GoalStateProcessor) processIPSetsApplyEvent(goalState *protos.GoalState) error { - var applyErr error for _, gs := range goalState.GetData() { payload := bytes.NewBuffer(gs) ipset, err := cp.DecodeControllerIPSet(payload) @@ -162,19 +160,22 @@ func (gsp *GoalStateProcessor) processIPSetsApplyEvent(goalState *protos.GoalSta switch ipset.GetSetKind() { case ipsets.HashSet: - applyErr = gsp.applySets(ipset, cachedIPSet) + err = gsp.applySets(ipset, cachedIPSet) + if err != nil { + return err + } case ipsets.ListSet: - applyErr = gsp.applyLists(ipset, cachedIPSet) + err = gsp.applyLists(ipset, cachedIPSet) + if err != nil { + return err + } case ipsets.UnknownKind: - applyErr = npmerrors.SimpleError( + return npmerrors.SimpleError( fmt.Sprintf("failed to decode IPSet apply event: Unknown IPSet kind %s", cachedIPSet.Kind), ) } - if applyErr != nil { - break - } } - return applyErr + return nil } func (gsp *GoalStateProcessor) applySets(ipSet *cp.ControllerIPSets, cachedIPSet *ipsets.IPSet) error { @@ -265,14 +266,14 @@ func (gsp *GoalStateProcessor) processPolicyApplyEvent(goalState *protos.GoalSta payload := bytes.NewBuffer(gs) netpol, err := cp.DecodeNPMNetworkPolicy(payload) if err != nil { - return npmerrors.SimpleErrorWrapper("failed to decode Policy remove event", err) + return npmerrors.SimpleErrorWrapper("failed to decode Policy apply event", err) } klog.Infof("Processing %s Policy ADD event", netpol.Name) err = gsp.dp.UpdatePolicy(netpol) if err != nil { klog.Errorf("Error applying policy %s to dataplane with error: %s", netpol.Name, err.Error()) - return err + return npmerrors.SimpleErrorWrapper("failed update policy event", err) } } return nil @@ -283,14 +284,14 @@ func (gsp *GoalStateProcessor) processPolicyRemoveEvent(goalState *protos.GoalSt payload := bytes.NewBuffer(gs) netpolName, err := cp.DecodeString(payload) if err != nil { - return err + return npmerrors.SimpleErrorWrapper("failed to decode Policy remove event", err) } klog.Infof("Processing %s Policy remove event", netpolName) err = gsp.dp.RemovePolicy(netpolName) if err != nil { klog.Errorf("Error removing policy %s from dataplane with error: %s", netpolName, err.Error()) - return err + return npmerrors.SimpleErrorWrapper("failed remove policy event", err) } } return nil diff --git a/npm/pkg/dataplane/dpshim/dpshim.go b/npm/pkg/dataplane/dpshim/dpshim.go index a213f9fec7..ab356eba0b 100644 --- a/npm/pkg/dataplane/dpshim/dpshim.go +++ b/npm/pkg/dataplane/dpshim/dpshim.go @@ -29,11 +29,9 @@ func (dp *DPShim) GetIPSet(setName string) *ipsets.IPSet { } func (dp *DPShim) CreateIPSets(setNames []*ipsets.IPSetMetadata) { - return } func (dp *DPShim) DeleteIPSet(setMetadata *ipsets.IPSetMetadata) { - return } func (dp *DPShim) AddToSets(setNames []*ipsets.IPSetMetadata, podMetadata *dataplane.PodMetadata) error { @@ -44,7 +42,7 @@ func (dp *DPShim) RemoveFromSets(setNames []*ipsets.IPSetMetadata, podMetadata * return nil } -func (dp *DPShim) AddToLists(listName []*ipsets.IPSetMetadata, setNames []*ipsets.IPSetMetadata) error { +func (dp *DPShim) AddToLists(listName, setNames []*ipsets.IPSetMetadata) error { return nil } @@ -56,7 +54,7 @@ func (dp *DPShim) ApplyDataPlane() error { return nil } -func (dp *DPShim) AddPolicy(policies *policies.NPMNetworkPolicy) error { +func (dp *DPShim) AddPolicy(networkpolicies *policies.NPMNetworkPolicy) error { return nil } @@ -64,6 +62,6 @@ func (dp *DPShim) RemovePolicy(policyName string) error { return nil } -func (dp *DPShim) UpdatePolicy(policies *policies.NPMNetworkPolicy) error { +func (dp *DPShim) UpdatePolicy(networkpolicies *policies.NPMNetworkPolicy) error { return nil } From 93eecb6d014fbd4026d24a82e0a498c68d7ee5ac Mon Sep 17 00:00:00 2001 From: Vamsi Kalapala Date: Fri, 14 Jan 2022 11:10:38 -0800 Subject: [PATCH 7/8] addressing comments --- .../goalstateprocessor/goalstateprocessor.go | 49 +++++++++++-------- .../goalstateprocessor_test.go | 2 +- npm/pkg/controlplane/gobutils.go | 18 +++++++ npm/pkg/controlplane/types.go | 2 +- npm/pkg/dataplane/ipsets/ipset.go | 6 ++- npm/pkg/transport/dataplane.go | 3 -- 6 files changed, 54 insertions(+), 26 deletions(-) diff --git a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go index b72d02e1b6..b3dcc0cb4b 100644 --- a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go +++ b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go @@ -18,8 +18,6 @@ type GoalStateProcessor struct { cancel context.CancelFunc nodeID string podName string - controllerIP string - controllerPort int dp dataplane.GenericDataplane inputChannel chan *protos.Events backoffChannel chan *protos.Events @@ -42,27 +40,19 @@ func NewGoalStateProcessor( } } +// Start kicks off the GoalStateProcessor func (gsp *GoalStateProcessor) Start() { klog.Infof("Starting GoalStateProcessor for node %s", gsp.nodeID) go gsp.run() } -func (gsp *GoalStateProcessor) SetController(controllerIP string, controllerPort int) { - klog.Infof("Setting controller for node %s", gsp.nodeID) - gsp.controllerIP = controllerIP - gsp.controllerPort = controllerPort -} - +// Stop stops the GoalStateProcessor func (gsp *GoalStateProcessor) Stop() { klog.Infof("Stopping GoalStateProcessor for node %s", gsp.nodeID) gsp.cancel() } func (gsp *GoalStateProcessor) run() { - if gsp.controllerIP != "" && gsp.controllerPort != 0 { - klog.Warningf("Invalid controller for node %s, IP %s port %d", gsp.nodeID, gsp.controllerIP, gsp.controllerPort) - return - } klog.Infof("Starting dataplane for node %s", gsp.nodeID) for { @@ -113,29 +103,29 @@ func (gsp *GoalStateProcessor) process(inputEvent *protos.Events) { return } - if _, ok := payload[cp.IpsetApply]; ok { - err := gsp.processIPSetsApplyEvent(payload[cp.IpsetApply]) + if ipsetApplyPayload, ok := payload[cp.IpsetApply]; ok { + err := gsp.processIPSetsApplyEvent(ipsetApplyPayload) if err != nil { klog.Errorf("Error processing IPSET apply event %s", err) } } - if _, ok := payload[cp.PolicyApply]; ok { - err := gsp.processPolicyApplyEvent(payload[cp.PolicyApply]) + if policyApplyPayload, ok := payload[cp.PolicyApply]; ok { + err := gsp.processPolicyApplyEvent(policyApplyPayload) if err != nil { klog.Errorf("Error processing POLICY apply event %s", err) } } - if _, ok := payload[cp.PolicyRemove]; ok { - err := gsp.processPolicyRemoveEvent(payload[cp.PolicyRemove]) + if policyRemovePayload, ok := payload[cp.PolicyRemove]; ok { + err := gsp.processPolicyRemoveEvent(policyRemovePayload) if err != nil { klog.Errorf("Error processing POLICY remove event %s", err) } } - if _, ok := payload[cp.IpsetRemove]; ok { - err := gsp.processIPSetsRemoveEvent(payload[cp.IpsetRemove]) + if ipsetRemovePayload, ok := payload[cp.IpsetRemove]; ok { + err := gsp.processIPSetsRemoveEvent(ipsetRemovePayload) if err != nil { klog.Errorf("Error processing IPSET remove event %s", err) } @@ -150,6 +140,11 @@ func (gsp *GoalStateProcessor) processIPSetsApplyEvent(goalState *protos.GoalSta return npmerrors.SimpleErrorWrapper("failed to decode IPSet apply event", err) } + if ipset == nil { + klog.Warningf("Empty IPSet apply event") + continue + } + ipsetName := ipset.GetPrefixName() klog.Infof("Processing %s IPSET apply event", ipsetName) @@ -248,6 +243,10 @@ func (gsp *GoalStateProcessor) processIPSetsRemoveEvent(goalState *protos.GoalSt if err != nil { return npmerrors.SimpleErrorWrapper("failed to decode IPSet remove event", err) } + if ipsetName == "" { + klog.Warningf("Empty IPSet remove event") + continue + } klog.Infof("Processing %s IPSET remove event", ipsetName) cachedIPSet := gsp.dp.GetIPSet(ipsetName) @@ -270,6 +269,11 @@ func (gsp *GoalStateProcessor) processPolicyApplyEvent(goalState *protos.GoalSta } klog.Infof("Processing %s Policy ADD event", netpol.Name) + if netpol == nil { + klog.Warningf("Empty Policy apply event") + continue + } + err = gsp.dp.UpdatePolicy(netpol) if err != nil { klog.Errorf("Error applying policy %s to dataplane with error: %s", netpol.Name, err.Error()) @@ -288,6 +292,11 @@ func (gsp *GoalStateProcessor) processPolicyRemoveEvent(goalState *protos.GoalSt } klog.Infof("Processing %s Policy remove event", netpolName) + if netpolName == "" { + klog.Warningf("Empty Policy remove event") + continue + } + err = gsp.dp.RemovePolicy(netpolName) if err != nil { klog.Errorf("Error removing policy %s from dataplane with error: %s", netpolName, err.Error()) diff --git a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor_test.go b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor_test.go index 0728ea4053..ee5c7e1470 100644 --- a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor_test.go +++ b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor_test.go @@ -76,7 +76,7 @@ var ( } ) -func TestNewGoalStateProcessor(t *testing.T) { +func TestPolicyApplyEvent(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/npm/pkg/controlplane/gobutils.go b/npm/pkg/controlplane/gobutils.go index 250a96db78..28867b2d7e 100644 --- a/npm/pkg/controlplane/gobutils.go +++ b/npm/pkg/controlplane/gobutils.go @@ -9,6 +9,9 @@ import ( ) func EncodeString(name string) (*bytes.Buffer, error) { + if name == "" { + return nil, npmerrors.SimpleError("failed to encode, name is empty") + } var payloadBuffer bytes.Buffer err := gob.NewEncoder(&payloadBuffer).Encode(&name) if err != nil { @@ -18,6 +21,9 @@ func EncodeString(name string) (*bytes.Buffer, error) { } func DecodeString(payload *bytes.Buffer) (string, error) { + if payload == nil { + return "", npmerrors.SimpleError("failed to decode, payload is nil") + } var name string err := gob.NewDecoder(payload).Decode(&name) if err != nil { @@ -27,6 +33,9 @@ func DecodeString(payload *bytes.Buffer) (string, error) { } func EncodeControllerIPSet(ipset *ControllerIPSets) (*bytes.Buffer, error) { + if ipset == nil { + return nil, npmerrors.SimpleError("failed to encode, ipset is nil") + } var payloadBuffer bytes.Buffer err := gob.NewEncoder(&payloadBuffer).Encode(&ipset) if err != nil { @@ -36,6 +45,9 @@ func EncodeControllerIPSet(ipset *ControllerIPSets) (*bytes.Buffer, error) { } func DecodeControllerIPSet(payload *bytes.Buffer) (*ControllerIPSets, error) { + if payload == nil { + return nil, npmerrors.SimpleError("failed to decode, payload is nil") + } var ipset ControllerIPSets err := gob.NewDecoder(payload).Decode(&ipset) if err != nil { @@ -45,6 +57,9 @@ func DecodeControllerIPSet(payload *bytes.Buffer) (*ControllerIPSets, error) { } func EncodeNPMNetworkPolicy(netpol *policies.NPMNetworkPolicy) (*bytes.Buffer, error) { + if netpol == nil { + return nil, npmerrors.SimpleError("failed to encode, netpol is nil") + } var payloadBuffer bytes.Buffer enc := gob.NewEncoder(&payloadBuffer) err := enc.Encode(netpol) @@ -55,6 +70,9 @@ func EncodeNPMNetworkPolicy(netpol *policies.NPMNetworkPolicy) (*bytes.Buffer, e } func DecodeNPMNetworkPolicy(payload *bytes.Buffer) (*policies.NPMNetworkPolicy, error) { + if payload == nil { + return nil, npmerrors.SimpleError("failed to decode, payload is nil") + } var netpol policies.NPMNetworkPolicy err := gob.NewDecoder(payload).Decode(&netpol) if err != nil { diff --git a/npm/pkg/controlplane/types.go b/npm/pkg/controlplane/types.go index c51d9d1b77..609bf95f36 100644 --- a/npm/pkg/controlplane/types.go +++ b/npm/pkg/controlplane/types.go @@ -19,7 +19,7 @@ type ControllerIPSets struct { // IPPodMetadata is used for setMaps to store Ips and ports as keys // and podMetadata as value IPPodMetadata map[string]*dp.PodMetadata - // This is used for listMaps to store child IP Sets + // MemberIPSets is used for listMaps to store child IP Sets MemberIPSets map[string]*ipsets.IPSetMetadata } diff --git a/npm/pkg/dataplane/ipsets/ipset.go b/npm/pkg/dataplane/ipsets/ipset.go index e2f0458d9b..b8df3d8736 100644 --- a/npm/pkg/dataplane/ipsets/ipset.go +++ b/npm/pkg/dataplane/ipsets/ipset.go @@ -181,9 +181,11 @@ const ( ) type IPSet struct { + // Name is prefixed name of original set Name string unprefixedName string - HashedName string + // HashedName is AzureNpmPrefix (azure-npm-) + hash of prefixed name + HashedName string // SetProperties embedding set properties SetProperties // IpPodKey is used for setMaps to store Ips and ports as keys @@ -232,6 +234,7 @@ func NewIPSet(setMetadata *IPSetMetadata) *IPSet { return set } +// GetSetMetadata returns set metadata with unprefixed original name and SetType func (set *IPSet) GetSetMetadata() *IPSetMetadata { return NewIPSetMetadata(set.unprefixedName, set.Type) } @@ -241,6 +244,7 @@ func (set *IPSet) String() string { set.Name, set.HashedName, setTypeName[set.Type], string(set.Kind)) } +// GetSetContents returns members of set as string slice func (set *IPSet) GetSetContents() ([]string, error) { switch set.Kind { case HashSet: diff --git a/npm/pkg/transport/dataplane.go b/npm/pkg/transport/dataplane.go index c673b5e8d6..b83735f432 100644 --- a/npm/pkg/transport/dataplane.go +++ b/npm/pkg/transport/dataplane.go @@ -64,9 +64,6 @@ func (c *DataplaneEventsClient) run(ctx context.Context, connectClient protos.Da "Received event %s \n", e.String(), ) - for _, d := range e.GetData() { - fmt.Printf("%b\n", d) - } } } } From 17a6417bb28ad58f55f2612fd860dac9b95e54e4 Mon Sep 17 00:00:00 2001 From: Vamsi Kalapala Date: Fri, 14 Jan 2022 12:53:48 -0800 Subject: [PATCH 8/8] fixing a lint issue --- npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go index b3dcc0cb4b..19f75daf71 100644 --- a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go +++ b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go @@ -267,12 +267,12 @@ func (gsp *GoalStateProcessor) processPolicyApplyEvent(goalState *protos.GoalSta if err != nil { return npmerrors.SimpleErrorWrapper("failed to decode Policy apply event", err) } - klog.Infof("Processing %s Policy ADD event", netpol.Name) if netpol == nil { klog.Warningf("Empty Policy apply event") continue } + klog.Infof("Processing %s Policy ADD event", netpol.Name) err = gsp.dp.UpdatePolicy(netpol) if err != nil {