Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (nsc *NamespaceController) cleanDeletedNamespace(cachedNsKey string) error
}

// Delete ipset for the namespace.
nsc.dp.DeleteIPSet(ipsets.NewIPSetMetadata(cachedNsKey, ipsets.Namespace))
nsc.dp.DeleteIPSet(ipsets.NewIPSetMetadata(cachedNsKey, ipsets.Namespace), util.SoftDelete)

delete(nsc.npmNamespaceCache.NsMap, cachedNsKey)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func TestDeleteNamespace(t *testing.T) {
for i := 1; i < len(setsToAddNamespaceTo); i++ {
dp.EXPECT().RemoveFromList(setsToAddNamespaceTo[i], setsToAddNamespaceTo[:1]).Return(nil).Times(1)
}
dp.EXPECT().DeleteIPSet(setsToAddNamespaceTo[0]).Return().Times(1)
dp.EXPECT().DeleteIPSet(setsToAddNamespaceTo[0], util.SoftDelete).Return().Times(1)

deleteNamespace(t, f, nsObj, DeletedFinalStateknownObject)

Expand Down Expand Up @@ -702,7 +702,7 @@ func TestDeleteNamespaceWithTombstoneAfterAddingNameSpace(t *testing.T) {
for i := 1; i < len(setsToAddNamespaceTo); i++ {
dp.EXPECT().RemoveFromList(setsToAddNamespaceTo[i], setsToAddNamespaceTo[:1]).Return(nil).Times(1)
}
dp.EXPECT().DeleteIPSet(setsToAddNamespaceTo[0]).Return().Times(1)
dp.EXPECT().DeleteIPSet(setsToAddNamespaceTo[0], util.SoftDelete).Return().Times(1)

deleteNamespace(t, f, nsObj, DeletedFinalStateUnknownObject)
testCases := []expectedNsValues{
Expand Down
166 changes: 122 additions & 44 deletions npm/pkg/controlplane/goalstateprocessor/goalstateprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/Azure/azure-container-networking/npm/pkg/dataplane"
"github.com/Azure/azure-container-networking/npm/pkg/dataplane/ipsets"
"github.com/Azure/azure-container-networking/npm/pkg/protos"
"github.com/Azure/azure-container-networking/npm/util"
npmerrors "github.com/Azure/azure-container-networking/npm/util/errors"
"k8s.io/klog"
)
Expand Down Expand Up @@ -69,6 +70,8 @@ func (gsp *GoalStateProcessor) run(stopCh <-chan struct{}) {

func (gsp *GoalStateProcessor) processNext(stopCh <-chan struct{}) bool {
select {
// TODO benchmark how many events can stay in pipeline as we work
// on a previous event
case inputEvents := <-gsp.inputChannel:
// TODO remove this large print later
klog.Infof("Received event %s", inputEvents)
Expand All @@ -92,9 +95,6 @@ func (gsp *GoalStateProcessor) processNext(stopCh <-chan struct{}) bool {
}

func (gsp *GoalStateProcessor) process(inputEvent *protos.Events) {
// TODO (Vamsi) differentiate between hydration event and a normal event
// in hydration event, any thing in local cache and not in event should be deleted.

klog.Infof("Processing event")
// apply dataplane after syncing
defer func() {
Expand All @@ -104,58 +104,145 @@ func (gsp *GoalStateProcessor) process(inputEvent *protos.Events) {
}
}()

// Process these individual buckkets in order
// 1. Apply IPSET
// 2. Apply POLICY
// 3. Remove POLICY
// 4. Remove IPSET

// TODO need to handle first connect stream of all GoalStates
payload := inputEvent.GetPayload()

if !validatePayload(payload) {
klog.Warningf("Empty payload in event %s", inputEvent)
return
}

switch inputEvent.GetEventType() {
case protos.Events_Hydration:
// in hydration event, any thing in local cache and not in event should be deleted.
klog.Infof("Received hydration event")
gsp.processHydrationEvent(payload)
case protos.Events_GoalState:
klog.Infof("Received goal state event")
gsp.processGoalStateEvent(payload)
default:
klog.Errorf("Received unknown event type %s", inputEvent.GetEventType())
}
}

func (gsp *GoalStateProcessor) processHydrationEvent(payload map[string]*protos.GoalState) {
// Hydration events are sent when the daemon first starts up, or a reconnection to controller happens.
// In this case, the controller will send a current state of the cache down to daemon.
// Daemon will need to calculate what updates and deleted have been missed and send them to the dataplane.

// Sequence of processing will be:
// Apply IPsets
// Apply Policies
// Get all existing IPSets and policies in the dataplane
// Delete cached Policies not in event
// Delete cached IPSets (without references) not in the event

var appendedIPSets map[string]struct{}
var appendedPolicies map[string]struct{}
var err error

if ipsetApplyPayload, ok := payload[cp.IpsetApply]; ok {
err := gsp.processIPSetsApplyEvent(ipsetApplyPayload)
appendedIPSets, err = gsp.processIPSetsApplyEvent(ipsetApplyPayload)
if err != nil {
klog.Errorf("Error processing IPSET apply HYDRATION event %s", err)
}
}

if policyApplyPayload, ok := payload[cp.PolicyApply]; ok {
appendedPolicies, err = gsp.processPolicyApplyEvent(policyApplyPayload)
if err != nil {
klog.Errorf("Error processing POLICY apply HYDRATION event %s", err)
}
}

cachedPolicyKeys := gsp.dp.GetAllPolicies()
toDeletePolicies := make([]string, 0)
if appendedPolicies == nil {
toDeletePolicies = cachedPolicyKeys
} else {
for _, policy := range cachedPolicyKeys {
if _, ok := appendedPolicies[policy]; !ok {
toDeletePolicies = append(toDeletePolicies, policy)
}
}
}

if len(toDeletePolicies) > 0 {
klog.Infof("Deleting %d policies", len(toDeletePolicies))
err = gsp.processPolicyRemoveEvent(toDeletePolicies)
if err != nil {
klog.Errorf("Error processing POLICY remove HYDRATION event %s", err)
}
}

cachedIPSetNames := gsp.dp.GetAllIPSets()
toDeleteIPSets := make([]string, 0)

if appendedIPSets == nil {
toDeleteIPSets = cachedIPSetNames
} else {
for _, ipset := range cachedIPSetNames {
if _, ok := appendedIPSets[ipset]; !ok {
toDeleteIPSets = append(toDeleteIPSets, ipset)
}
}
}

if len(toDeleteIPSets) > 0 {
klog.Infof("Deleting %d ipsets", len(toDeleteIPSets))
gsp.processIPSetsRemoveEvent(toDeleteIPSets, util.ForceDelete)
}
}

func (gsp *GoalStateProcessor) processGoalStateEvent(payload map[string]*protos.GoalState) {
// Process these individual buckets in order
// 1. Apply IPSET
// 2. Apply POLICY
// 3. Remove POLICY
// 4. Remove IPSET
if ipsetApplyPayload, ok := payload[cp.IpsetApply]; ok {
_, err := gsp.processIPSetsApplyEvent(ipsetApplyPayload)
if err != nil {
klog.Errorf("Error processing IPSET apply event %s", err)
}
}

if policyApplyPayload, ok := payload[cp.PolicyApply]; ok {
err := gsp.processPolicyApplyEvent(policyApplyPayload)
_, err := gsp.processPolicyApplyEvent(policyApplyPayload)
if err != nil {
klog.Errorf("Error processing POLICY apply event %s", err)
}
}

if policyRemovePayload, ok := payload[cp.PolicyRemove]; ok {
err := gsp.processPolicyRemoveEvent(policyRemovePayload)
payload := bytes.NewBuffer(policyRemovePayload.GetData())
netpolNames, err := cp.DecodeStrings(payload)
if err != nil {
klog.Errorf("Error processing POLICY remove event, failed to decode Policy remove event %s", err)
}
err = gsp.processPolicyRemoveEvent(netpolNames)
if err != nil {
klog.Errorf("Error processing POLICY remove event %s", err)
}
}

if ipsetRemovePayload, ok := payload[cp.IpsetRemove]; ok {
err := gsp.processIPSetsRemoveEvent(ipsetRemovePayload)
payload := bytes.NewBuffer(ipsetRemovePayload.GetData())
ipsetNames, err := cp.DecodeStrings(payload)
if err != nil {
klog.Errorf("Error processing IPSET remove event %s", err)
klog.Errorf("Error processing IPSET remove event, failed to decode IPSet remove event: %s", err)
}
gsp.processIPSetsRemoveEvent(ipsetNames, util.SoftDelete)
}
}

func (gsp *GoalStateProcessor) processIPSetsApplyEvent(goalState *protos.GoalState) error {
func (gsp *GoalStateProcessor) processIPSetsApplyEvent(goalState *protos.GoalState) (map[string]struct{}, error) {
payload := bytes.NewBuffer(goalState.GetData())
payloadIPSets, err := cp.DecodeControllerIPSets(payload)
if err != nil {
return npmerrors.SimpleErrorWrapper("failed to decode IPSet apply event", err)
return nil, npmerrors.SimpleErrorWrapper("failed to decode IPSet apply event", err)
}

klog.Infof("Processing IPSet apply event %v", payloadIPSets)

appendedIPSets := make(map[string]struct{}, len(payloadIPSets))
for _, ipset := range payloadIPSets {
if ipset == nil {
klog.Warningf("Empty IPSet apply event")
Expand All @@ -176,20 +263,21 @@ func (gsp *GoalStateProcessor) processIPSetsApplyEvent(goalState *protos.GoalSta
case ipsets.HashSet:
err = gsp.applySets(ipset, cachedIPSet)
if err != nil {
return err
return nil, err
}
case ipsets.ListSet:
err = gsp.applyLists(ipset, cachedIPSet)
if err != nil {
return err
return nil, err
}
case ipsets.UnknownKind:
return npmerrors.SimpleError(
return nil, npmerrors.SimpleError(
fmt.Sprintf("failed to decode IPSet apply event: Unknown IPSet kind %s", cachedIPSet.Kind),
)
}
appendedIPSets[ipsetName] = struct{}{}
}
return nil
return appendedIPSets, nil
}

func (gsp *GoalStateProcessor) applySets(ipSet *cp.ControllerIPSets, cachedIPSet *ipsets.IPSet) error {
Expand Down Expand Up @@ -255,13 +343,7 @@ func (gsp *GoalStateProcessor) applyLists(ipSet *cp.ControllerIPSets, cachedIPSe
return nil
}

func (gsp *GoalStateProcessor) processIPSetsRemoveEvent(goalState *protos.GoalState) error {
payload := bytes.NewBuffer(goalState.GetData())
ipsetNames, err := cp.DecodeStrings(payload)
if err != nil {
return npmerrors.SimpleErrorWrapper("failed to decode IPSet remove event", err)
}

func (gsp *GoalStateProcessor) processIPSetsRemoveEvent(ipsetNames []string, forceDelete util.DeleteOption) {
for _, ipsetName := range ipsetNames {
if ipsetName == "" {
klog.Warningf("Empty IPSet remove event")
Expand All @@ -272,21 +354,21 @@ func (gsp *GoalStateProcessor) processIPSetsRemoveEvent(goalState *protos.GoalSt
cachedIPSet := gsp.dp.GetIPSet(ipsetName)
if cachedIPSet == nil {
klog.Infof("IPSet %s not found in cache, ignoring delete call.", ipsetName)
return nil
continue
}

gsp.dp.DeleteIPSet(ipsets.NewIPSetMetadata(cachedIPSet.Name, cachedIPSet.Type))
gsp.dp.DeleteIPSet(ipsets.NewIPSetMetadata(cachedIPSet.Name, cachedIPSet.Type), forceDelete)
}
return nil
}

func (gsp *GoalStateProcessor) processPolicyApplyEvent(goalState *protos.GoalState) error {
func (gsp *GoalStateProcessor) processPolicyApplyEvent(goalState *protos.GoalState) (map[string]struct{}, error) {
payload := bytes.NewBuffer(goalState.GetData())
netpols, err := cp.DecodeNPMNetworkPolicies(payload)
if err != nil {
return npmerrors.SimpleErrorWrapper("failed to decode Policy apply event", err)
return nil, npmerrors.SimpleErrorWrapper("failed to decode Policy apply event", err)
}

appendedPolicies := make(map[string]struct{}, len(netpols))
for _, netpol := range netpols {
if netpol == nil {
klog.Warningf("Empty Policy apply event")
Expand All @@ -298,18 +380,14 @@ func (gsp *GoalStateProcessor) processPolicyApplyEvent(goalState *protos.GoalSta
err = gsp.dp.UpdatePolicy(netpol)
if err != nil {
klog.Errorf("Error applying policy %s to dataplane with error: %s", netpol.Name, err.Error())
return npmerrors.SimpleErrorWrapper("failed update policy event", err)
return nil, npmerrors.SimpleErrorWrapper("failed update policy event", err)
}
appendedPolicies[netpol.PolicyKey] = struct{}{}
}
return nil
return appendedPolicies, nil
}

func (gsp *GoalStateProcessor) processPolicyRemoveEvent(goalState *protos.GoalState) error {
payload := bytes.NewBuffer(goalState.GetData())
netpolNames, err := cp.DecodeStrings(payload)
if err != nil {
return npmerrors.SimpleErrorWrapper("failed to decode Policy remove event", err)
}
func (gsp *GoalStateProcessor) processPolicyRemoveEvent(netpolNames []string) error {
for _, netpolName := range netpolNames {
klog.Infof("Processing %s Policy remove event", netpolName)

Expand All @@ -318,7 +396,7 @@ func (gsp *GoalStateProcessor) processPolicyRemoveEvent(goalState *protos.GoalSt
continue
}

err = gsp.dp.RemovePolicy(netpolName)
err := gsp.dp.RemovePolicy(netpolName)
if err != nil {
klog.Errorf("Error removing policy %s from dataplane with error: %s", netpolName, err.Error())
return npmerrors.SimpleErrorWrapper("failed remove policy event", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func TestPolicyApplyEvent(t *testing.T) {

go func() {
inputChan <- &protos.Events{
EventType: protos.Events_GoalState,
Payload: map[string]*protos.GoalState{
controlplane.PolicyApply: {
Data: payload.Bytes(),
Expand Down Expand Up @@ -174,7 +175,8 @@ func TestIPSetsApplyUpdateMembers(t *testing.T) {
gsp, _ := NewGoalStateProcessor(ctx, "node1", "pod1", inputChan, dp)
go func() {
inputChan <- &protos.Events{
Payload: goalState,
EventType: protos.Events_GoalState,
Payload: goalState,
}
}()
time.Sleep(sleepAfterChanSent)
Expand All @@ -192,7 +194,8 @@ func TestIPSetsApplyUpdateMembers(t *testing.T) {
)
go func() {
inputChan <- &protos.Events{
Payload: goalState,
EventType: protos.Events_GoalState,
Payload: goalState,
}
}()
time.Sleep(sleepAfterChanSent)
Expand Down
14 changes: 11 additions & 3 deletions npm/pkg/dataplane/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (dp *DataPlane) CreateIPSets(setMetadata []*ipsets.IPSetMetadata) {

// DeleteSet checks for members and references of the given "set" type ipset
// if not used then will delete it from cache
func (dp *DataPlane) DeleteIPSet(setMetadata *ipsets.IPSetMetadata) {
dp.ipsetMgr.DeleteIPSet(setMetadata.GetPrefixName())
func (dp *DataPlane) DeleteIPSet(setMetadata *ipsets.IPSetMetadata, forceDelete util.DeleteOption) {
dp.ipsetMgr.DeleteIPSet(setMetadata.GetPrefixName(), forceDelete)
}

// AddToSets takes in a list of IPSet names along with IP member
Expand Down Expand Up @@ -276,6 +276,14 @@ func (dp *DataPlane) UpdatePolicy(policy *policies.NPMNetworkPolicy) error {
return nil
}

func (dp *DataPlane) GetAllIPSets() []string {
return dp.ipsetMgr.GetAllIPSets()
}

func (dp *DataPlane) GetAllPolicies() []string {
return dp.policyMgr.GetAllPolicies()
}

func (dp *DataPlane) createIPSetsAndReferences(sets []*ipsets.TranslatedIPSet, netpolName string, referenceType ipsets.ReferenceType) error {
// Create IPSets first along with reference updates
npmErrorString := npmerrors.AddSelectorReference
Expand Down Expand Up @@ -367,7 +375,7 @@ func (dp *DataPlane) deleteIPSetsAndReferences(sets []*ipsets.TranslatedIPSet, n
}

// Try to delete these IPSets
dp.ipsetMgr.DeleteIPSet(set.Metadata.GetPrefixName())
dp.ipsetMgr.DeleteIPSet(set.Metadata.GetPrefixName(), false)
}
return nil
}
Loading