diff --git a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go new file mode 100644 index 0000000000..19f75daf71 --- /dev/null +++ b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go @@ -0,0 +1,316 @@ +package goalstateprocessor + +import ( + "bytes" + "context" + "fmt" + + cp "github.com/Azure/azure-container-networking/npm/pkg/controlplane" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" + "github.com/Azure/azure-container-networking/npm/pkg/protos" + npmerrors "github.com/Azure/azure-container-networking/npm/util/errors" + "k8s.io/klog" +) + +type GoalStateProcessor struct { + ctx context.Context + cancel context.CancelFunc + nodeID string + podName string + dp dataplane.GenericDataplane + inputChannel chan *protos.Events + backoffChannel chan *protos.Events +} + +func NewGoalStateProcessor( + ctx context.Context, + nodeID string, + podName string, + inputChan chan *protos.Events, + dp dataplane.GenericDataplane) *GoalStateProcessor { + klog.Infof("Creating GoalStateProcessor for node %s", nodeID) + return &GoalStateProcessor{ + ctx: ctx, + nodeID: nodeID, + podName: podName, + dp: dp, + inputChannel: inputChan, + backoffChannel: make(chan *protos.Events), + } +} + +// Start kicks off the GoalStateProcessor +func (gsp *GoalStateProcessor) Start() { + klog.Infof("Starting GoalStateProcessor for node %s", gsp.nodeID) + go gsp.run() +} + +// Stop stops the GoalStateProcessor +func (gsp *GoalStateProcessor) Stop() { + klog.Infof("Stopping GoalStateProcessor for node %s", gsp.nodeID) + gsp.cancel() +} + +func (gsp *GoalStateProcessor) run() { + klog.Infof("Starting dataplane for node %s", gsp.nodeID) + + for { + gsp.processNext() + } +} + +func (gsp *GoalStateProcessor) processNext() { + select { + case <-gsp.ctx.Done(): + klog.Infof("GoalStateProcessor for node %s stopped", gsp.nodeID) + return + case inputEvents := <-gsp.inputChannel: + // TODO remove this large print later + klog.Infof("Received event %s", inputEvents) + gsp.process(inputEvents) + case backoffEvents := <-gsp.backoffChannel: + // For now keep it simple. Do not worry about backoff events + // but if we need to handle them, we can do it here. + // TODO remove this large print later + klog.Infof("Received backoff event %s", backoffEvents) + gsp.process(backoffEvents) + default: + } +} + +func (gsp *GoalStateProcessor) process(inputEvent *protos.Events) { + klog.Infof("Processing event") + // apply dataplane after syncing + defer func() { + dperr := gsp.dp.ApplyDataPlane() + if dperr != nil { + klog.Errorf("Apply Dataplane failed with %v", dperr) + } + }() + + // Process these individual buckkets in order + // 1. Apply IPSET + // 2. Apply POLICY + // 3. Remove POLICY + // 4. Remove IPSET + + // TODO need to handle first connect stream of all GoalStates + payload := inputEvent.GetPayload() + + if !validatePayload(payload) { + klog.Warningf("Empty payload in event %s", inputEvent) + return + } + + if ipsetApplyPayload, ok := payload[cp.IpsetApply]; ok { + err := gsp.processIPSetsApplyEvent(ipsetApplyPayload) + if err != nil { + klog.Errorf("Error processing IPSET apply event %s", err) + } + } + + if policyApplyPayload, ok := payload[cp.PolicyApply]; ok { + err := gsp.processPolicyApplyEvent(policyApplyPayload) + if err != nil { + klog.Errorf("Error processing POLICY apply event %s", err) + } + } + + if policyRemovePayload, ok := payload[cp.PolicyRemove]; ok { + err := gsp.processPolicyRemoveEvent(policyRemovePayload) + if err != nil { + klog.Errorf("Error processing POLICY remove event %s", err) + } + } + + if ipsetRemovePayload, ok := payload[cp.IpsetRemove]; ok { + err := gsp.processIPSetsRemoveEvent(ipsetRemovePayload) + if err != nil { + klog.Errorf("Error processing IPSET remove event %s", err) + } + } +} + +func (gsp *GoalStateProcessor) processIPSetsApplyEvent(goalState *protos.GoalState) error { + for _, gs := range goalState.GetData() { + payload := bytes.NewBuffer(gs) + ipset, err := cp.DecodeControllerIPSet(payload) + if err != nil { + return npmerrors.SimpleErrorWrapper("failed to decode IPSet apply event", err) + } + + if ipset == nil { + klog.Warningf("Empty IPSet apply event") + continue + } + + ipsetName := ipset.GetPrefixName() + klog.Infof("Processing %s IPSET apply event", ipsetName) + + cachedIPSet := gsp.dp.GetIPSet(ipsetName) + if cachedIPSet == nil { + klog.Infof("IPSet %s not found in cache, adding to cache", ipsetName) + } + + switch ipset.GetSetKind() { + case ipsets.HashSet: + err = gsp.applySets(ipset, cachedIPSet) + if err != nil { + return err + } + case ipsets.ListSet: + err = gsp.applyLists(ipset, cachedIPSet) + if err != nil { + return err + } + case ipsets.UnknownKind: + return npmerrors.SimpleError( + fmt.Sprintf("failed to decode IPSet apply event: Unknown IPSet kind %s", cachedIPSet.Kind), + ) + } + } + return nil +} + +func (gsp *GoalStateProcessor) applySets(ipSet *cp.ControllerIPSets, cachedIPSet *ipsets.IPSet) error { + if len(ipSet.IPPodMetadata) == 0 { + gsp.dp.CreateIPSets([]*ipsets.IPSetMetadata{ipSet.GetMetadata()}) + return nil + } + + setMetadata := ipSet.GetMetadata() + for _, podMetadata := range ipSet.IPPodMetadata { + err := gsp.dp.AddToSets([]*ipsets.IPSetMetadata{setMetadata}, podMetadata) + if err != nil { + return npmerrors.SimpleErrorWrapper("IPSet apply event, failed at AddToSet.", err) + } + } + + if cachedIPSet != nil { + for podIP, podKey := range cachedIPSet.IPPodKey { + if _, ok := ipSet.IPPodMetadata[podIP]; !ok { + err := gsp.dp.RemoveFromSets([]*ipsets.IPSetMetadata{setMetadata}, dataplane.NewPodMetadata(podIP, podKey, "")) + if err != nil { + return npmerrors.SimpleErrorWrapper("IPSet apply event, failed at RemoveFromSets.", err) + } + } + } + } + return nil +} + +func (gsp *GoalStateProcessor) applyLists(ipSet *cp.ControllerIPSets, cachedIPSet *ipsets.IPSet) error { + if len(ipSet.MemberIPSets) == 0 { + gsp.dp.CreateIPSets([]*ipsets.IPSetMetadata{ipSet.GetMetadata()}) + return nil + } + + setMetadata := ipSet.GetMetadata() + membersToAdd := make([]*ipsets.IPSetMetadata, len(ipSet.MemberIPSets)) + idx := 0 + for _, memberIPSet := range ipSet.MemberIPSets { + membersToAdd[idx] = memberIPSet + idx++ + } + err := gsp.dp.AddToLists([]*ipsets.IPSetMetadata{setMetadata}, membersToAdd) + if err != nil { + return npmerrors.SimpleErrorWrapper("IPSet apply event, failed at AddToLists.", err) + } + + if cachedIPSet != nil { + toDeleteMembers := make([]*ipsets.IPSetMetadata, 0) + for _, memberSet := range cachedIPSet.MemberIPSets { + if _, ok := ipSet.MemberIPSets[memberSet.Name]; !ok { + toDeleteMembers = append(toDeleteMembers, memberSet.GetSetMetadata()) + } + } + + if len(toDeleteMembers) > 0 { + err := gsp.dp.RemoveFromList(setMetadata, toDeleteMembers) + if err != nil { + return npmerrors.SimpleErrorWrapper("IPSet apply event, failed at RemoveFromList.", err) + } + } + } + return nil +} + +func (gsp *GoalStateProcessor) processIPSetsRemoveEvent(goalState *protos.GoalState) error { + for _, gs := range goalState.GetData() { + payload := bytes.NewBuffer(gs) + ipsetName, err := cp.DecodeString(payload) + if err != nil { + return npmerrors.SimpleErrorWrapper("failed to decode IPSet remove event", err) + } + if ipsetName == "" { + klog.Warningf("Empty IPSet remove event") + continue + } + klog.Infof("Processing %s IPSET remove event", ipsetName) + + cachedIPSet := gsp.dp.GetIPSet(ipsetName) + if cachedIPSet == nil { + klog.Infof("IPSet %s not found in cache, ignoring delete call.", ipsetName) + return nil + } + + gsp.dp.DeleteIPSet(ipsets.NewIPSetMetadata(cachedIPSet.Name, cachedIPSet.Type)) + } + return nil +} + +func (gsp *GoalStateProcessor) processPolicyApplyEvent(goalState *protos.GoalState) error { + for _, gs := range goalState.GetData() { + payload := bytes.NewBuffer(gs) + netpol, err := cp.DecodeNPMNetworkPolicy(payload) + if err != nil { + return npmerrors.SimpleErrorWrapper("failed to decode Policy apply event", err) + } + + if netpol == nil { + klog.Warningf("Empty Policy apply event") + continue + } + klog.Infof("Processing %s Policy ADD event", netpol.Name) + + err = gsp.dp.UpdatePolicy(netpol) + if err != nil { + klog.Errorf("Error applying policy %s to dataplane with error: %s", netpol.Name, err.Error()) + return npmerrors.SimpleErrorWrapper("failed update policy event", err) + } + } + return nil +} + +func (gsp *GoalStateProcessor) processPolicyRemoveEvent(goalState *protos.GoalState) error { + for _, gs := range goalState.GetData() { + payload := bytes.NewBuffer(gs) + netpolName, err := cp.DecodeString(payload) + if err != nil { + return npmerrors.SimpleErrorWrapper("failed to decode Policy remove event", err) + } + klog.Infof("Processing %s Policy remove event", netpolName) + + if netpolName == "" { + klog.Warningf("Empty Policy remove event") + continue + } + + err = gsp.dp.RemovePolicy(netpolName) + if err != nil { + klog.Errorf("Error removing policy %s from dataplane with error: %s", netpolName, err.Error()) + return npmerrors.SimpleErrorWrapper("failed remove policy event", err) + } + } + return nil +} + +func validatePayload(payload map[string]*protos.GoalState) bool { + for _, v := range payload { + if len(v.GetData()) != 0 { + return true + } + } + return false +} diff --git a/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor_test.go b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor_test.go new file mode 100644 index 0000000000..ee5c7e1470 --- /dev/null +++ b/npm/pkg/controlplane/goalstateprocessor/goalstateprocessor_test.go @@ -0,0 +1,214 @@ +package goalstateprocessor + +import ( + "context" + "testing" + "time" + + "github.com/Azure/azure-container-networking/npm/pkg/controlplane" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" + dpmocks "github.com/Azure/azure-container-networking/npm/pkg/dataplane/mocks" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane/policies" + "github.com/Azure/azure-container-networking/npm/pkg/protos" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +const ( + sleepAfterChanSent = time.Millisecond * 100 +) + +var ( + testNSSet = ipsets.NewIPSetMetadata("test-ns-set", ipsets.Namespace) + testNSCPSet = controlplane.NewControllerIPSets(testNSSet) + testKeyPodSet = ipsets.NewIPSetMetadata("test-keyPod-set", ipsets.KeyLabelOfPod) + testKeyPodCPSet = controlplane.NewControllerIPSets(testKeyPodSet) + testNestedKeyPodSet = ipsets.NewIPSetMetadata("test-nestedkeyPod-set", ipsets.NestedLabelOfPod) + testNestedKeyPodCPSet = controlplane.NewControllerIPSets(testNestedKeyPodSet) + testNetPol = &policies.NPMNetworkPolicy{ + Name: "test-netpol", + NameSpace: "x", + PolicyKey: "x/test-netpol", + PodSelectorIPSets: []*ipsets.TranslatedIPSet{ + { + Metadata: testNSSet, + }, + { + Metadata: testKeyPodSet, + }, + }, + RuleIPSets: []*ipsets.TranslatedIPSet{ + { + Metadata: testNSSet, + }, + { + Metadata: testKeyPodSet, + }, + }, + ACLs: []*policies.ACLPolicy{ + { + PolicyID: "azure-acl-123", + Target: policies.Dropped, + Direction: policies.Ingress, + }, + { + PolicyID: "azure-acl-234", + Target: policies.Allowed, + Direction: policies.Ingress, + SrcList: []policies.SetInfo{ + { + IPSet: testNSSet, + Included: true, + MatchType: policies.SrcMatch, + }, + { + IPSet: testKeyPodSet, + Included: true, + MatchType: policies.SrcMatch, + }, + }, + }, + }, + PodEndpoints: map[string]string{ + "10.0.0.1": "1234", + }, + } +) + +func TestPolicyApplyEvent(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + // Verify that the policy was applied + dp.EXPECT().UpdatePolicy(gomock.Any()).Times(1) + dp.EXPECT().ApplyDataPlane().Times(1) + + inputChan := make(chan *protos.Events) + payload, err := controlplane.EncodeNPMNetworkPolicy(testNetPol) + assert.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + gsp := NewGoalStateProcessor(ctx, "node1", "pod1", inputChan, dp) + + go func() { + inputChan <- &protos.Events{ + Payload: map[string]*protos.GoalState{ + controlplane.PolicyApply: { + Data: [][]byte{payload.Bytes()}, + }, + }, + } + }() + time.Sleep(sleepAfterChanSent) + + gsp.processNext() +} + +func TestIPSetsApply(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + // Verify that the policy was applied + dp.EXPECT().GetIPSet(gomock.Any()).Times(3) + dp.EXPECT().CreateIPSets(gomock.Any()).Times(3) + dp.EXPECT().ApplyDataPlane().Times(1) + + inputChan := make(chan *protos.Events) + + goalState := getGoalStateForControllerSets(t, + []*controlplane.ControllerIPSets{ + testNSCPSet, + testKeyPodCPSet, + testNestedKeyPodCPSet, + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + gsp := NewGoalStateProcessor(ctx, "node1", "pod1", inputChan, dp) + go func() { + inputChan <- &protos.Events{ + Payload: goalState, + } + }() + time.Sleep(sleepAfterChanSent) + + gsp.processNext() +} + +func TestIPSetsApplyUpdateMembers(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dp := dpmocks.NewMockGenericDataplane(ctrl) + // Verify that the policy was applied + dp.EXPECT().GetIPSet(gomock.Any()).Times(4) + dp.EXPECT().CreateIPSets(gomock.Any()).Times(1) + dp.EXPECT().AddToSets(gomock.Any(), gomock.Any()).Times(2) + dp.EXPECT().AddToLists(gomock.Any(), gomock.Any()).Times(1) + dp.EXPECT().ApplyDataPlane().Times(2) + + inputChan := make(chan *protos.Events) + + testNSCPSet.IPPodMetadata = map[string]*dataplane.PodMetadata{ + "10.0.0.1": dataplane.NewPodMetadata("test", "10.0.0.1", "1234"), + } + testNestedKeyPodCPSet.MemberIPSets = map[string]*ipsets.IPSetMetadata{ + testNSSet.GetPrefixName(): testNSSet, + } + goalState := getGoalStateForControllerSets(t, + []*controlplane.ControllerIPSets{ + testNSCPSet, + testKeyPodCPSet, + testNestedKeyPodCPSet, + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + gsp := NewGoalStateProcessor(ctx, "node1", "pod1", inputChan, dp) + go func() { + inputChan <- &protos.Events{ + Payload: goalState, + } + }() + time.Sleep(sleepAfterChanSent) + + gsp.processNext() + + // Update one of the ipsets and send another event + testNSCPSet.IPPodMetadata = map[string]*dataplane.PodMetadata{ + "10.0.0.2": dataplane.NewPodMetadata("test2", "10.0.0.2", "1234"), + } + goalState = getGoalStateForControllerSets(t, + []*controlplane.ControllerIPSets{ + testNSCPSet, + }, + ) + go func() { + inputChan <- &protos.Events{ + Payload: goalState, + } + }() + time.Sleep(sleepAfterChanSent) + + gsp.processNext() +} + +func getGoalStateForControllerSets(t *testing.T, sets []*controlplane.ControllerIPSets) map[string]*protos.GoalState { + goalState := map[string]*protos.GoalState{ + controlplane.IpsetApply: { + Data: [][]byte{}, + }, + } + for _, set := range sets { + payload, err := controlplane.EncodeControllerIPSet(set) + assert.NoError(t, err) + goalState[controlplane.IpsetApply].Data = append(goalState[controlplane.IpsetApply].Data, payload.Bytes()) + } + return goalState +} diff --git a/npm/pkg/controlplane/gobutils.go b/npm/pkg/controlplane/gobutils.go new file mode 100644 index 0000000000..28867b2d7e --- /dev/null +++ b/npm/pkg/controlplane/gobutils.go @@ -0,0 +1,82 @@ +package controlplane + +import ( + "bytes" + "encoding/gob" + + "github.com/Azure/azure-container-networking/npm/pkg/dataplane/policies" + npmerrors "github.com/Azure/azure-container-networking/npm/util/errors" +) + +func EncodeString(name string) (*bytes.Buffer, error) { + if name == "" { + return nil, npmerrors.SimpleError("failed to encode, name is empty") + } + var payloadBuffer bytes.Buffer + err := gob.NewEncoder(&payloadBuffer).Encode(&name) + if err != nil { + return nil, npmerrors.SimpleErrorWrapper("failed to encode", err) + } + return &payloadBuffer, nil +} + +func DecodeString(payload *bytes.Buffer) (string, error) { + if payload == nil { + return "", npmerrors.SimpleError("failed to decode, payload is nil") + } + var name string + err := gob.NewDecoder(payload).Decode(&name) + if err != nil { + return "", npmerrors.SimpleErrorWrapper("failed to decode", err) + } + return name, nil +} + +func EncodeControllerIPSet(ipset *ControllerIPSets) (*bytes.Buffer, error) { + if ipset == nil { + return nil, npmerrors.SimpleError("failed to encode, ipset is nil") + } + var payloadBuffer bytes.Buffer + err := gob.NewEncoder(&payloadBuffer).Encode(&ipset) + if err != nil { + return nil, npmerrors.SimpleErrorWrapper("failed to encode", err) + } + return &payloadBuffer, nil +} + +func DecodeControllerIPSet(payload *bytes.Buffer) (*ControllerIPSets, error) { + if payload == nil { + return nil, npmerrors.SimpleError("failed to decode, payload is nil") + } + var ipset ControllerIPSets + err := gob.NewDecoder(payload).Decode(&ipset) + if err != nil { + return nil, npmerrors.SimpleErrorWrapper("failed to decode", err) + } + return &ipset, nil +} + +func EncodeNPMNetworkPolicy(netpol *policies.NPMNetworkPolicy) (*bytes.Buffer, error) { + if netpol == nil { + return nil, npmerrors.SimpleError("failed to encode, netpol is nil") + } + var payloadBuffer bytes.Buffer + enc := gob.NewEncoder(&payloadBuffer) + err := enc.Encode(netpol) + if err != nil { + return nil, npmerrors.SimpleErrorWrapper("failed to encode", err) + } + return &payloadBuffer, nil +} + +func DecodeNPMNetworkPolicy(payload *bytes.Buffer) (*policies.NPMNetworkPolicy, error) { + if payload == nil { + return nil, npmerrors.SimpleError("failed to decode, payload is nil") + } + var netpol policies.NPMNetworkPolicy + err := gob.NewDecoder(payload).Decode(&netpol) + if err != nil { + return nil, npmerrors.SimpleErrorWrapper("failed to decode", err) + } + return &netpol, nil +} diff --git a/npm/pkg/controlplane/types.go b/npm/pkg/controlplane/types.go new file mode 100644 index 0000000000..609bf95f36 --- /dev/null +++ b/npm/pkg/controlplane/types.go @@ -0,0 +1,36 @@ +package controlplane + +import ( + dp "github.com/Azure/azure-container-networking/npm/pkg/dataplane" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" +) + +const ( + IpsetApply string = "IPSETAPPLY" + IpsetRemove string = "IPSETREMOVE" + PolicyApply string = "POLICYAPPLY" + PolicyRemove string = "POLICYREMOVE" +) + +// ControllerIPSets is used in fan-out design for controller pod to calculate +// and push to daemon pod +type ControllerIPSets struct { + *ipsets.IPSetMetadata + // IPPodMetadata is used for setMaps to store Ips and ports as keys + // and podMetadata as value + IPPodMetadata map[string]*dp.PodMetadata + // MemberIPSets is used for listMaps to store child IP Sets + MemberIPSets map[string]*ipsets.IPSetMetadata +} + +func NewControllerIPSets(metadata *ipsets.IPSetMetadata) *ControllerIPSets { + return &ControllerIPSets{ + IPSetMetadata: metadata, + IPPodMetadata: make(map[string]*dp.PodMetadata), + MemberIPSets: make(map[string]*ipsets.IPSetMetadata), + } +} + +func (c *ControllerIPSets) GetMetadata() *ipsets.IPSetMetadata { + return c.IPSetMetadata +} diff --git a/npm/pkg/dataplane/dataplane.go b/npm/pkg/dataplane/dataplane.go index b4b6800bbd..19ad894c07 100644 --- a/npm/pkg/dataplane/dataplane.go +++ b/npm/pkg/dataplane/dataplane.go @@ -82,6 +82,10 @@ func (dp *DataPlane) ResetDataPlane() error { return dp.bootupDataPlane() } +func (dp *DataPlane) GetIPSet(setName string) *ipsets.IPSet { + return dp.ipsetMgr.GetIPSet(setName) +} + // CreateIPSets takes in a set object and updates local cache with this set func (dp *DataPlane) CreateIPSets(setMetadata []*ipsets.IPSetMetadata) { dp.ipsetMgr.CreateIPSets(setMetadata) diff --git a/npm/pkg/dataplane/dpshim/dpshim.go b/npm/pkg/dataplane/dpshim/dpshim.go new file mode 100644 index 0000000000..ab356eba0b --- /dev/null +++ b/npm/pkg/dataplane/dpshim/dpshim.go @@ -0,0 +1,67 @@ +package dpshim + +import ( + "github.com/Azure/azure-container-networking/npm/pkg/dataplane" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets" + "github.com/Azure/azure-container-networking/npm/pkg/dataplane/policies" + "github.com/Azure/azure-container-networking/npm/pkg/protos" +) + +// TODO setting this up to unblock another workitem +type DPShim struct { + outChannel chan *protos.Events +} + +func NewDPSim(outChannel chan *protos.Events) *DPShim { + return &DPShim{outChannel: outChannel} +} + +func (dp *DPShim) InitializeDataPlane() error { + return nil +} + +func (dp *DPShim) ResetDataPlane() error { + return nil +} + +func (dp *DPShim) GetIPSet(setName string) *ipsets.IPSet { + return nil +} + +func (dp *DPShim) CreateIPSets(setNames []*ipsets.IPSetMetadata) { +} + +func (dp *DPShim) DeleteIPSet(setMetadata *ipsets.IPSetMetadata) { +} + +func (dp *DPShim) AddToSets(setNames []*ipsets.IPSetMetadata, podMetadata *dataplane.PodMetadata) error { + return nil +} + +func (dp *DPShim) RemoveFromSets(setNames []*ipsets.IPSetMetadata, podMetadata *dataplane.PodMetadata) error { + return nil +} + +func (dp *DPShim) AddToLists(listName, setNames []*ipsets.IPSetMetadata) error { + return nil +} + +func (dp *DPShim) RemoveFromList(listName *ipsets.IPSetMetadata, setNames []*ipsets.IPSetMetadata) error { + return nil +} + +func (dp *DPShim) ApplyDataPlane() error { + return nil +} + +func (dp *DPShim) AddPolicy(networkpolicies *policies.NPMNetworkPolicy) error { + return nil +} + +func (dp *DPShim) RemovePolicy(policyName string) error { + return nil +} + +func (dp *DPShim) UpdatePolicy(networkpolicies *policies.NPMNetworkPolicy) error { + return nil +} diff --git a/npm/pkg/dataplane/ipsets/ipset.go b/npm/pkg/dataplane/ipsets/ipset.go index 7d38f627a0..b8df3d8736 100644 --- a/npm/pkg/dataplane/ipsets/ipset.go +++ b/npm/pkg/dataplane/ipsets/ipset.go @@ -181,7 +181,10 @@ const ( ) type IPSet struct { - Name string + // Name is prefixed name of original set + Name string + unprefixedName string + // HashedName is AzureNpmPrefix (azure-npm-) + hash of prefixed name HashedName string // SetProperties embedding set properties SetProperties @@ -207,8 +210,9 @@ type IPSet struct { func NewIPSet(setMetadata *IPSetMetadata) *IPSet { prefixedName := setMetadata.GetPrefixName() set := &IPSet{ - Name: prefixedName, - HashedName: util.GetHashedName(prefixedName), + Name: prefixedName, + unprefixedName: setMetadata.Name, + HashedName: util.GetHashedName(prefixedName), SetProperties: SetProperties{ Type: setMetadata.Type, Kind: setMetadata.GetSetKind(), @@ -230,11 +234,17 @@ func NewIPSet(setMetadata *IPSetMetadata) *IPSet { return set } +// GetSetMetadata returns set metadata with unprefixed original name and SetType +func (set *IPSet) GetSetMetadata() *IPSetMetadata { + return NewIPSetMetadata(set.unprefixedName, set.Type) +} + func (set *IPSet) String() string { return fmt.Sprintf("Name: %s HashedNamed: %s Type: %s Kind: %s", set.Name, set.HashedName, setTypeName[set.Type], string(set.Kind)) } +// GetSetContents returns members of set as string slice func (set *IPSet) GetSetContents() ([]string, error) { switch set.Kind { case HashSet: diff --git a/npm/pkg/dataplane/mocks/genericdataplane_generated.go b/npm/pkg/dataplane/mocks/genericdataplane_generated.go index 5dd6243e6e..a309d6d57a 100644 --- a/npm/pkg/dataplane/mocks/genericdataplane_generated.go +++ b/npm/pkg/dataplane/mocks/genericdataplane_generated.go @@ -2,7 +2,7 @@ // // Code generated by MockGen. DO NOT EDIT. -// Source: /home/nitishm/github/nitishm/azure-container-networking/npm/pkg/dataplane/types.go +// Source: /mnt/c/Users/vamsi/Desktop/Microsoft_ws/azure-container-networking/npm/pkg/dataplane/types.go // Package mocks is a generated GoMock package. package mocks @@ -119,6 +119,20 @@ func (mr *MockGenericDataplaneMockRecorder) DeleteIPSet(setMetadata interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteIPSet", reflect.TypeOf((*MockGenericDataplane)(nil).DeleteIPSet), setMetadata) } +// GetIPSet mocks base method. +func (m *MockGenericDataplane) GetIPSet(setName string) *ipsets.IPSet { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetIPSet", setName) + ret0, _ := ret[0].(*ipsets.IPSet) + return ret0 +} + +// GetIPSet indicates an expected call of GetIPSet. +func (mr *MockGenericDataplaneMockRecorder) GetIPSet(setName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetIPSet", reflect.TypeOf((*MockGenericDataplane)(nil).GetIPSet), setName) +} + // InitializeDataPlane mocks base method. func (m *MockGenericDataplane) InitializeDataPlane() error { m.ctrl.T.Helper() diff --git a/npm/pkg/dataplane/types.go b/npm/pkg/dataplane/types.go index 88a48a1165..d24210c240 100644 --- a/npm/pkg/dataplane/types.go +++ b/npm/pkg/dataplane/types.go @@ -8,6 +8,7 @@ import ( type GenericDataplane interface { InitializeDataPlane() error ResetDataPlane() error + GetIPSet(setName string) *ipsets.IPSet CreateIPSets(setNames []*ipsets.IPSetMetadata) DeleteIPSet(setMetadata *ipsets.IPSetMetadata) AddToSets(setNames []*ipsets.IPSetMetadata, podMetadata *PodMetadata) error diff --git a/npm/pkg/protos/transport.pb.go b/npm/pkg/protos/transport.pb.go index ebc6980c31..345a189e82 100644 --- a/npm/pkg/protos/transport.pb.go +++ b/npm/pkg/protos/transport.pb.go @@ -9,7 +9,6 @@ package protos import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - structpb "google.golang.org/protobuf/types/known/structpb" reflect "reflect" sync "sync" ) @@ -64,102 +63,6 @@ func (DatapathPodMetadata_APIVersion) EnumDescriptor() ([]byte, []int) { return file_transport_proto_rawDescGZIP(), []int{0, 0} } -// The following events are supported: -type Events_EventType int32 - -const ( - // Send a CREATE/UPDATE event to a datapath Pod - Events_APPLY Events_EventType = 0 - // Send a DELETE event to a datapath Pod - Events_REMOVE Events_EventType = 1 -) - -// Enum value maps for Events_EventType. -var ( - Events_EventType_name = map[int32]string{ - 0: "APPLY", - 1: "REMOVE", - } - Events_EventType_value = map[string]int32{ - "APPLY": 0, - "REMOVE": 1, - } -) - -func (x Events_EventType) Enum() *Events_EventType { - p := new(Events_EventType) - *p = x - return p -} - -func (x Events_EventType) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (Events_EventType) Descriptor() protoreflect.EnumDescriptor { - return file_transport_proto_enumTypes[1].Descriptor() -} - -func (Events_EventType) Type() protoreflect.EnumType { - return &file_transport_proto_enumTypes[1] -} - -func (x Events_EventType) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use Events_EventType.Descriptor instead. -func (Events_EventType) EnumDescriptor() ([]byte, []int) { - return file_transport_proto_rawDescGZIP(), []int{1, 0} -} - -// Supported objects are IPSet and NetworkPolicy objects. -type Events_ObjectType int32 - -const ( - Events_IPSET Events_ObjectType = 0 - Events_POLICY Events_ObjectType = 1 -) - -// Enum value maps for Events_ObjectType. -var ( - Events_ObjectType_name = map[int32]string{ - 0: "IPSET", - 1: "POLICY", - } - Events_ObjectType_value = map[string]int32{ - "IPSET": 0, - "POLICY": 1, - } -) - -func (x Events_ObjectType) Enum() *Events_ObjectType { - p := new(Events_ObjectType) - *p = x - return p -} - -func (x Events_ObjectType) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (Events_ObjectType) Descriptor() protoreflect.EnumDescriptor { - return file_transport_proto_enumTypes[2].Descriptor() -} - -func (Events_ObjectType) Type() protoreflect.EnumType { - return &file_transport_proto_enumTypes[2] -} - -func (x Events_ObjectType) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use Events_ObjectType.Descriptor instead. -func (Events_ObjectType) EnumDescriptor() ([]byte, []int) { - return file_transport_proto_rawDescGZIP(), []int{1, 1} -} - // DatapathPodMetadata is the metadata for a datapath pod type DatapathPodMetadata struct { state protoimpl.MessageState @@ -232,10 +135,8 @@ type Events struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Type Events_EventType `protobuf:"varint,1,opt,name=type,proto3,enum=protos.Events_EventType" json:"type,omitempty"` - Object Events_ObjectType `protobuf:"varint,2,opt,name=object,proto3,enum=protos.Events_ObjectType" json:"object,omitempty"` // Payload can contain one or more Event objects. - Event []*Event `protobuf:"bytes,3,rep,name=event,proto3" json:"event,omitempty"` + Payload map[string]*GoalState `protobuf:"bytes,1,rep,name=payload,proto3" json:"payload,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` } func (x *Events) Reset() { @@ -270,41 +171,27 @@ func (*Events) Descriptor() ([]byte, []int) { return file_transport_proto_rawDescGZIP(), []int{1} } -func (x *Events) GetType() Events_EventType { - if x != nil { - return x.Type - } - return Events_APPLY -} - -func (x *Events) GetObject() Events_ObjectType { - if x != nil { - return x.Object - } - return Events_IPSET -} - -func (x *Events) GetEvent() []*Event { +func (x *Events) GetPayload() map[string]*GoalState { if x != nil { - return x.Event + return x.Payload } return nil } // Event is a generic object that can be Created, // Updated, Deleted by the controlplane. -type Event struct { +type GoalState struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields // Data can contain one or more instances of IPSet or NetworkPolicy // objects. - Data []*structpb.Struct `protobuf:"bytes,3,rep,name=data,proto3" json:"data,omitempty"` + Data [][]byte `protobuf:"bytes,1,rep,name=data,proto3" json:"data,omitempty"` } -func (x *Event) Reset() { - *x = Event{} +func (x *GoalState) Reset() { + *x = GoalState{} if protoimpl.UnsafeEnabled { mi := &file_transport_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -312,13 +199,13 @@ func (x *Event) Reset() { } } -func (x *Event) String() string { +func (x *GoalState) String() string { return protoimpl.X.MessageStringOf(x) } -func (*Event) ProtoMessage() {} +func (*GoalState) ProtoMessage() {} -func (x *Event) ProtoReflect() protoreflect.Message { +func (x *GoalState) ProtoReflect() protoreflect.Message { mi := &file_transport_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -330,12 +217,12 @@ func (x *Event) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use Event.ProtoReflect.Descriptor instead. -func (*Event) Descriptor() ([]byte, []int) { +// Deprecated: Use GoalState.ProtoReflect.Descriptor instead. +func (*GoalState) Descriptor() ([]byte, []int) { return file_transport_proto_rawDescGZIP(), []int{2} } -func (x *Event) GetData() []*structpb.Struct { +func (x *GoalState) GetData() [][]byte { if x != nil { return x.Data } @@ -346,46 +233,39 @@ var File_transport_proto protoreflect.FileDescriptor var file_transport_proto_rawDesc = []byte{ 0x0a, 0x0f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x12, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, - 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xab, 0x01, 0x0a, 0x13, 0x44, 0x61, 0x74, 0x61, - 0x70, 0x61, 0x74, 0x68, 0x50, 0x6f, 0x64, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, - 0x19, 0x0a, 0x08, 0x70, 0x6f, 0x64, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x07, 0x70, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x6e, 0x6f, - 0x64, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6e, - 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x46, 0x0a, 0x0a, 0x61, 0x70, 0x69, 0x56, 0x65, - 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x26, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x70, 0x61, 0x74, 0x68, 0x50, 0x6f, 0x64, - 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x41, 0x50, 0x49, 0x56, 0x65, 0x72, 0x73, - 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x61, 0x70, 0x69, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, - 0x14, 0x0a, 0x0a, 0x41, 0x50, 0x49, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x06, 0x0a, - 0x02, 0x56, 0x31, 0x10, 0x00, 0x22, 0xd7, 0x01, 0x0a, 0x06, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, - 0x12, 0x2c, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x18, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x45, - 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x31, - 0x0a, 0x06, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x4f, - 0x62, 0x6a, 0x65, 0x63, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x06, 0x6f, 0x62, 0x6a, 0x65, 0x63, - 0x74, 0x12, 0x23, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, - 0x32, 0x0d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, - 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x22, 0x0a, 0x09, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, - 0x79, 0x70, 0x65, 0x12, 0x09, 0x0a, 0x05, 0x41, 0x50, 0x50, 0x4c, 0x59, 0x10, 0x00, 0x12, 0x0a, - 0x0a, 0x06, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x10, 0x01, 0x22, 0x23, 0x0a, 0x0a, 0x4f, 0x62, - 0x6a, 0x65, 0x63, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09, 0x0a, 0x05, 0x49, 0x50, 0x53, 0x45, - 0x54, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x50, 0x4f, 0x4c, 0x49, 0x43, 0x59, 0x10, 0x01, 0x22, - 0x34, 0x0a, 0x05, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x2b, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, - 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, - 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x4b, 0x0a, 0x0f, 0x44, 0x61, 0x74, 0x61, 0x70, 0x6c, 0x61, - 0x6e, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x38, 0x0a, 0x07, 0x43, 0x6f, 0x6e, 0x6e, - 0x65, 0x63, 0x74, 0x12, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x44, 0x61, 0x74, - 0x61, 0x70, 0x61, 0x74, 0x68, 0x50, 0x6f, 0x64, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, - 0x1a, 0x0e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, - 0x30, 0x01, 0x42, 0x43, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x41, 0x7a, 0x75, 0x72, 0x65, 0x2f, 0x61, 0x7a, 0x75, 0x72, 0x65, 0x2d, 0x63, 0x6f, 0x6e, - 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x2d, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x69, 0x6e, - 0x67, 0x2f, 0x6e, 0x70, 0x6d, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, - 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x12, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x22, 0xab, 0x01, 0x0a, 0x13, 0x44, 0x61, + 0x74, 0x61, 0x70, 0x61, 0x74, 0x68, 0x50, 0x6f, 0x64, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, + 0x61, 0x12, 0x19, 0x0a, 0x08, 0x70, 0x6f, 0x64, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x6f, 0x64, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1b, 0x0a, 0x09, + 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x08, 0x6e, 0x6f, 0x64, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x46, 0x0a, 0x0a, 0x61, 0x70, 0x69, + 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x26, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x70, 0x61, 0x74, 0x68, 0x50, + 0x6f, 0x64, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x41, 0x50, 0x49, 0x56, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x0a, 0x61, 0x70, 0x69, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, + 0x6e, 0x22, 0x14, 0x0a, 0x0a, 0x41, 0x50, 0x49, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, + 0x06, 0x0a, 0x02, 0x56, 0x31, 0x10, 0x00, 0x22, 0x8e, 0x01, 0x0a, 0x06, 0x45, 0x76, 0x65, 0x6e, + 0x74, 0x73, 0x12, 0x35, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x45, 0x76, 0x65, + 0x6e, 0x74, 0x73, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x79, + 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x4d, 0x0a, 0x0c, 0x50, 0x61, 0x79, + 0x6c, 0x6f, 0x61, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x27, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x73, 0x2e, 0x47, 0x6f, 0x61, 0x6c, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x1f, 0x0a, 0x09, 0x47, 0x6f, 0x61, 0x6c, + 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x4b, 0x0a, 0x0f, 0x44, 0x61, 0x74, + 0x61, 0x70, 0x6c, 0x61, 0x6e, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x38, 0x0a, 0x07, + 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, + 0x2e, 0x44, 0x61, 0x74, 0x61, 0x70, 0x61, 0x74, 0x68, 0x50, 0x6f, 0x64, 0x4d, 0x65, 0x74, 0x61, + 0x64, 0x61, 0x74, 0x61, 0x1a, 0x0e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x45, 0x76, + 0x65, 0x6e, 0x74, 0x73, 0x30, 0x01, 0x42, 0x43, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x41, 0x7a, 0x75, 0x72, 0x65, 0x2f, 0x61, 0x7a, 0x75, 0x72, 0x65, + 0x2d, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x2d, 0x6e, 0x65, 0x74, 0x77, 0x6f, + 0x72, 0x6b, 0x69, 0x6e, 0x67, 0x2f, 0x6e, 0x70, 0x6d, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x73, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( @@ -400,30 +280,26 @@ func file_transport_proto_rawDescGZIP() []byte { return file_transport_proto_rawDescData } -var file_transport_proto_enumTypes = make([]protoimpl.EnumInfo, 3) -var file_transport_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_transport_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_transport_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_transport_proto_goTypes = []interface{}{ (DatapathPodMetadata_APIVersion)(0), // 0: protos.DatapathPodMetadata.APIVersion - (Events_EventType)(0), // 1: protos.Events.EventType - (Events_ObjectType)(0), // 2: protos.Events.ObjectType - (*DatapathPodMetadata)(nil), // 3: protos.DatapathPodMetadata - (*Events)(nil), // 4: protos.Events - (*Event)(nil), // 5: protos.Event - (*structpb.Struct)(nil), // 6: google.protobuf.Struct + (*DatapathPodMetadata)(nil), // 1: protos.DatapathPodMetadata + (*Events)(nil), // 2: protos.Events + (*GoalState)(nil), // 3: protos.GoalState + nil, // 4: protos.Events.PayloadEntry } var file_transport_proto_depIdxs = []int32{ 0, // 0: protos.DatapathPodMetadata.apiVersion:type_name -> protos.DatapathPodMetadata.APIVersion - 1, // 1: protos.Events.type:type_name -> protos.Events.EventType - 2, // 2: protos.Events.object:type_name -> protos.Events.ObjectType - 5, // 3: protos.Events.event:type_name -> protos.Event - 6, // 4: protos.Event.data:type_name -> google.protobuf.Struct - 3, // 5: protos.DataplaneEvents.Connect:input_type -> protos.DatapathPodMetadata - 4, // 6: protos.DataplaneEvents.Connect:output_type -> protos.Events - 6, // [6:7] is the sub-list for method output_type - 5, // [5:6] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 4, // 1: protos.Events.payload:type_name -> protos.Events.PayloadEntry + 3, // 2: protos.Events.PayloadEntry.value:type_name -> protos.GoalState + 1, // 3: protos.DataplaneEvents.Connect:input_type -> protos.DatapathPodMetadata + 2, // 4: protos.DataplaneEvents.Connect:output_type -> protos.Events + 4, // [4:5] is the sub-list for method output_type + 3, // [3:4] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_transport_proto_init() } @@ -457,7 +333,7 @@ func file_transport_proto_init() { } } file_transport_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Event); i { + switch v := v.(*GoalState); i { case 0: return &v.state case 1: @@ -474,8 +350,8 @@ func file_transport_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_transport_proto_rawDesc, - NumEnums: 3, - NumMessages: 3, + NumEnums: 1, + NumMessages: 4, NumExtensions: 0, NumServices: 1, }, diff --git a/npm/pkg/protos/transport.proto b/npm/pkg/protos/transport.proto index 68388b134f..6f1740a0ce 100644 --- a/npm/pkg/protos/transport.proto +++ b/npm/pkg/protos/transport.proto @@ -2,8 +2,6 @@ syntax = "proto3"; package protos; option go_package = "github.com/Azure/azure-container-networking/npm/pkg/protos;protos"; -import "google/protobuf/struct.proto"; - // DataplaneEvents represents the Service RPC exposed by the gRPC server. service DataplaneEvents{ rpc Connect(DatapathPodMetadata) returns (stream Events); @@ -22,30 +20,15 @@ message DatapathPodMetadata { // Events defines the operation (event type) and object type being // streamed to the datapath client. A events message may carry one or // more Event objects. -message Events{ - EventType type = 1; - // The following events are supported: - enum EventType { - // Send a CREATE/UPDATE event to a datapath Pod - APPLY = 0; - // Send a DELETE event to a datapath Pod - REMOVE = 1; - } - - ObjectType object = 2; - // Supported objects are IPSet and NetworkPolicy objects. - enum ObjectType { - IPSET = 0; - POLICY = 1; - } +message Events { // Payload can contain one or more Event objects. - repeated Event event = 3; + map payload = 1; } // Event is a generic object that can be Created, // Updated, Deleted by the controlplane. -message Event { +message GoalState { // Data can contain one or more instances of IPSet or NetworkPolicy // objects. - repeated google.protobuf.Struct data = 3; + repeated bytes data = 1; } diff --git a/npm/pkg/transport/controlplane.go b/npm/pkg/transport/controlplane.go index 4a5c3f5e67..b1752cb69d 100644 --- a/npm/pkg/transport/controlplane.go +++ b/npm/pkg/transport/controlplane.go @@ -1,15 +1,18 @@ package transport import ( + "bytes" "context" + "encoding/gob" "fmt" "net" + cp "github.com/Azure/azure-container-networking/npm/pkg/controlplane" "github.com/Azure/azure-container-networking/npm/pkg/protos" + npmerrors "github.com/Azure/azure-container-networking/npm/util/errors" "google.golang.org/grpc" "google.golang.org/grpc/reflection" "google.golang.org/grpc/stats" - "google.golang.org/protobuf/types/known/structpb" "k8s.io/klog/v2" ) @@ -97,15 +100,18 @@ func (m *Manager) start() error { } } case msg := <-m.inCh: + var payload bytes.Buffer + enc := gob.NewEncoder(&payload) + + err := enc.Encode(msg) + if err != nil { + return npmerrors.SimpleErrorWrapper("failed to encode event", err) + } for _, client := range m.Registrations { if err := client.stream.SendMsg(&protos.Events{ - Type: *protos.Events_APPLY.Enum(), - Object: *protos.Events_IPSET.Enum(), - Event: []*protos.Event{ - { - Data: []*structpb.Struct{ - msg.(*structpb.Struct), - }, + Payload: map[string]*protos.GoalState{ + cp.IpsetApply: { + Data: [][]byte{payload.Bytes()}, }, }, }); err != nil { diff --git a/npm/pkg/transport/dataplane.go b/npm/pkg/transport/dataplane.go index 82b32f52f3..b83735f432 100644 --- a/npm/pkg/transport/dataplane.go +++ b/npm/pkg/transport/dataplane.go @@ -17,6 +17,7 @@ type DataplaneEventsClient struct { } func NewDataplaneEventsClient(ctx context.Context, pod, node, addr string) (*DataplaneEventsClient, error) { + // TODO Make this secure cc, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { return nil, fmt.Errorf("failed to dial %s: %w", addr, err) @@ -56,19 +57,13 @@ func (c *DataplaneEventsClient) run(ctx context.Context, connectClient protos.Da break } - // TODO: REMOVE ME - // This is for debugging purposes only - fmt.Printf( - "Received event type %s object type %s: \n", - event.GetType(), - event.GetObject(), - ) - - for _, e := range event.GetEvent() { - for _, d := range e.GetData() { - eventAsMap := d.AsMap() - fmt.Printf("%s: %s\n", eventAsMap["Type"], eventAsMap["Payload"]) - } + for _, e := range event.GetPayload() { + // TODO: REMOVE ME + // This is for debugging purposes only + fmt.Printf( + "Received event %s \n", + e.String(), + ) } } } diff --git a/test/integration/npm/main.go b/test/integration/npm/main.go index c7ff556aed..726b02e7fd 100644 --- a/test/integration/npm/main.go +++ b/test/integration/npm/main.go @@ -93,7 +93,7 @@ func main() { panicOnError(dp.AddToSets([]*ipsets.IPSetMetadata{ipsets.TestNSSet.Metadata}, podMetadataB)) podMetadataC := &dataplane.PodMetadata{ PodKey: "c", - PodIP: "10.240.0.28", + PodIP: "10.240.0.83", NodeName: nodeName, } panicOnError(dp.AddToSets([]*ipsets.IPSetMetadata{ipsets.TestKeyPodSet.Metadata, ipsets.TestNSSet.Metadata}, podMetadataC)) @@ -120,6 +120,8 @@ func main() { dp.DeleteIPSet(ipsets.TestKVPodSet.Metadata) panicOnError(dp.ApplyDataPlane()) + panicOnError(dp.AddToLists([]*ipsets.IPSetMetadata{ipsets.TestNestedLabelList.Metadata}, []*ipsets.IPSetMetadata{ipsets.TestKVPodSet.Metadata, ipsets.TestNSSet.Metadata})) + printAndWait(true) panicOnError(dp.RemoveFromSets([]*ipsets.IPSetMetadata{ipsets.TestNSSet.Metadata}, podMetadata)) @@ -138,7 +140,7 @@ func main() { podMetadataD = &dataplane.PodMetadata{ PodKey: "d", - PodIP: "10.240.0.27", + PodIP: "10.240.0.91", NodeName: nodeName, } panicOnError(dp.AddToSets([]*ipsets.IPSetMetadata{ipsets.TestKeyPodSet.Metadata, ipsets.TestNSSet.Metadata}, podMetadataD))